Эх сурвалжийг харах

历史-任务-功能-备份

apple 5 жил өмнө
parent
commit
4799d43fcf

+ 312 - 0
udpfilterdup/src/README.md

@@ -76,3 +76,315 @@
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
 }
+
+
+"nextNode": [
+            {
+                "addr": "172.17.4.189",
+                "port": 1482,
+                "stype": "project",
+                "memo": "合并项目"
+            },
+            {
+                "addr": "127.0.0.1",
+                "port": 1483,
+                "stype": "bidding",
+                "memo": "创建招标数据索引new"
+            }
+   ],
+
+
+
+
+
+//定时任务--定时任务--定时任务--暂未用
+func timedTaskDay() {
+	log.Println("部署定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
+	c.Start()
+	//timedTaskOnce()
+}
+func timedTaskOnce() {
+
+	defer util.Catch()
+	log.Println("开始一次迁移任务")
+	movedata()
+	log.Println("开始一次任务判重")
+	//当前时间-8   -4 小时
+	now := time.Now()
+	log.Println(now)
+	preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
+	log.Println(preTime,curTime)
+	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
+	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	between_time := curTime.Unix() - (86400 * timingPubScope)
+	log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
+	//区间id
+	q_start := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": StringTOBsonId(task_sid),
+			"$lte": StringTOBsonId(task_eid),
+		},
+	}
+	//q_start = map[string]interface{}{
+	//	"_id": map[string]interface{}{
+	//		"$gte": StringTOBsonId("5f184cd552c1d9fbf84519d3"),
+	//		"$lte": StringTOBsonId("5f184d3852c1d9fbf8451a2a"),
+	//	},
+	//}
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
+	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
+	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+	pendAllArr:=[][]map[string]interface{}{}//待处理数组
+	dayArr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
+		if num%10000 == 0 {
+			log.Println("正序遍历:", num)
+		}
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging": 0,
+						"repeat_reason": "sourcewebsite为1 重复",
+					},
+				},
+			})
+			if len(updateExtract) > 50 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+
+
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		//取-符合-发布时间X年内的数据
+		if util.IntAll(tmp["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp["publishtime"])
+			if pubtime > 0 && pubtime >= between_time {
+				oknum++
+				if deterTime==0 {
+					log.Println("找到第一条符合条件的数据")
+					deterTime = util.Int64All(tmp["publishtime"])
+					dayArr = append(dayArr,tmp)
+				}else {
+					if pubtime-deterTime >timingSpanDay*86400 {
+						//新数组重新构建,当前组数据加到全部组数据
+						pendAllArr = append(pendAllArr,dayArr)
+						dayArr = []map[string]interface{}{}
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						dayArr = append(dayArr,tmp)
+					}
+				}
+			}else {
+				//不在两年内的也清标记
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,
+						},
+					},
+				})
+				if len(updateExtract) > 50 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	//批量更新标记
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+
+	if len(dayArr)>0 {
+		pendAllArr = append(pendAllArr,dayArr)
+		dayArr = []map[string]interface{}{}
+	}
+
+	log.Println("查询数量:",num,"符合条件:",oknum)
+
+	if len(pendAllArr) <= 0 {
+		log.Println("没找到dataging==1的数据")
+		return
+	}
+
+	//测试分组数量是否正确
+	testNum:=0
+	for k,v:=range pendAllArr {
+		log.Println("第",k,"组--","数量:",len(v))
+		testNum = testNum+len(v)
+	}
+	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+	n, repeateN := 0, 0
+	pool := make(chan bool, 4)
+	wg := &sync.WaitGroup{}
+	for k,v:=range pendAllArr { //每组结束更新一波数据
+		pool <- true
+		wg.Add(1)
+		go func(k int,v []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			//构建当前组的数据池
+			log.Println("构建第",k,"组---(数据池)")
+			//当前组的第一个发布时间
+			first_pt :=util.Int64All(v[0]["publishtime"])
+			curTM := TimedTaskDatamap(dupdays, first_pt,int(k))
+			log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
+			n = n+len(v)
+			log.Println("统计目前总数量:",n,"重复数量:",repeateN)
+			for _,tmp:=range v {
+				info := NewInfo(tmp)
+				if !LowHeavy { //是否进行低质量数据判重
+					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+						log.Println("无效数据")
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":   -1, //无效数据标签
+									"dataging": 0,
+								},
+							},
+						})
+						if len(updateExtract) > 50 {
+							mgo.UpSertBulk(extract, updateExtract...)
+							updateExtract = [][]map[string]interface{}{}
+						}
+						return
+					}
+				}
+				b, source, reason := curTM.check(info)
+				if b { //有重复,生成更新语句,更新抽取和更新招标
+					log.Println("判重结果", b, reason,"目标id",info.id)
+					repeateN++
+					//重复数据打标签
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":        1,
+								"repeat_reason": reason,
+								"repeat_id":     source.id,
+								"dataging":      0,
+							},
+						},
+					})
+				}else {
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,//符合条件的都为dataging==0
+							},
+						},
+					})
+				}
+				if len(updateExtract) > 50 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+			}
+		}(k,v)
+
+		//每组数据结束-更新数据
+		if len(updateExtract) > 0 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+	}
+
+
+
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+	log.Println("this timeTask over.", n, "repeateN:", repeateN)
+
+	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+	if n > repeateN {
+		for _, to := range nextNode {
+			next_sid := util.BsonIdToSId(task_sid)
+			next_eid := util.BsonIdToSId(task_eid)
+			key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  next_sid,
+				"lteid": next_eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+//迁移数据dupdays+5之前的数据
+func movedata() {
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	year, month, day := time.Now().Date()
+	now:=time.Now()
+	move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": move_time,
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		mgo.Save(extract_back, tmp)
+		tmp = map[string]interface{}{}
+		if index%1000 == 0 {
+			log.Println("index", index)
+		}
+	}
+	log.Println("save to", extract_back, " ok index", index)
+	qv := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
+		},
+	}
+	delnum := mgo.Delete(extract, qv)
+	log.Println("remove from ", extract, delnum)
+}

