Browse Source

备份-判重-新增判断过滤

apple 4 years ago
parent
commit
eaa6df317d

+ 9 - 3
udpfilterdup/src/datamap.go

@@ -312,7 +312,13 @@ L:
 						reasons = reason
 						break L
 					}
-					if info.href != "" && info.href != v.href { //待优化
+					//相同发布时间-标题无包含关系
+					if isTheSameDay(info.publishtime,v.publishtime) &&
+						!(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+						continue
+					}
+					//不同href
+					if info.href != "" && info.href != v.href {
 						if v.title==info.title{
 							if !againRepeat(v, info) {//进行同站点二次判断
 								reason = "同站点-href不同-标题相同等"
@@ -324,13 +330,13 @@ L:
 								continue
 							}
 						}else {
-							if againRepeat(v, info) {//进行同站点二次判断
+							if againRepeat(v, info) {
 								continue
 							}
 						}
 					}
 				}
-
+				//特殊词处理
 				specialNum:= dealWithSpecialWordNumber(info,v)
 				//前置条件 - 标题相关,有且一个关键词
 				if specialNum==1 {

+ 4 - 8
udpfilterdup/src/main.go

@@ -59,8 +59,7 @@ var (
 
 
 func init() {
-
-	return
+	
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
@@ -140,7 +139,7 @@ func init() {
 }
 
 
-func mainT() {
+func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -174,12 +173,8 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
-
-
-	exportFusionRecordExcleData()
+func mainT() {
 
-	return
 
 	if TimingTask {
 		go historyTaskDay()
@@ -329,6 +324,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 	log.Println("当前数据池的数量:",DM.currentTotalCount())
 
+	//睡眠时间 增+
 	time.Sleep(30 * time.Second)
 
 	//更新Ocr的标记

+ 19 - 18
udpfusion/src/fusionFullData.go

@@ -85,19 +85,20 @@ func startFusionData()  {
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record  批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时
 					saveRecordData["_id"] = saveid
-					UpdateRecord.add_pool <- saveRecordData
+					UpdateRecord.add_pool <- saveRecordData //批量新增
 
+					//批量更新Es -
+					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
+					//UpdateElastic.update_pool <- map[string]string{
+					//	"id":sourceid,
+					//	"updateStr":updateStr1+updateStr2,
+					//}
 
-					//批量更新Es -问题耗时
-					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
-					UpdateElastic.update_pool <- map[string]string{
-						"id":sourceid,
-						"updateStr":updateStr1+updateStr2,
-					}
 
+					UpdateRecord.add_pool <- saveRecordData
 				}else {
 					saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
@@ -106,14 +107,14 @@ func startFusionData()  {
 					UpdateRecord.add_pool <- saveRecordData //批量新增
 
 					//批量更新Es -
-					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
-					UpdateElastic.update_pool <- map[string]string{
-						"id":sourceid,
-						"updateStr":updateStr1+updateStr2,
-					}
+					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
+					//UpdateElastic.update_pool <- map[string]string{
+					//	"id":sourceid,
+					//	"updateStr":updateStr1+updateStr2,
+					//}
 
 				}
 			}(sourceid, fusionArr)

+ 18 - 0
udpfusion/src/updateRecord.go

@@ -52,6 +52,24 @@ func (update *updateRecordInfo) addRecordData() {
 					}()
 					//批量新增
 					mgo.SaveBulk(record_coll_name, dataArr...)
+
+					/*
+
+
+					*/
+
+					//批量更新Es -问题耗时
+					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
+					//UpdateElastic.update_pool <- map[string]string{
+					//	"id":sourceid,
+					//	"updateStr":updateStr1+updateStr2,
+					//}
+
+
+
 				}(tmpArr)
 				tmpArr = make([]map[string]interface{}, update.saveSize)
 				tmpIndex = 0