apple 5 жил өмнө
parent
commit
8731514d18
1 өөрчлөгдсөн 44 нэмэгдсэн , 10 устгасан
  1. 44 10
      udpfilterdup/src/main.go

+ 44 - 10
udpfilterdup/src/main.go

@@ -557,6 +557,9 @@ func historyTaskDay() {
 				}()
 				//每组临时数组 -  互不干扰
 				groupUpdateExtract := [][]map[string]interface{}{}
+				//
+				groupOtherExtract := [][]map[string]interface{}{}
+
 				//构建当前组的数据池
 				log.Println("构建第", k, "组---(数据池)")
 				//当前组的第一个发布时间
@@ -604,16 +607,32 @@ func historyTaskDay() {
 						DM.replacePoolData(source)
 						updatelock.Lock()
 
-						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
-							map[string]interface{}{
-								"_id": StringTOBsonId(source.id),
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat_ids": repeat_ids,
+
+						//更新数据源-   14 或者 15
+						//判断是否在当前段落
+						if judgeIsCurIds(gtid,lteid,source.id) {
+							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
 								},
-							},
-						})
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}else {
+							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
@@ -632,6 +651,12 @@ func historyTaskDay() {
 							mgo.UpSertBulk(extract, groupUpdateExtract...)
 							groupUpdateExtract = [][]map[string]interface{}{}
 						}
+
+						if len(groupOtherExtract) > 200 {
+							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							groupOtherExtract = [][]map[string]interface{}{}
+						}
+
 						updatelock.Unlock()
 
 
@@ -667,6 +692,10 @@ func historyTaskDay() {
 				if len(groupUpdateExtract) > 0 {
 					mgo.UpSertBulk(extract, groupUpdateExtract...)
 				}
+
+				if len(groupOtherExtract) > 0 {
+					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+				}
 				updatelock.Unlock()
 
 			}(k, v)
@@ -710,7 +739,12 @@ func historyTaskDay() {
 	}
 }
 
+//判断是否在当前id段落
+func judgeIsCurIds (gtid string,lteid string,curid string) bool {
+
 
+	return false
+}
 
 
 
@@ -739,7 +773,7 @@ func moveHistoryData(startid string,endid string) {
 
 	qv := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
 		},
 	}
 	delnum := mgo.Delete(extract, qv)