Parcourir la source

更新用户行业分类,才用UDP 方式

wcc il y a 2 ans
Parent
commit
d3e93c7895
4 fichiers modifiés avec 41 ajouts et 157 suppressions
  1. 4 0
      src/config.json
  2. 17 143
      src/task/task.go
  3. 1 0
      src/udptask/udptask.go
  4. 19 14
      src/util/charge_rule.go

+ 4 - 0
src/config.json

@@ -112,6 +112,10 @@
 		    }
 		]
     },
+    "yonghuhangye": {
+        "name": "用户行业分类",
+        "taskid": "641c34903d7382f00e3ec755"
+    },
     "biaoqian":{
         "name": "标签分类",
         "taskid": "624110979b906b76a7439654",

+ 17 - 143
src/task/task.go

@@ -531,149 +531,8 @@ OVER:
 }
 
 func newtaskrun(tt *TTask) {
-	//针对用户行业标签,需要单独处理
-	if tt.S_name == "用户行业分类" {
-		log.Println("执行任务:->", tt.S_name)
-		DealUserKey(tt)
-	} else {
-		NewTaskRunAll(tt, false, nil)
-	}
-
-}
-
-//DealUserKey 用户行业标签分类前预处理
-func DealUserKey(tt *TTask) {
-	//最终更新的数据
-	//var updateUserPool [][]map[string]interface{}
-	var saveUserPool = make([]map[string]interface{}, 0)
-	//开始识别
-	pool := make(chan bool, tt.I_thread)
-	wg := &sync.WaitGroup{}
-	lock := &sync.Mutex{}
-	q := make(map[string]interface{})
-
-	var lastID string
-	//1.获取查询条件
-	comeintime := time.Now().Unix() - 5*60
-	query := map[string]interface{}{
-		"l_registedate": map[string]interface{}{
-			"$lt": comeintime,
-		},
-	}
-	qId := tt.MgoTask.GetMgoConn()
-	defer tt.MgoTask.DestoryMongoConn(qId)
-	tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
-	eId := ""
-	for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
-		eId = u.BsonIdToSId(tmp["_id"])
-	}
-
-	if tt.LastId != "" {
-		sid := tt.LastId
-		if eId <= sid || eId == "" {
-			return
-		}
-		q["_id"] = map[string]interface{}{
-			"$gt":  u.StringTOBsonId(sid),
-			"$lte": u.StringTOBsonId(eId),
-		}
-	} else {
-		q["_id"] = map[string]interface{}{
-			"$lte": u.StringTOBsonId(eId),
-		}
-	}
-	//2.条件封装完毕,开始查询数据
-	sess := tt.MgoTask.GetMgoConn()
-	defer tt.MgoTask.DestoryMongoConn(sess)
-
-	log.Println(tt.S_name, " 查询条件:=>", q)
-	extractquery := sess.DB(tt.S_mgodb).C(tt.S_coll).Find(q).Select(nil).Sort("_id").Iter()
-
-	sum := 0
-	for tmp := make(map[string]interface{}); extractquery.Next(&tmp); sum++ {
-		lastID = u.BsonIdToSId(tmp["_id"])
-
-		pool <- true
-		wg.Add(1)
-
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-
-			keys := u.GetUserKeys(tmp)
-			tags := []string{}
-			for _, v := range keys {
-				tag := util.ObjToString(v)
-				tags = append(tags, tag)
-			}
-			//按顺序识别
-			update := map[string]interface{}{}
-			if len(keys) > 0 {
-				//用户关键词
-				update["key_list"] = strings.Join(tags, ",")
-				update["i_appid"] = tmp["i_appid"]
-				update["_id"] = tmp["_id"]
-			}
-
-			lock.Lock()
-			SMap := NewClassificationRun(tt, tmp)
-			subtype := SMap.Map["subscope_dy"]
-			// 存储到结果表
-			if subtype != nil {
-				if subs, ok := subtype.([]string); ok {
-					tops := []string{}
-					for _, v := range subs {
-						top := strings.Split(v, "_")[0]
-						tops = append(tops, top)
-					}
-					update["subscope_dy"] = strings.Join(subs, ",")
-					update["topscope_dy"] = strings.Join(tops, ",")
-				}
-			}
-
-			if len(update) > 0 {
-				saveUserPool = append(saveUserPool, update)
-			}
-
-			if len(saveUserPool) > NN {
-				//存储到新表
-				if tt.S_table != "" {
-					tt.MgoTask.SaveBulk(tt.S_table, saveUserPool...)
-					saveUserPool = []map[string]interface{}{}
-				}
-			}
-			lock.Unlock()
-
-		}(tmp)
-
-		tmp = make(map[string]interface{})
-	}
-
-	wg.Wait()
-
-	if len(saveUserPool) > 0 {
-		//存储到新表
-		if tt.S_table != "" {
-			tt.MgoTask.SaveBulk(tt.S_table, saveUserPool...)
-			saveUserPool = []map[string]interface{}{}
-		}
-	}
-
-	if lastID > tt.LastId {
-		tt.LastId = lastID
-		setid := map[string]interface{}{
-			"$set": map[string]interface{}{
-				"s_startid":   tt.LastId,
-				"s_starttime": time.Now().Unix(),
-			},
-		}
-		go tools.MgoClass.Update("rc_task", `{"_id":"`+tt.ID+`"}`, setid, false, false)
-
-	}
-	log.Println("运行", tt.S_name, "over", sum)
 
