apple 5 年之前
父節點
當前提交
b409d536e3
共有 1 個文件被更改,包括 69 次插入13 次删除
  1. 69 13
      udpfilterdup/src/main.go

+ 69 - 13
udpfilterdup/src/main.go

@@ -47,8 +47,12 @@ var (
 	timingPubScope int64                             //发布时间周期
 	gtid,lteid,lastid string
 	IdType         bool   							 //默认object类型
+
+	updatelock 		sync.Mutex         //锁
 )
 
+
+
 func init() {
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
 	flag.StringVar(&gtid, "gtid", "", "历史的起始id")
@@ -420,7 +424,7 @@ func historyTaskDay() {
 		log.Println("历史判重查询条件:",q,"时间:", between_time)
 
 		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-		num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
+		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
 		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
 		pendAllArr:=[][]map[string]interface{}{}//待处理数组
 		dayArr := []map[string]interface{}{}
@@ -430,6 +434,8 @@ func historyTaskDay() {
 			}
 			source := util.ObjToMap(tmp["jsondata"])
 			if util.IntAll((*source)["sourcewebsite"]) == 1 {
+				outnum++
+				updatelock.Lock()
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
 						"_id": tmp["_id"],
@@ -442,15 +448,21 @@ func historyTaskDay() {
 						},
 					},
 				})
-				if len(updateExtract) > 50 {
+				if len(updateExtract) > 200 {
+					log.Println("sourcewebsite,批量更新")
 					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
 				}
+
+				updatelock.Unlock()
+
+
 				tmp = make(map[string]interface{})
 				continue
 			}
 
 			//取-符合-发布时间X年内的数据
+			updatelock.Lock()
 			if util.IntAll(tmp["dataging"]) == 1 {
 				pubtime := util.Int64All(tmp["publishtime"])
 				if pubtime > 0 && pubtime >= between_time {
@@ -471,6 +483,7 @@ func historyTaskDay() {
 						}
 					}
 				}else {
+					outnum++
 					//不在两年内的也清标记
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
@@ -482,26 +495,33 @@ func historyTaskDay() {
 							},
 						},
 					})
-					if len(updateExtract) > 50 {
-
+					if len(updateExtract) > 200 {
+						log.Println("不在周期内符合dataging==1,批量更新")
 						mgo.UpSertBulk(extract, updateExtract...)
-
 						updateExtract = [][]map[string]interface{}{}
 					}
 
 				}
 			}
+
+			updatelock.Unlock()
+
 			tmp = make(map[string]interface{})
 		}
 
 
 		//批量更新标记
+		updatelock.Lock()
+
 		if len(updateExtract) > 0 {
+			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
 			mgo.UpSertBulk(extract, updateExtract...)
-
 			updateExtract = [][]map[string]interface{}{}
 		}
 
+		updatelock.Unlock()
+
+
 		if len(dayArr)>0 {
 			pendAllArr = append(pendAllArr,dayArr)
 			dayArr = []map[string]interface{}{}
@@ -548,6 +568,8 @@ func historyTaskDay() {
 					if !LowHeavy { //是否进行低质量数据判重
 						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 							log.Println("无效数据")
+							updatelock.Lock()
+
 							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 								map[string]interface{}{
 									"_id": tmp["_id"],
@@ -559,10 +581,13 @@ func historyTaskDay() {
 									},
 								},
 							})
-							if len(groupUpdateExtract) > 50 {
+							if len(groupUpdateExtract) > 200 {
 								mgo.UpSertBulk(extract, groupUpdateExtract...)
 								groupUpdateExtract = [][]map[string]interface{}{}
 							}
+
+							updatelock.Unlock()
+
 							return
 						}
 					}
@@ -575,6 +600,8 @@ func historyTaskDay() {
 						source.repeat_ids = repeat_ids
 						//替换数据池-更新
 						DM.replacePoolData(source)
+						updatelock.Lock()
+
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
 							map[string]interface{}{
 								"_id": StringTOBsonId(source.id),
@@ -585,8 +612,6 @@ func historyTaskDay() {
 								},
 							},
 						})
-
-
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
@@ -600,7 +625,17 @@ func historyTaskDay() {
 								},
 							},
 						})
+
+						if len(groupUpdateExtract) > 200 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
+						}
+						updatelock.Unlock()
+
+
 					} else {
+						updatelock.Lock()
+
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
@@ -611,16 +646,27 @@ func historyTaskDay() {
 								},
 							},
 						})
+
+						if len(groupUpdateExtract) > 200 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
+						}
+						updatelock.Unlock()
+
+
+
 					}
-					if len(groupUpdateExtract) > 50 {
-						mgo.UpSertBulk(extract, groupUpdateExtract...)
-						groupUpdateExtract = [][]map[string]interface{}{}
-					}
+
+
+
 				}
 				//每组数据结束-更新数据
+				updatelock.Lock()
 				if len(groupUpdateExtract) > 0 {
 					mgo.UpSertBulk(extract, groupUpdateExtract...)
 				}
+				updatelock.Unlock()
+
 			}(k, v)
 
 		}
@@ -751,3 +797,13 @@ func moveOnceTimeOut()  {
 
 
 
+
+
+
+
+
+
+
+
+
+