优化线程池,避免线程数过多

This commit is contained in:
mxd 2019-10-13 18:15:27 +08:00
parent 34c9e1d077
commit 92a7ed1f89
5 changed files with 146 additions and 129 deletions

View File

@ -0,0 +1,116 @@
package org.spiderflow.concurrent;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class SpiderFlowThreadPoolExecutor {
private int maxThreads;
private ThreadPoolExecutor executor;
private final AtomicInteger poolNumber = new AtomicInteger(1);
private static final ThreadGroup SPIDER_FLOW_THREAD_GROUP = new ThreadGroup("spider-flow-group");
private static final String THREAD_POOL_NAME_PREFIX = "spider-flow-";
public SpiderFlowThreadPoolExecutor(int maxThreads) {
super();
this.maxThreads = maxThreads;
this.executor = new ThreadPoolExecutor(maxThreads, maxThreads, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(SPIDER_FLOW_THREAD_GROUP, runnable, THREAD_POOL_NAME_PREFIX + poolNumber.getAndIncrement());
}
});
}
public SubThreadPoolExecutor createSubThreadPoolExecutor(int threads){
return new SubThreadPoolExecutor(Math.min(maxThreads, threads));
}
public class SubThreadPoolExecutor{
private int threads;
private Future<?>[] futures;
private AtomicInteger executing = new AtomicInteger(0);
public SubThreadPoolExecutor(int threads) {
super();
this.threads = threads;
this.futures = new Future[threads];
}
/**
* 等待所有线程执行完毕
*/
public void awaitTermination(){
while(executing.get() > 0){
removeDoneFuture();
}
}
private int index(){
for (int i = 0; i < threads; i++) {
if(futures[i] == null || futures[i].isDone()){
return i;
}
}
return -1;
}
/**
* 清除已完成的任务
*/
private void removeDoneFuture(){
for (int i = 0; i < threads; i++) {
try {
if(futures[i] != null && futures[i].get(10,TimeUnit.MILLISECONDS) == null){
futures[i] = null;
}
} catch (Throwable t) {
//忽略异常
}
}
}
private void await(){
while(index() == -1){
removeDoneFuture();
}
}
public void submit(Runnable runnable){
//正在执行的线程数+1
executing.incrementAndGet();
Runnable task = ()->{
try{
//执行任务
runnable.run();
} finally{
//正在执行的线程数-1
executing.decrementAndGet();
}
};
//如果没有空闲线程且在线程池中提交则直接运行
if(index() == -1 && Thread.currentThread().getThreadGroup() == SPIDER_FLOW_THREAD_GROUP){
task.run();
return;
}
//等待有空闲线程时在提交
await();
futures[index()] = executor.submit(task);
}
}
}

View File