+ 2 - 2
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk_zk_test",
-        "extract_back": "zk_zk_test",
+        "extract": "zk",
+        "extract_back": "zk",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"

+ 7 - 7
udpfilterdup/src/datamap.go

@@ -54,7 +54,7 @@ type datamap struct {
 }
 
 //历史
-func TimedTaskDatamap(days int,lasttime int64) *datamap {
+func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	log.Println("数据池开始重新构建")
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
@@ -115,12 +115,12 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 			}
 		}
 		if n%50000 == 0 {
-			log.Println("当前数据池:", n, continuSum)
+			log.Println("当前第",numIndex,"组数据池构建中:", n, continuSum,tmp["_id"])
 		}
 		tmp = make(map[string]interface{})
 	}
 
-	log.Printf("数据池构建完成:%d秒,%d个\n", int(time.Now().Unix())-start, n)
+	log.Printf("第%d组:数据池构建完成:%d秒,%d个\n",numIndex ,int(time.Now().Unix())-start, n)
 
 	return dm
 }
@@ -188,7 +188,7 @@ func NewDatamap(days int, lastid string) *datamap {
 			}
 		}
 		if n%10000 == 0 {
-			log.Println("当前 n:", n,"数量:" ,continuSum)
+			log.Println("当前 n:", n,"数量:" ,continuSum,tmp["_id"])
 		}
 		tmp = make(map[string]interface{})
 	}
