自动去重(根据URL),暂不支持参数 close #I193U2

This commit is contained in:
mxd 2020-04-11 11:07:07 +08:00
parent 54cdef911a
commit 2c78a1809b
4 changed files with 84 additions and 6 deletions

View File

@ -54,7 +54,7 @@ CREATE TABLE `sp_task` (
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;
/* v0.4.0 新增*/ /* v0.4.0 新增 */
DROP TABLE IF EXISTS `sp_function`; DROP TABLE IF EXISTS `sp_function`;
CREATE TABLE `sp_function` ( CREATE TABLE `sp_function` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
@ -65,6 +65,7 @@ CREATE TABLE `sp_function` (
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
/* v0.5.0 新增 */
DROP TABLE IF EXISTS `sp_flow_notice`; DROP TABLE IF EXISTS `sp_flow_notice`;
CREATE TABLE `sp_flow_notice` ( CREATE TABLE `sp_flow_notice` (
`id` varchar(32) NOT NULL, `id` varchar(32) NOT NULL,

View File

@ -40,6 +40,11 @@
<groupId>org.spiderflow</groupId> <groupId>org.spiderflow</groupId>
<artifactId>spider-flow-api</artifactId> <artifactId>spider-flow-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId> <artifactId>commons-csv</artifactId>

View File

@ -1,5 +1,8 @@
package org.spiderflow.core.executor.shape; package org.spiderflow.core.executor.shape;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Funnels;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
@ -9,21 +12,21 @@ import org.slf4j.LoggerFactory;
import org.spiderflow.Grammerable; import org.spiderflow.Grammerable;
import org.spiderflow.context.CookieContext; import org.spiderflow.context.CookieContext;
import org.spiderflow.context.SpiderContext; import org.spiderflow.context.SpiderContext;
import org.spiderflow.core.executor.function.MD5FunctionExecutor;
import org.spiderflow.core.io.HttpRequest; import org.spiderflow.core.io.HttpRequest;
import org.spiderflow.core.io.HttpResponse; import org.spiderflow.core.io.HttpResponse;
import org.spiderflow.core.utils.ExpressionUtils; import org.spiderflow.core.utils.ExpressionUtils;
import org.spiderflow.executor.ShapeExecutor; import org.spiderflow.executor.ShapeExecutor;
import org.spiderflow.io.SpiderResponse; import org.spiderflow.io.SpiderResponse;
import org.spiderflow.listener.SpiderListener;
import org.spiderflow.model.Grammer; import org.spiderflow.model.Grammer;
import org.spiderflow.model.SpiderNode; import org.spiderflow.model.SpiderNode;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream; import java.io.*;
import java.io.File; import java.nio.charset.Charset;
import java.io.IOException;
import java.io.InputStream;
import java.util.*; import java.util.*;
/** /**
@ -32,7 +35,7 @@ import java.util.*;
* *
*/ */
@Component @Component
public class RequestExecutor implements ShapeExecutor,Grammerable{ public class RequestExecutor implements ShapeExecutor,Grammerable, SpiderListener {
public static final String SLEEP = "sleep"; public static final String SLEEP = "sleep";
@ -84,9 +87,19 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
public static final String COOKIE_AUTO_SET = "cookie-auto-set"; public static final String COOKIE_AUTO_SET = "cookie-auto-set";
public static final String REPEAT_ENABLE = "repeat-enable";
public static final String BLOOM_FILTER_KEY = "_bloomfilter";
@Value("${spider.workspace}") @Value("${spider.workspace}")
private String workspcace; private String workspcace;
@Value("${spider.bloomfilter.capacity:5000000}")
private Integer capacity;
@Value("${spider.bloomfilter.error-rate:0.00001}")
private Double errorRate;
private static final Logger logger = LoggerFactory.getLogger(RequestExecutor.class); private static final Logger logger = LoggerFactory.getLogger(RequestExecutor.class);
@Override @Override
@ -127,6 +140,7 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
logger.error("设置延迟时间失败", t); logger.error("设置延迟时间失败", t);
} }
} }
BloomFilter<String> bloomFilter = null;
//重试次数 //重试次数
int retryCount = NumberUtils.toInt(node.getStringJsonValue(RETRY_COUNT), 0) + 1; int retryCount = NumberUtils.toInt(node.getStringJsonValue(RETRY_COUNT), 0) + 1;
//重试间隔时间单位毫秒 //重试间隔时间单位毫秒
@ -142,6 +156,15 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
logger.error("设置请求url出错异常信息", e); logger.error("设置请求url出错异常信息", e);
ExceptionUtils.wrapAndThrow(e); ExceptionUtils.wrapAndThrow(e);
} }
if("1".equalsIgnoreCase(node.getStringJsonValue(REPEAT_ENABLE,"0"))){
bloomFilter = createBloomFilter(context);
synchronized (bloomFilter){
if(bloomFilter.mightContain(MD5FunctionExecutor.string(url))){
logger.info("过滤重复URL:{}",url);
return;
}
}
}
context.pause(node.getNodeId(),"common",URL,url); context.pause(node.getNodeId(),"common",URL,url);
logger.info("设置请求url:{}", url); logger.info("设置请求url:{}", url);
request.url(url); request.url(url);
@ -236,6 +259,11 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
HttpResponse response = request.execute(); HttpResponse response = request.execute();
successed = response.getStatusCode() == 200; successed = response.getStatusCode() == 200;
if(successed){ if(successed){
if(bloomFilter != null){
synchronized (bloomFilter){
bloomFilter.put(MD5FunctionExecutor.string(url));
}
}
String charset = node.getStringJsonValue(RESPONSE_CHARSET); String charset = node.getStringJsonValue(RESPONSE_CHARSET);
if(StringUtils.isNotBlank(charset)){ if(StringUtils.isNotBlank(charset)){
response.setCharset(charset); response.setCharset(charset);
@ -407,4 +435,47 @@ public class RequestExecutor implements ShapeExecutor,Grammerable{
grammers.add(grammer); grammers.add(grammer);
return grammers; return grammers;
} }
@Override
public void beforeStart(SpiderContext context) {
}
private BloomFilter<String> createBloomFilter(SpiderContext context){
BloomFilter<String> filter = context.get(BLOOM_FILTER_KEY);
if(filter == null){
Funnel<CharSequence> funnel = Funnels.stringFunnel(Charset.forName("UTF-8"));
String fileName = context.getFlowId() + File.separator + "url.bf";
File file = new File(workspcace,fileName);
if(file.exists()){
try(FileInputStream fis = new FileInputStream(file)){
filter = BloomFilter.readFrom(fis,funnel);
} catch (IOException e) {
logger.error("读取布隆过滤器出错",e);
}
}else{
filter = BloomFilter.create(funnel,capacity,errorRate);
}
context.put(BLOOM_FILTER_KEY,filter);
}
return filter;
}
@Override
public void afterEnd(SpiderContext context) {
BloomFilter<String> filter = context.get(BLOOM_FILTER_KEY);
if(filter != null){
File file = new File(workspcace,context.getFlowId() + File.separator + "url.bf");
if(!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
try(FileOutputStream fos = new FileOutputStream(file)){
filter.writeTo(fos);
fos.flush();
}catch(IOException e){
logger.error("保存布隆过滤器出错",e);
}
}
}
} }

View File

@ -116,6 +116,7 @@
<input type="checkbox" title="跟随重定向" value="follow-redirect" lay-skin="primary" {{d.data.object['follow-redirect'] == '0' ? '' : 'checked'}}/> <input type="checkbox" title="跟随重定向" value="follow-redirect" lay-skin="primary" {{d.data.object['follow-redirect'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="TLS证书验证" value="tls-validate" lay-skin="primary" {{d.data.object['tls-validate'] == '0' ? '' : 'checked'}}/> <input type="checkbox" title="TLS证书验证" value="tls-validate" lay-skin="primary" {{d.data.object['tls-validate'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="自动管理Cookie" value="cookie-auto-set" lay-skin="primary" {{d.data.object['cookie-auto-set'] == '0' ? '' : 'checked'}}/> <input type="checkbox" title="自动管理Cookie" value="cookie-auto-set" lay-skin="primary" {{d.data.object['cookie-auto-set'] == '0' ? '' : 'checked'}}/>
<input type="checkbox" title="自动去重" value="repeat-enable" lay-skin="primary" {{d.data.object['repeat-enable'] == '1' ? 'checked' : ''}}/>
</div> </div>
</div> </div>
</div> </div>