线程提交策略配置

This commit is contained in:
mxd 2020-03-26 16:59:59 +08:00
parent 8b3cf43eb0
commit 9cef96a04e
11 changed files with 218 additions and 33 deletions

View File

@ -0,0 +1,36 @@
package org.spiderflow.concurrent;
import java.util.PriorityQueue;
public class ChildPriorThreadSubmitStrategy implements ThreadSubmitStrategy{
private Object mutex = this;
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> {
if(o1.getNode().hasLeftNode(o2.getNode().getNodeId())){
return -1;
}
return 1;
});
@Override
public void add(SpiderFutureTask<?> task) {
synchronized (mutex){
priorityQueue.add(task);
}
}
@Override
public boolean isEmpty() {
synchronized (mutex){
return priorityQueue.isEmpty();
}
}
@Override
public SpiderFutureTask<?> get() {
synchronized (mutex){
return priorityQueue.poll();
}
}
}

View File

@ -0,0 +1,24 @@
package org.spiderflow.concurrent;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class LinkedThreadSubmitStrategy implements ThreadSubmitStrategy{
private List<SpiderFutureTask<?>> taskList = new CopyOnWriteArrayList<>();
@Override
public void add(SpiderFutureTask<?> task) {
taskList.add(task);
}
@Override
public boolean isEmpty() {
return taskList.isEmpty();
}
@Override
public SpiderFutureTask<?> get() {
return taskList.remove(0);
}
}

View File

@ -0,0 +1,37 @@
package org.spiderflow.concurrent;
import java.util.PriorityQueue;
public class ParentPriorThreadSubmitStrategy implements ThreadSubmitStrategy{
private Object mutex = this;
private PriorityQueue<SpiderFutureTask<?>> priorityQueue = new PriorityQueue<>((o1, o2) -> {
if(o1.getNode().hasLeftNode(o2.getNode().getNodeId())){
return 1;
}
return -1;
});
@Override
public void add(SpiderFutureTask<?> task) {
synchronized (mutex) {
priorityQueue.add(task);
}
}
@Override
public boolean isEmpty() {
synchronized (mutex) {
return priorityQueue.isEmpty();
}
}
@Override
public SpiderFutureTask<?> get() {
synchronized (mutex) {
return priorityQueue.poll();
}
}
}

View File

@ -0,0 +1,26 @@
package org.spiderflow.concurrent;
import org.apache.commons.lang3.RandomUtils;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class RandomThreadSubmitStrategy implements ThreadSubmitStrategy{
private List<SpiderFutureTask<?>> taskList = new CopyOnWriteArrayList<>();
@Override
public void add(SpiderFutureTask<?> task) {
taskList.add(task);
}
@Override
public boolean isEmpty() {
return taskList.isEmpty();
}
@Override
public SpiderFutureTask<?> get() {
return taskList.remove(RandomUtils.nextInt(0, taskList.size()));
}
}

View File

