调整队列出队时机

This commit is contained in:
mxd 2020-03-27 11:44:21 +08:00
parent 562ad6dfbf
commit dcab892dd8
6 changed files with 69 additions and 17 deletions

View File

@ -1,17 +1,27 @@
package org.spiderflow.concurrent;
import org.spiderflow.model.SpiderNode;
import java.util.Comparator;
import java.util.PriorityQueue;
public class ChildPriorThreadSubmitStrategy implements ThreadSubmitStrategy{
private Object mutex = this;
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> {
if(o1.getNode().hasLeftNode(o2.getNode().getNodeId())){
private Comparator<SpiderNode> comparator = (o1, o2) -> {
if(o1.hasLeftNode(o2.getNodeId())){
return -1;
}
return 1;
});
}
return 1;
};
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> comparator.compare(o1.getNode(),o2.getNode()));
@Override
public Comparator<SpiderNode> comparator() {
return comparator;
}
@Override
public void add(SpiderFutureTask<?> task) {

View File

@ -1,5 +1,8 @@
package org.spiderflow.concurrent;
import org.spiderflow.model.SpiderNode;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -7,6 +10,11 @@ public class LinkedThreadSubmitStrategy implements ThreadSubmitStrategy{
private List<SpiderFutureTask<?>> taskList = new CopyOnWriteArrayList<>();
@Override
public Comparator<SpiderNode> comparator() {
return (o1, o2) -> -1;
}
@Override
public void add(SpiderFutureTask<?> task) {
taskList.add(task);

View File

@ -1,17 +1,27 @@
package org.spiderflow.concurrent;
import org.spiderflow.model.SpiderNode;
import java.util.Comparator;
import java.util.PriorityQueue;
public class ParentPriorThreadSubmitStrategy implements ThreadSubmitStrategy{
public class ParentPriorThreadSubmitStrategy implements ThreadSubmitStrategy {
private Object mutex = this;
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> {
if(o1.getNode().hasLeftNode(o2.getNode().getNodeId())){
private Comparator<SpiderNode> comparator = (o1, o2) -> {
if (o1.hasLeftNode(o2.getNodeId())) {
return 1;
}
return -1;
});
};
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> comparator.compare(o1.getNode(), o2.getNode()));
@Override
public Comparator<SpiderNode> comparator() {
return comparator;
}
@Override
public void add(SpiderFutureTask<?> task) {

View File

@ -1,7 +1,9 @@
package org.spiderflow.concurrent;
import org.apache.commons.lang3.RandomUtils;
import org.spiderflow.model.SpiderNode;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -9,6 +11,11 @@ public class RandomThreadSubmitStrategy implements ThreadSubmitStrategy{
private List<SpiderFutureTask<?>> taskList = new CopyOnWriteArrayList<>();
@Override
public Comparator<SpiderNode> comparator() {
return (o1, o2) -> RandomUtils.nextInt(0,3) - 1;
}
@Override
public void add(SpiderFutureTask<?> task) {
taskList.add(task);

View File

@ -1,7 +1,13 @@
package org.spiderflow.concurrent;
import org.spiderflow.model.SpiderNode;
import java.util.Comparator;
public interface ThreadSubmitStrategy {
Comparator<SpiderNode> comparator();
void add(SpiderFutureTask<?> task);
boolean isEmpty();

View File

@ -23,7 +23,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -115,31 +118,39 @@ public class Spider {
if (listeners != null) {
listeners.forEach(listener -> listener.beforeStart(context));
}
Comparator<SpiderNode> comparator = submitStrategy.comparator();
//启动一个线程开始执行任务,并监听其结束并执行下一级
Future<?> f = pool.submitAsync(TtlRunnable.get(() -> {
try {
//执行具体节点
Spider.this.executeNode(null, root, context, variables);
Queue<Future<?>> queue = context.getFutureQueue();
//循环从队列中获取Future,直到队列为空结束,当任务完成时则执行下一级如未完成把Future加到队列末尾
//循环从队列中获取Future,直到队列为空结束,当任务完成时则执行下一级
while (!queue.isEmpty()) {
try {
Future<?> future = queue.poll();
if (future.isDone()) { //判断任务是否完成
//TODO 这里应该是取出最先执行完毕的任务
Optional<Future<?>> max = queue.stream().filter(Future::isDone).max((o1, o2) -> {
try {
return comparator.compare(((SpiderTask) o1.get()).node, ((SpiderTask) o2.get()).node);
} catch (InterruptedException | ExecutionException e) {
}
return 0;
});
if (max.isPresent()) { //判断任务是否完成
queue.remove(max.get());
if (context.isRunning()) { //检测是否运行中(当在页面中点击"停止",此值为false,其余为true)
SpiderTask task = (SpiderTask) future.get();
SpiderTask task = (SpiderTask) max.get().get();
task.node.decrement(); //任务执行完毕,计数器减一(该计数器是给Join节点使用)
if (task.executor.allowExecuteNext(task.node, context, task.variables)) { //判断是否允许执行下一级
logger.debug("执行节点[{}:{}]完毕", task.node.getNodeName(), task.node.getNodeId());
System.out.println("执行下一级:" + task.node.getNodeName());
//执行下一级
Spider.this.executeNextNodes(task.node, context, task.variables);
} else {
logger.debug("执行节点[{}:{}]完毕,忽略执行下一节点", task.node.getNodeName(), task.node.getNodeId());
}
}
} else {
//将Future加到队列末尾
queue.add(future);
}
//睡眠1ms,让出cpu
Thread.sleep(1);