Bläddra i källkod

BI 用户行业分类-存入新表

wcc 2 år sedan
förälder
incheckning
dbcf59c7d0
1 ändrade filer med 19 tillägg och 14 borttagningar
  1. 19 14
      src/task/task.go

+ 19 - 14
src/task/task.go

@@ -544,7 +544,8 @@ func newtaskrun(tt *TTask) {
 //DealUserKey 用户行业标签分类前预处理
 func DealUserKey(tt *TTask) {
 	//最终更新的数据
-	var updateUserPool [][]map[string]interface{}
+	//var updateUserPool [][]map[string]interface{}
+	var saveUserPool = make([]map[string]interface{}, 0)
 	//开始识别
 	pool := make(chan bool, tt.I_thread)
 	wg := &sync.WaitGroup{}
@@ -613,6 +614,7 @@ func DealUserKey(tt *TTask) {
 			if len(keys) > 0 {
 				//用户关键词
 				update["key_list"] = strings.Join(tags, ",")
+				update["_id"] = tmp["_id"]
 			}
 
 			lock.Lock()
@@ -621,23 +623,21 @@ func DealUserKey(tt *TTask) {
 			//fmt.Println("SMap=>", SMap)
 			subtype := SMap.Map["subscope_dy"]
 
-			if subtype != nil {
-				if subs, ok := subtype.([]string); ok {
-					update["subscope_dy"] = strings.Join(subs, ",")
-					updatePool := []map[string]interface{}{
-						{"_id": tmp["_id"]},
-						{"$set": update},
+			if tt.S_table != "" {
+				// 存储到结果表
+				if subtype != nil {
+					if subs, ok := subtype.([]string); ok {
+						update["subscope_dy"] = strings.Join(subs, ",")
 					}
-					updateUserPool = append(updateUserPool, updatePool)
+					saveUserPool = append(saveUserPool, update)
 					realCount++
 				}
+				if len(saveUserPool) > NN {
+					tt.MgoTask.SaveBulk(tt.S_table, saveUserPool...)
+					saveUserPool = []map[string]interface{}{}
+				}
 			}
 
-			if len(updateUserPool) > 10 {
-				tt.MgoTask.UpdateBulk(tt.S_collection, updateUserPool...)
-				updateUserPool = [][]map[string]interface{}{}
-				//log.Println("current:", tt.S_name, "number:", sum)
-			}
 			lock.Unlock()
 
 		}(tmp)
@@ -647,6 +647,11 @@ func DealUserKey(tt *TTask) {
 
 	wg.Wait()
 
+	if len(saveUserPool) > 0 {
+		tt.MgoTask.SaveBulk(tt.S_table, saveUserPool...)
+		saveUserPool = []map[string]interface{}{}
+	}
+
 	if lastID > tt.LastId {
 		tt.LastId = lastID
 		setid := map[string]interface{}{
@@ -782,7 +787,7 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 					}
 				}
 			}
-		} else {                  //udp查询条件
+		} else { //udp查询条件
 			if tt.S_query != "" { //有查询条件
 				json.Unmarshal([]byte(strings.Replace(tt.S_query, "'", "\"", -1)), &q)
 			}