zhengkun vor 3 Jahren
Ursprung
Commit
326b4413ac
13 geänderte Dateien mit 4060 neuen und 0 gelöschten Zeilen
  1. 792 0
      src/README.md
  2. 39 0
      src/config.json
  3. 366 0
      src/dataMethod.go
  4. 483 0
      src/dataMethodHeavy.go
  5. 421 0
      src/dataMethodMerge.go
  6. 616 0
      src/datamap.go
  7. 111 0
      src/fullRepeat.go
  8. 370 0
      src/historyRepeat.go
  9. 166 0
      src/increaseRepeat.go
  10. 246 0
      src/main.go
  11. 328 0
      src/mgo.go
  12. 60 0
      src/udptaskmap.go
  13. 62 0
      src/updateMethod.go

+ 792 - 0
src/README.md

@@ -0,0 +1,792 @@
+
+{
+    "udpport": ":19097",
+    "dupdays": 7,
+    "mongodb": {
+        "addr": "172.17.4.85:27080",
+        "pool": 10,
+        "db": "qfw",
+        "extract": "result_20200715",
+        "extract_back": "result_20200714",
+        "site": {
+            "dbname": "qfw",
+            "coll": "site"
+        }
+    },
+    "task_mongodb": {
+        "task_addrName": "172.17.4.187:27081",
+        "task_dbName": "qfw",
+        "task_collName": "ocr_flie_over",
+        "pool": 5
+        
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "nextNode": [
+        {
+            "addr": "172.17.4.194",
+            "port": 1782,
+            "stype": "project",
+            "memo": "合并项目"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+    "threads": 1,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 5,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}
+
+
+
+
+
+
+mgo = &MongodbSim{
+		MongodbAddr: "172.17.4.187:27083",
+		DbName:      "qfw",
+		Size:        10,
+	}
+mgo.InitPool()
+	return
+	
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
+}
+
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": move_time,
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		mgo.Save("result_20200713", tmp)
+		mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+}
+
+
+
+
+
+{
+    "udpport": ":1785",
+    "dupdays": 7,
+    "mongodb": {
+        "addr": "172.17.4.85:27080",
+        "pool": 10,
+        "db": "qfw",
+        "extract": "result_20200715",
+        "extract_back": "result_20200714",
+        "site": {
+            "dbname": "qfw",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://172.17.145.179:19281/_send/_mail"
+    },
+    "nextNode": [
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+    "threads": 1,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 5,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}
+
+func historyTaskDay() {
+	defer util.Catch()
+
+	for {
+		start:=time.Now().Unix()
+
+		if gtid=="" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
+		}
+		if lteid!="" {
+			//先进行数据迁移
+			log.Println("开启一次迁移任务",gtid,lteid)
+			moveHistoryData(gtid,lteid)
+			gtid = lteid //替换数据
+		}
+
+		//查询表最后一个id
+		task_sess := task_mgo.GetMgoConn()
+		defer task_mgo.DestoryMongoConn(task_sess)
+		q:=map[string]interface{}{
+			"isused":true,
+		}
+		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+			lteid = util.ObjToString(tmp["gtid"])
+			log.Println("查询的最后一个任务Id:",lteid)
+			break
+		}
+
+		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
+		time.Sleep(5 * time.Minute)
+
+		sess := mgo.GetMgoConn()//连接器
+		defer mgo.DestoryMongoConn(sess)
+		//开始判重
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+		log.Println("历史判重查询条件:",q,"时间:", between_time)
+		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
+			}
+			source := util.ObjToMap(tmp["jsondata"])
+			if util.IntAll((*source)["sourcewebsite"]) == 1 {
+				outnum++
+				updatelock.Lock()
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat": 1,
+							"dataging": 0,
+							"repeat_reason": "sourcewebsite为1 重复",
+						},
+					},
+				})
+				if len(updateExtract) >= 200 {
+					log.Println("sourcewebsite,批量更新")
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+				updatelock.Unlock()
+
+
+				tmp = make(map[string]interface{})
+				continue
+			}
+
+			//取-符合-发布时间X年内的数据
+			updatelock.Lock()
+			if util.IntAll(tmp["dataging"]) == 1 {
+				pubtime := util.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time {
+					oknum++
+					if deterTime==0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						if pubtime-deterTime >timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr,dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = util.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr,tmp)
+						}else {
+							dayArr = append(dayArr,tmp)
+						}
+					}
+				}else {
+					outnum++
+					//不在两年内的也清标记
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,
+							},
+						},
+					})
+					if len(updateExtract) >= 200 {
+						log.Println("不在周期内符合dataging==1,批量更新")
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+
+				}
+			}
+
+			updatelock.Unlock()
+
+			tmp = make(map[string]interface{})
+		}
+
+
+		//批量更新标记
+		updatelock.Lock()
+
+		if len(updateExtract) > 0 {
+			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+
+		updatelock.Unlock()
+
+
+		if len(dayArr)>0 {
+			pendAllArr = append(pendAllArr,dayArr)
+			dayArr = []map[string]interface{}{}
+		}
+
+		log.Println("查询数量:",num,"符合条件:",oknum)
+
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
+
+		//测试分组数量是否正确
+		testNum:=0
+		for k,v:=range pendAllArr {
+			log.Println("第",k,"组--","数量:",len(v))
+			testNum = testNum+len(v)
+		}
+		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+		n, repeateN := 0, 0
+		log.Println("线程数:",threadNum)
+		pool := make(chan bool, threadNum)
+		wg := &sync.WaitGroup{}
+		for k,v:=range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int, v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//每组临时数组 -  互不干扰
+				groupUpdateExtract := [][]map[string]interface{}{}
+				//
+				groupOtherExtract := [][]map[string]interface{}{}
+
+				//构建当前组的数据池
+				log.Println("构建第", k, "组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
+					info := NewInfo(tmp)
+					b, source, reason := curTM.check(info)
+					if b { //有重复,生成更新语句,更新抽取和更新招标
+						repeateN++
+						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						updatelock.Lock()
+
+
+						//更新数据源-   14 或者 15
+						//判断是否在当前段落
+						if judgeIsCurIds(gtid,lteid,source.id) {
+							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}else {
+							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":        1,
+									"repeat_reason": reason,
+									"repeat_id":     source.id,
+									"dataging":      0,
+								},
+							},
+						})
+
+						if len(groupUpdateExtract) >= 500 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
+						}
+
+						if len(groupOtherExtract) >= 500 {
+							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							groupOtherExtract = [][]map[string]interface{}{}
+						}
+
+						updatelock.Unlock()
+
+
+					} else {
+						updatelock.Lock()
+
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"dataging": 0, //符合条件的都为dataging==0
+								},
+							},
+						})
+
+						if len(groupUpdateExtract) >= 500 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
+						}
+						updatelock.Unlock()
+					}
+				}
+				//每组数据结束-更新数据
+				updatelock.Lock()
+				if len(groupUpdateExtract) > 0 {
+					mgo.UpSertBulk(extract, groupUpdateExtract...)
+				}
+
+				if len(groupOtherExtract) > 0 {
+					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+				}
+				updatelock.Unlock()
+
+			}(k, v)
+
+		}
+
+		wg.Wait()
+
+
+		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+		if n >= repeateN && gtid!=lteid{
+			for _, to := range nextNode {
+				next_sid := util.BsonIdToSId(gtid)
+				next_eid := util.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+
+		end:=time.Now().Unix()
+
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+		log.Println(gtid,lteid)
+		if end-start<60*5 {
+			log.Println("睡眠.............")
+			time.Sleep(5 * time.Minute)
+		}
+		log.Println("继续下一段的历史判重")
+	}
+}func historyTaskDay() {
+ 	defer util.Catch()
+ 
+ 	for {
+ 		start:=time.Now().Unix()
+ 
+ 		if gtid=="" {
+ 			log.Println("请传gtid,否则无法运行")
+ 			os.Exit(0)
+ 			return
+ 		}
+ 		if lteid!="" {
+ 			//先进行数据迁移
+ 			log.Println("开启一次迁移任务",gtid,lteid)
+ 			moveHistoryData(gtid,lteid)
+ 			gtid = lteid //替换数据
+ 		}
+ 
+ 		//查询表最后一个id
+ 		task_sess := task_mgo.GetMgoConn()
+ 		defer task_mgo.DestoryMongoConn(task_sess)
+ 		q:=map[string]interface{}{
+ 			"isused":true,
+ 		}
+ 		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+ 		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+ 		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+ 			lteid = util.ObjToString(tmp["gtid"])
+ 			log.Println("查询的最后一个任务Id:",lteid)
+ 			break
+ 		}
+ 
+ 		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
+ 		time.Sleep(5 * time.Minute)
+ 
+ 		sess := mgo.GetMgoConn()//连接器
+ 		defer mgo.DestoryMongoConn(sess)
+ 		//开始判重
+ 		q = map[string]interface{}{
+ 			"_id": map[string]interface{}{
+ 				"$gt": StringTOBsonId(gtid),
+ 				"$lte": StringTOBsonId(lteid),
+ 			},
+ 		}
+ 		log.Println("历史判重查询条件:",q,"时间:", between_time)
+ 		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+ 		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+ 		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+ 		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+ 		dayArr := []map[string]interface{}{}
+ 		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+ 			if num%10000 == 0 {
+ 				log.Println("正序遍历:", num)
+ 			}
+ 			source := util.ObjToMap(tmp["jsondata"])
+ 			if util.IntAll((*source)["sourcewebsite"]) == 1 {
+ 				outnum++
+ 				updatelock.Lock()
+ 				updateExtract = append(updateExtract, []map[string]interface{}{
+ 					map[string]interface{}{
+ 						"_id": tmp["_id"],
+ 					},
+ 					map[string]interface{}{
+ 						"$set": map[string]interface{}{
+ 							"repeat": 1,
+ 							"dataging": 0,
+ 							"repeat_reason": "sourcewebsite为1 重复",
+ 						},
+ 					},
+ 				})
+ 				if len(updateExtract) >= 200 {
+ 					log.Println("sourcewebsite,批量更新")
+ 					mgo.UpSertBulk(extract, updateExtract...)
+ 					updateExtract = [][]map[string]interface{}{}
+ 				}
+ 
+ 				updatelock.Unlock()
+ 
+ 
+ 				tmp = make(map[string]interface{})
+ 				continue
+ 			}
+ 
+ 			//取-符合-发布时间X年内的数据
+ 			updatelock.Lock()
+ 			if util.IntAll(tmp["dataging"]) == 1 {
+ 				pubtime := util.Int64All(tmp["publishtime"])
+ 				if pubtime > 0 && pubtime >= between_time {
+ 					oknum++
+ 					if deterTime==0 {
+ 						log.Println("找到第一条符合条件的数据")
+ 						deterTime = util.Int64All(tmp["publishtime"])
+ 						dayArr = append(dayArr,tmp)
+ 					}else {
+ 						if pubtime-deterTime >timingSpanDay*86400 {
+ 							//新数组重新构建,当前组数据加到全部组数据
+ 							pendAllArr = append(pendAllArr,dayArr)
+ 							dayArr = []map[string]interface{}{}
+ 							deterTime = util.Int64All(tmp["publishtime"])
+ 							dayArr = append(dayArr,tmp)
+ 						}else {
+ 							dayArr = append(dayArr,tmp)
+ 						}
+ 					}
+ 				}else {
+ 					outnum++
+ 					//不在两年内的也清标记
+ 					updateExtract = append(updateExtract, []map[string]interface{}{
+ 						map[string]interface{}{
+ 							"_id": tmp["_id"],
+ 						},
+ 						map[string]interface{}{
+ 							"$set": map[string]interface{}{
+ 								"dataging": 0,
+ 							},
+ 						},
+ 					})
+ 					if len(updateExtract) >= 200 {
+ 						log.Println("不在周期内符合dataging==1,批量更新")
+ 						mgo.UpSertBulk(extract, updateExtract...)
+ 						updateExtract = [][]map[string]interface{}{}
+ 					}
+ 
+ 				}
+ 			}
+ 
+ 			updatelock.Unlock()
+ 
+ 			tmp = make(map[string]interface{})
+ 		}
+ 
+ 
+ 		//批量更新标记
+ 		updatelock.Lock()
+ 
+ 		if len(updateExtract) > 0 {
+ 			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
+ 			mgo.UpSertBulk(extract, updateExtract...)
+ 			updateExtract = [][]map[string]interface{}{}
+ 		}
+ 
+ 		updatelock.Unlock()
+ 
+ 
+ 		if len(dayArr)>0 {
+ 			pendAllArr = append(pendAllArr,dayArr)
+ 			dayArr = []map[string]interface{}{}
+ 		}
+ 
+ 		log.Println("查询数量:",num,"符合条件:",oknum)
+ 
+ 		if len(pendAllArr) <= 0 {
+ 			log.Println("没找到dataging==1的数据")
+ 		}
+ 
+ 		//测试分组数量是否正确
+ 		testNum:=0
+ 		for k,v:=range pendAllArr {
+ 			log.Println("第",k,"组--","数量:",len(v))
+ 			testNum = testNum+len(v)
+ 		}
+ 		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+ 
+ 		n, repeateN := 0, 0
+ 		log.Println("线程数:",threadNum)
+ 		pool := make(chan bool, threadNum)
+ 		wg := &sync.WaitGroup{}
+ 		for k,v:=range pendAllArr { //每组结束更新一波数据
+ 			pool <- true
+ 			wg.Add(1)
+ 			go func(k int, v []map[string]interface{}) {
+ 				defer func() {
+ 					<-pool
+ 					wg.Done()
+ 				}()
+ 				//每组临时数组 -  互不干扰
+ 				groupUpdateExtract := [][]map[string]interface{}{}
+ 				//
+ 				groupOtherExtract := [][]map[string]interface{}{}
+ 
+ 				//构建当前组的数据池
+ 				log.Println("构建第", k, "组---(数据池)")
+ 				//当前组的第一个发布时间
+ 				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+ 				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+ 				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+ 				n = n + len(v)
+ 				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+ 				for _, tmp := range v {
+ 					info := NewInfo(tmp)
+ 					b, source, reason := curTM.check(info)
+ 					if b { //有重复,生成更新语句,更新抽取和更新招标
+ 						repeateN++
+ 						//重复数据打标签
+ 						repeat_ids:=source.repeat_ids
+ 						repeat_ids =  append(repeat_ids,info.id)
+ 						source.repeat_ids = repeat_ids
+ 						//替换数据池-更新
+ 						DM.replacePoolData(source)
+ 						updatelock.Lock()
+ 
+ 
+ 						//更新数据源-   14 或者 15
+ 						//判断是否在当前段落
+ 						if judgeIsCurIds(gtid,lteid,source.id) {
+ 							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+ 								map[string]interface{}{
+ 									"_id": StringTOBsonId(source.id),
+ 								},
+ 								map[string]interface{}{
+ 									"$set": map[string]interface{}{
+ 										"repeat_ids": repeat_ids,
+ 									},
+ 								},
+ 							})
+ 						}else {
+ 							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+ 								map[string]interface{}{
+ 									"_id": StringTOBsonId(source.id),
+ 								},
+ 								map[string]interface{}{
+ 									"$set": map[string]interface{}{
+ 										"repeat_ids": repeat_ids,
+ 									},
+ 								},
+ 							})
+ 						}
+ 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+ 							map[string]interface{}{
+ 								"_id": tmp["_id"],
+ 							},
+ 							map[string]interface{}{
+ 								"$set": map[string]interface{}{
+ 									"repeat":        1,
+ 									"repeat_reason": reason,
+ 									"repeat_id":     source.id,
+ 									"dataging":      0,
+ 								},
+ 							},
+ 						})
+ 
+ 						if len(groupUpdateExtract) >= 500 {
+ 							mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 							groupUpdateExtract = [][]map[string]interface{}{}
+ 						}
+ 
+ 						if len(groupOtherExtract) >= 500 {
+ 							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+ 							groupOtherExtract = [][]map[string]interface{}{}
+ 						}
+ 
+ 						updatelock.Unlock()
+ 
+ 
+ 					} else {
+ 						updatelock.Lock()
+ 
+ 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+ 							map[string]interface{}{
+ 								"_id": tmp["_id"],
+ 							},
+ 							map[string]interface{}{
+ 								"$set": map[string]interface{}{
+ 									"dataging": 0, //符合条件的都为dataging==0
+ 								},
+ 							},
+ 						})
+ 
+ 						if len(groupUpdateExtract) >= 500 {
+ 							mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 							groupUpdateExtract = [][]map[string]interface{}{}
+ 						}
+ 						updatelock.Unlock()
+ 					}
+ 				}
+ 				//每组数据结束-更新数据
+ 				updatelock.Lock()
+ 				if len(groupUpdateExtract) > 0 {
+ 					mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 				}
+ 
+ 				if len(groupOtherExtract) > 0 {
+ 					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+ 				}
+ 				updatelock.Unlock()
+ 
+ 			}(k, v)
+ 
+ 		}
+ 
+ 		wg.Wait()
+ 
+ 
+ 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+ 		if n >= repeateN && gtid!=lteid{
+ 			for _, to := range nextNode {
+ 				next_sid := util.BsonIdToSId(gtid)
+ 				next_eid := util.BsonIdToSId(lteid)
+ 				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+ 				by, _ := json.Marshal(map[string]interface{}{
+ 					"gtid":  next_sid,
+ 					"lteid": next_eid,
+ 					"stype": util.ObjToString(to["stype"]),
+ 					"key":   key,
+ 				})
+ 				addr := &net.UDPAddr{
+ 					IP:   net.ParseIP(to["addr"].(string)),
+ 					Port: util.IntAll(to["port"]),
+ 				}
+ 				node := &udpNode{by, addr, time.Now().Unix(), 0}
+ 				udptaskmap.Store(key, node)
+ 				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+ 			}
+ 		}
+ 
+ 		end:=time.Now().Unix()
+ 
+ 		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+ 		log.Println(gtid,lteid)
+ 		if end-start<60*5 {
+ 			log.Println("睡眠.............")
+ 			time.Sleep(5 * time.Minute)
+ 		}
+ 		log.Println("继续下一段的历史判重")
+ 	}
+ }	       					

