From 7761eee3ee83ccc326d1dec29488ade16275b0b8 Mon Sep 17 00:00:00 2001 From: mxd <838425805@qq.com> Date: Thu, 9 Apr 2020 09:02:40 +0800 Subject: [PATCH] =?UTF-8?q?rest=E6=9C=8D=E5=8A=A1=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SpiderFlowThreadPoolExecutor.java | 4 ++ .../main/java/org/spiderflow/core/Spider.java | 6 +- .../core/expression/parsing/Tokenizer.java | 1 - .../org/spiderflow/core/job/SpiderJob.java | 14 ++-- .../spiderflow/core/job/SpiderJobManager.java | 5 +- .../controller/SpiderRestController.java | 70 ++++++++++++++++++- 6 files changed, 88 insertions(+), 12 deletions(-) diff --git a/spider-flow-api/src/main/java/org/spiderflow/concurrent/SpiderFlowThreadPoolExecutor.java b/spider-flow-api/src/main/java/org/spiderflow/concurrent/SpiderFlowThreadPoolExecutor.java index 1dcb7d9..8b75e88 100644 --- a/spider-flow-api/src/main/java/org/spiderflow/concurrent/SpiderFlowThreadPoolExecutor.java +++ b/spider-flow-api/src/main/java/org/spiderflow/concurrent/SpiderFlowThreadPoolExecutor.java @@ -42,6 +42,10 @@ public class SpiderFlowThreadPoolExecutor { }); } + public Future submit(Runnable runnable){ + return this.executor.submit(runnable); + } + /** * 创建子线程池 diff --git a/spider-flow-core/src/main/java/org/spiderflow/core/Spider.java b/spider-flow-core/src/main/java/org/spiderflow/core/Spider.java index 3b3e008..c20a17d 100644 --- a/spider-flow-core/src/main/java/org/spiderflow/core/Spider.java +++ b/spider-flow-core/src/main/java/org/spiderflow/core/Spider.java @@ -54,7 +54,7 @@ public class Spider { @Autowired private FlowNoticeService flowNoticeService; - private static SpiderFlowThreadPoolExecutor threadPoolExecutor; + public static SpiderFlowThreadPoolExecutor executorInstance; private static final String ATOMIC_DEAD_CYCLE = "__atomic_dead_cycle"; @@ -62,7 +62,7 @@ public class Spider { @PostConstruct private void init() { - threadPoolExecutor = new SpiderFlowThreadPoolExecutor(totalThreads); + executorInstance = new SpiderFlowThreadPoolExecutor(totalThreads); } public List run(SpiderFlow spiderFlow, SpiderContext context, Map variables) { @@ -120,7 +120,7 @@ public class Spider { submitStrategy = new RandomThreadSubmitStrategy(); } //创建子线程池,采用一父多子的线程池,子线程数不能超过总线程数(超过时进入队列等待),+1是因为会占用一个线程用来调度执行下一级 - SubThreadPoolExecutor pool = threadPoolExecutor.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy); + SubThreadPoolExecutor pool = executorInstance.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy); context.setRootNode(root); context.setThreadPool(pool); //触发监听器 diff --git a/spider-flow-core/src/main/java/org/spiderflow/core/expression/parsing/Tokenizer.java b/spider-flow-core/src/main/java/org/spiderflow/core/expression/parsing/Tokenizer.java index 6b222f5..244901c 100644 --- a/spider-flow-core/src/main/java/org/spiderflow/core/expression/parsing/Tokenizer.java +++ b/spider-flow-core/src/main/java/org/spiderflow/core/expression/parsing/Tokenizer.java @@ -21,7 +21,6 @@ public class Tokenizer { stream.startSpan(); RuntimeException re = null; - // TODO: this will fall on its face if we have something like {{ "}}" }}. while (stream.hasMore()) { if (stream.match("${", false)) { if (!stream.isSpanEmpty()) tokens.add(new Token(TokenType.TextBlock, stream.endSpan())); diff --git a/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJob.java b/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJob.java index 352a043..2792fa5 100644 --- a/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJob.java +++ b/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJob.java @@ -55,7 +55,9 @@ public class SpiderJob extends QuartzJobBean { } JobDataMap dataMap = context.getMergedJobDataMap(); SpiderFlow spiderFlow = (SpiderFlow) dataMap.get(SpiderJobManager.JOB_PARAM_NAME); - run(spiderFlow, context.getNextFireTime()); + if("1".equalsIgnoreCase(spiderFlow.getEnabled())){ + run(spiderFlow, context.getNextFireTime()); + } } public void run(String id) { @@ -63,13 +65,17 @@ public class SpiderJob extends QuartzJobBean { } public void run(SpiderFlow spiderFlow, Date nextExecuteTime) { - SpiderJobContext context = null; - Date now = new Date(); Task task = new Task(); task.setFlowId(spiderFlow.getId()); task.setBeginTime(new Date()); + taskService.save(task); + run(spiderFlow,task,nextExecuteTime); + } + + public void run(SpiderFlow spiderFlow, Task task,Date nextExecuteTime) { + SpiderJobContext context = null; + Date now = new Date(); try { - taskService.save(task); context = SpiderJobContext.create(this.workspace, spiderFlow.getId(),task.getId(),false); SpiderContextHolder.set(context); contextMap.put(task.getId(), context); diff --git a/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJobManager.java b/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJobManager.java index 2cc98c9..745fa27 100644 --- a/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJobManager.java +++ b/spider-flow-core/src/main/java/org/spiderflow/core/job/SpiderJobManager.java @@ -11,6 +11,7 @@ import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.spiderflow.core.Spider; import org.spiderflow.core.model.SpiderFlow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -70,9 +71,9 @@ public class SpiderJobManager { } public void run(String id){ - new Thread(()->{ + Spider.executorInstance.submit(()->{ spiderJob.run(id); - }).start(); + }); } public boolean remove(String id){ diff --git a/spider-flow-web/src/main/java/org/spiderflow/controller/SpiderRestController.java b/spider-flow-web/src/main/java/org/spiderflow/controller/SpiderRestController.java index cb9f77a..894f54c 100644 --- a/spider-flow-web/src/main/java/org/spiderflow/controller/SpiderRestController.java +++ b/spider-flow-web/src/main/java/org/spiderflow/controller/SpiderRestController.java @@ -2,10 +2,14 @@ package org.spiderflow.controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.spiderflow.context.SpiderContext; import org.spiderflow.core.Spider; +import org.spiderflow.core.job.SpiderJob; import org.spiderflow.core.job.SpiderJobContext; import org.spiderflow.core.model.SpiderFlow; +import org.spiderflow.core.model.Task; import org.spiderflow.core.service.SpiderFlowService; +import org.spiderflow.core.service.TaskService; import org.spiderflow.model.JsonBean; import org.spiderflow.model.SpiderOutput; import org.springframework.beans.factory.annotation.Autowired; @@ -15,6 +19,7 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.Date; import java.util.List; import java.util.Map; @@ -32,14 +37,75 @@ public class SpiderRestController { @Value("${spider.workspace}") private String workspace; - + + @Autowired + private SpiderJob spiderJob; + + @Autowired + private TaskService taskService; + + /** + * 异步运行 + * @param id + * @return + */ + @RequestMapping("/runAsync/{id}") + public JsonBean runAsync(@PathVariable("id")String id){ + SpiderFlow flow = spiderFlowService.getById(id); + if(flow == null){ + return new JsonBean<>(0, "找不到此爬虫信息"); + } + Task task = new Task(); + task.setFlowId(flow.getId()); + task.setBeginTime(new Date()); + taskService.save(task); + Spider.executorInstance.submit(()->{ + spiderJob.run(flow,task,null); + }); + return new JsonBean<>(task.getId()); + } + + /** + * 停止运行任务 + * @param taskId + */ + @RequestMapping("/stop/{taskId}") + public JsonBean stop(@PathVariable("taskId")Integer taskId){ + SpiderContext context = SpiderJob.getSpiderContext(taskId); + if(context == null){ + return new JsonBean<>(0,"任务不存在!"); + } + context.setRunning(false); + return new JsonBean<>(1,"停止成功!"); + + } + + /** + * 查询任务状态 + * @param taskId + */ + @RequestMapping("/status/{taskId}") + public JsonBean status(@PathVariable("taskId")Integer taskId){ + SpiderContext context = SpiderJob.getSpiderContext(taskId); + if(context == null){ + return new JsonBean<>(0); // + } + return new JsonBean<>(1); //正在运行中 + } + + /** + * 同步运行 + * @param id + * @param params + * @return + */ @RequestMapping("/run/{id}") public JsonBean> run(@PathVariable("id")String id,@RequestBody(required = false)Map params){ SpiderFlow flow = spiderFlowService.getById(id); if(flow == null){ return new JsonBean<>(0, "找不到此爬虫信息"); } - List outputs = null; + List outputs; Integer maxId = spiderFlowService.getFlowMaxTaskId(id); SpiderJobContext context = SpiderJobContext.create(workspace, id,maxId,true); try{