瀏覽代碼

正式-判重

apple 5 年之前
父節點
當前提交
9a90f9a879
共有 3 個文件被更改,包括 150 次插入30 次删除
  1. 8 26
      udpfilterdup/src/main.go
  2. 142 4
      udpprojectset/src/heavy_test.go
  3. 二進制
      udpprojectset/src/zheng.xlsx

+ 8 - 26
udpfilterdup/src/main.go

@@ -98,6 +98,7 @@ func main() {
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
+
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -149,17 +150,10 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	//区间id
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	//q := map[string]interface{}{
-	//	"_id": map[string]interface{}{
-	//		"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-	//		"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
-	//	},
-	//}
-
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt":  mapInfo["gtid"].(string),
-			"$lte": mapInfo["lteid"].(string),
+			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
 		},
 	}
 
@@ -211,8 +205,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 					var newData = &Info{}    //更换新的数据池数据
 					var id_map = map[string]interface{}{}
 					repeat_id := source.id
-					//id_map["_id"] = util.StringTOBsonId(info.id)
-					id_map["_id"] = info.id
+					id_map["_id"] = util.StringTOBsonId(info.id)
 					if isMerger {
 						//需要合并相关操作
 						//合并操作--评功权重打分-合并完替换原始数据池
@@ -272,7 +265,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						}
 
 					}
-
 					//构建数据库更新用到的
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						id_map,
@@ -403,7 +395,6 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 				}
 				mapLock.Unlock()
 			} else {
-				mapLock.Lock()
 				b, source, reason := HM.checkHistory(info)
 				if b { //有重复,生成更新语句,更新抽取和更新招标
 					if reason == "未判重记录" {
@@ -421,11 +412,6 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 								},
 							},
 						})
-						if len(updateExtract) > 500 {
-							mgo.UpdateBulk(extract, updateExtract...)
-							updateExtract = [][]map[string]interface{}{}
-						}
-						mapLock.Unlock()
 					} else {
 						repeateN++
 						var mergeArr = []int64{} //更改合并数组记录
@@ -493,23 +479,19 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 							}
 
 						}
-
 						//构建数据库更新用到的
 						updateExtract = append(updateExtract, []map[string]interface{}{
 							id_map,
 							update_map,
 						})
-						if len(updateExtract) > 500 {
-							mgo.UpdateBulk(extract, updateExtract...)
-							updateExtract = [][]map[string]interface{}{}
-						}
-						mapLock.Unlock()
 					}
-				} else {
-					mapLock.Unlock()
 				}
 			}
 		}(tmp)