+ 39 - 0
src/config.json

@@ -0,0 +1,39 @@
+{
+    "udpport": ":17859",
+    "dupdays": 7,
+    "mongodb": {
+        "addr": "127.0.0.1:27017",
+        "pool": 10,
+        "db": "zhengkun",
+        "extract": "repeat_test",
+        "extract_back": "repeat_test",
+        "site": {
+            "dbname": "zhengkun",
+            "coll": "site"
+        }
+    },
+    "task_mongodb": {
+        "task_addrName": "127.0.0.1:27017",
+        "task_dbName": "zhengkun",
+        "task_collName": "repeat_test",
+        "pool": 10
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://172.17.145.179:19281/_send/_mail"
+    },
+    "nextNode": [
+    ],
+    "userName": "",
+    "password": "",
+    "threads": 1,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 4,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}

+ 366 - 0
src/dataMethod.go

@@ -0,0 +1,366 @@
+package main
+
+import (
+	"math"
+	qutil "qfw/util"
+	"regexp"
+	"strings"
+)
+
+
+//完善判重数据检测-前置条件
+func convertArabicNumeralsAndLetters(data string) string {
+	newData :=data
+	res1, _ := regexp.Compile("[a-zA-Z]+");
+	if res1.MatchString(data) {
+		newData = res1.ReplaceAllStringFunc(data, strings.ToUpper);
+	}
+	res2, _ := regexp.Compile("[0-9]+");
+	if res2.MatchString(newData) {
+		arr1:=[]string {"0","1","2","3","4","5","6","7","8","9"}
+		arr2:=[]string {"零","一","二","三","四","五","六","七","八","九"}
+		for i:=0 ;i<len(arr1) ;i++  {
+			resTemp ,_:=regexp.Compile(arr1[i])
+			newData= resTemp.ReplaceAllString(newData, arr2[i]);
+		}
+	}
+	return newData
+}
+
+func dealWithSpecialPhrases(str1 string,str2 string) (string,string) {
+	newStr1:=str1
+	newStr2:=str2
+	res, _ := regexp.Compile("重新招标");
+	if res.MatchString(newStr1) {
+		newStr1 = res.ReplaceAllString(newStr1,"重招");
+	}
+	if res.MatchString(newStr2) {
+		newStr2 = res.ReplaceAllString(newStr2,"重招");
+	}
+	return newStr1,newStr2
+}
+//关键词数量v
+func dealWithSpecialWordNumber(info*Info,v*Info) int {
+	okNum:=0
+	if  info.titleSpecialWord || info.specialWord {
+		okNum++
+	}
+	if  v.titleSpecialWord || v.specialWord {
+		okNum++
+	}
+	return okNum
+}
+
+//关键词再次判断
+func againRepeat(v *Info, info *Info ,site bool) bool {
+	if isPublishtimeInterval(info.publishtime,v.publishtime) && site {
+		return true
+	}
+	if isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+		return true
+	}
+	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+		return true
+	}
+	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+		return true
+	}
+	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+		return true
+	}
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+	if v.title != info.title && v.title != "" && info.title != ""{
+		if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+			return true
+		}
+	}
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+		return true
+	}
+
+	return false
+}
+
+//均含有关键词再次判断
+func againContainSpecialWord (v *Info, info *Info) bool {
+
+	if isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+		return true
+	}
+	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+		return true
+	}
+	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+		return true
+	}
+	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+		return true
+	}
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+	//提取标题-标段号处理
+	if dealTitleSpecial(v.title,info.title) {
+		return true
+	}
+
+	return false
+}
+
+//提取标题-标段号处理
+func dealTitleSpecial(title1 string,title2 string) bool{
+
+	regular1 := "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789]+[))]?"
+	regular2 := "[0-9a-zA-Z一二三四五六七八九十零123456789]+(包|标段|标包)"
+	regx1_1,_ := regexp.Compile(regular1)
+	str1:=regx1_1.FindString(title1)
+	if str1!="" {
+		//log.Println("标题1,规则一提取:",str1)
+	}else {
+		regx1_2,_ := regexp.Compile(regular2)
+		str1=regx1_2.FindString(title1)
+		if str1!="" {
+			//log.Println("标题1,规则二提取:",str1)
+		}
+	}
+
+	regx2_1,_ := regexp.Compile(regular1)
+	str2:=regx2_1.FindString(title2)
+	if str2!="" {
+		//log.Println("标题2,规则一提取:",str2)
+	}else {
+		regx2_2,_ := regexp.Compile(regular2)
+		str2=regx2_2.FindString(title2)
+		if str2!="" {
+			//log.Println("标题2,规则二提取:",str2)
+		}
+	}
+
+	//根据提取的结果,在进行清洗
+	if str1!="" {
+		str1 = deleteExtraSpace(str1)
+		str1= strings.Replace(str1, "(", "", -1)
+		str1= strings.Replace(str1, "(", "", -1)
+		str1= strings.Replace(str1, ")", "", -1)
+		str1= strings.Replace(str1, ")", "", -1)
+		str1 = convertArabicNumeralsAndLetters(str1)
+	}
+
+	if str2!="" {
+		str2 = deleteExtraSpace(str2)
+		str2= strings.Replace(str2, "(", "", -1)
+		str2= strings.Replace(str2, "(", "", -1)
+		str2= strings.Replace(str2, ")", "", -1)
+		str2= strings.Replace(str2, ")", "", -1)
+		str2 = convertArabicNumeralsAndLetters(str2)
+	}
+
+	//log.Println("最终:",str1,str2)
+	if str1!=str2 {
+		//log.Println("不一致")
+		return true
+	}else {
+		//log.Println("一致")
+		return false
+	}
+}
+
+
+//删除中标单位字符串中多余的空格(含tab)
+func deleteExtraSpace(s string) string {
+	//删除字符串中的多余空格,有多个空格时,仅保留一个空格
+	s1 := strings.Replace(s, "  ", " ", -1)      //替换tab为空格
+	regstr := "\\s{2,}"                          //两个及两个以上空格的正则表达式
+	reg, _ := regexp.Compile(regstr)             //编译正则表达式
+	s2 := make([]byte, len(s1))                  //定义字符数组切片
+	copy(s2, s1)                                 //将字符串复制到切片
+	spc_index := reg.FindStringIndex(string(s2)) //在字符串中搜索
+	for len(spc_index) > 0 {                     //找到适配项
+		s2 = append(s2[:spc_index[0]+1], s2[spc_index[1]:]...) //删除多余空格
+		spc_index = reg.FindStringIndex(string(s2))            //继续在字符串中搜索
+	}
+	return string(s2)
+}
+
+//中标金额倍率:10000
+func isBidWinningAmount(f1 float64 ,f2 float64) bool {
+
+	if f1==f2||f1*10000==f2||f2*10000==f1 {
+		return false
+	}
+	return true
+}
+
+
+//时间间隔周期
+func isTimeIntervalPeriod(i1 int64 ,i2 int64) bool {
+
+	if math.Abs(float64(i1-i2)) < 172800.0 {
+		return true
+	}else {
+		return false //大于48小时
+	}
+}
+
+
+//开标时间区间为一天
+func isBidopentimeInterval(i1 int64 ,i2 int64) bool {
+	if i1==0||i2==0 {
+		return false
+	}
+	//不在同一天-或者同一天间隔超过六小时,属于不相等返回true
+	timeOne,timeTwo:=i1,i2
+	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
+	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
+	if day1==day2 {
+		//是否间隔超过十二小时
+		if math.Abs(float64(i1-i2)) >43200.0 {
+			return true
+		}else {
+			return false
+		}
+	}else {
+		return true
+	}
+}
+
+//发布时间区间为一天
+func isPublishtimeInterval(i1 int64 ,i2 int64) bool {
+	if i1==0||i2==0 {
+		return false
+	}
+	//不在同一天-或者同一天间隔超过12小时,属于不相等返回true
+	timeOne,timeTwo:=i1,i2
+	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
+	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
+	if day1==day2 {
+		//是否间隔超过十二小时
+		if math.Abs(float64(i1-i2)) >=43200.0 {
+			return true
+		}else {
+			return false
+		}
+	}else {
+		return true
+	}
+}
+
+//开标时间区间为一天
+func isTheSameDay(i1 int64 ,i2 int64) bool {
+	if i1==0||i2==0 {
+		return false
+	}
+	timeOne,timeTwo:=i1,i2
+	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
+	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
+	if day1==day2 {
+		return true
+	}
+	//if math.Abs(float64(i1-i2)) <=86400.0 {
+	//	return true
+	//}
+	return false
+}
+
+
+
+//前置0 五要素均相等认为重复
+func leadingElementSame(v *Info, info *Info) bool {
+
+	isok:= 0
+	if info.projectname != "" && v.projectname == info.projectname {
+		isok++
+	}
+	if info.buyer != "" && v.buyer == info.buyer {
+		isok++
+	}
+	if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+		if info.contractnumber != "" && v.contractnumber == info.contractnumber {
+			isok++
+		}
+	}else {
+		if info.projectcode != "" && v.projectcode == info.projectcode {
+			isok++
+		}
+	}
+	if info.title != "" && v.title == info.title {
+		isok++
+	}
+	if v.agency == info.agency &&info.agency != "" {
+		isok++
+	}
+	if v.winner == info.winner&&info.winner != "" {
+		isok++
+	}
+
+	if isok>=5 {
+		return true
+	}
+
+
+
+	return false
+}
+
+//buyer的优先级
+func buyerIsContinue(v *Info, info *Info) bool {
+	if !isTheSameDay(info.publishtime,v.publishtime) {
+		return true
+	}
+	if v.title != info.title && v.title != "" && info.title != ""{
+		if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+			return true
+		}
+	}
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+		return true
+	}
+	//if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+	//	return true
+	//}
+	//if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+	//	return true
+	//}
+	//if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+	//	return true
+	//}
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+
+	return false
+}
+
+
+
+//无效数据
+func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
+	var n int
+	if d1 != "" {
+		n++
+	}
+	if d2 != "" {
+		n++
+	}
+	if d3 != "" {
+		n++
+	}
+	if d4 != "" {
+		n++
+	}
+	if n == 0 {
+		return true
+	}
+	return false
+}