@@ -464,10 +464,10 @@ L:
 func (d *datamap) update(t int64) {
 
 	if TimingTask {
-		d.keymap = d.GetLatelyFiveDay(t)
+		//d.keymap = d.GetLatelyFiveDay(t)
 	}else {
-		d.keymap = d.GetLatelyFiveDay(t)//测试全量数据采用
-		//d.keymap = d.GetLatelyFiveDayDouble(t)
+		//d.keymap = d.GetLatelyFiveDay(t)//测试全量数据采用
+		d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	m := map[string]bool{}
 	for _, v := range d.keymap {

+ 251 - 414
udpfilterdup/src/main.go

@@ -8,7 +8,6 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"github.com/cron"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -18,8 +17,6 @@ import (
 	"strings"
 	"sync"
 	"time"
-
-	"gopkg.in/mgo.v2/bson"
 )
 
 var (
@@ -46,15 +43,15 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	sid,eid,lastid string                     		 //测试人员判重使用
+	gtid,lteid,lastid string
 	IdType         bool   							 //默认object类型
 )
 
 func init() {
 
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
+	flag.StringVar(&gtid, "gtid", "", "历史的起始id")
+
 	flag.Parse()
 	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
@@ -113,21 +110,23 @@ func main() {
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	if TimingTask {
-		go timedTaskDay()
+		go historyTaskDay()
 	}
 	time.Sleep(99999 * time.Hour)
 }
 
 //测试组人员使用
 func mainT() {
+
 	if TimingTask {
-		log.Println("定时任务测试开始")
-		go timedTaskDay()
+		log.Println("新历史任务测试开始")
+		go historyTaskDay()
+		//go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid = "5f15bf800000000000000000"
-		eid = "5f1efa000000000000000000"
+		sid := "4f16936d52c1d9fbf843c60e"
+		eid := "6f16936d52c1d9fbf843c60e"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -138,7 +137,7 @@ func mainT() {
 		mapinfo["gtid"] = sid
 		mapinfo["lteid"] = eid
 		mapinfo["stop"] = "true"
-		taskRepair([]byte{}, mapinfo)
+		task([]byte{}, mapinfo)
 		time.Sleep(99999 * time.Hour)
 	}
 }
@@ -177,8 +176,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-func taskRepair(data []byte, mapInfo map[string]interface{}) {
-	log.Println("开始修复数据判重")
+//开始判重程序
+func task(data []byte, mapInfo map[string]interface{}) {
+	log.Println("开始数据判重")
 	defer util.Catch()
 	//区间id
 	q := map[string]interface{}{
@@ -202,7 +202,6 @@ func taskRepair(data []byte, mapInfo map[string]interface{}) {
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
-	updateSave := []map[string]interface{}{}
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
@@ -265,16 +264,6 @@ func taskRepair(data []byte, mapInfo map[string]interface{}) {
 						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
-
-					//存新表
-					log.Println("修复数据:",info.id)
-					updateSave = append(updateSave, map[string]interface{}{
-						"id":info.id,
-					})
-					if len(updateSave) >= 100 {
-						mgo.SaveBulk("repair_repeat_0728", updateSave...)
-						updateSave = []map[string]interface{}{}
-					}
 					return
 				}
 			}
@@ -299,18 +288,6 @@ func taskRepair(data []byte, mapInfo map[string]interface{}) {
 					},
 				})
 
-
-				//存新表
-				log.Println("修复-数据:",info.id)
-				updateSave = append(updateSave, map[string]interface{}{
-					"id":info.id,
-				})
-				if len(updateSave) >= 100 {
-					mgo.SaveBulk("repair_repeat_0728", updateSave...)
-					updateSave = []map[string]interface{}{}
-				}
-
-
 				//是否合并-低质量数据不合并
 				if isMerger && !strings.Contains(reason,"低质量"){
 					newData, update_map ,isReplace := mergeDataFields(source, info)
@@ -351,10 +328,6 @@ func taskRepair(data []byte, mapInfo map[string]interface{}) {
 	if len(updateExtract) > 0 {
 		mgo.UpSertBulk(extract, updateExtract...)
 	}
-	if len(updateSave) >= 0 {
-		mgo.SaveBulk("repair_repeat_0728", updateSave...)
-	}
-
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 
 	//任务完成,开始发送广播通知下面节点
@@ -382,283 +355,65 @@ func taskRepair(data []byte, mapInfo map[string]interface{}) {
 }
 
 
-
-
-//开始判重程序
-func task(data []byte, mapInfo map[string]interface{}) {
-	log.Println("开始数据判重")
+func historyTaskDay() {
 	defer util.Catch()
-	//区间id
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
-			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
-		},
-	}
-	if IdType {
-		q = map[string]interface{}{
-			"_id": map[string]interface{}{
-				"$gt":  mapInfo["gtid"].(string),
-				"$lte": mapInfo["lteid"].(string),
-			},
-		}
-	}
 
+	for {
+		start:=time.Now().Unix()
 
-	log.Println(mgo.DbName, extract, q)
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	updateExtract := [][]map[string]interface{}{}
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
-	n, repeateN := 0, 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		if n%10000 == 0 {
-			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
+		if gtid=="" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
 		}
-		source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
-		if util.IntAll((*source)["sourcewebsite"]) == 1 {
-			repeateN++
-			updateExtract = append(updateExtract, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"repeat": 1,
-						"dataging":0,
-						"repeat_reason": "sourcewebsite为1,重复",
-					},
-				},
-			})
-			if len(updateExtract) >= 200 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
-			tmp = make(map[string]interface{})
-			continue
+		if lteid!="" {
+			//先进行数据迁移
+			log.Println("开启一次迁移任务-上一个时间段")
+			moveHistoryData(gtid,lteid)
+			gtid = lteid //替换数据
 		}
 
-		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
-			util.IntAll(tmp["dataging"]) == 1 {
-			if util.IntAll(tmp["repeat"]) == 1 {
-				repeateN++
-			}
-			tmp = make(map[string]interface{})
-			continue
-		}
-		pool <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			info := NewInfo(tmp)
-			if !LowHeavy { //是否进行低质量数据判重
-				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-					updateExtract = append(updateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"repeat": -1, //无效数据标签
-							},
-						},
-					})
-					if len(updateExtract) >= 200 {
-						mgo.UpSertBulk(extract, updateExtract...)
-						updateExtract = [][]map[string]interface{}{}
-					}
-					return
-				}
-			}
-			//正常判重
-			b, source, reason := DM.check(info)
-			if b { //有重复,生成更新语句,更新抽取和更新招标
-				repeateN++
-				var updateID = map[string]interface{}{} //记录更新判重的
-				updateID["_id"] = StringTOBsonId(info.id)
-				if IdType {
-					updateID["_id"] = info.id
-				}
-
-				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
-					updateID,
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"repeat":        1,
-							"repeat_reason": reason,
-							"repeat_id":     source.id,
-						},
-					},
-				})
-
-				//是否合并-低质量数据不合并
-				if isMerger && !strings.Contains(reason,"低质量"){
-					newData, update_map ,isReplace := mergeDataFields(source, info)
-					if isReplace {//替换-数据池
-						fmt.Println("合并更新的id:",source.id)
-						//数据池 - 替换
-						DM.replacePoolData(newData)
-						//mongo更新 - 具体字段 - merge
-						mgo.UpdateById(extract,source.id,update_map)
-						//发udp  更新索引
-						//for _, to := range nextNode {
-						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-						//	by, _ := json.Marshal(map[string]interface{}{
-						//		"gtid":  source.id,
-						//		"lteid": source.id,
-						//		"stype": "biddingall",
-						//		"key":   key,
-						//	})
-						//	addr := &net.UDPAddr{
-						//		IP:   net.ParseIP(to["addr"].(string)),
-						//		Port: util.IntAll(to["port"]),
-						//	}
-						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
-						//	udptaskmap.Store(key, node)
-						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-						//}
-					}
-				}
-			}
-		}(tmp)
-		if len(updateExtract) >= 200 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
-		tmp = make(map[string]interface{})
-	}
-	wg.Wait()
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-	}
-	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
-
-	//任务完成,开始发送广播通知下面节点
-	if n > repeateN && mapInfo["stop"] == nil {
-		log.Println("判重任务完成发送udp")
-		for _, to := range nextNode {
-			sid, _ := mapInfo["gtid"].(string)
-			eid, _ := mapInfo["lteid"].(string)
-			key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
-			by, _ := json.Marshal(map[string]interface{}{
-				"gtid":  sid,
-				"lteid": eid,
-				"stype": util.ObjToString(to["stype"]),
-				"key":   key,
-			})
-			addr := &net.UDPAddr{
-				IP:   net.ParseIP(to["addr"].(string)),
-				Port: util.IntAll(to["port"]),
-			}
-			node := &udpNode{by, addr, time.Now().Unix(), 0}
-			udptaskmap.Store(key, node)
-			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		//查询表最后一个id
+		sess := mgo.GetMgoConn()
+		defer mgo.DestoryMongoConn(sess)
+		q:=map[string]interface{}{}
+		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		it_last := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("-_id").Iter()
+		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+			lteid = BsonTOStringId(tmp["_id"])
+			log.Println("取出最后一个id:",lteid)
+			break
 		}
-	}
-}
-
-
 
