修复多个循环的聚合变量BUG

This commit is contained in:
mxd 2019-10-27 20:56:54 +08:00
parent 42b1b7e769
commit 89c0bf0416
3 changed files with 27 additions and 19 deletions

View File

@ -165,6 +165,7 @@ public class Spider {
String loopVariableName = node.getStringJsonValue(ShapeExecutor.LOOP_VARIABLE_NAME); String loopVariableName = node.getStringJsonValue(ShapeExecutor.LOOP_VARIABLE_NAME);
if(executor instanceof LoopExecutor){ if(executor instanceof LoopExecutor){
variables.put(LoopExecutor.BEFORE_LOOP_VARIABLE, variables);
variables.put(LoopExecutor.LOOP_NODE_KEY + node.getNodeId(), new CountDownLatch(loopCount)); variables.put(LoopExecutor.LOOP_NODE_KEY + node.getNodeId(), new CountDownLatch(loopCount));
variables.put(LoopJoinExecutor.VARIABLE_CONTEXT + node.getNodeId(), new LinkedBlockingQueue<>(loopCount)); variables.put(LoopJoinExecutor.VARIABLE_CONTEXT + node.getNodeId(), new LinkedBlockingQueue<>(loopCount));
} }

View File

@ -17,6 +17,8 @@ public class LoopExecutor implements ShapeExecutor{
public static final String LOOP_NODE_KEY = "__loop_node_"; public static final String LOOP_NODE_KEY = "__loop_node_";
public static final String BEFORE_LOOP_VARIABLE = "__loop_before_variable";
@Override @Override
public void execute(SpiderNode node, SpiderContext context, Map<String,Object> variables) { public void execute(SpiderNode node, SpiderContext context, Map<String,Object> variables) {
} }

View File

@ -40,26 +40,31 @@ public class LoopJoinExecutor implements ShapeExecutor {
variableCollection.add(variables); variableCollection.add(variables);
CountDownLatch countDownLatch = (CountDownLatch) variables.get(LoopExecutor.LOOP_NODE_KEY + joinNodeId); CountDownLatch countDownLatch = (CountDownLatch) variables.get(LoopExecutor.LOOP_NODE_KEY + joinNodeId);
if (countDownLatch != null) { if (countDownLatch != null) {
synchronized (countDownLatch) { countDownLatch.countDown();
countDownLatch.countDown(); boolean isDone = countDownLatch.getCount() == 0L;
boolean isDone = countDownLatch.getCount() == 0L; if(isDone){
if(isDone){ Map<String, Object> beforeLoopVariable = (Map<String, Object>) variables.get(LoopExecutor.BEFORE_LOOP_VARIABLE);
//清空原有 variableCollection.stream()
variableCollection.stream() .flatMap(map -> map.entrySet().stream())
.flatMap(map -> map.entrySet().stream()) .collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList())))
.collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList()))) .forEach((k,v)->{
.forEach((k,v)->{ String key = "@" + k;
String key = "@" + k; if(variables.containsKey(key) == false || k.startsWith("@")){
if(variables.containsKey(key) == false || k.startsWith("@")){ if(key.startsWith("@@")){
if(key.startsWith("@@")){ key = k;
key = k;
}
variables.put(key, v);
} }
}); //清除掉原有变量
} variables.remove(k);
variables.put(key, v);
return isDone; }
});
//与循环前的变量进行合并
variables.putAll(beforeLoopVariable);
//删除掉多余出来的聚合变量
beforeLoopVariable.forEach((k,v)->{
variables.remove("@" + k);
});
return isDone;
} }
} else { } else {
context.error("找不到等待节点:{}" + node.getStringJsonValue(JOIN_NODE_ID)); context.error("找不到等待节点:{}" + node.getStringJsonValue(JOIN_NODE_ID));