Browse Source

删除 IdType为 string的更新方式 , 新增个别记录打印

apple 4 years ago
parent
commit
2caedc2c85
3 changed files with 14 additions and 37 deletions
  1. 2 2
      udpfilterdup/src/config.json
  2. 1 7
      udpfilterdup/src/datamap.go
  3. 11 28
      udpfilterdup/src/main.go

+ 2 - 2
udpfilterdup/src/config.json

@@ -3,7 +3,7 @@
     "dupdays": 7,
     "mongodb": {
         "addr": "192.168.3.207:27092",
-        "pool": 5,
+        "pool": 10,
         "db": "extract_kf",
         "extract": "zk_zk_test",
         "extract_back": "zk_zk_test",
@@ -16,7 +16,7 @@
         "task_addrName": "172.17.4.187:27081",
         "task_dbName": "qfw",
         "task_collName": "ocr_flie_over",
-        "pool": 5
+        "pool": 10
     },
     "jkmail": {
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",

+ 1 - 7
udpfilterdup/src/datamap.go

@@ -197,12 +197,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 		area = "全国"
 	}
 	info := &Info{}
-	if IdType {
-		info.id = qutil.ObjToString(tmp["_id"])
-	}else  {
-		info.id = BsonTOStringId(tmp["_id"])
-	}
-
+	info.id = BsonTOStringId(tmp["_id"])
 	info.title = qutil.ObjToString(tmp["title"])
 	info.area = area
 	info.subtype = subtype
@@ -222,7 +217,6 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.site = qutil.ObjToString(tmp["site"])
 	info.href = qutil.ObjToString(tmp["href"])
 	info.repeatid = qutil.ObjToString(tmp["repeatid"])
-
 	info.specialWord = FilterRegTitle.MatchString(info.title)
 	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
 	info.mergemap = *qutil.ObjToMap(tmp["merge"])

+ 11 - 28
udpfilterdup/src/main.go

@@ -48,7 +48,6 @@ var (
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
 	gtid,lteid,lastid,gtept,ltept string			//命令输入
-	IdType         bool   							 //默认object类型
 	IsFull		   bool
 	updatelock 		sync.Mutex         //锁
 )
@@ -200,15 +199,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 		},
 	}
-	if IdType {
-		q = map[string]interface{}{
-			"_id": map[string]interface{}{
-				"$gt":  mapInfo["gtid"].(string),
-				"$lte": mapInfo["lteid"].(string),
-			},
-		}
-	}
-	//增量
+	//全量
 	if IsFull && gtept!="" && ltept!=""{
 		log.Println("执行分段模式")
 		q = map[string]interface{}{
@@ -219,7 +210,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 
-	log.Println(mgo.DbName, extract, q)
+	log.Println("增量查询条件:",mgo.DbName, extract, q)
+
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
@@ -277,15 +269,12 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				repeateN++
 				var updateID = map[string]interface{}{} //记录更新判重的
 				updateID["_id"] = StringTOBsonId(info.id)
-				if IdType {
-					updateID["_id"] = info.id
-				}
 				repeat_ids:=source.repeat_ids
 				repeat_ids =  append(repeat_ids,info.id)
 				source.repeat_ids = repeat_ids
 				//替换数据池-更新
 				DM.replacePoolData(source)
-				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+				updateExtract = append(updateExtract, []map[string]interface{}{//原始数据打标签
 					map[string]interface{}{
 						"_id": StringTOBsonId(source.id),
 					},
@@ -379,7 +368,8 @@ func historyTaskDay() {
 			log.Println("查询的最后一个任务Id:",lteid)
 			break
 		}
-		//
+
+		
 		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
 		time.Sleep(5 * time.Minute)
 
@@ -394,7 +384,6 @@ func historyTaskDay() {
 			},
 		}
 		log.Println("历史判重查询条件:",q,"时间:", between_time)
-
 		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
 		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
@@ -420,7 +409,7 @@ func historyTaskDay() {
 						},
 					},
 				})
-				if len(updateExtract) > 200 {
+				if len(updateExtract) >= 200 {
 					log.Println("sourcewebsite,批量更新")
 					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
@@ -467,7 +456,7 @@ func historyTaskDay() {
 							},
 						},
 					})
-					if len(updateExtract) > 200 {
+					if len(updateExtract) >= 200 {
 						log.Println("不在周期内符合dataging==1,批量更新")
 						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
@@ -591,12 +580,12 @@ func historyTaskDay() {
 							},
 						})
 
-						if len(groupUpdateExtract) > 200 {
+						if len(groupUpdateExtract) >= 500 {
 							mgo.UpSertBulk(extract, groupUpdateExtract...)
 							groupUpdateExtract = [][]map[string]interface{}{}
 						}
 
-						if len(groupOtherExtract) > 200 {
+						if len(groupOtherExtract) >= 500 {
 							mgo.UpSertBulk(extract_back, groupOtherExtract...)
 							groupOtherExtract = [][]map[string]interface{}{}
 						}
@@ -618,18 +607,12 @@ func historyTaskDay() {
 							},
 						})
 
-						if len(groupUpdateExtract) > 200 {
+						if len(groupUpdateExtract) >= 500 {
 							mgo.UpSertBulk(extract, groupUpdateExtract...)
 							groupUpdateExtract = [][]map[string]interface{}{}
 						}
 						updatelock.Unlock()
-
-
-
 					}
-
-
-
 				}
 				//每组数据结束-更新数据
 				updatelock.Lock()