+ 483 - 0
src/dataMethodHeavy.go

@@ -0,0 +1,483 @@
+package main
+
+import (
+	"strings"
+)
+
+//判重方法1
+func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet := false
+	if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
+		info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
+		info.subtype == "变更" || info.subtype == "其他" {
+		//招标结果
+		if isMeet, reason = tenderRepeat_A(v, info, reason); isMeet {
+			if tenderRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---招标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+
+	} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
+		//中标结果
+		if isMeet, reason = winningRepeat_A(v, info, reason); isMeet {
+			if winningRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---中标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+
+	} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+		//合同
+		if isMeet, reason = contractRepeat_A(v, info, reason); isMeet {
+			if contractRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---合同类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+	} else {
+		//招标结果
+		if isMeet, reason = tenderRepeat_A(v, info, reason); isMeet {
+			if tenderRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---类别空-招标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+	}
+
+	return false, reason
+}
+
+//判重方法2
+func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
+	isMeet := false
+	isAgency :=false
+	//招标类-代理机构不同-广泛前后缀比较
+	if v.agency != info.agency && v.agency != "" && info.agency != "" {
+		//新增一层判断
+		if strings.Contains(v.agency, info.agency) || strings.Contains(info.agency, v.agency) {
+			isAgency = true
+		}else {
+			return false, reason
+		}
+	}
+
+	if (v.agency == info.agency && v.agency != "" && info.agency != "")|| isAgency {
+		if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
+			info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
+			info.subtype == "变更" || info.subtype == "其他" {
+			//招标结果
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
+				if tenderRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---招标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+
+		} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
+			//中标结果
+			if isMeet, reason = winningRepeat_B(v, info, reason); isMeet {
+				if winningRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---中标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+
+		} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+			//合同
+			if isMeet, reason = contractRepeat_B(v, info, reason); isMeet {
+				if contractRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---合同类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+		} else {
+			//招标结果
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
+				if tenderRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---类别空-招标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+		}
+	}
+
+	//机构最2少一个为空
+	if v.agency == "" || info.agency == "" {
+		var repeat = false
+		if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat {
+			reason = reason + "---机构最少一个空"
+			return true, reason
+		} else {
+			return false, reason
+		}
+	}
+
+	return false, reason
+}
+
+//招标_A
+func tenderRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	var ss string
+	p1, p2, p3, p4, p9, p10, p11 := false, false, false, false, false, false, false
+	if v.projectname != "" && v.projectname == info.projectname {
+		ss = ss + "p1-名称-"
+		p1 = true
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		ss = ss + "p2-单位-"
+		p2 = true
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		ss = ss + "p3-编号组-"
+		p3 = true
+	}
+	if v.budget != 0 && v.budget == info.budget {
+		ss = ss + "p4-预算-"
+		p4 = true
+	}
+	if v.bidopentime != 0 && v.bidopentime == info.bidopentime {
+		ss = ss + "p9-开标时间相同-"
+		p9 = true
+	}
+	if v.bidopenaddress != "" && v.bidopenaddress == info.bidopenaddress {
+		ss = ss + "p10-开标地点-"
+		p10 = true
+	}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		ss = ss + "p11-标题-"
+		p11 = true
+	}
+
+	if info.subtype !=""&&(p1 && p3 && p11)  {
+		reason = reason + "满足招标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+
+	if  (p1 && p2 && p3) || (p1 && p2 && p4) || (p1 && p2 && p9) ||
+		(p1 && p2 && p10) || (p1 && p2 && p11) || (p1 && p3 && p9) || (p1 && p3 && p10) ||
+		(p1 && p4 && p9) || (p1 && p4 && p10) || (p2 && p3 && p4) ||
+		(p2 && p3 && p9) || (p2 && p3 && p10) || (p2 && p3 && p11) ||
+		(p2 && p4 && p9) || (p2 && p4 && p10) || (p2 && p4 && p11) ||
+		(p3 && p4 && p9) || (p3 && p4 && p10) || (p3 && p4 && p11) ||
+		(p4 && p9 && p10) || (p4 && p9 && p11) || (p9 && p10 && p11) {
+		reason = reason + "满足招标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+	return false, reason
+}
+
+//招标_B
+func tenderRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	m, n := 0, 0
+	if v.projectname != "" && v.projectname == info.projectname {
+		m++
+		n++
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		m++
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		m++
+	}
+	if v.budget != 0 && v.budget == info.budget {
+		m++
+	}
+	if v.bidopentime != 0 && v.bidopentime == info.bidopentime {
+		m++
+	}
+	//if v.bidopenaddress != "" && v.bidopenaddress == info.bidopenaddress {
+	//	m++
+	//}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		m++
+		n++
+	}
+	if m >= 2 {
+		if n == 2 && m == 2 {
+			return false, reason
+		} else {
+			reason = reason + "满足招标B,六选二,"
+			return true, reason
+		}
+	}
+	return false, reason
+}
+
+//招标_C
+func tenderRepeat_C(v *Info, info *Info) bool {
+
+	if v.budget != 0 && info.budget != 0 && v.budget != info.budget {
+		return true
+	}
+	if v.bidopentime != 0 && info.bidopentime != 0 && isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+		return true
+	}
+	return false
+}
+
+//中标_A
+func winningRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	var ss string
+	p1, p2, p3, p5, p6, p11 := false, false, false, false, false, false
+	if v.projectname != "" && v.projectname == info.projectname {
+		ss = ss + "p1-项目名称-"
+		p1 = true
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		ss = ss + "p2-单位-"
+		p2 = true
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		ss = ss + "p3-编号组--"
+		p3 = true
+	}
+	if v.bidamount != 0 && !isBidWinningAmount(v.bidamount,info.bidamount) {
+		ss = ss + "p5-中标金-"
+		p5 = true
+	}
+	if v.winner != "" && deleteExtraSpace(v.winner) == deleteExtraSpace(info.winner) {
+		ss = ss + "p6-中标人-"
+		p6 = true
+	}
+
+
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		ss = ss + "p11-标题-"
+		p11 = true
+	}
+
+	if 	(p1 && p2 && p3) || (p1 && p2 && p5) || (p1 && p2 && p6) ||
+		(p1 && p2 && p11)|| (p1 && p3 && p11)||
+		(p1 && p3 && p5) || (p1 && p3 && p6) || (p1 && p5 && p6) ||
+		(p2 && p3 && p5) || (p2 && p3 && p6) || (p2 && p3 && p11) ||
+		(p2 && p5 && p6) || (p2 && p5 && p11) || (p2 && p6 && p11) ||
+		(p3 && p5 && p6) || (p3 && p5 && p11) || (p3 && p6 && p11) ||
+		(p5 && p6 && p11) {
+		reason = reason + "满足中标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+
+	return false, reason
+}
+
+//中标_B
+func winningRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	m, n := 0, 0
+	if v.projectname != "" && v.projectname == info.projectname {
+		m++
+		n++
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		m++
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		m++
+	}
+	if v.bidamount != 0 && !isBidWinningAmount(v.bidamount,info.bidamount) {
+		m++
+	}
+	if v.winner != "" && deleteExtraSpace(v.winner) == deleteExtraSpace(info.winner) {
+		m++
+	}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		m++
+		n++
+	}
+	if m >= 2 {
+		if n == 2 && m == 2 {
+			return false, reason
+		} else {
+			reason = reason + "满足中标B.六选二,"
+			return true, reason
+		}
+	}
+	return false, reason
+}
+
+//中标_C
+func winningRepeat_C(v *Info, info *Info) bool {
+
+	if v.bidamount != 0 && info.bidamount != 0 && isBidWinningAmount(v.bidamount,info.bidamount) {
+		//避免抽错金额-
+		if ((v.projectcode!=""&&info.projectcode!=""&&v.projectcode==info.projectcode)||
+			(v.contractnumber!=""&&info.contractnumber!=""&&v.contractnumber==info.contractnumber)) &&
+			(v.winner!=""&&info.winner!=""&&v.winner==info.winner) {
+			return false
+		}
+		return true
+	}
+	if v.winner != "" && info.winner != "" && deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) {
+		return true
+	}
+	return false
+}
+
+//合同_A
+func contractRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet_1 := false
+	if isMeet_1, reason = tenderRepeat_A(v, info, reason); isMeet_1 {
+		return true, reason
+	}
+
+	isMeet_2 := false
+	if isMeet_2, reason = winningRepeat_A(v, info, reason); isMeet_2 {
+		return true, reason
+	}
+	return false, reason
+}
+
+//合同_B
+func contractRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet_1 := false
+	if isMeet_1, reason = tenderRepeat_B(v, info, reason); isMeet_1 {
+		return true, reason
+	}
+	isMeet_2 := false
+	if isMeet_2, reason = winningRepeat_B(v, info, reason); isMeet_2 {
+		return true, reason
+	}
+	return false, reason
+}
+
+//合同_C
+func contractRepeat_C(v *Info, info *Info) bool {
+
+	if tenderRepeat_C(v, info) {
+		return true
+	}
+	if winningRepeat_C(v, info) {
+		return true
+	}
+
+	//合同类 - 新增编号
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+
+	return false
+}
+
+
+
+
+
+
+
+
+
+//快速低质量数据判重
+func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
+	//if !isTheSameDay(v.publishtime,info.publishtime) {
+	//	return false,reason
+	//}
+
+	//区间间隔24小时
+	if !isTimeIntervalPeriod(v.publishtime,info.publishtime) {
+		return false,reason
+	}
+
+
+	//首先判定是否为低质量数据    info目标数据
+	if info.title!=""&&(info.agency==""||v.agency=="")&&
+		info.title==v.title&&info.projectcode==""&&info.contractnumber==""&&info.buyer=="" {
+		isValue:=0//五要素判断
+		if info.projectname != "" {//项目名称
+			isValue++
+		}
+		if info.budget != 0 {//预算
+			isValue++
+		}
+		if info.winner != ""{//中标单位
+			isValue++
+		}
+		if info.bidamount != 0 {//中标金额
+			isValue++
+		}
+		if isValue==0 {
+			reason = reason + "---低质量-要素均为空-标题满足"
+			return true, reason
+		}else if isValue==1 {
+			isMeet := false
+			if isMeet, reason = judgeLowQualityData(v, info, reason); isMeet {
+				reason = reason + "---低质量-有且一个要素组合"
+				return true, reason
+			}
+		}else {
+
+		}
+	}
+	return false,reason
+}
+
+
+//类别细节原因记录
+func judgeLowQualityData(v *Info, info *Info, reason string) (bool, string) {
+	if info.projectname!="" && info.projectname == v.projectname{//项目名称
+		reason = reason + "---项目名称"
+		return true,reason
+	}
+	if info.budget != 0 && info.budget == v.budget{//预算
+		reason = reason + "---预算"
+		return true,reason
+	}
+	if v.winner != "" && info.winner == v.winner{//中标单位
+		reason = reason + "---中标单位"
+		return true,reason
+	}
+	if v.bidamount != 0 && info.bidamount == v.bidamount{//中标金额
+		reason = reason + "---中标金额"
+		return true,reason
+	}
+	return false,reason
+}

