ソースを参照

增量-更新方式变更

apple 4 年 前
コミット
5b12d7fc68
2 ファイル変更76 行追加24 行削除
  1. 14 24
      udpfilterdup/src/main.go
  2. 62 0
      udpfilterdup/src/updateMethod.go

+ 14 - 24
udpfilterdup/src/main.go

@@ -21,6 +21,7 @@ import (
 	"time"
 )
 
+
 var (
 	Sysconfig    map[string]interface{} //配置文件
 	mconf        map[string]interface{} //mongodb配置信息
@@ -33,7 +34,7 @@ var (
 	nextNode     []map[string]interface{} //下节点数组
 	dupdays      = 7                      //初始化判重范围
 	DM           *datamap                 //
-
+	Update		 *updateInfo
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
 	FilterRegTitle_0 = regexp.MustCompile("^_$")
@@ -89,6 +90,10 @@ func init() {
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
+	//更新池
+	Update = newUpdatePool()
+	go Update.updateData()
+
 	FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
 	FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
@@ -215,7 +220,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	updateExtract := [][]map[string]interface{}{}
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
@@ -226,8 +230,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
 		if util.IntAll((*source)["sourcewebsite"]) == 1 {
 			repeateN++
-			updatelock.Lock()
-			updateExtract = append(updateExtract, []map[string]interface{}{
+			Update.updatePool <- []map[string]interface{}{
 				map[string]interface{}{
 					"_id": tmp["_id"],
 				},
@@ -238,12 +241,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						"repeat_reason": "sourcewebsite为1,重复",
 					},
 				},
-			})
-			if len(updateExtract) >= 500 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
 			}
-			updatelock.Unlock()
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -274,7 +272,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				source.repeat_ids = repeat_ids
 				//替换数据池-更新
 				DM.replacePoolData(source)
-				updateExtract = append(updateExtract, []map[string]interface{}{//原始数据打标签
+
+				Update.updatePool <- []map[string]interface{}{//原始数据打标签
 					map[string]interface{}{
 						"_id": StringTOBsonId(source.id),
 					},
@@ -283,8 +282,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 							"repeat_ids": repeat_ids,
 						},
 					},
-				})
-				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+				}
+				Update.updatePool <- []map[string]interface{}{//重复数据打标签
 					updateID,
 					map[string]interface{}{
 						"$set": map[string]interface{}{
@@ -293,21 +292,13 @@ func task(data []byte, mapInfo map[string]interface{}) {
 							"repeat_id":     source.id,
 						},
 					},
-				})
+				}
 			}
 		}(tmp)
-		updatelock.Lock()
-		if len(updateExtract) >=500 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
-		updatelock.Unlock()
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-	}
+
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 
 	time.Sleep(30 * time.Second)
@@ -368,8 +359,7 @@ func historyTaskDay() {
 			log.Println("查询的最后一个任务Id:",lteid)
 			break
 		}
-
-
+		
 		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
 		time.Sleep(5 * time.Minute)
 

+ 62 - 0
udpfilterdup/src/updateMethod.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+type updateInfo struct {
+
+	//更新或新增通道
+	updatePool chan []map[string]interface{}
+	//数量
+	saveSize   	int
+
+}
+
+var sp = make(chan bool, 5)
+
+func newUpdatePool() *updateInfo {
+	update:=&updateInfo{make(chan []map[string]interface{}, 50000),500}
+	return update
+}
+
+
+func (update *updateInfo) updateData() {
+	log.Println("开始不断监听--待更新数据")
+	tmpArr := make([][]map[string]interface{}, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.updatePool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			log.Println(value,tmpIndex)
+			if tmpIndex == update.saveSize {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.UpSertBulk(extract, dataArr...)
+
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.UpSertBulk(extract, dataArr...)
+
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}