+	NewTaskRunAll(tt, false, nil)
 }
 
 //NewTaskRunAll 常规任务和udp非合并数据处理方法
@@ -782,7 +641,8 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 							"$lte": u.StringTOBsonId(eId),
 						}
 					}
-					time.Sleep(time.Minute * 2) //按id查询,为了保证有新数据入库,每次休息2分钟
+					//按id查询,为了保证有新数据入库,每次休息2分钟
+					time.Sleep(time.Minute * 2)
 					//测试环境q的赋值执行下述代码
 					//if tt.LastId != "" && q["_id"] == nil {
 					//	q["_id"] = map[string]interface{}{
@@ -894,6 +754,19 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 							}
 						}
 					}
+					// 针对用户行业分类,单独处理数据
+					if mapInfo["stype"] == "yonghuhangye" || strings.TrimSpace(tt.S_name) == "用户行业分类" {
+						subs := SMap.Map["subscope_dy"]
+						delete(SMap.Map, "topscope_dy")
+						var tops []string
+						if subscopes, ok := subs.([]string); ok {
+							for _, sub := range subscopes {
+								top := strings.Split(sub, "_")[0]
+								tops = append(tops, top)
+							}
+							SMap.Map["topscope_dy"] = u.RemoveDuplicateString(tops)
+						}
+					}
 
 					//追加时处理,//更新字段 I_fieldUpdate  0:覆盖 1:追加
 					if tt.I_fieldUpdate == 1 && tt.I_multiclass == 1 {
@@ -1427,6 +1300,7 @@ func FindId(coll string) (gtid, lteid string) {
 	return gtid, lteid
 }
 
+//NewLoadTestTask 测试任务
 func NewLoadTestTask(_id, s_mgourl, s_mgodb, s_coll, i_poolsize, s_startid, s_endid, s_query string) (bs bool, filename string) {
 	defer tools.Catch()
 	r, t, _ := NewAnalyTask(_id, s_mgourl, s_mgodb, s_coll, tools.IntAllDef(i_poolsize, 5))

+ 1 - 0
src/udptask/udptask.go

@@ -115,6 +115,7 @@ func RunningHangyeClass() {
 	}
 }
 
+//UdpTask udp 任务
 func UdpTask(stype string, mapInfo map[string]interface{}) int {
 	total := 0
 	defer qutil.Catch()

+ 19 - 14
src/util/charge_rule.go

@@ -43,20 +43,6 @@ func ChargeDetailZB(detail string) bool {
 	return false
 }
 
-////ChargeSP 判断拟建数据是否 属于审批,三个以上属于 拟建,true 审批,false 拟建
-//func ChargeSP(tmp map[string]interface{}) bool {
-//	num := 0
-//	for _, v := range spFields {
-//		if val, ok := tmp[v]; ok && val != nil {
-//			num++
-//		}
-//	}
-//	if num >= 3 {
-//		return false
-//	}
-//	return true
-//}
-
 //GetJyKey 免费订阅:o_jy.a_key.key/appendkey
 func GetJyKey(data map[string]interface{}) (res []interface{}) {
 	// 获取o_jy.a_key.key的值
@@ -160,3 +146,22 @@ func GetUserKeys(data map[string]interface{}) (res []interface{}) {
 
 	return
 }
+
+//RemoveDuplicateString 去除重复字符串
+func RemoveDuplicateString(arr []string) []string {
+	encountered := map[string]bool{}
+	result := []string{}
+
+	for _, v := range arr {
+		if encountered[v] == true {
+			// Do not add duplicate.
+		} else {
+			// Append value to result slice.
+			result = append(result, v)
+			// Record this element as an encountered element.
+			encountered[v] = true
+		}
+	}
+	// Return the new slice.
+	return result
+}