+ 421 - 0
src/dataMethodMerge.go

@@ -0,0 +1,421 @@
+package main
+
+import "qfw/util"
+
+
+func mergeDataFields(source *Info, info *Info) (*Info,map[string]interface{} ,bool) {
+	update_map := map[string]interface{}{
+		"$set": map[string]interface{}{},
+	}
+	mergeMap :=source.mergemap
+	isReplace:=false
+	//项目名称
+	if source.projectname == "" && info.projectname != "" {
+		mergeMap["projectname"] = map[string]interface{}{
+			"projectname":info.projectname,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["projectname"] = info.projectname
+		source.projectname = info.projectname
+		isReplace = true
+	}
+
+	//项目编号
+	if source.projectcode == "" && info.projectcode != "" {
+		mergeMap["projectcode"] = map[string]interface{}{
+			"projectcode":info.projectcode,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["projectcode"] = info.projectcode
+		source.projectcode = info.projectcode
+		isReplace = true
+	}
+
+	//采购单位
+	if source.buyer == "" && info.buyer != "" {
+		mergeMap["buyer"] = map[string]interface{}{
+			"buyer":info.buyer,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["buyer"] = info.buyer
+		source.buyer = info.buyer
+		isReplace = true
+	}
+
+	//预算
+	if source.budget == 0 && info.budget != 0 {
+		mergeMap["budget"] = map[string]interface{}{
+			"budget":info.budget,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["budget"] = info.budget
+		source.budget = info.budget
+		isReplace = true
+	}
+
+	//中标单位
+	if source.winner == "" && info.winner != "" {
+		mergeMap["winner"] = map[string]interface{}{
+			"winner":info.winner,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["winner"] = info.winner
+		source.winner = info.winner
+		isReplace = true
+	}
+
+	//中标金额
+	if source.bidamount == 0 && info.bidamount != 0 {
+		mergeMap["bidamount"] = map[string]interface{}{
+			"bidamount":info.bidamount,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["bidamount"] = info.bidamount
+		source.bidamount = info.bidamount
+		isReplace = true
+	}
+
+	//开标时间
+	if source.bidopentime == 0 && info.bidopentime != 0 {
+		mergeMap["bidopentime"] = map[string]interface{}{
+			"bidopentime":info.bidopentime,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["bidopentime"] = info.bidopentime
+		source.bidopentime = info.bidopentime
+		isReplace = true
+	}
+
+	//合同编号
+	if source.contractnumber == "" && info.contractnumber != "" {
+		mergeMap["contractnumber"] = map[string]interface{}{
+			"contractnumber":info.contractnumber,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["contractnumber"] = info.contractnumber
+		source.contractnumber = info.contractnumber
+		isReplace = true
+	}
+
+	//代理机构
+	if source.agency == "" && info.agency != "" {
+		mergeMap["agency"] = map[string]interface{}{
+			"agency":info.agency,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["agency"] = info.agency
+		source.agency = info.agency
+		isReplace = true
+	}
+
+	source.mergemap = mergeMap
+	update_map["$set"].(map[string]interface{})["merge"] = mergeMap
+
+	return source,update_map,isReplace
+}
+
+
+
+
+
+
+
+//合并字段-并更新merge字段的值-
+func mergeDataFieldsArr(source *Info, info *Info) (*Info, []int64, bool) {
+
+	merge_recordMap := make(map[string]interface{}, 0)
+	mergeArr := make([]int64, 0)
+	//是否替换数据了-记录原始的数据
+	is_replace := false
+	//1、城市
+	if source.area == "" || source.area == "全国" {
+		//为空
+		if info.area != "全国" && info.area != "" {
+			merge_recordMap["area"] = source.area
+			merge_recordMap["city"] = source.city
+			source.area = info.area
+			source.city = info.city
+			mergeArr = append(mergeArr, 1)
+			is_replace = true
+		}
+	} else {
+		//不为空-查看站点相关-有值必替换
+		if source.is_site {
+			//是站点替换的城市
+			merge_recordMap["site_area"] = source.area
+			merge_recordMap["site_city"] = source.city
+			mergeArr = append(mergeArr, 0)
+			is_replace = true
+			source.is_site = false
+
+		}
+	}
+	//2、项目名称
+	if source.projectname == "" && info.projectname != "" {
+		merge_recordMap["projectname"] = source.projectname
+		source.projectname = info.projectname
+		mergeArr = append(mergeArr, 2)
+		is_replace = true
+	}
+	//3、项目编号
+	if source.projectcode == "" && info.projectcode != "" {
+		merge_recordMap["projectcode"] = source.projectcode
+		source.projectcode = info.projectcode
+		mergeArr = append(mergeArr, 3)
+		is_replace = true
+	}
+	//4、采购单位
+	if source.buyer == "" && info.buyer != "" {
+		merge_recordMap["buyer"] = source.buyer
+		source.buyer = info.buyer
+		mergeArr = append(mergeArr, 4)
+		is_replace = true
+	}
+	//5、预算
+	if source.budget == 0 && info.budget != 0 {
+		merge_recordMap["budget"] = source.budget
+		source.budget = info.budget
+		mergeArr = append(mergeArr, 5)
+		is_replace = true
+	}
+	//6、中标单位
+	if source.winner == "" && info.winner != "" {
+		merge_recordMap["winner"] = source.winner
+		source.winner = info.winner
+		mergeArr = append(mergeArr, 6)
+		is_replace = true
+	}
+
+	//7、中标金额
+	if source.bidamount == 0 && info.bidamount != 0 {
+		merge_recordMap["bidamount"] = source.bidamount
+		source.bidamount = info.bidamount
+		mergeArr = append(mergeArr, 7)
+		is_replace = true
+	}
+	//8、开标时间-地点
+	if source.bidopentime == 0 && info.bidopentime != 0 {
+		merge_recordMap["bidopentime"] = source.bidopentime
+		source.bidopentime = info.bidopentime
+		mergeArr = append(mergeArr, 8)
+		is_replace = true
+	}
+
+	//9、合同编号
+	if source.contractnumber == "" && info.contractnumber != "" {
+		merge_recordMap["contractnumber"] = source.contractnumber
+		source.contractnumber = info.contractnumber
+		mergeArr = append(mergeArr, 9)
+		is_replace = true
+	}
+
+	//10、发布时间
+	if source.publishtime == 0 && info.publishtime != 0 {
+		merge_recordMap["publishtime"] = source.publishtime
+		source.publishtime = info.publishtime
+		mergeArr = append(mergeArr, 10)
+		is_replace = true
+	}
+	//11、代理机构
+	if source.agency == "" && info.agency != "" {
+		merge_recordMap["agency"] = source.agency
+		source.agency = info.agency
+		mergeArr = append(mergeArr, 11)
+		is_replace = true
+	}
+
+	if is_replace { //有过替换更新
+		//总次数+1
+		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
+		merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
+		//和哪一个数据id进行非空替换的-记录
+		key := info.id
+		source.mergemap[key] = merge_recordMap
+	}
+
+	//待进一步优化
+	return source, mergeArr, is_replace
+}
+
+//权重评估
+func basicDataScore(v *Info, info *Info) bool {
+
+	/*
+	  权重评估
+	  网站优先级判定规则:
+	  1、国家>省级>市级>县区
+	  2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台
+	  3、同sitetype-分析weight
+	  4、要素打分-分析
+	*/
+	v_score, info_score := -1, -1
+	dict_v := SiteMap[v.site]
+	dict_info := SiteMap[info.site]
+	//先判断level
+	if dict_v != nil {
+		v_level := util.ObjToString(dict_v["level"])
+		if v_level == "国家" {
+			v_score = 4
+		} else if v_level == "省级" {
+			v_score = 3
+		} else if v_level == "市级" {
+			v_score = 2
+		} else if v_level == "县区" {
+			v_score = 1
+		} else if v_level == "" {
+		} else {
+			v_score = 0
+		}
+	}
+
+	if dict_info != nil {
+		info_level := util.ObjToString(dict_info["level"])
+		if info_level == "国家" {
+			info_score = 4
+		} else if info_level == "省级" {
+			info_score = 3
+		} else if info_level == "市级" {
+			info_score = 2
+		} else if info_level == "县区" {
+			info_score = 1
+		} else if info_level == "" {
+
+		} else {
+			v_score = 0
+		}
+	}
+
+	if v_score > info_score {
+		return true
+	}
+	if v_score < info_score {
+		return false
+	}
+
+	//判断sitetype
+	if dict_v != nil {
+		v_sitetype := util.ObjToString(dict_v["sitetype"])
+		if v_sitetype == "政府采购" {
+			v_score = 4
+		} else if v_sitetype == "公共资源" {
+			v_score = 3
+		} else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
+			v_score = 2
+		} else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
+			v_score = 1
+		} else if v_sitetype == "" {
+		} else {
+			v_score = 0
+		}
+	}
+
+	if dict_info != nil {
+		info_sitetype := util.ObjToString(dict_info["sitetype"])
+		if info_sitetype == "政府采购" {
+			info_score = 4
+		} else if info_sitetype == "公共资源" {
+			info_score = 3
+		} else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
+			info_score = 2
+		} else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
+			info_score = 1
+		} else if info_sitetype == "" {
+		} else {
+			info_score = 0
+		}
+	}
+
+	if v_score > info_score {
+		return true
+	}
+	if v_score < info_score {
+		return false
+	}
+
+	if v_score == info_score {//同sitetype 情况下   分析weight
+		v_weight := util.IntAll(dict_v["weight"])
+		info_weight := util.IntAll(dict_info["weight"])
+		if v_weight>info_weight {
+			return true
+		}
+		if info_weight>v_weight {
+			return false
+		}
+	}
+
+	//网站评估
+	m, n := 0, 0
+	if v.projectname != "" {
+		m++
+	}
+	if v.buyer != "" {
+		m++
+	}
+	if v.projectcode != "" || v.contractnumber != "" {
+		m++
+	}
+	if v.budget != 0 {
+		m++
+	}
+	if v.bidamount != 0 {
+		m++
+	}
+	if v.winner != "" {
+		m++
+	}
+	if v.bidopentime != 0 {
+		m++
+	}
+	if v.bidopenaddress != "" {
+		m++
+	}
+	if v.agency != "" {
+		m = m + 2
+	}
+	if v.city != "" {
+		m = m + 2
+	}
+
+	if info.projectname != "" {
+		n++
+	}
+	if info.buyer != "" {
+		n++
+	}
+	if info.projectcode != "" || info.contractnumber != "" {
+		n++
+	}
+	if info.budget != 0 {
+		n++
+	}
+	if info.bidamount != 0 {
+		n++
+	}
+	if info.winner != "" {
+		n++
+	}
+	if info.bidopentime != 0 {
+		n++
+	}
+	if info.bidopenaddress != "" {
+		n++
+	}
+	if info.agency != "" {
+		n = n + 2
+	}
+	if info.city != "" {
+		n = n + 2
+	}
+
+	if m > n {
+		return true
+	} else if m == n {
+		if v.publishtime >= info.publishtime {
+			return true
+		} else {
+			return false
+		}
+	} else {
+		return false
+	}
+}

+ 616 - 0
src/datamap.go

@@ -0,0 +1,616 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	qutil "qfw/util"
+	"reflect"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Info struct {
+	id    string //id
+	title string //标题
+
+	area           string  //省份
+	city           string  //城市
+	subtype        string  //信息类型
+	buyer          string  //采购单位
+	agency         string  //代理机构
+	winner         string  //中标单位
+	budget         float64 //预算金额
+	bidamount      float64 //中标金额
+	projectname    string  //项目名称
+	projectcode    string  //项目编号
+	contractnumber string  //合同编号
+	publishtime    int64   //发布时间
+	comeintime     int64   //入库时间
+	bidopentime    int64   //开标时间
+	bidopenaddress string  //开标地点
+	site 		   string //站点
+	href 		     string //正文的url
+	repeatid         string                 //重复id
+	titleSpecialWord bool                   //标题特殊词
+	specialWord      bool                   //再次判断的特殊词
+	mergemap         map[string]interface{} //合并记录
+	is_site          bool                   //是否站点城市
+	repeat_ids        []string               //记录所有重复id
+
+}
+
+var datelimit = float64(432000) //五天
+var sitelock sync.Mutex         //锁
+
+//一般数据判重
+type datamap struct {
+	lock   sync.Mutex //锁
+	days   int        //保留几天数据
+	data   map[string][]*Info
+	keymap []string
+	areakeys []string
+	keys   map[string]bool
+}
+
+//历史
+func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
+	datelimit = qutil.Float64All(days * 86400)
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
+	if lasttime <0 {
+		log.Println("数据池空数据")
+		return dm
+	}
+	start := int(time.Now().Unix())
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{"publishtime": map[string]interface{}{
+		"$lt": lasttime,
+	}}
+	log.Println("query", query)
+	it := sess.DB(data_mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
+	n, continuSum := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
+			qutil.IntAll(tmp["dataging"]) == 1 {
+
+		} else {
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+				continue
+			}
+			pt := tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+
+			if pt_time > time.Now().Unix() {
+				continue
+			}
+			if qutil.Float64All(lasttime-pt_time) < datelimit {
+				continuSum++
+				info := NewInfo(tmp)
+				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
+				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+				data := dm.data[k]
+				if data == nil {
+					data = []*Info{}
+				}
+				data = append(data, info)
+				dm.data[k] = data
+				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
+			} else {
+				break
+			}
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+	log.Printf("第%d组:数据池构建完成:%d秒,%d个\n",numIndex ,int(time.Now().Unix())-start, n)
+
+	return dm
+}
+
+//增量
+func NewDatamap(days int, lastid string) *datamap {
+	datelimit = qutil.Float64All(days * 86400 * 2)
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
+	if lastid == "" {
+		log.Println("不构建数据池")
+		return dm
+	}
+	//初始化加载数据
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{"_id": map[string]interface{}{
+		"$lte": StringTOBsonId(lastid),
+	}}
+	log.Println("query", query)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
+	nowTime := time.Now().Unix()//当前时间的时间戳
+	n, continuSum := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+
+		//source := util.ObjToMap(tmp["jsondata"]) //修复临时添加
+		//if util.IntAll((*source)["sourcewebsite"]) == 1 {
+		//	continue
+		//}
+
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{
+
+		} else {
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+				continue
+			}
+			pt:= tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+			if pt_time > time.Now().Unix() {
+				continue
+			}
+			if qutil.Float64All(nowTime-pt_time) <= datelimit {
+				continuSum++
+				info := NewInfo(tmp)
+				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
+				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+				data := dm.data[k]
+				if data == nil {
+					data = []*Info{}
+				}
+				data = append(data, info)
+				dm.data[k] = data
+				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
+			} else {
+				break
+			}
+		}
+		if n%10000 == 0 {
+			log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"])
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Println("load data:", n,"总数:",continuSum)
+	return dm
+}
+
+//数据构建
+func NewInfo(tmp map[string]interface{}) *Info {
+	subtype := qutil.ObjToString(tmp["subtype"])
+	if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+		subtype=="竞谈"||subtype=="竞价" {
+		subtype = "招标"
+	}
+
+	area := qutil.ObjToString(tmp["area"])
+	if area == "A" {
+		area = "全国"
+	}
+	info := &Info{}
+	info.id = BsonTOStringId(tmp["_id"])
+	info.title = qutil.ObjToString(tmp["title"])
+	info.area = area
+	info.subtype = subtype
+	info.buyer = qutil.ObjToString(tmp["buyer"])
+	info.projectname = qutil.ObjToString(tmp["projectname"])
+	info.contractnumber = qutil.ObjToString(tmp["contractnumber"])
+	info.projectcode = qutil.ObjToString(tmp["projectcode"])
+	info.city = qutil.ObjToString(tmp["city"])
+	info.agency = qutil.ObjToString(tmp["agency"])
+	info.winner = qutil.ObjToString(tmp["winner"])
+	info.budget = qutil.Float64All(tmp["budget"])
+	info.bidamount = qutil.Float64All(tmp["bidamount"])
+	info.publishtime = qutil.Int64All(tmp["publishtime"])
+	info.comeintime = qutil.Int64All(tmp["comeintime"])
+	info.bidopentime = qutil.Int64All(tmp["bidopentime"])
+	info.bidopenaddress = qutil.ObjToString(tmp["bidopenaddress"])
+	info.site = qutil.ObjToString(tmp["site"])
+	info.href = qutil.ObjToString(tmp["href"])
+	info.repeatid = qutil.ObjToString(tmp["repeatid"])
+	info.specialWord = FilterRegTitle.MatchString(info.title)
+	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
+	info.mergemap = *qutil.ObjToMap(tmp["merge"])
+	if info.mergemap == nil {
+		info.mergemap = make(map[string]interface{}, 0)
+	}
+	if info.repeat_ids == nil {
+		info.repeat_ids = make([]string, 0)
+	}
+
+
+
+	info.is_site = false
+
+	return info
+}
+
+//判重方法
+//判重方法
+//判重方法
+func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
+	reason := ""
+	isTestLog := false
+	keys := []string{}
+	d.lock.Lock()
+	for k, _ := range d.keys { //不同时间段
+		if info.area=="全国" {//匹配所有省
+			for _,v := range d.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
+		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+	}
+	d.lock.Unlock()
+L:
+	for _, k := range keys {
+		d.lock.Lock()
+		data := d.data[k]
+		d.lock.Unlock()
+		if len(data) > 0 { //对比v   找到同类型,同省或全国的数据作对比
+			for _, v := range data {
+				reason = ""
+				isTestLog = false
+				if v.id == info.id { //正常重复
+					return false, v, ""
+				}
+				//buyer 优先级高,有值且不相等过滤
+				if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer {
+					if v.title != info.title && v.title != "" && info.title != "" {
+						isTestLog = true
+					}
+					if buyerIsContinue(v,info) {
+						continue
+					}
+				}
+				if info.site != "" {//站点临时赋值
+					sitelock.Lock()
+					dict := SiteMap[info.site]
+					sitelock.Unlock()
+					if dict != nil {
+						if (info.area == "全国" && dict["area"] != "")||
+							(info.city == "" && dict["city"] != ""){
+							info.is_site = true
+							info.area = qutil.ObjToString(dict["area"])
+							info.city = qutil.ObjToString(dict["city"])
+						}
+					}
+				}
+
+
+				//前置条件 - 站点相关
+				if info.site != "" && info.site == v.site {
+					if info.href != "" && info.href == v.href {
+						reason = "同站点-href相同"
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+					//相同发布时间-标题无包含关系 - 项目名称不等
+					if isTheSameDay(info.publishtime,v.publishtime) &&
+						!(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+						continue
+					}
+					//不同href
+					if info.href != "" && info.href != v.href {
+						if v.title==info.title{
+							if !againRepeat(v, info,true)   {//进行同站点二次判断
+								reason = "同站点-href不同-标题相同等"
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}else {
+								continue
+							}
+						}else {
+							if againRepeat(v, info,true)  {
+								continue
+							}
+						}
+					}
+				}
+				//特殊词处理
+				specialNum:= dealWithSpecialWordNumber(info,v)
+				//前置条件 - 标题相关,有且一个关键词
+				if specialNum==1 {
+					if againRepeat(v, info,false) {
+						continue
+					}
+				}
+				//前置条件3 - 标题相关,均含有关键词
+				if specialNum==2 {
+					if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+						v.title != "" && info.title != "" {
+						letter1,letter2:=v.title,info.title
+						res, _ := regexp.Compile("[0-9a-zA-Z]+");
+						if res.MatchString(letter1)||res.MatchString(letter2) {
+							letter1=convertArabicNumeralsAndLetters(letter1)
+							letter2=convertArabicNumeralsAndLetters(letter2)
+						}
+						if strings.Contains(letter1,"重新招标")|| strings.Contains(letter2,"重新招标"){
+							letter1,letter2=dealWithSpecialPhrases(letter1,letter2)
+						}
+						if letter1==letter2 {
+							reason = reason + "标题关键词相等关系"
+							if !againRepeat(v, info,false) {//进行二级金额判断
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}
+						}else {
+							if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
+								//无包含关系-即不相等
+								if againContainSpecialWord(v, info) {
+									continue
+								}
+							}
+						}
+					}
+				}
+
+
+				//前置条件-五要素均相等
+				if leadingElementSame(v,info) {
+					reason = "五要素-相同-满足"
+					b = true
+					source = v
+					reasons = reason
+					break L
+				}
+
+
+
+				//新增快速数据过少判重
+				if LowHeavy {
+					repeat := false
+					if repeat, reason = fastLowQualityHeavy(v, info, reason); repeat {
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+				}
+
+				//代理机构相同-非空相等
+				if v.agency != "" && info.agency != "" && v.agency == info.agency {
+					reason = reason + "同机构-"
+					repeat := false
+					if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+				} else {
+					reason = reason + "非同机构-"
+					if info.city != "" && info.city == v.city {
+						reason = reason + "同城-"
+						repeat := false
+						if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
+							b = true
+							source = v
+							reasons = reason
+							break L
+						}
+					} else {
+						reason = reason + "不同城-"
+						repeat := false
+						if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat {
+							b = true
+							source = v
+							reasons = reason
+							break L
+						}
+					}
+				}
+			}
+
+		}
+	}
+
+	//往预存数据 d 添加
+	if !b {
+		ct := info.publishtime
+		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+		d.lock.Lock()
+		data := d.data[k]
+		if data == nil {
+			data = []*Info{info}
+			d.data[k] = data
+			if !d.keys[dkey] {
+				d.keys[dkey] = true
+				d.update(ct)
+			}
+		} else {
+			data = append(data, info)
+			d.data[k] = data
+		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range d.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := d.areakeys
+			areaArr = append(areaArr,info.area)
+			d.areakeys = areaArr
+		}
+
+		d.lock.Unlock()
+	}
+
+	if isTestLog {
+		reasons = reasons+"-新修改"
+	}
+	return
+}
+
+func (d *datamap) update(t int64) {
+
+	if TimingTask {
+
+	}else {
+		if IsFull {
+			d.keymap = d.GetLatelyFiveDay(t)//全量
+		}else {
+			d.keymap = d.GetLatelyFiveDayDouble(t) //增量
+		}
+		m := map[string]bool{}
+		for _, v := range d.keymap {
+			m[v] = true
+		}
+		for k, _ := range d.data {
+			if !m[k[:8]] {
+				delete(d.data, k)
+			}
+		}
+		for k, _ := range d.keys {
+			if !m[k] {
+				delete(d.keys, k)
+			}
+		}
+	}
+
+}
+
+func (d *datamap) GetLatelyFiveDay(t int64) []string  {
+	array := make([]string, d.days)
+	now := time.Unix(t, 0)
+	for i := 0; i < d.days; i++ {
+		array[i] = now.Format(qutil.Date_yyyyMMdd)
+		now = now.AddDate(0, 0, -1)
+	}
+	return array
+}
+
+func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
+	array := make([]string, d.days*2)
+	now := time.Now()
+	for i := 0; i < d.days*2; i++ {
+		array[i] = now.Format(qutil.Date_yyyyMMdd)
+		now = now.AddDate(0, 0, -1)
+	}
+	return array
+}
+
+
+
+//替换原始数据池-更新
+func (d *datamap) replacePoolData(newData *Info) {
+	d.lock.Lock()
+	ct := newData.publishtime
+	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+	k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
+	data := d.data[k]
+	for k, v := range data {
+		if v.id == newData.id {//替换
+			data[k] = newData
+			break
+		}
+	}
+	d.data[k] = data
+	d.lock.Unlock()
+}
+
+
+
+
+
+
+
+
+
+
+
+//相互替换数据池-暂时弃用
+func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
+	//删除数据池的老数据
+	ct_old := oldData.publishtime
+	dkey_old := qutil.FormatDateByInt64(&ct_old, qutil.Date_yyyyMMdd)
+	k_old := fmt.Sprintf("%s_%s_%s", dkey_old, oldData.subtype, oldData.area)
+	data_old := d.data[k_old]
+	for k, v := range data_old {
+		if v.id == oldData.id {//删除对应当前的老数据
+			data_old = append(data_old[:k], data_old[k+1:]...)
+			break
+		}
+	}
+	d.data[k_old] = data_old
+
+	//添加新的
+	ct := newData.publishtime
+	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+	k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
+	d.lock.Lock()
+	data := d.data[k]
+	if data == nil {
+		data = []*Info{newData}
+		d.data[k] = data
+		if !d.keys[dkey] {
+			d.keys[dkey] = true
+			d.update(ct)
+		}
+	} else {
+		data = append(data, newData)
+		d.data[k] = data
+	}
+	//添加省
+	isAreaExist :=false
+	for _,v:= range d.areakeys {
+		if v==newData.area {
+			isAreaExist = true
+		}
+	}
+	if !isAreaExist {
+		areaArr := d.areakeys
+		areaArr = append(areaArr,newData.area)
+		d.areakeys = areaArr
+	}
+
+	d.lock.Unlock()
+}
+//总计条数-暂时弃用
+func (d *datamap) currentTotalCount() int {
+	num:=qutil.IntAll(0)
+	for _,v:=range d.data {
+		num = num+qutil.IntAll(len(v))
+	}
+	return num
+}
+
+
+
+
+
+
+
+

+ 111 - 0
src/fullRepeat.go

@@ -0,0 +1,111 @@
+package main
+
+import (
+	"log"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"sync"
+	"time"
+)
+
+//开始全量判重程序
+func fullRepeat(sid,eid string) {
+	defer qu.Catch()
+	//区间id-是否分段
+	if IsFull && sec_gtid!="" && sec_lteid!=""{
+		sid = sec_gtid
+		eid = sec_lteid
+	}
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(sid),
+			"$lte": StringTOBsonId(eid),
+		},
+	}
+	log.Println("开始全量数据判重~查询条件:",data_mgo.DbName, extract, q)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	n, isok ,repeatN:= 0,0,0
+	dataAllDict := make(map[string][]map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%1000 == 0 {
+			log.Println("index: ", n, isok)
+		}
+		if util.IntAll(tmp["repeat"]) == 1 {
+			repeatN++
+			tmp = make(map[string]interface{})
+			continue
+		}
+		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
+			tmp = make(map[string]interface{})
+			continue
+		}
+		//优化空间-相同天-划分一组(在分类别)
+
+
+
+
+
+
+		isok++
+		//数据分组-按照类别分组
+		subtype := qu.ObjToString(tmp["subtype"])
+		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+			subtype=="竞谈"||subtype=="竞价" {
+			subtype = "招标"
+		}
+		dataArr := dataAllDict[subtype]
+		if dataArr==nil {
+			dataArr = []map[string]interface{}{}
+		}
+		dataArr = append(dataArr,tmp)
+		dataAllDict[subtype] = dataArr
+		tmp = make(map[string]interface{})
+	}
+	log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for _,dataArr := range dataAllDict {
+		pool <- true
+		wg.Add(1)
+		go func(dataArr []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			num := 0
+			for _,tmp := range dataArr{
+				info := NewInfo(tmp)
+				b, source, _ := DM.check(info)
+				if b {
+					num++
+					var updateID = map[string]interface{}{} //记录更新判重的
+					updateID["_id"] = StringTOBsonId(info.id)
+					repeat_ids:=source.repeat_ids
+					repeat_ids =  append(repeat_ids,info.id)
+					source.repeat_ids = repeat_ids
+					DM.replacePoolData(source)//替换数据池-更新
+					//Update.updatePool <- []map[string]interface{}{//重复数据打标签
+					//	updateID,
+					//	map[string]interface{}{
+					//		"$set": map[string]interface{}{
+					//			"repeat":        1,
+					//			"repeat_reason": reason,
+					//			"repeat_id":     source.id,
+					//			"dataging":		 0,
+					//			"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+					//		},
+					//	},
+					//}
+				}
+			}
+			numberlock.Lock()
+			repeatN+=num
+			numberlock.Unlock()
+		}(dataArr)
+	}
+	wg.Wait()
+	log.Println("this full data is over.", n, "repeateN:", repeatN)
+	time.Sleep(15 * time.Second)
+}

