소스 검색

整理代码-优化判重

zhengkun 3 년 전
부모
커밋
f17a6ece8d

+ 1 - 1
udpfilterdup/src/config.json

@@ -26,7 +26,7 @@
     ],
     "userName": "",
     "password": "",
-    "threads": 1,
+    "threads": 3,
     "isMerger": false,
     "lowHeavy":true,
     "timingTask":false,

+ 12 - 8
udpfilterdup/src/datamap.go

@@ -254,13 +254,11 @@ func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 	keys := []string{}
 	d.lock.Lock()
 	for k, _ := range d.keys { //不同时间段
-		if info.area=="全国" {
-			//匹配所有省
+		if info.area=="全国" {//匹配所有省
 			for _,v := range d.areakeys{
 				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
 			}
-		}else {
-			//匹配指定省
+		}else {//匹配指定省
 			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
 		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
@@ -287,7 +285,6 @@ L:
 						continue
 					}
 				}
-
 				if info.site != "" {//站点临时赋值
 					sitelock.Lock()
 					dict := SiteMap[info.site]
@@ -546,7 +543,15 @@ func (d *datamap) replacePoolData(newData *Info) {
 
 
 
-//替换 - A-B - 原始数据池
+
+
+
+
+
+
+
+
+//相互替换数据池-暂时弃用
 func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 	//删除数据池的老数据
 	ct_old := oldData.publishtime
@@ -593,8 +598,7 @@ func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 
 	d.lock.Unlock()
 }
-
-
+//总计条数-暂时弃用
 func (d *datamap) currentTotalCount() int {
 	num:=qutil.IntAll(0)
 	for _,v:=range d.data {

+ 107 - 0
udpfilterdup/src/fullRepeat.go

@@ -0,0 +1,107 @@
+package main
+
+import (
+	"log"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
+	"sync"
+	"time"
+)
+
+//开始判重程序
+func fullRepeat(sid,eid string) {
+	defer qu.Catch()
+	//区间id-是否分段
+	if IsFull && sec_gtid!="" && sec_lteid!=""{
+		sid = sec_gtid
+		eid = sec_lteid
+	}
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(sid),
+			"$lte": StringTOBsonId(eid),
+		},
+	}
+	log.Println("开始全量数据判重~查询条件:",mgo.DbName, extract, q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	n, isok ,repeatN:= 0,0,0
+	dataAllDict := make(map[string][]map[string]interface{},0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%1000 == 0 {
+			log.Println("index: ", n, isok)
+		}
+		if util.IntAll(tmp["repeat"]) == 1 {
+			repeatN++
+			tmp = make(map[string]interface{})
+			continue
+		}
+		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
+			tmp = make(map[string]interface{})
+			continue
+		}
+		//优化空间-相同天-划分一组(在分类别)
+
+
+		isok++
+		//数据分组-按照类别分组
+		subtype := qu.ObjToString(tmp["subtype"])
+		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+			subtype=="竞谈"||subtype=="竞价" {
+			subtype = "招标"
+		}
+		dataArr := dataAllDict[subtype]
+		if dataArr==nil {
+			dataArr = []map[string]interface{}{}
+		}
+		dataArr = append(dataArr,tmp)
+		dataAllDict[subtype] = dataArr
+		tmp = make(map[string]interface{})
+	}
+	log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for _,dataArr := range dataAllDict {
+		pool <- true
+		wg.Add(1)
+		go func(dataArr []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			num := 0
+			for _,tmp := range dataArr{
+				info := NewInfo(tmp)
+				b, source, _ := DM.check(info)
+				if b {
+					num++
+					var updateID = map[string]interface{}{} //记录更新判重的
+					updateID["_id"] = StringTOBsonId(info.id)
+					repeat_ids:=source.repeat_ids
+					repeat_ids =  append(repeat_ids,info.id)
+					source.repeat_ids = repeat_ids
+					DM.replacePoolData(source)//替换数据池-更新
+					//Update.updatePool <- []map[string]interface{}{//重复数据打标签
+					//	updateID,
+					//	map[string]interface{}{
+					//		"$set": map[string]interface{}{
+					//			"repeat":        1,
+					//			"repeat_reason": reason,
+					//			"repeat_id":     source.id,
+					//			"dataging":		 0,
+					//			"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+					//		},
+					//	},
+					//}
+				}
+			}
+			updatelock.Lock()
+			repeatN+=num
+			updatelock.Unlock()
+		}(dataArr)
+	}
+	wg.Wait()
+	log.Println("this full data is over.", n, "repeateN:", repeatN)
+	time.Sleep(15 * time.Second)
+}

+ 2 - 2
udpfilterdup/src/historyRepeat.go

@@ -15,7 +15,7 @@ import (
 )
 
 //历史判重
-func historyTaskDay() {
+func historyRepeat() {
 	defer util.Catch()
 
 	for {
@@ -254,7 +254,7 @@ func historyTaskDay() {
 
 		time.Sleep(30 * time.Second)
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-		if n >= repeateN && gtid!=lteid{
+		if gtid!=lteid{
 			for _, to := range nextNode {
 				next_sid := util.BsonIdToSId(gtid)
 				next_eid := util.BsonIdToSId(lteid)

+ 76 - 79
udpfilterdup/src/increaseRepeat.go

@@ -5,14 +5,15 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	"qfw/util"
+	"qfw/common/src/qfw/util"
+	qu "qfw/util"
 	"sync"
 	"time"
 )
 
-//开始判重程序
-func taskRepeat(mapInfo map[string]interface{}) {
-	defer util.Catch()
+//开始增量判重程序
+func increaseRepeat(mapInfo map[string]interface{}) {
+	defer qu.Catch()
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -20,31 +21,18 @@ func taskRepeat(mapInfo map[string]interface{}) {
 			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 		},
 	}
-	//全量
-	if IsFull && gtept!="" && ltept!=""{
-		log.Println("执行全量分段模式:",gtept,"---",ltept)
-		q = map[string]interface{}{
-			"publishtime": map[string]interface{}{
-				"$gte": util.Int64All(gtept),
-				"$lte": util.Int64All(ltept),
-			},
-		}
-	}
-	//临时赋值
 	log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
-
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
-	n, repeateN := 0, 0
+	n, isok ,repeatN:= 0,0,0
+	dataAllDict := make(map[string][]map[string]interface{},0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%1000 == 0 {
-			log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
+			log.Println("index: ", n, isok)
 		}
 		if util.IntAll(tmp["repeat"]) == 1 {
-			repeateN++
+			repeatN++
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -52,79 +40,88 @@ func taskRepeat(mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-
+		isok++
 		//数据分组-按照类别分组
-
-
-
+		subtype := qu.ObjToString(tmp["subtype"])
+		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
+			subtype=="竞谈"||subtype=="竞价" {
+			subtype = "招标"
+		}
+		dataArr := dataAllDict[subtype]
+		if dataArr==nil {
+			dataArr = []map[string]interface{}{}
+		}
+		dataArr = append(dataArr,tmp)
+		dataAllDict[subtype] = dataArr
+		tmp = make(map[string]interface{})
+	}
+	log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for _,dataArr := range dataAllDict {
 		pool <- true
 		wg.Add(1)
-		go func(tmp map[string]interface{}) {
+		go func(dataArr []map[string]interface{}) {
 			defer func() {
 				<-pool
 				wg.Done()
 			}()
-			info := NewInfo(tmp)
-			//正常判重
-			b, source, reason := DM.check(info)
-			if b {
-				repeateN++
-				var updateID = map[string]interface{}{} //记录更新判重的
-				updateID["_id"] = StringTOBsonId(info.id)
-				repeat_ids:=source.repeat_ids
-				repeat_ids =  append(repeat_ids,info.id)
-				source.repeat_ids = repeat_ids
-				//替换数据池-更新
-				DM.replacePoolData(source)
-				Update.updatePool <- []map[string]interface{}{//重复数据打标签
-					updateID,
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"repeat":        1,
-							"repeat_reason": reason,
-							"repeat_id":     source.id,
-							"dataging":		 0,
-							"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+			num := 0
+			for _,tmp := range dataArr{
+				info := NewInfo(tmp)
+				b, source, reason := DM.check(info)
+				if b {
+					num++
+					var updateID = map[string]interface{}{} //记录更新判重的
+					updateID["_id"] = StringTOBsonId(info.id)
+					repeat_ids:=source.repeat_ids
+					repeat_ids =  append(repeat_ids,info.id)
+					source.repeat_ids = repeat_ids
+					DM.replacePoolData(source)//替换数据池-更新
+					Update.updatePool <- []map[string]interface{}{//重复数据打标签
+						updateID,
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":        1,
+								"repeat_reason": reason,
+								"repeat_id":     source.id,
+								"dataging":		 0,
+								"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+							},
 						},
-					},
+					}
 				}
 			}
-		}(tmp)
-		tmp = make(map[string]interface{})
+			updatelock.Lock()
+			repeatN+=num
+			updatelock.Unlock()
+		}(dataArr)
 	}
 	wg.Wait()
-
-	log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
-	//log.Println("当前数据池的数量:",DM.currentTotalCount())
-	//睡眠时间30s 目的是让数据池更新所有数据...
-	time.Sleep(15 * time.Second)
+	log.Println("this cur task over.", n, "repeateN:", repeatN)
 	//更新Ocr的标记
-	if !IsFull {
-		updateOcrFileData(mapInfo["lteid"].(string))
-		//任务完成,开始发送广播通知下面节点
-		if n >= repeateN && mapInfo["stop"] == nil {
-			log.Println("判重任务完成发送udp")
-			for _, to := range nextNode {
-				sid, _ := mapInfo["gtid"].(string)
-				eid, _ := mapInfo["lteid"].(string)
-				key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
-				by, _ := json.Marshal(map[string]interface{}{
-					"gtid":  sid,
-					"lteid": eid,
-					"stype": util.ObjToString(to["stype"]),
-					"key":   key,
-				})
-				addr := &net.UDPAddr{
-					IP:   net.ParseIP(to["addr"].(string)),
-					Port: util.IntAll(to["port"]),
-				}
-				node := &udpNode{by, addr, time.Now().Unix(), 0}
-				udptaskmap.Store(key, node)
-				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-			}
+	updateOcrFileData(mapInfo["lteid"].(string))
+	time.Sleep(15 * time.Second)
+	//任务完成,开始发送广播通知下面节点
+	log.Println("判重任务完成发送udp")
+	for _, to := range nextNode {
+		sid, _ := mapInfo["gtid"].(string)
+		eid, _ := mapInfo["lteid"].(string)
+		key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": util.ObjToString(to["stype"]),
+			"key":   key,
+		})
+		addr := &net.UDPAddr{
+			IP:   net.ParseIP(to["addr"].(string)),
+			Port: util.IntAll(to["port"]),
 		}
+		node := &udpNode{by, addr, time.Now().Unix(), 0}
+		udptaskmap.Store(key, node)
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 	}
-
 }
 //更新ocr表
 func updateOcrFileData(cur_lteid string)  {

+ 34 - 78
udpfilterdup/src/main.go

@@ -10,14 +10,12 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	"os"
 	"qfw/util"
 	"regexp"
 	"sync"
 	"time"
 )
 
-
 var (
 	Sysconfig    map[string]interface{} 	//配置文件
 	mconf        map[string]interface{} 	//mongodb配置信息
@@ -44,7 +42,7 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	gtid,lastid,gtept,ltept string					 //命令输入
+	gtid,lastid,sec_gtid,sec_lteid string					 //命令输入
 	lteid	string									 //历史增量属性
 	IsFull		   bool								 //是否全量
 	updatelock 		sync.Mutex         				 //锁4
@@ -52,15 +50,11 @@ var (
 	taskList		[]map[string]interface{}		 //任务池
 )
 
-//udp通道
-var udptask chan struct{} = make(chan struct{}, 1)
-
-
 func init() {
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
-	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
-	flag.StringVar(&ltept, "ltept", "", "全量lte发布时间") //全量区间pt
+	flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
+	flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
 
 	flag.Parse()
 
@@ -78,7 +72,6 @@ func init() {
 		Size:        util.IntAllDef(task_mconf["task_pool"], 10),
 		UserName:	 userName,
 		Password:	 passWord,
-
 	}
 	task_mgo.InitPool()
 	task_collName = task_mconf["task_collName"].(string)
@@ -112,8 +105,6 @@ func init() {
 	TimingTask = Sysconfig["timingTask"].(bool)
 	timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
 	timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
-
-
 	//站点配置
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -134,62 +125,7 @@ func init() {
 	}
 	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
 }
-func mainT() {
-	go checkMapJob()
-	updport := Sysconfig["udpport"].(string)
-	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", updport)
-	if TimingTask {
-		log.Println("正常历史部署")
-		go historyTaskDay()
-	}else {
-		if gtept!=""&&ltept!="" {
-			log.Println("全量判重-准备开始")
-			IsFull = true	//全量判重
-			sid := "1fffffffffffffffffffffff"
-			eid := "9fffffffffffffffffffffff"
-			mapinfo := map[string]interface{}{}
-			if sid == "" || eid == "" {
-				log.Println("sid,eid参数不能为空")
-				os.Exit(0)
-			}
-			mapinfo["gtid"] = sid
-			mapinfo["lteid"] = eid
-			mapinfo["stop"] = "true"
-			taskRepeat(mapinfo)
-			time.Sleep(99999 * time.Hour)
-		}else {
-			//正常增量
-			log.Println("正常增量部署,监听任务")
-			go getRepeatTask()
-		}
-	}
-	time.Sleep(99999 * time.Hour)
-}
-//测试组人员使用
-func main() {
-	if TimingTask {
-		go historyTaskDay()
-		time.Sleep(99999 * time.Hour)
-	} else {
-		IsFull = true	//全量判重
-		sid := "1fffffffffffffffffffffff"
-		eid := "9fffffffffffffffffffffff"
-		mapinfo := map[string]interface{}{}
-		if sid == "" || eid == "" {
-			log.Println("sid,eid参数不能为空")
-			os.Exit(0)
-		}
-		mapinfo["gtid"] = sid
-		mapinfo["lteid"] = eid
-		mapinfo["stop"] = "true"
 
-		log.Println("测试:全量判重-准备开始")
-		taskRepeat(mapinfo)
-		time.Sleep(99999 * time.Hour)
-	}
-}
 //udp接收
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -219,34 +155,54 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-
 //监听-获取-分发判重任务
 func getRepeatTask()  {
 	for  {
 		if len(taskList)>0 {
 			updatelock.Lock()
-			//log.Println("准备执行判重任务...")
 			mapInfo := taskList[0]
 			if mapInfo != nil  {
-				taskRepeat(mapInfo) //判重方法
+				increaseRepeat(mapInfo) //判重方法
 			}
 			taskList = taskList[1:]
 			log.Println("此段落结束当前任务池...",len(taskList),taskList)
 			updatelock.Unlock()
 		}else {
-			//log.Println("无任务...睡眠15s")
 			time.Sleep(15 * time.Second)
 		}
 	}
 }
 
-
-
-
-
-
-
-
+//主函数
+func main() {
+	go checkMapJob()
+	updport := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	if TimingTask {
+		log.Println("正常历史部署")
+		go historyRepeat()
+	}else {
+		IsFull = true
+		if !IsFull {//正常增量
+			log.Println("正常增量部署,监听任务")
+			go getRepeatTask()
+			//新增调试
+			//sid := "1fffffffffffffffffffffff"
+			//eid := "9fffffffffffffffffffffff"
+			//increaseRepeat(map[string]interface{}{
+			//	"gtid":sid,
+			//	"lteid":eid,
+			//})
+		}else {
+			sid := "1fffffffffffffffffffffff"
+			eid := "9fffffffffffffffffffffff"
+			fullRepeat(sid,eid)
+		}
+	}
+	time.Sleep(99999 * time.Hour)
+}
 
 
 

+ 1 - 1
udpfilterdup/src/udptaskmap.go

@@ -40,7 +40,7 @@ func checkMapJob() {
 				if node.retry > 5 {
 					log.Println("udp重试失败", k)
 					udptaskmap.Delete(k)
-					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract-send-fail", k.(string)))
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "repeat-send-fail", k.(string)))
 					if err == nil {
 						defer res.Body.Close()
 						read, err := ioutil.ReadAll(res.Body)