-
-//定时任务--定时任务--定时任务
-func timedTaskDay() {
-	log.Println("部署定时任务")
-	c := cron.New()
-	c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
-	c.Start()
-}
-func timedTaskOnce() {
-
-	defer util.Catch()
-	log.Println("开始一次迁移任务")
-	movedata()
-	log.Println("开始一次任务判重")
-	//当前时间-8   -4 小时
-	now := time.Now()
-	log.Println(now)
-	preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
-	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
-	log.Println(preTime,curTime)
-	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
-	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	between_time := curTime.Unix() - (86400 * timingPubScope)
-	log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
-	//区间id
-	q_start := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gte": StringTOBsonId(task_sid),
-			"$lte": StringTOBsonId(task_eid),
-		},
-	}
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
-	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
-	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
-	pendAllArr:=[][]map[string]interface{}{}//待处理数组
-	dayArr := []map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
-		if num%10000 == 0 {
-			log.Println("正序遍历:", num)
+		//开始判重
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
 		}
-		source := util.ObjToMap(tmp["jsondata"])
-		if util.IntAll((*source)["sourcewebsite"]) == 1 {
-			updateExtract = append(updateExtract, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"repeat": 1,
-						"dataging": 0,
-						"repeat_reason": "sourcewebsite为1 重复",
-					},
-				},
-			})
-			if len(updateExtract) > 50 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
+		log.Println("历史判重查询条件:",q,"时间:", between_time)
+
+		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
+		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
 			}