@ -7,10 +7,10 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
import org.spiderflow.model.SpiderLog;
import org.spiderflow.model.SpiderNode;
import org.spiderflow.model.SpiderOutput;
import org.spiderflow.utils.ThreadPool;
/**
* 爬虫上下文集合
@ -29,7 +29,7 @@ public class SpiderContext extends HashMap<String, Object>{
*/
private List<SpiderOutput> outputs = new ArrayList<>();
private ThreadPool threadPool;
private SubThreadPoolExecutor threadPool;
private SpiderNode rootNode;
@ -57,11 +57,11 @@ public class SpiderContext extends HashMap<String, Object>{
}
public ThreadPool getThreadPool() {
public SubThreadPoolExecutor getThreadPool() {
return threadPool;
}
public void setThreadPool(ThreadPool threadPool) {
public void setThreadPool(SubThreadPoolExecutor threadPool) {
this.threadPool = threadPool;
}

View File

@ -1,115 +0,0 @@
package org.spiderflow.utils;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池
* @author Administrator
*
*/
public class ThreadPool implements Runnable{
private ThreadPoolExecutor executor;
private LinkedBlockingQueue<ExecutingRunnable> threadQueue;
private LinkedBlockingQueue<ExecutingRunnable> executingQueue;
private boolean running = true;
private ThreadPool(int nThreads){
executingQueue = new LinkedBlockingQueue<>();
threadQueue = new LinkedBlockingQueue<>(nThreads << 3);
//实际线程数=提交任务线程+线程数
this.executor = new ThreadPoolExecutor(nThreads + 1, nThreads + 1, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
this.executor.submit(this);
}
public static ThreadPool create(int nThreads){
return new ThreadPool(nThreads);
}
public void submit(Runnable runnable){
try {
/**
* 提交任务时重新建立ExecutingRunnable对象放入到线程队列执行队列中
*/
ExecutingRunnable executingRunnable = new ExecutingRunnable(runnable);
executingQueue.put(executingRunnable);
threadQueue.put(executingRunnable);
} catch (InterruptedException e) {
}
}
public void shutdown(){
ExecutingRunnable runnable;
try {
/**
* 循环从执行队列中取正在执行的线程当线程状态不在执行中时则从队列中移除直到队列为空
*/
while((runnable= executingQueue.peek()) != null){
if(!runnable.isExecuting()){
executingQueue.remove();
}
}
} catch (NoSuchElementException e) {
}
running = false;
this.executor.shutdown();
while(!this.executor.isTerminated()){
try {
this.executor.awaitTermination(50, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
}
@Override
public void run() {
ExecutingRunnable runnable;
try {
/**
* 循环从线程队列中取要执行的任务提交到线程池中
* 当调用shutdown之后且执行队列为空则退出当前线程
*/
while(running){
if((runnable = threadQueue.poll(100, TimeUnit.MILLISECONDS)) != null && running){
executor.submit(runnable);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
class ExecutingRunnable implements Runnable{
private boolean executing = true;
private Runnable runnable;
public ExecutingRunnable(Runnable runnable) {
super();
this.runnable = runnable;
}
public boolean isExecuting() {
return executing;
}
@Override
public void run() {
try {
runnable.run();
} finally{
//当线程执行完毕时设置当前线程状态
this.executing = false;
}
}
}
}

View File

@ -12,6 +12,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.spiderflow.ExpressionEngine;
import org.spiderflow.ExpressionHolder;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor;
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
import org.spiderflow.context.SpiderContext;
import org.spiderflow.core.model.SpiderFlow;
import org.spiderflow.core.utils.SpiderFlowUtils;
@ -20,8 +22,8 @@ import org.spiderflow.listener.SpiderListener;
import org.spiderflow.model.SpiderNode;
import org.spiderflow.model.SpiderOutput;
import org.spiderflow.utils.Maps;
import org.spiderflow.utils.ThreadPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
@ -47,11 +49,20 @@ public class Spider {
@Autowired(required = false)
private List<SpiderListener> listeners;
@Value("${spider.thread.max:64}")
private Integer totalThreads;
@Value("${spider.thread.default:8}")
private Integer defaultThreads;
private static Map<String, ShapeExecutor> executorMap = new HashMap<String, ShapeExecutor>();
private static SpiderFlowThreadPoolExecutor executor;
@PostConstruct
private void init() {
executorMap = executors.stream().collect(Collectors.toMap(ShapeExecutor::supportShape, v -> v));
executor = new SpiderFlowThreadPoolExecutor(totalThreads);
}
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context,Map<String, Object> variables) {
@ -75,8 +86,8 @@ public class Spider {
}
private void executeRoot(SpiderNode root, SpiderContext context, Map<String, Object> variables) {
int nThreads = NumberUtils.toInt(root.getStringJsonValue(ShapeExecutor.THREAD_COUNT), 8);
ThreadPool pool = ThreadPool.create(nThreads);
int nThreads = NumberUtils.toInt(root.getStringJsonValue(ShapeExecutor.THREAD_COUNT), defaultThreads);
SubThreadPoolExecutor pool = executor.createSubThreadPoolExecutor(nThreads);
context.setRootNode(root);
context.setThreadPool(pool);
if (listeners != null) {
@ -84,7 +95,7 @@ public class Spider {
}
try {
executeNode(pool, null, root, context, variables);
pool.shutdown();
pool.awaitTermination();
} finally {
if (listeners != null) {
listeners.forEach(listener -> listener.afterEnd(context));
@ -94,13 +105,13 @@ public class Spider {
}
public void execute(int nThreads, SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
ThreadPool pool = ThreadPool.create(nThreads);
SubThreadPoolExecutor pool = executor.createSubThreadPoolExecutor(nThreads);
context.setThreadPool(pool);
executeNode(pool, fromNode, node, context, variables);
pool.shutdown();
pool.awaitTermination();
}
private void executeaNextNodes(ThreadPool pool, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
private void executeaNextNodes(SubThreadPoolExecutor pool, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
List<SpiderNode> nextNodes = node.getNextNodes();
if (nextNodes != null) {
for (SpiderNode nextNode : nextNodes) {
@ -109,7 +120,7 @@ public class Spider {
}
}
public void executeNode(ThreadPool pool, SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
public void executeNode(SubThreadPoolExecutor pool, SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
String shape = node.getStringJsonValue("shape");
if (StringUtils.isBlank(shape)) {
executeaNextNodes(pool, node, context, variables);

View File

@ -2,7 +2,10 @@ server.port=8088
logging.level.root=INFO
#logging.level.org.spiderflow=DEBUG
#平台最大线程数
spider.thread.max=64
#单任务默认最大线程数
spider.thread.default=8
#设置为true时定时任务才生效
spider.job.enable=false
#爬虫任务的执行日志保存路径
@ -16,3 +19,5 @@ spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=123456789
spring.datasource.url=jdbc:mysql://localhost:3306/spiderflow?useSSL=false&useUnicode=true&characterEncoding=UTF8&autoReconnect=true
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration