unknown 6 jaren geleden
bovenliggende
commit
43c0d411e8
3 gewijzigde bestanden met toevoegingen van 76 en 69 verwijderingen
  1. 65 62
      src/jy/extract/clearesult.go
  2. 2 1
      src/jy/extract/clearudp.go
  3. 9 6
      src/jy/extract/extractInit.go

+ 65 - 62
src/jy/extract/clearesult.go

@@ -9,8 +9,7 @@ import (
 	"time"
 )
 
-var CltLogs []map[string]interface{}        //清理日志
-var UpdateResult [][]map[string]interface{} //更新抽取结果
+var CltLogs []map[string]interface{} //清理日志
 
 func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
 	qu.Try(func() {
@@ -18,54 +17,56 @@ func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
 		_id := qu.BsonIdToSId((*doc)["_id"])
 		data := elastic.Get(qu.ObjToString(ju.Config["mergetablealias"]), qu.ObjToString(ju.Config["mergetable"]), `{"query":{"term":{"list.infoid":"`+qu.BsonIdToSId((*doc)["_id"])+`"}}}`)
 		kvMap := make(map[string][]map[string]interface{})
-		tmparr := []map[string]interface{}{}
 		tmpmaparr := make(map[string]interface{})
+		set := make(map[string]interface{})
 		if len(*data) > 0 {
 			list := (*data)[0]["list"].([]interface{})
 			listArr := qu.ObjArrToMapArr(list)
 			kvMap["list"] = listArr
 		}
-		field := ""
-		for _, l := range c.ClearLuas {
-			beforeval := (*doc)[l.Field]
-			if field == "" {
-				field = l.Field
-			}
-			lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
-			clearResult := lua.ClearRunScript()
-			(*doc)[l.Field] = clearResult[l.Field]                //覆盖原来要清理的字段的值
-			if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
-				logdata := map[string]interface{}{
-					"code":    l.Code,
-					"name":    l.Name,
-					"field":   l.Field,
-					"type":    "clear",
-					"luatext": l.LuaText,
-					"before":  beforeval,
-					"value":   clearResult[l.Field],
+		for field, luas := range c.ClearLuas {
+			tmparr := []map[string]interface{}{}
+			for _, l := range luas {
+				beforeval := (*doc)[l.Field]
+				lua := ju.LuaScript{Code: l.Code, Name: l.Name, Doc: *doc, Script: l.LuaText, KvMap: kvMap}
+				clearResult := lua.ClearRunScript()                   //清理后结果
+				(*doc)[l.Field] = clearResult[l.Field]                //覆盖原来要清理的字段的值
+				if len(clearResult) > 0 && c.ClearTaskInfo.IsCltLog { //封装清理日志
+					logdata := map[string]interface{}{
+						"code":    l.Code,
+						"name":    l.Name,
+						"field":   l.Field,
+						"type":    "clear",
+						"luatext": l.LuaText,
+						"before":  beforeval,
+						"value":   clearResult[l.Field],
+					}
+					tmparr = append(tmparr, logdata)
 				}
-				tmparr = append(tmparr, logdata)
 			}
+			tmpmaparr[field] = tmparr
+			log.Println(field, "====", len((*doc)[field]), (*doc)[field])
+			log.Println("tmpmaparr=====", len(tmpmaparr), tmpmaparr)
+			set[field] = (*doc)[field]
 		}
-		//封装日志信息
-		tmpmaparr[field] = tmparr
 		tmpmaparr["resultid"] = _id
 		lock.Lock()
-		CltLogs = append(CltLogs, tmpmaparr)
+		CltLogs = append(CltLogs, tmpmaparr) //日志信息
 		lock.Unlock()
+
 		//封装更新信息
 		updatearr := []map[string]interface{}{
 			map[string]interface{}{
 				"_id": _id,
 			},
 			map[string]interface{}{
-				"$set": map[string]interface{}{
-					field: (*doc)[field],
-				},
+				"$set": set,
 			},
 		}
+		log.Println("updatearr====", updatearr)
 		lock.Lock()
-		UpdateResult = append(UpdateResult, updatearr)
+		c.UpdateResult = append(c.UpdateResult, updatearr)
+		log.Println("len(updatearr)=====", len(c.UpdateResult))
 		lock.Unlock()
 	}, func(err interface{}) {
 		log.Println((*doc)["_id"], err)
@@ -100,37 +101,39 @@ func SaveCltLog() {
 
 //批量更新抽取结果的值(todo)
 func (c *ClearTask) UpdateResultVal() {
-	//	defer qu.Catch()
-	//	e.ResultChanel = make(chan bool, 5)
-	//	e.ResultArr = [][]map[string]interface{}{}
-	//	for {
-	//		if len(e.ResultArr) > 500 {
-	//			e.ResultChanel <- true
-	//			arr := e.ResultArr[:500]
-	//			go func(tmp *[][]map[string]interface{}) {
-	//				qu.Try(func() {
-	//					db.Mgo.UpSertBulk("extract_result", *tmp...)
-	//					<-e.ResultChanel
-	//				}, func(err interface{}) {
-	//					log.Println(err)
-	//					<-e.ResultChanel
-	//				})
-	//			}(&arr)
-	//			e.ResultArr = e.ResultArr[500:]
-	//		} else {
-	//			e.ResultChanel <- true
-	//			arr := e.ResultArr
-	//			go func(tmp *[][]map[string]interface{}) {
-	//				qu.Try(func() {
-	//					db.Mgo.UpSertBulk("extract_result", *tmp...)
-	//					<-e.ResultChanel
-	//				}, func(err interface{}) {
-	//					log.Println(err)
-	//					<-e.ResultChanel
-	//				})
-	//			}(&arr)
-	//			e.ResultArr = [][]map[string]interface{}{}
-	//			time.Sleep(10 * time.Second)
-	//		}
-	//	}
+	defer qu.Catch()
+	c.ClearChannel = make(chan bool, 5)
+	c.UpdateResult = [][]map[string]interface{}{}
+	for {
+		if len(c.UpdateResult) > 500 {
+			c.ClearChannel <- true
+			arr := c.UpdateResult[:500]
+			go func(tmp *[][]map[string]interface{}) {
+				qu.Try(func() {
+					db.Mgo.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
+					//db.Mgo.UpSertBulk("extract_result", *tmp...)
+					<-c.ClearChannel
+				}, func(err interface{}) {
+					log.Println(err)
+					<-c.ClearChannel
+				})
+			}(&arr)
+			c.UpdateResult = c.UpdateResult[500:]
+		} else {
+			c.ClearChannel <- true
+			arr := c.UpdateResult
+			go func(tmp *[][]map[string]interface{}) {
+				qu.Try(func() {
+					db.Mgo.UpdateBulk(c.ClearTaskInfo.FromColl, *tmp...)
+					//db.Mgo.UpSertBulk("extract_result", *tmp...)
+					<-c.ClearChannel
+				}, func(err interface{}) {
+					log.Println(err)
+					<-c.ClearChannel
+				})
+			}(&arr)
+			c.UpdateResult = [][]map[string]interface{}{}
+			time.Sleep(10 * time.Second)
+		}
+	}
 }

+ 2 - 1
src/jy/extract/clearudp.go

@@ -30,6 +30,7 @@ func clearProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		err := json.Unmarshal(data, &rep)
 		if err != nil {
 			log.Println(err)
+			Udpclient.WriteUdp([]byte("false"), mu.OP_NOOP, ra) //回应上一个节点
 		} else {
 			sid, _ := rep["gtid"].(string)
 			eid, _ := rep["lteid"].(string)
@@ -59,7 +60,7 @@ func ClearByUdp(sid, eid string) {
 	clt.ClearTaskInfo.FDB = db.MgoFactory(2, 3, 120, clt.ClearTaskInfo.FromDbAddr, clt.ClearTaskInfo.FromDB)
 	//初始化脚本信息
 	clt.InitClearLuas()
-
+	//更新结果表清理后的字段值
 	go clt.UpdateResultVal()
 
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}

+ 9 - 6
src/jy/extract/extractInit.go

@@ -106,10 +106,13 @@ type ClearLua struct {
 }
 
 type ClearTask struct {
-	Id            string         //任务id
-	Content       string         //信息内容
-	ClearTaskInfo *ClearTaskInfo //任务信息
-	ClearLuas     []*ClearLua    //清理脚本
+	Id            string                 //任务id
+	Content       string                 //信息内容
+	ClearTaskInfo *ClearTaskInfo         //任务信息
+	ClearLuas     map[string][]*ClearLua //清理脚本
+
+	UpdateResult [][]map[string]interface{} //清理后结果
+	ClearChannel chan bool
 }
 
 func init() {
@@ -884,7 +887,7 @@ func (c *ClearTask) InitClearTaskInfo() {
 //加载清理脚本
 func (c *ClearTask) InitClearLuas() {
 	defer qu.Catch()
-	c.ClearLuas = []*ClearLua{}
+	c.ClearLuas = make(map[string][]*ClearLua)
 	list, _ := db.Mgo.Find("clearversioninfo", `{"vid":"`+c.ClearTaskInfo.VersionId+`","delete":false}`, nil, nil, false, -1, -1)
 	for _, l := range *list {
 		if b, _ := l["isuse"].(bool); !b { //仅使用启用的属性
@@ -904,7 +907,7 @@ func (c *ClearTask) InitClearLuas() {
 				LuaText: vv["s_luascript"].(string),
 				LFields: getALLFields(),
 			}
-			c.ClearLuas = append(c.ClearLuas, clearLua)
+			c.ClearLuas[s_field] = append(c.ClearLuas[s_field], clearLua)
 		}
 	}
 }