+ 370 - 0
src/historyRepeat.go

@@ -0,0 +1,370 @@
+package main
+
+import (
+	"encoding/json"
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"qfw/util"
+	"strconv"
+	"sync"
+	"time"
+)
+
+//历史判重
+func historyRepeat() {
+	defer util.Catch()
+	for {
+		start:=time.Now().Unix()
+		if gtid=="" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
+		}
+		if lteid!="" {
+			//先进行数据迁移
+			log.Println("开启一次迁移任务",gtid,lteid)
+			moveHistoryData(gtid,lteid)
+			gtid = lteid //替换数据
+		}
+		//查询表最后一个id
+		task_sess := task_mgo.GetMgoConn()
+		defer task_mgo.DestoryMongoConn(task_sess)
+		q:=map[string]interface{}{}
+		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+
+		isRepeatStatus:=false
+		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+			is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
+			if is_repeat_status == 1 {
+				lteid = util.ObjToString(tmp["lteid"])
+				log.Println("查询的最后一个已标记的任务lteid:",lteid)
+				isRepeatStatus = true
+				tmp = make(map[string]interface{})
+				break
+			}else  {
+				tmp = make(map[string]interface{})
+			}
+		}
+
+		if !isRepeatStatus {
+			log.Println("查询不到有标记的lteid数据")
+			log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
+			time.Sleep(5 * time.Minute)
+			continue
+		}
+
+		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
+		time.Sleep(5 * time.Minute)
+
+		sess := data_mgo.GetMgoConn()//连接器
+		defer data_mgo.DestoryMongoConn(sess)
+		//开始判重
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+		log.Println("历史判重查询条件:",q,"时间:", between_time)
+		it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
+			}
+			//取-符合-发布时间X年内的数据
+			if util.IntAll(tmp["dataging"]) == 1 {
+				pubtime := util.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time {
+					oknum++
+					if deterTime==0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						if pubtime-deterTime >timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr,dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = util.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr,tmp)
+						}else {
+							dayArr = append(dayArr,tmp)
+						}
+					}
+				}else {
+					outnum++
+					//不在两年内的也清标记
+					Update.updatePool <- []map[string]interface{}{//重复数据打标签
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,
+								"history_updatetime":util.Int64All(time.Now().Unix()),
+							},
+						},
+					}
+				}
+			}
+			tmp = make(map[string]interface{})
+		}
+
+		if len(dayArr)>0 {
+			pendAllArr = append(pendAllArr,dayArr)
+			dayArr = []map[string]interface{}{}
+		}
+
+		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
+
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
+
+		//测试分组数量是否正确
+		testNum:=0
+		for k,v:=range pendAllArr {
+			log.Println("第",k,"组--","数量:",len(v))
+			testNum = testNum+len(v)
+		}
+		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+		n, repeateN := 0, 0
+		log.Println("线程数:",threadNum)
+		pool := make(chan bool, threadNum)
+		wg := &sync.WaitGroup{}
+		for k,v:=range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int, v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//相关ids 跨表
+				groupOtherExtract := [][]map[string]interface{}{}
+
+				//构建当前组的数据池
+				log.Println("构建第", k, "组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
+					info := NewInfo(tmp)
+					b, source, reason := curTM.check(info)
+					if b { //有重复,生成更新语句,更新抽取和更新招标
+						repeateN++
+						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+
+						updatelock.Lock()
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						//更新数据源
+						//判断是否在当前段落
+						if judgeIsCurIds(gtid,lteid,source.id) {
+							Update.updatePool <- []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							}
+						}else {
+							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":        1,
+									"repeat_reason": reason,
+									"repeat_id":     source.id,
+									"dataging":      0,
+									"history_updatetime":util.Int64All(time.Now().Unix()),
+								},
+							},
+						}
+						if len(groupOtherExtract) >= 500 {
+							data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							groupOtherExtract = [][]map[string]interface{}{}
+						}
+
+						updatelock.Unlock()
+
+					} else {
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"dataging": 0, //符合条件的都为dataging==0
+									"history_updatetime":util.Int64All(time.Now().Unix()),
+								},
+							},
+						}
+					}
+				}
+				//每组数据结束-更新数据
+				updatelock.Lock()
+				if len(groupOtherExtract) > 0 {
+					data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
+				}
+				updatelock.Unlock()
+
+			}(k, v)
+
+		}
+
+		wg.Wait()
+
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+
+		time.Sleep(30 * time.Second)
+		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+		if gtid!=lteid{
+			for _, to := range nextNode {
+				next_sid := util.BsonIdToSId(gtid)
+				next_eid := util.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+
+		end:=time.Now().Unix()
+
+		log.Println(gtid,lteid)
+
+		if end-start<60*5 {
+			log.Println("睡眠.............")
+			time.Sleep(5 * time.Minute)
+		}
+		log.Println("继续下一段的历史判重")
+	}
+}
+//判断是否在当前id段落
+func judgeIsCurIds (gtid string,lteid string,curid string) bool {
+
+	gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
+	lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
+	cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
+	if cur_time>=gt_time&&cur_time<=lte_time {
+		return true
+	}
+	return false
+}
+//迁移上一段数据
+func moveHistoryData(startid string,endid string) {
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	year, month, day := time.Now().Date()
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": StringTOBsonId(startid),
+			"$lte": StringTOBsonId(endid),
+		},
+	}
+	log.Println(q)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		data_mgo.Save(extract_back, tmp)
+		tmp = map[string]interface{}{}
+		if index%1000 == 0 {
+			log.Println("index", index)
+		}
+	}
+	log.Println("save to", extract_back, " ok index", index)
+
+	qv := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
+		},
+	}
+	delnum := data_mgo.Delete(extract, qv)
+	log.Println("remove from ", extract, delnum)
+
+}
+
+
+
+
+
+
+
+
+//暂时弃用
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
+}
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
+	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$lt": StringTOBsonId(task_id),
+		},
+	}
+
+	it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		data_mgo.Save("result_20200713", tmp)
+		data_mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+
+}