@ -1,8 +1,7 @@
package org.spiderflow.concurrent;
import org.apache.commons.lang3.RandomUtils;
import org.spiderflow.model.SpiderNode;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@ -49,8 +48,8 @@ public class SpiderFlowThreadPoolExecutor {
* @param threads 线程池大小
* @return
*/
public SubThreadPoolExecutor createSubThreadPoolExecutor(int threads){
return new SubThreadPoolExecutor(Math.min(maxThreads, threads));
public SubThreadPoolExecutor createSubThreadPoolExecutor(int threads,ThreadSubmitStrategy submitStrategy){
return new SubThreadPoolExecutor(Math.min(maxThreads, threads),submitStrategy);
}
/**
@ -83,12 +82,13 @@ public class SpiderFlowThreadPoolExecutor {
*/
private volatile boolean submitting = false;
private List<SpiderFutureTask<?>> futureTaskList = new CopyOnWriteArrayList<>();
private ThreadSubmitStrategy submitStrategy;
public SubThreadPoolExecutor(int threads) {
public SubThreadPoolExecutor(int threads,ThreadSubmitStrategy submitStrategy) {
super();
this.threads = threads;
this.futures = new Future[threads];
this.submitStrategy = submitStrategy;
}
/**
@ -100,8 +100,8 @@ public class SpiderFlowThreadPoolExecutor {
}
running = false;
//当停止时,唤醒提交任务线程使其结束
synchronized (futureTaskList){
futureTaskList.notifyAll();
synchronized (submitStrategy){
submitStrategy.notifyAll();
}
}
@ -141,7 +141,7 @@ public class SpiderFlowThreadPoolExecutor {
/**
* 异步提交任务
*/
public <T> Future<T> submitAsync(Runnable runnable,T value){
public <T> Future<T> submitAsync(Runnable runnable, T value, SpiderNode node){
SpiderFutureTask<T> future = new SpiderFutureTask<>(()-> {
try {
//执行任务
@ -150,16 +150,18 @@ public class SpiderFlowThreadPoolExecutor {
//正在执行的线程数-1
executing.decrementAndGet();
}
}, value,this);
synchronized (futureTaskList){
futureTaskList.add(future);
//如果是第一次调用submitSync方法则启动提交任务线程
if(!submitting){
submitting = true;
CompletableFuture.runAsync(this::submit);
}
}, value,node,this);
submitStrategy.add(future);
//如果是第一次调用submitSync方法则启动提交任务线程
if(!submitting){
submitting = true;
CompletableFuture.runAsync(this::submit);
}
synchronized (submitStrategy){
//通知继续从集合中取任务提交到线程池中
futureTaskList.notifyAll();
submitStrategy.notifyAll();
}
return future;
}
@ -167,16 +169,16 @@ public class SpiderFlowThreadPoolExecutor {
private void submit(){
while(running){
try {
synchronized (futureTaskList){
synchronized (submitStrategy){
//如果集合是空的则等待提交
if(futureTaskList.isEmpty()){
futureTaskList.wait(); //等待唤醒
if(submitStrategy.isEmpty()){
submitStrategy.wait(); //等待唤醒
}
}
//当该线程被唤醒时把集合中所有任务都提交到线程池中
while(!futureTaskList.isEmpty()){
//随机从集合中获取任务提交到线程池中
SpiderFutureTask<?> futureTask = futureTaskList.remove(RandomUtils.nextInt(0, futureTaskList.size()));
while(!submitStrategy.isEmpty()){
//从提交策略中获取任务提交到线程池中
SpiderFutureTask<?> futureTask = submitStrategy.get();
//如果没有空闲线程且在线程池中提交则直接运行
if(index() == -1 && Thread.currentThread().getThreadGroup() == SPIDER_FLOW_THREAD_GROUP){
futureTask.run();

View File

@ -2,17 +2,25 @@ package org.spiderflow.concurrent;
import java.util.concurrent.FutureTask;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
import org.spiderflow.model.SpiderNode;
public class SpiderFutureTask<V> extends FutureTask {
private SubThreadPoolExecutor executor;
public SpiderFutureTask(Runnable runnable, V result, SubThreadPoolExecutor executor) {
private SpiderNode node;
public SpiderFutureTask(Runnable runnable, V result, SpiderNode node,SubThreadPoolExecutor executor) {
super(runnable,result);
this.executor = executor;
this.node = node;
}
public SubThreadPoolExecutor getExecutor() {
return executor;
}
public SpiderNode getNode() {
return node;
}
}

View File

@ -0,0 +1,10 @@
package org.spiderflow.concurrent;
public interface ThreadSubmitStrategy {
void add(SpiderFutureTask<?> task);
boolean isEmpty();
SpiderFutureTask<?> get();
}

View File

@ -27,6 +27,12 @@ public class SpiderNode {
* 节点列表中的上一个节点
*/
private List<SpiderNode> prevNodes = new ArrayList<>();
/**
* 父级节点ID
*/
private Set<String> parentNodes;
/**
* 节点流转条件
*/
@ -153,6 +159,23 @@ public class SpiderNode {
counter.decrementAndGet();
}
public boolean hasLeftNode(String nodeId){
if(parentNodes == null){
Set<String> parents = new HashSet<>();
generateParents(parents);
this.parentNodes = parents;
}
return this.parentNodes.contains(nodeId);
}
private void generateParents(Set<String> parents){
for (SpiderNode prevNode : prevNodes) {
if(parents.add(prevNode.nodeId)){
prevNode.generateParents(parents);
}
}
}
public boolean isDone(){
return isDone(new HashSet<>());
}

View File

@ -5,7 +5,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor;
import org.spiderflow.concurrent.*;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
import org.spiderflow.context.SpiderContext;
import org.spiderflow.context.SpiderContextHolder;
@ -95,8 +95,20 @@ public class Spider {
private void executeRoot(SpiderNode root, SpiderContext context, Map<String, Object> variables) {
//获取当前流程执行线程数
int nThreads = NumberUtils.toInt(root.getStringJsonValue(ShapeExecutor.THREAD_COUNT), defaultThreads);
String strategy = root.getStringJsonValue("submit-strategy");
ThreadSubmitStrategy submitStrategy;
//选择提交策略这里一定要使用new,不能与其他实例共享
if("linked".equalsIgnoreCase(strategy)){
submitStrategy = new LinkedThreadSubmitStrategy();
}else if("child".equalsIgnoreCase(strategy)){
submitStrategy = new ChildPriorThreadSubmitStrategy();
}else if("parent".equalsIgnoreCase(strategy)){
submitStrategy = new ParentPriorThreadSubmitStrategy();
}else{
submitStrategy = new RandomThreadSubmitStrategy();
}
//创建子线程池采用一父多子的线程池,子线程数不能超过总线程数超过时进入队列等待,+1是因为会占用一个线程用来调度执行下一级
SubThreadPoolExecutor pool = threadPoolExecutor.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1);
SubThreadPoolExecutor pool = threadPoolExecutor.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy);
context.setRootNode(root);
context.setThreadPool(pool);
//触发监听器
@ -142,7 +154,7 @@ public class Spider {
listeners.forEach(listener -> listener.afterEnd(context));
}
}
}), null);
}), null, root);
try {
f.get(); //阻塞等待所有任务执行完毕
} catch (InterruptedException | ExecutionException ignored) {}
@ -235,7 +247,7 @@ public class Spider {
for (SpiderTask task : tasks) {
if(executor.isThread()){ //判断节点是否是异步运行
//提交任务至线程池中,并将Future添加到队列末尾
futureQueue.add(context.getThreadPool().submitAsync(task.runnable, task));
futureQueue.add(context.getThreadPool().submitAsync(task.runnable, task, node));
}else{
FutureTask<SpiderTask> futureTask = new FutureTask<>(task.runnable, task);
futureTask.run();

View File

@ -205,8 +205,4 @@ public class OutputExecutor implements ShapeExecutor{
return "output";
}
@Override
public boolean isThread() {
return false;
}
}

View File

@ -14,6 +14,17 @@
<input type="text" name="spiderName" placeholder="请输入爬虫名称" autocomplete="off" class="layui-input" value="{{d.data.object.spiderName || '未定义名称'}}">
</div>
</div>
<div class="layui-col-md4">
<label class="layui-form-label">提交策略</label>
<div class="layui-input-block">
<select name="submit-strategy">
<option value="random" {{d.data.object['submit-strategy'] == 'random' ? 'selected':''}}>随机</option>
<option value="linked" {{d.data.object['submit-strategy'] == 'linked' ? 'selected':''}}>顺序</option>
<option value="child" {{d.data.object['submit-strategy'] == 'child' ? 'selected':''}}>子优先</option>
<option value="parent" {{d.data.object['submit-strategy'] == 'parent' ? 'selected':''}}>父优先</option>
</select>
</div>
</div>
<div class="layui-col-md4">
<label class="layui-form-label">最大线程数</label>
<div class="layui-input-block">