+		if len(updateExtract) > 500 {
+			mgo.UpdateBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()

+ 142 - 4
udpprojectset/src/heavy_test.go

@@ -6,6 +6,7 @@ import (
 	"log"
 	"qfw/util"
 	"qfw/util/mongodb"
+	"sync"
 	"testing"
 	"time"
 )
@@ -17,7 +18,138 @@ var (
 
 func Test_heavy(t *testing.T) {
 
+	//mapinfo := map[string]interface{}{
+	//	"gtid":  "586b6d7061a0721f15b8f264",
+	//	"lteid": "5e0b2b780cf41612e0639460",
+	//}
+	//task([]byte{}, mapinfo)
 
+
+	//log.Println("1")
+	//代码copy数据
+	//sessTest :=mgoTest.GetMgoConn()
+	//defer sessTest.Close()
+	//
+	//sess := mgo.GetMgoConn()
+	//defer sess.Close()
+	//
+	////var arr []map[string]interface{}
+	//
+	//res_test := sessTest.DB("qfw").C("bidding").Find(mongodb.ObjToMQ(`{"comeintime":{"$gte": 1571025600, "$lte": 1571976000}}`, true)).Iter()
+	//res :=sess.DB("extract_kf").C("a_testbidding")
+	//5
+	//
+	//
+	//
+	//
+	//i:=0
+	//for dict := make(map[string]interface{}); res_test.Next(&dict); i++{
+	//
+	//	//插入
+	//	if i%2000==0 {
+	//		log.Println("当前:",i)
+	//	}
+	//	res.Insert(dict)
+	//	//if len(arr)>=500 {
+	//	//	arr = make([]map[string]interface{},0)
+	//	//}else {
+	//	//	arr = append(arr,dict)
+	//	//}
+	//}
+	//
+
+	//extract,extract_copy:="a_testbidding_new","a_testbidding"
+	//
+	//sess := mgo.GetMgoConn()
+	//defer mgo.DestoryMongoConn(sess)
+	//res_copy := sess.DB("extract_kf").C(extract_copy).Find(nil).Iter()
+	//
+	//m1 :=map[string]int{} //老版本
+	//m2 :=map[string]int{} //新版本
+	//
+	//i:=0
+	//j:=0
+	//for v1 := make(map[string]interface{}); res_copy.Next(&v1); i++{
+	//	if i%2000==0 {
+	//		log.Println("当前i:",i)
+	//	}
+	//	m1[(v1["_id"].(bson.ObjectId).Hex())]= util.IntAll(v1["repeat"])
+	//}
+	//
+	//sesss := mgo.GetMgoConn()
+	//defer mgo.DestoryMongoConn(sesss)
+	//res := sesss.DB("extract_kf").C(extract).Find(nil).Iter()
+	//
+	//
+	//for v2 := make(map[string]interface{}); res.Next(&v2); j++{
+	//	if j%2000==0 {
+	//		log.Println("当前j:",j)
+	//	}
+	//	m2[(v2["_id"].(bson.ObjectId).Hex())]= util.IntAll(v2["repeat"])
+	//}
+	//
+	//fmt.Println(len(m1),len(m2))
+	//n1:=0
+	//n2:=0
+	//n3:=0
+	//n4:=0
+	//n5:=0
+	//n6:=0
+	//
+	//var arr1 []string
+	//var arr2 []string
+	//for k,v:=range m1{
+	//
+	//	if m2[k]==1&&v==0{//0:1
+	//		n1++
+	//		arr2 = append(arr2,fmt.Sprintf("目标_id:%s",k))
+	//	}
+	//	if m2[k]==0&&v==1{ //1:0
+	//		n2++
+	//		arr1 = append(arr1,fmt.Sprintf("目标_id:%s",k))
+	//	}
+	//	if m2[k]==0&&v==0{ //0:0
+	//		n3++
+	//	}
+	//	if m2[k]==1&&v==1{//1:1
+	//		n4++
+	//	}
+	//	if m2[k]==-1&&v==0{ //0:-1
+	//		n5++
+	//	}
+	//	if m2[k]==-1&&v==1{//1:-1
+	//		n6++
+	//	}
+	//
+	//}
+	////打印 1:0情况    ;
+	//mm:=0
+	//for _,v:=range arr1 {
+	//	mm++
+	//	if mm%200==0 {
+	//		log.Println(v)
+	//	}
+	//}
+	//
+	//log.Println("分割线---------------")
+	//log.Println("分割线---------------")
+	//
+	//
+	////打印 0:1情况
+	//nn:=0
+	//for _,v:=range arr2 {
+	//	nn++
+	//	if nn%200==0 {
+	//		log.Println(v)
+	//	}
+	//}
+	//
+	//log.Println("V1 0:1---",n1)
+	//log.Println("V1 1:0---",n2)
+	//log.Println("V1 0:0---",n3)
+	//log.Println("V1 1:1---",n4)
+	//log.Println("V1 0:-1---",n5)
+	//log.Println("V1 1:-1---",n6)
 }
 
 
@@ -48,6 +180,11 @@ func Test_field(t *testing.T) {
 		field_map[dict["s_field"].(string)] = "1"
 	}
 
+	//固定死的需要分析的字段
+
+
+
+
 	/*	ObjectId("5da3f2c5a5cb26b9b79847fc")
 		ObjectId("5da3fd6da5cb26b9b7a8683c")
 		ObjectId("5da40bdaa5cb26b9b7bea472")
@@ -132,7 +269,7 @@ func Test_field(t *testing.T) {
 	row1.AddCell().Value = "爬虫类"
 	row1.AddCell().Value = "字段有效数"
 
-
+	mapLock := &sync.Mutex{}
 	limit :=0
 	for _,v :=range arr  {
 		limit++
@@ -141,7 +278,7 @@ func Test_field(t *testing.T) {
 		row.AddCell().SetString(v["key"].(string))
 		row.AddCell().SetInt(v["total"].(int))
 
-
+		mapLock.Lock()
 		sheetName := "排名:"+util.ObjToString(v["key"])
 		sheet_detail, err := f.AddSheet(sheetName)
 		if err==nil {
@@ -150,7 +287,6 @@ func Test_field(t *testing.T) {
 				if a,ok :=v1.([]string);ok {
 					for k2, v2 := range a {
 						if k2==0 {
-							log.Println(k1)
 							sheet_detail.Cell(row_num, col_num).Value = util.ObjToString(k1)
 							row_num++
 							sheet_detail.Cell(row_num, col_num).Value = v2
@@ -165,7 +301,9 @@ func Test_field(t *testing.T) {
 			}
 		}
 
-		if limit >3{
+		mapLock.Unlock()
+
+		if limit >10{
 			break
 		}
 	}

二進制
udpprojectset/src/zheng.xlsx