循环结束后聚合变量xxx->@xxx
This commit is contained in:
parent
9650d53534
commit
42b1b7e769
@ -5,6 +5,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -18,13 +19,13 @@ import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor;
|
||||
import org.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
|
||||
import org.spiderflow.context.SpiderContext;
|
||||
import org.spiderflow.core.executor.shape.LoopExecutor;
|
||||
import org.spiderflow.core.executor.shape.LoopJoinExecutor;
|
||||
import org.spiderflow.core.model.SpiderFlow;
|
||||
import org.spiderflow.core.utils.SpiderFlowUtils;
|
||||
import org.spiderflow.executor.ShapeExecutor;
|
||||
import org.spiderflow.listener.SpiderListener;
|
||||
import org.spiderflow.model.SpiderNode;
|
||||
import org.spiderflow.model.SpiderOutput;
|
||||
import org.spiderflow.utils.Maps;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -161,13 +162,15 @@ public class Spider {
|
||||
}
|
||||
}
|
||||
if (loopCount > 0) {
|
||||
Map<String, Object> nVariables = new HashMap<>(variables);
|
||||
|
||||
String loopVariableName = node.getStringJsonValue(ShapeExecutor.LOOP_VARIABLE_NAME);
|
||||
if(executor instanceof LoopExecutor){
|
||||
nVariables.put(LoopExecutor.LOOP_NODE_KEY + node.getNodeId(), new CountDownLatch(loopCount));
|
||||
variables.put(LoopExecutor.LOOP_NODE_KEY + node.getNodeId(), new CountDownLatch(loopCount));
|
||||
variables.put(LoopJoinExecutor.VARIABLE_CONTEXT + node.getNodeId(), new LinkedBlockingQueue<>(loopCount));
|
||||
}
|
||||
for (int i = 0; i < loopCount; i++) {
|
||||
if (context.isRunning()) {
|
||||
Map<String, Object> nVariables = new HashMap<>(variables);
|
||||
// 存入下标变量
|
||||
if(loopVariableName != null){
|
||||
nVariables.put(loopVariableName, i);
|
||||
|
@ -1,7 +1,9 @@
|
||||
package org.spiderflow.core.executor.shape;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.spiderflow.context.SpiderContext;
|
||||
import org.spiderflow.executor.ShapeExecutor;
|
||||
@ -18,10 +20,11 @@ import org.springframework.stereotype.Component;
|
||||
public class LoopJoinExecutor implements ShapeExecutor {
|
||||
|
||||
private static final String JOIN_NODE_ID = "joinNode";
|
||||
|
||||
|
||||
public static final String VARIABLE_CONTEXT = "__variable_context";
|
||||
|
||||
@Override
|
||||
public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -29,13 +32,35 @@ public class LoopJoinExecutor implements ShapeExecutor {
|
||||
return "loopJoin";
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public boolean allowExecuteNext(SpiderNode node, SpiderContext context, Map<String, Object> variables) {
|
||||
String joinNodeId = node.getStringJsonValue(JOIN_NODE_ID);
|
||||
Collection<Map<String, Object>> variableCollection = (Collection<Map<String, Object>>) variables.get(VARIABLE_CONTEXT + joinNodeId);
|
||||
variableCollection.add(variables);
|
||||
CountDownLatch countDownLatch = (CountDownLatch) variables.get(LoopExecutor.LOOP_NODE_KEY + joinNodeId);
|
||||
if (countDownLatch != null) {
|
||||
countDownLatch.countDown();
|
||||
return countDownLatch.getCount() == 0L;
|
||||
synchronized (countDownLatch) {
|
||||
countDownLatch.countDown();
|
||||
boolean isDone = countDownLatch.getCount() == 0L;
|
||||
if(isDone){
|
||||
//清空原有
|
||||
variableCollection.stream()
|
||||
.flatMap(map -> map.entrySet().stream())
|
||||
.collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList())))
|
||||
.forEach((k,v)->{
|
||||
String key = "@" + k;
|
||||
if(variables.containsKey(key) == false || k.startsWith("@")){
|
||||
if(key.startsWith("@@")){
|
||||
key = k;
|
||||
}
|
||||
variables.put(key, v);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return isDone;
|
||||
}
|
||||
} else {
|
||||
context.error("找不到等待节点:{}" + node.getStringJsonValue(JOIN_NODE_ID));
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ public class CharacterStream {
|
||||
public boolean matchIdentifierStart (boolean consume) {
|
||||
if (index >= end) return false;
|
||||
char c = source.charAt(index);
|
||||
if (Character.isJavaIdentifierStart(c)) {
|
||||
if (Character.isJavaIdentifierStart(c) || c == '@') {
|
||||
if (consume) index++;
|
||||
return true;
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
<div class="layui-tab layui-tab-fixed layui-tab-brief">
|
||||
<ul class="layui-tab-title">
|
||||
<li class="layui-this">配置</li>
|
||||
</ul>
|
||||
<div class="layui-tab-content">
|
||||
<div class="layui-tab-item layui-show">
|
||||
<form class="layui-form editor-form-node">
|
||||
<div class="layui-form-item">
|
||||
<label class="layui-form-label">节点名称</label>
|
||||
<div class="layui-input-block">
|
||||
<input type="text" name="value" placeholder="请输入节点名称" value="{{=d.value}}" autocomplete="off" class="layui-input">
|
||||
</div>
|
||||
</div>
|
||||
<div class="layui-form-item">
|
||||
<label class="layui-form-label">等待节点</label>
|
||||
<div class="layui-input-block">
|
||||
<select name="joinNode">
|
||||
{{# layui.each(d.model.cells,function(i,item){ }}
|
||||
{{# if(item.data&&item.data.object.shape == 'loop'){ }}
|
||||
<option value="{{item.id}}" {{item.id == d.data.object.joinNode ? 'selected' : ''}}>{{item.value}}</option>
|
||||
{{# } }}
|
||||
{{# }) }}
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
Loading…
Reference in New Issue
Block a user