-
-
-			tmp = make(map[string]interface{})
-			continue
-		}
-
-		//取-符合-发布时间X年内的数据
-		if util.IntAll(tmp["dataging"]) == 1 {
-			pubtime := util.Int64All(tmp["publishtime"])
-			if pubtime > 0 && pubtime >= between_time {
-				oknum++
-				if deterTime==0 {
-					log.Println("找到第一条符合条件的数据")
-					deterTime = util.Int64All(tmp["publishtime"])
-					dayArr = append(dayArr,tmp)
-				}else {
-					if pubtime-deterTime >timingSpanDay*86400 {
-						//新数组重新构建,当前组数据加到全部组数据
-						pendAllArr = append(pendAllArr,dayArr)
-						dayArr = []map[string]interface{}{}
-						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						dayArr = append(dayArr,tmp)
-					}
-				}
-			}else {
-				//不在两年内的也清标记
+			source := util.ObjToMap(tmp["jsondata"])
+			if util.IntAll((*source)["sourcewebsite"]) == 1 {
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
 						"_id": tmp["_id"],
 					},
 					map[string]interface{}{
 						"$set": map[string]interface{}{
+							"repeat": 1,
 							"dataging": 0,
+							"repeat_reason": "sourcewebsite为1 重复",
 						},
 					},
 				})
@@ -666,61 +421,38 @@ func timedTaskOnce() {
 					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
 				}
-
+				tmp = make(map[string]interface{})
+				continue
 			}
-		}
-		tmp = make(map[string]interface{})
-	}
-
 
-	//批量更新标记
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
-
-	if len(dayArr)>0 {
-		pendAllArr = append(pendAllArr,dayArr)
-		dayArr = []map[string]interface{}{}
-	}
-
-	log.Println("查询数量:",num,"符合条件:",oknum)
-
-	if len(pendAllArr) <= 0 {
-		log.Println("没找到dataging==1的数据")
-		return
-	}
-
-	//测试分组数量是否正确
-	testNum:=0
-	for k,v:=range pendAllArr {
-		log.Println("第",k,"组--","数量:",len(v))
-		testNum = testNum+len(v)
-	}
-	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
-
-	n, repeateN := 0, 0
-	for k,v:=range pendAllArr { //每组结束更新一波数据
-		//构建当前组的数据池
-		log.Println("构建第",k,"组---(数据池)")
-		//当前组的第一个发布时间
-		first_pt :=util.Int64All(v[0]["publishtime"])
-		curTM := TimedTaskDatamap(dupdays, first_pt)
-		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
-		n = n+len(v)
-		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
-		for _,tmp:=range v {
-			info := NewInfo(tmp)
-			if !LowHeavy { //是否进行低质量数据判重
-				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-					log.Println("无效数据")
+			//取-符合-发布时间X年内的数据
+			if util.IntAll(tmp["dataging"]) == 1 {
+				pubtime := util.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time {
+					oknum++
+					if deterTime==0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						if pubtime-deterTime >timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr,dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = util.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr,tmp)
+						}else {
+							dayArr = append(dayArr,tmp)
+						}
+					}
+				}else {
+					//不在两年内的也清标记
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"repeat":   -1, //无效数据标签
 								"dataging": 0,
 							},
 						},
@@ -729,94 +461,180 @@ func timedTaskOnce() {
 						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
-					continue
+
 				}
 			}
-			b, source, reason := curTM.check(info)
-			if b { //有重复,生成更新语句,更新抽取和更新招标
-				log.Println("判重结果", b, reason,"目标id",info.id)
-				repeateN++
-				//重复数据打标签
-				updateExtract = append(updateExtract, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"repeat":        1,
-							"repeat_reason": reason,
-							"repeat_id":     source.id,
-							"dataging":      0,
-						},
-					},
-				})
-			}else {
-				updateExtract = append(updateExtract, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"dataging": 0,//符合条件的都为dataging==0
-						},
-					},
-				})
-			}
-			if len(updateExtract) > 50 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
+			tmp = make(map[string]interface{})
 		}
 
