代码优化
This commit is contained in:
parent
6f484753f5
commit
0379b02e7b
@ -24,10 +24,6 @@ public class SpiderContext extends HashMap<String, Object>{
|
||||
private String flowId;
|
||||
|
||||
private static final long serialVersionUID = 8379177178417619790L;
|
||||
/**
|
||||
* 爬虫输出参数列表
|
||||
*/
|
||||
private List<SpiderOutput> outputs = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 流程执行线程
|
||||
@ -55,7 +51,7 @@ public class SpiderContext extends HashMap<String, Object>{
|
||||
private CookieContext cookieContext = new CookieContext();
|
||||
|
||||
public List<SpiderOutput> getOutputs() {
|
||||
return outputs;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
public <T> T get(String key){
|
||||
@ -88,9 +84,7 @@ public class SpiderContext extends HashMap<String, Object>{
|
||||
}
|
||||
|
||||
public void addOutput(SpiderOutput output){
|
||||
synchronized (this.outputs){
|
||||
this.outputs.add(output);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public SubThreadPoolExecutor getThreadPool() {
|
||||
|
@ -264,7 +264,7 @@ public class Spider {
|
||||
}
|
||||
LinkedBlockingQueue<Future<?>> futureQueue = context.getFutureQueue();
|
||||
for (SpiderTask task : tasks) {
|
||||
if(executor.isThread()){ //【判断节点是否是异步运行
|
||||
if(executor.isThread()){ //判断节点是否是异步运行
|
||||
//提交任务至线程池中,并将Future添加到队列末尾
|
||||
futureQueue.add(context.getThreadPool().submitAsync(task.runnable, task, node));
|
||||
}else{
|
||||
|
@ -124,7 +124,7 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.error("设置延迟时间失败:{}", t);
|
||||
logger.error("设置延迟时间失败", t);
|
||||
}
|
||||
}
|
||||
//重试次数
|
||||
@ -139,7 +139,7 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
|
||||
try {
|
||||
url = ExpressionUtils.execute(node.getStringJsonValue(URL), variables).toString();
|
||||
} catch (Exception e) {
|
||||
logger.error("设置请求url出错,异常信息:{}", e);
|
||||
logger.error("设置请求url出错,异常信息", e);
|
||||
ExceptionUtils.wrapAndThrow(e);
|
||||
}
|
||||
context.pause(node.getNodeId(),"common",URL,url);
|
||||
@ -204,7 +204,7 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
|
||||
request.data(requestBody);
|
||||
logger.info("设置请求Body:{}", requestBody);
|
||||
} catch (Exception e) {
|
||||
logger.debug("设置请求Body出错:{}", e);
|
||||
logger.debug("设置请求Body出错", e);
|
||||
}
|
||||
}else if("form-data".equals(bodyType)){
|
||||
List<Map<String, String>> formParameters = node.getListJsonValue(PARAMETER_FORM_NAME,PARAMETER_FORM_VALUE,PARAMETER_FORM_TYPE,PARAMETER_FORM_FILENAME);
|
||||
|
@ -70,7 +70,7 @@ public class SpiderJob extends QuartzJobBean {
|
||||
task.setBeginTime(new Date());
|
||||
try {
|
||||
taskService.save(task);
|
||||
context = SpiderJobContext.create(this.workspace, spiderFlow.getId(),task.getId());
|
||||
context = SpiderJobContext.create(this.workspace, spiderFlow.getId(),task.getId(),false);
|
||||
SpiderContextHolder.set(context);
|
||||
contextMap.put(task.getId(), context);
|
||||
logger.info("开始执行任务{}", spiderFlow.getName());
|
||||
|
@ -3,10 +3,13 @@ package org.spiderflow.core.job;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.spiderflow.context.SpiderContext;
|
||||
import org.spiderflow.model.SpiderOutput;
|
||||
|
||||
public class SpiderJobContext extends SpiderContext{
|
||||
|
||||
@ -16,9 +19,14 @@ public class SpiderJobContext extends SpiderContext{
|
||||
|
||||
private OutputStream outputstream;
|
||||
|
||||
public SpiderJobContext(OutputStream outputstream) {
|
||||
private List<SpiderOutput> outputs = new ArrayList<>();
|
||||
|
||||
private boolean output;
|
||||
|
||||
public SpiderJobContext(OutputStream outputstream,boolean output) {
|
||||
super();
|
||||
this.outputstream = outputstream;
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public void close(){
|
||||
@ -28,11 +36,25 @@ public class SpiderJobContext extends SpiderContext{
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOutput(SpiderOutput output) {
|
||||
if(this.output){
|
||||
synchronized (this.outputs){
|
||||
this.outputs.add(output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SpiderOutput> getOutputs() {
|
||||
return outputs;
|
||||
}
|
||||
|
||||
public OutputStream getOutputstream(){
|
||||
return this.outputstream;
|
||||
}
|
||||
|
||||
public static SpiderJobContext create(String directory,String id,Integer taskId){
|
||||
public static SpiderJobContext create(String directory,String id,Integer taskId,boolean output){
|
||||
OutputStream os = null;
|
||||
try {
|
||||
File file = new File(new File(directory),id + File.separator + "logs" + File.separator + taskId + ".log");
|
||||
@ -44,7 +66,7 @@ public class SpiderJobContext extends SpiderContext{
|
||||
} catch (Exception e) {
|
||||
logger.error("创建日志文件出错",e);
|
||||
}
|
||||
SpiderJobContext context = new SpiderJobContext(os);
|
||||
SpiderJobContext context = new SpiderJobContext(os, output);
|
||||
context.setFlowId(id);
|
||||
return context;
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ public class SpiderRestController {
|
||||
}
|
||||
List<SpiderOutput> outputs = null;
|
||||
Integer maxId = spiderFlowService.getFlowMaxTaskId(id);
|
||||
SpiderJobContext context = SpiderJobContext.create(workspace, id,maxId);
|
||||
SpiderJobContext context = SpiderJobContext.create(workspace, id,maxId,true);
|
||||
try{
|
||||
outputs = spider.run(flow,context, params);
|
||||
}catch(Exception e){
|
||||
|
@ -37,7 +37,6 @@ public class SpiderWebSocketContext extends SpiderContext {
|
||||
|
||||
@Override
|
||||
public void addOutput(SpiderOutput output) {
|
||||
super.addOutput(output);
|
||||
this.write(new WebSocketEvent<>("output", output));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user