+ 166 - 0
src/increaseRepeat.go

@@ -0,0 +1,166 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"sync"
+	"time"
+)
+
+//开始增量判重程序
+func increaseRepeat(mapInfo map[string]interface{}) {
+	defer qu.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	n, isok ,repeatN:= 0,0,0
+	dataAllDict := make(map[string][]map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%1000 == 0 {
+			log.Println("index: ", n, isok)
+		}
+		if util.IntAll(tmp["repeat"]) == 1 {
+			repeatN++
+			tmp = make(map[string]interface{})
+			continue
+		}
+		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
+			tmp = make(map[string]interface{})
+			continue
+		}
+		//数据分组-按照类别分组
+		isok++
+		subtype := qu.ObjToString(tmp["subtype"])
+		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+			subtype=="竞谈"||subtype=="竞价" {
+			subtype = "招标"
+		}
+		dataArr := dataAllDict[subtype]
+		if dataArr==nil {
+			dataArr = []map[string]interface{}{}
+		}
+		dataArr = append(dataArr,tmp)
+		dataAllDict[subtype] = dataArr
+		tmp = make(map[string]interface{})
+	}
+	log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for _,dataArr := range dataAllDict {
+		pool <- true
+		wg.Add(1)
+		go func(dataArr []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			num := 0
+			for _,tmp := range dataArr{
+				info := NewInfo(tmp)
+				b,source,reason := DM.check(info)
+				if b {
+					num++
+					var updateID = map[string]interface{}{} //记录更新判重的
+					updateID["_id"] = StringTOBsonId(info.id)
+					repeat_ids:=source.repeat_ids
+					repeat_ids =  append(repeat_ids,info.id)
+					source.repeat_ids = repeat_ids
+					DM.replacePoolData(source)//替换数据池-更新
+					Update.updatePool <- []map[string]interface{}{//重复数据打标签
+						updateID,
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":        1,
+								"repeat_reason": reason,
+								"repeat_id":     source.id,
+								"dataging":		 0,
+								"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+							},
+						},
+					}
+				}
+			}
+			numberlock.Lock()
+			repeatN+=num
+			numberlock.Unlock()
+		}(dataArr)
+	}
+	wg.Wait()
+	log.Println("this cur task over.", n, "repeateN:", repeatN)
+	//更新Ocr的标记
+	updateOcrFileData(mapInfo["lteid"].(string))
+	time.Sleep(15 * time.Second)
+	//任务完成,开始发送广播通知下面节点
+	log.Println("判重任务完成发送udp")
+	for _, to := range nextNode {
+		sid, _ := mapInfo["gtid"].(string)
+		eid, _ := mapInfo["lteid"].(string)
+		key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": util.ObjToString(to["stype"]),
+			"key":   key,
+		})
+		addr := &net.UDPAddr{
+			IP:   net.ParseIP(to["addr"].(string)),
+			Port: util.IntAll(to["port"]),
+		}
+		node := &udpNode{by, addr, time.Now().Unix(), 0}
+		udptaskmap.Store(key, node)
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+	}
+}
+//更新ocr表
+func updateOcrFileData(cur_lteid string)  {
+	//更新ocr 分类表-判重的状态
+	log.Println("开始更新Ocr表-标记",cur_lteid)
+	task_sess := task_mgo.GetMgoConn()
+	defer task_mgo.DestoryMongoConn(task_sess)
+	q_task:=map[string]interface{}{}
+	it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
+	isUpdateOcr:=false
+	updateOcrFile:=[][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+		cur_id := BsonTOStringId(tmp["_id"])
+		lteid:=util.ObjToString(tmp["lteid"])
+		if (lteid==cur_lteid) { //需要更新
+			log.Println("找到该lteid数据",cur_lteid,cur_id)
+			isUpdateOcr = true
+			updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"is_repeat_status": 1,
+						"is_repeat_time" : util.Int64All(time.Now().Unix()),
+					},
+				},
+			})
+			tmp = make(map[string]interface{})
+			break
+		}else {
+			tmp = make(map[string]interface{})
+		}
+	}
+	if !isUpdateOcr {
+		log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
+	}else {
+		if len(updateOcrFile) > 0 {
+			task_mgo.UpSertBulk(task_collName, updateOcrFile...)
+		}
+	}
+}