-		//每组数据结束-更新数据
+
+		//批量更新标记
 		if len(updateExtract) > 0 {
 			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
-	}
 
+		if len(dayArr)>0 {
+			pendAllArr = append(pendAllArr,dayArr)
+			dayArr = []map[string]interface{}{}
+		}
 
+		log.Println("查询数量:",num,"符合条件:",oknum)
 
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
-	log.Println("this timeTask over.", n, "repeateN:", repeateN)
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
 
-	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-	if n > repeateN {
-		for _, to := range nextNode {
-			next_sid := util.BsonIdToSId(task_sid)
-			next_eid := util.BsonIdToSId(task_eid)
-			key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
-			by, _ := json.Marshal(map[string]interface{}{
-				"gtid":  next_sid,
-				"lteid": next_eid,
-				"stype": util.ObjToString(to["stype"]),
-				"key":   key,
-			})
-			addr := &net.UDPAddr{
-				IP:   net.ParseIP(to["addr"].(string)),
-				Port: util.IntAll(to["port"]),
+		//测试分组数量是否正确
+		testNum:=0
+		for k,v:=range pendAllArr {
+			log.Println("第",k,"组--","数量:",len(v))
+			testNum = testNum+len(v)
+		}
+		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+		n, repeateN := 0, 0
+		pool := make(chan bool, 5)
+		wg := &sync.WaitGroup{}
+		for k,v:=range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int,v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//构建当前组的数据池
+				log.Println("构建第",k,"组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt :=util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400,int(k))
+				log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
+				n = n+len(v)
+				log.Println("统计目前总数量:",n,"重复数量:",repeateN)
+				for _,tmp:=range v {
+					info := NewInfo(tmp)
+					if !LowHeavy { //是否进行低质量数据判重
+						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+							log.Println("无效数据")
+							updateExtract = append(updateExtract, []map[string]interface{}{
+								map[string]interface{}{
+									"_id": tmp["_id"],
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat":   -1, //无效数据标签
+										"dataging": 0,
+									},
+								},
+							})
+							if len(updateExtract) > 50 {
+								mgo.UpSertBulk(extract, updateExtract...)
+								updateExtract = [][]map[string]interface{}{}
+							}
+							return
+						}
+					}
+					b, source, reason := curTM.check(info)
+					if b { //有重复,生成更新语句,更新抽取和更新招标
+						log.Println("判重结果", b, reason,"目标id",info.id)
+						repeateN++
+						//重复数据打标签
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":        1,
+									"repeat_reason": reason,
+									"repeat_id":     source.id,
+									"dataging":      0,
+								},
+							},
+						})
+					}else {
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"dataging": 0,//符合条件的都为dataging==0
+								},
+							},
+						})
+					}
+					if len(updateExtract) > 50 {
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+				}
+			}(k,v)
+
+			//每组数据结束-更新数据
+			if len(updateExtract) > 0 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
 			}
-			node := &udpNode{by, addr, time.Now().Unix(), 0}
-			udptaskmap.Store(key, node)
-			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 		}
+
+		wg.Wait()
+
+
+		if len(updateExtract) > 0 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+		if n > repeateN {
+			for _, to := range nextNode {
+				next_sid := util.BsonIdToSId(gtid)
+				next_eid := util.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+
+		end:=time.Now().Unix()
+
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+		log.Println(gtid,lteid)
+		if end-start<60*5 {
+			log.Println("睡眠.............")
+			time.Sleep(5 * time.Minute)
+		}
+		log.Println("继续下一段的历史判重")
 	}
+
+
 }
 
 
-//迁移数据dupdays+5之前的数据
-func movedata() {
+
+
+
+//迁移上一段数据
+func moveHistoryData(startid string,endid string) {
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	year, month, day := time.Now().Date()
-	now:=time.Now()
-	move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
 	q := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$lt": move_time,
+		"_id": map[string]interface{}{
+			"$gt": StringTOBsonId(startid),
+			"$lte": StringTOBsonId(endid),
 		},
 	}
 	log.Println(q)
@@ -830,6 +648,7 @@ func movedata() {
 		}
 	}
 	log.Println("save to", extract_back, " ok index", index)
+
 	qv := map[string]interface{}{
 		"comeintime": map[string]interface{}{
 			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
@@ -840,3 +659,21 @@ func movedata() {
 }
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+