Pārlūkot izejas kodu

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang 5 gadi atpakaļ
vecāks
revīzija
245f0fb01b

+ 9 - 0
udpcreateindex/src/biddingall.go

@@ -30,6 +30,15 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				"$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
+	} else {
+		idMap := q["_id"].(map[string]interface{})
+		tmpQ := map[string]interface{}{}
+		for c, id := range idMap {
+			if idStr, ok := id.(string); ok && id != "" {
+				tmpQ[c] = qutil.StringTOBsonId(idStr)
+			}
+		}
+		q["_id"] = tmpQ
 	}
 	//bidding库
 	session := mgo.GetMgoConn()

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,10 +5,10 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk_xiufu_test01",
-        "extract_back": "zk_xiufu_test01",
+        "extract": "zk_task_test",
+        "extract_back": "zk_task_test",
         "site": {
-            "dbname": "extract_kf",
+            "dbname": "zhaolongyue",
             "coll": "site"
         }
     },

+ 10 - 5
udpfilterdup/src/datamap.go

@@ -53,7 +53,7 @@ type datamap struct {
 	keys   map[string]bool
 }
 
-func TimedTaskDatamap(days int,lasttime int64) *datamap {
+func TimedTaskDatamap(days int,lasttime int64 ,coll string) *datamap {
 	log.Println("数据池开始重新构建")
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
@@ -69,11 +69,12 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 		"$lt": lasttime,
 	}}
 	log.Println("query", query)
-	it := sess.DB(mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
+	it := sess.DB(mgo.DbName).C(coll).Find(query).Sort("-publishtime").Iter()
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		//qutil.IntAll(tmp["dataging"]) == 1
-		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 {
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
+			qutil.IntAll(tmp["dataging"]) == 1 {
 
 		} else {
 			pt := tmp["publishtime"]
@@ -113,13 +114,14 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	}
 
 
-	log.Printf("数据池构建完成:%d秒,%d个\n", int(time.Now().Unix())-start, n)
+	log.Printf("数据池构建完成:%d秒,%d个\n", int(time.Now().Unix())-start, n)
 
 
 	return dm
 }
 
 
+
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400 * 2)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
@@ -614,6 +616,9 @@ func dealWithSpecialWordNumber(info*Info,v*Info) int {
 
  //快速低质量数据判重
 func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
+	if !isTheSameDay(v.publishtime,info.publishtime) {
+		return false,reason
+	}
 	//首先判定是否为低质量数据    info目标数据
 	if info.agency==v.agency&&info.title!=""&&
 		info.title==v.title &&
@@ -635,7 +640,7 @@ func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
 			isValue++
 		}
 		if isValue==0 {
-			reason = reason + "---低质量-要素均为空标题包含关系"
+			reason = reason + "---低质量-要素均为空-标题包含关系"
 			return true, reason
 		}else if isValue==1 {
 			isMeet := false

+ 73 - 26
udpfilterdup/src/main.go

@@ -135,8 +135,8 @@ func mainT() {
 		5e933b1a50b5ea296ef0e839
 		*/
 		//IdType = true
-		sid = "5eca4d52511b120337790325"
-		eid = "5eca4d55511b120337790329"
+		sid = "5ece4b1b9e628c59915eb257"
+		eid = "5ed55b6d9e628c599161977c"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -229,31 +229,36 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
 
-		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
-			util.IntAll(tmp["dataging"]) == 1 ||util.IntAll(tmp["sourcewebsite"]) == 1{
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			repeateN++
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"repeat_reason": "sourcewebsite为1,重复",
+					},
+				},
+			})
+			if len(updateExtract) >= 200 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+
+
 			tmp = make(map[string]interface{})
+			continue
+		}
+
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
+			util.IntAll(tmp["dataging"]) == 1 {
 			if util.IntAll(tmp["repeat"]) == 1 {
 				repeateN++
 			}
-			if util.IntAll(tmp["sourcewebsite"]) == 1 {
-				repeateN++
-				updateExtract = append(updateExtract, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"repeat": 1,
-							"repeat_reason": "sourcewebsite为1,重复",
-						},
-					},
-				})
-				if len(updateExtract) >= 200 {
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
-				}
-
-			}
+			tmp = make(map[string]interface{})
 			continue
 		}
 		pool <- true
@@ -513,7 +518,9 @@ func timedTaskOnce() {
 		if num%10000 == 0 {
 			log.Println("正序遍历:", num)
 		}
-		if util.IntAll(tmp["sourcewebsite"]) == 1 {
+
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
 			updateExtract = append(updateExtract, []map[string]interface{}{
 				map[string]interface{}{
 					"_id": tmp["_id"],
@@ -530,8 +537,14 @@ func timedTaskOnce() {
 				mgo.UpSertBulk(extract, updateExtract...)
 				updateExtract = [][]map[string]interface{}{}
 			}
+
+
+			tmp = make(map[string]interface{})
 			continue
 		}
+
+
+
 		//取-符合-发布时间X年内的数据
 		if util.IntAll(tmp["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp["publishtime"])
@@ -602,10 +615,16 @@ func timedTaskOnce() {
 	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
 
 	n, repeateN := 0, 0
-	for k,v:=range pendAllArr {
+	for k,v:=range pendAllArr { //每组结束更新一波数据
 		//构建当前组的数据池
 		log.Println("构建第",k,"组---(数据池)")
-		DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
+		//当前组的第一个发布时间
+		first_pt :=util.Int64All(v[0]["publishtime"])
+		coll :=extract_back
+		if isTaskTimeCycle(first_pt) {
+			coll  = extract
+		}
+		DM = TimedTaskDatamap(dupdays, first_pt,coll)
 		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
 		n = n+len(v)
 		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
@@ -667,6 +686,13 @@ func timedTaskOnce() {
 				updateExtract = [][]map[string]interface{}{}
 			}
 		}
+
+		//每组数据结束-更新数据
+		if len(updateExtract) > 0 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+
 	}
 
 
@@ -699,6 +725,27 @@ func timedTaskOnce() {
 		}
 	}
 }
+//判断是否在周期天内
+func isTaskTimeCycle(pt int64) bool {
+
+	year, month, day := time.Now().Date()
+	predur_pt:=time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix()
+	log.Println(predur_pt)
+
+	if pt >= predur_pt {
+		return true
+	}else  {
+		return false
+	}
+
+}
+
+
+
+
+
+
+
 
 //合并字段-并更新merge字段的值
 func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint16(m.Size))
+	opts.SetMaxPoolSize(uint64(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)