+ 246 - 0
src/main.go

@@ -0,0 +1,246 @@
+package main
+
+/**
+招标信息判重
+**/
+
+import (
+	"encoding/json"
+	"flag"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"regexp"
+	"sync"
+	"time"
+)
+
+var (
+	Sysconfig    map[string]interface{} 	//配置文件
+	mconf        map[string]interface{} 	//mongodb配置信息
+	data_mgo          *MongodbSim            	//mongodb操作对象
+	task_mgo     *MongodbSim            	//mongodb操作对象
+	task_collName	string
+	extract      string
+	extract_back string
+	udpclient    mu.UdpClient             	//udp对象
+	nextNode     []map[string]interface{} 	//下节点数组
+	dupdays      = 7                      	//初始化判重范围
+	DM           *datamap                 	//
+	Update		 *updateInfo
+	//正则筛选相关
+	FilterRegTitle   = regexp.MustCompile("^_$")
+	FilterRegTitle_0 = regexp.MustCompile("^_$")
+	FilterRegTitle_1 = regexp.MustCompile("^_$")
+	FilterRegTitle_2 = regexp.MustCompile("^_$")
+	threadNum      int                               //线程数量
+	SiteMap        map[string]map[string]interface{} //站点map
+	LowHeavy       bool                              //低质量数据判重
+	TimingTask     bool                              //是否定时任务
+	timingSpanDay  int64                             //时间跨度
+	timingPubScope int64                             //发布时间周期
+	gtid,lastid,sec_gtid,sec_lteid string					 //命令输入
+	lteid	string									 //历史增量属性
+	IsFull		   bool								 //是否全量
+	updatelock 		sync.Mutex         				 //锁4
+	numberlock 		sync.Mutex         				 //锁4
+	userName,passWord 	string						 //mongo -用户密码
+	taskList		[]map[string]interface{}		 //任务池
+)
+
+func init() {
+	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
+	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
+	flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
+	flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
+
+	flag.Parse()
+
+	qu.ReadConfig(&Sysconfig)
+
+	userName = qu.ObjToString(Sysconfig["userName"])
+	passWord = qu.ObjToString(Sysconfig["passWord"])
+
+	log.Println("集群用户密码:",userName,passWord)
+
+	task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
+	task_mgo = &MongodbSim{
+		MongodbAddr: task_mconf["task_addrName"].(string),
+		DbName:      task_mconf["task_dbName"].(string),
+		Size:        util.IntAllDef(task_mconf["task_pool"], 10),
+		UserName:	 userName,
+		Password:	 passWord,
+	}
+	task_mgo.InitPool()
+	task_collName = task_mconf["task_collName"].(string)
+
+	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
+	mconf = Sysconfig["mongodb"].(map[string]interface{})
+	data_mgo = &MongodbSim{
+		MongodbAddr: mconf["addr"].(string),
+		DbName:      mconf["db"].(string),
+		Size:        util.IntAllDef(mconf["pool"], 10),
+	}
+	data_mgo.InitPool()
+
+	extract = mconf["extract"].(string)
+	extract_back = mconf["extract_back"].(string)
+
+	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
+	//加载数据
+	DM = NewDatamap(dupdays, lastid)
+	//更新池
+	Update = newUpdatePool()
+	go Update.updateData()
+
+	FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
+	FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
+	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
+	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
+	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
+	LowHeavy = Sysconfig["lowHeavy"].(bool)
+	TimingTask = Sysconfig["timingTask"].(bool)
+	timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
+	timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
+	//站点配置
+	site := mconf["site"].(map[string]interface{})
+	SiteMap = make(map[string]map[string]interface{}, 0)
+	start := int(time.Now().Unix())
+	sess_site := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess_site)
+	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
+	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
+		data_map := map[string]interface{}{
+			"area":     util.ObjToString(site_dict["area"]),
+			"city":     util.ObjToString(site_dict["city"]),
+			"district": util.ObjToString(site_dict["district"]),
+			"sitetype": util.ObjToString(site_dict["sitetype"]),
+			"level":    util.ObjToString(site_dict["level"]),
+			"weight":   util.ObjToString(site_dict["weight"]),
+		}
+		SiteMap[util.ObjToString(site_dict["site"])] = data_map
+	}
+	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
+}
+
+//udp接收
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
+			}
+			udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+			//插入任务-判断任务-是否存在
+			updatelock.Lock()
+			taskList = append(taskList,mapInfo)
+			log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
+			updatelock.Unlock()
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Println("ok:", ok)
+			udptaskmap.Delete(ok)
+		}
+	}
+}
+
+//监听-获取-分发判重任务
+func getRepeatTask()  {
+	for  {
+		if len(taskList)>0 {
+			updatelock.Lock()
+			mapInfo := taskList[0]
+			if mapInfo != nil  {
+				increaseRepeat(mapInfo) //判重方法
+			}
+			taskList = taskList[1:]
+			log.Println("此段落结束当前任务池...",len(taskList),taskList)
+			updatelock.Unlock()
+		}else {
+			time.Sleep(15 * time.Second)
+		}
+	}
+}
+
+//主函数
+func main() {
+	go checkMapJob()
+	updport := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	if TimingTask {
+		log.Println("正常历史部署")
+		go historyRepeat()
+	}else {
+		//IsFull = true
+		if !IsFull {//正常增量
+			log.Println("正常增量部署,监听任务")
+			go getRepeatTask()
+			//新增调试
+			//sid := "1fffffffffffffffffffffff"
+			//eid := "9fffffffffffffffffffffff"
+			//increaseRepeat(map[string]interface{}{
+			//	"gtid":sid,
+			//	"lteid":eid,
+			//})
+		}else {
+			sid := "1fffffffffffffffffffffff"
+			eid := "9fffffffffffffffffffffff"
+			fullRepeat(sid,eid)
+		}
+	}
+	time.Sleep(99999 * time.Hour)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+ 328 - 0
src/mgo.go

@@ -0,0 +1,328 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 60 - 0
src/udptaskmap.go

@@ -0,0 +1,60 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+
+	//阿里云内网无法发送邮件
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				node.retry++
+				if node.retry > 5 {
+					log.Println("udp重试失败", k)
+					udptaskmap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "repeat-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+				}
+			} else if now-node.timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}

+ 62 - 0
src/updateMethod.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+type updateInfo struct {
+
+	//更新或新增通道
+	updatePool chan []map[string]interface{}
+	//数量
+	saveSize   	int
+
+}
+
+
+
+
+var sp = make(chan bool, 5)
+
+func newUpdatePool() *updateInfo {
+	update:=&updateInfo{make(chan []map[string]interface{}, 50000),200}
+	return update
+}
+
+
+func (update *updateInfo) updateData() {
+	log.Println("开始不断监听--待更新数据")
+	tmpArr := make([][]map[string]interface{}, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.updatePool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == update.saveSize {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					data_mgo.UpSertBulk(extract, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(5 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					data_mgo.UpSertBulk(extract, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}