Browse Source

blocks内存优化及数据存储丢失

fengweiqiang 6 years ago
parent
commit
ca455e4015
4 changed files with 110 additions and 37 deletions
  1. 28 8
      src/jy/extract/clearesult.go
  2. 12 4
      src/jy/extract/extract.go
  3. 62 22
      src/jy/extract/extractInit.go
  4. 8 3
      src/jy/extract/extractudp.go

+ 28 - 8
src/jy/extract/clearesult.go

@@ -63,9 +63,9 @@ func (c *ClearTask) ClearProcess(doc *map[string]interface{}) {
 				"$set": set,
 			},
 		}
-		lock.Lock()
+		c.RWMutex.Lock()
 		c.UpdateResult = append(c.UpdateResult, updatearr)
-		lock.Unlock()
+		c.RWMutex.Unlock()
 	}, func(err interface{}) {
 		log.Debug((*doc)["_id"], err)
 		<-c.ClearTaskInfo.ProcessPool
@@ -96,21 +96,25 @@ func SaveCltLog() {
 			}
 		}
 	}
-	time.AfterFunc(10*time.Second, SaveCltLog)
+	time.AfterFunc(3*time.Second, SaveCltLog)
 }
 
 //批量更新抽取结果的值
 func (c *ClearTask) UpdateResultVal(init bool) {
 	defer qu.Catch()
+	c.RWMutex.Lock()
 	if c.UpdateResult == nil {
 		c.UpdateResult = [][]map[string]interface{}{}
 	}
+	c.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(c.UpdateResult) > 500 {
-					arr := c.UpdateResult[:500]
-					c.UpdateResult = c.UpdateResult[500:]
+				c.RWMutex.Lock()
+				if len(c.UpdateResult) > 100 {
+					arr := c.UpdateResult[:100]
+					c.UpdateResult = c.UpdateResult[100:]
+					c.RWMutex.Unlock()
 					qu.Try(func() {
 						c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 					}, func(err interface{}) {
@@ -119,20 +123,36 @@ func (c *ClearTask) UpdateResultVal(init bool) {
 				} else {
 					arr := c.UpdateResult
 					c.UpdateResult = [][]map[string]interface{}{}
+					c.RWMutex.Unlock()
 					qu.Try(func() {
 						c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
-					time.Sleep(10 * time.Second)
+					time.Sleep(3 * time.Second)
 				}
 			}
 		}()
 	} else {
+		c.RWMutex.Lock()
 		arr := c.UpdateResult
 		c.UpdateResult = [][]map[string]interface{}{}
+		c.RWMutex.Unlock()
 		qu.Try(func() {
-			c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > 100 {
+					arr2 := arr[:100]
+					arr = arr[100:]
+					lenarr = len(arr)
+					c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr2...)
+					time.Sleep(1*time.Second)
+				} else {
+					c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
+					break
+				}
+			}
+			//c.ClearTaskInfo.FDB.UpdateBulk(c.ClearTaskInfo.FromColl, arr...)
 		}, func(err interface{}) {
 			log.Debug(err)
 		})

+ 12 - 4
src/jy/extract/extract.go

@@ -28,7 +28,7 @@ var (
 	ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
 	TaskList      map[string]*ExtractTask          //任务列表
 	ClearTaskList map[string]*ClearTask            //清理任务列表
-	saveLimit     = 200                            //抽取日志批量保存
+	saveLimit     = 100                            //抽取日志批量保存
 	PageSize      = 5000                           //查询分页
 	Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1}`
 	Fields2       = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
@@ -1269,7 +1269,11 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			tmp["kvtext"] = kvtext.String()
 		}
 		if len(blocks) > 0 {
-			tmp["blocks"] = blocks
+			if blocksBytes, err := json.Marshal(blocks);err == nil{
+				if utf8.RuneCount(blocksBytes) < 100000{
+					tmp["blocks"] = string(blocksBytes)
+				}
+			}
 		}
 		//tmp["extract_content"] = j.Content
 		if e.TaskInfo.TestColl == "" {
@@ -1286,7 +1290,9 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					},
 					map[string]interface{}{"$set": tmp},
 				}
+				e.RWMutex.Lock()
 				e.BidArr = append(e.BidArr, tmparr)
+				e.RWMutex.Unlock()
 				e.BidTotal++
 			}
 			if b, ok := ju.Config["saveresult"].(bool); ok && b {
@@ -1300,7 +1306,9 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					},
 					map[string]interface{}{"$set": tmp},
 				}
+				e.RWMutex.Lock()
 				e.ResultArr = append(e.ResultArr, tmparr)
+				e.RWMutex.Unlock()
 			}
 		} else { //测试结果
 			delete(tmp, "_id")
@@ -1356,7 +1364,7 @@ func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask)
 				"firstid":    j.SourceMid,
 				"createtime": now,
 			})
-			if len(datas) == 200 {
+			if len(datas) == saveLimit {
 				db.Mgo.SaveBulk(coll, datas...)
 				datas = []map[string]interface{}{}
 			}
@@ -1370,7 +1378,7 @@ func otherNeedSave(j *ju.Job, result map[string][]*ju.ExtField, e *ExtractTask)
 			"firstid":    j.SourceMid,
 			"createtime": now,
 		})
-		if len(datas) == 200 {
+		if len(datas) == saveLimit {
 			db.Mgo.SaveBulk(coll, datas...)
 			datas = []map[string]interface{}{}
 		}

+ 62 - 22
src/jy/extract/extractInit.go

@@ -15,7 +15,8 @@ import (
 	log "github.com/donnie4w/go-logger/logger"
 )
 
-type RegLuaInfo struct { //正则或脚本信息
+type RegLuaInfo struct {
+	//正则或脚本信息
 	Code, Name, Field string  //
 	RuleText          string  //
 	IsLua             bool    //
@@ -75,16 +76,17 @@ type ExtractTask struct {
 	IsFileField bool      //是否开启附件抽取
 	FileFields  *sync.Map //抽取附件属性组
 
-	ResultChanel chan bool                  //抽取结果详情
-	ResultArr    [][]map[string]interface{} //抽取结果详情
-	BidChanel    chan bool                  //抽取结果
-	BidArr       [][]map[string]interface{} //抽取结果
-	BidTotal     int                        //结果数量
+	ResultChanel chan bool //抽取结果详情
+	sync.RWMutex
+	ResultArr [][]map[string]interface{} //抽取结果详情
+	BidChanel chan bool                  //抽取结果
+	BidArr    [][]map[string]interface{} //抽取结果
+	BidTotal int                         //结果数量
 
 	RecogFieldMap map[string]map[string]interface{}   //识别字段
 	FidClassMap   map[string][]map[string]interface{} //分类
-	CidRuleMap    map[string][]map[string]interface{} //规则
-	AuditFields   []string                            //需要审核的字段名称
+	CidRuleMap map[string][]map[string]interface{}    //规则
+	AuditFields []string                              //需要审核的字段名称
 
 	ProvinceMap       map[string]string    //省全称简称(key:浙江省 val:浙江)
 	ProvinceBriefMap  map[string]*Province //省简称对应的省信息(key:浙江 val:&Province{})
@@ -128,6 +130,7 @@ type ClearLua struct {
 }
 
 type ClearTask struct {
+	sync.RWMutex
 	Id            string                 //任务id
 	Content       string                 //信息内容
 	ClearTaskInfo *ClearTaskInfo         //任务信息
@@ -553,7 +556,7 @@ func (e *ExtractTask) InitTag() {
 			}
 			sort.Sort(tab.Items)
 			//ju.TagdbTable[fname] = &tab
-			ju.TagdbTable.Store(fname,&tab)
+			ju.TagdbTable.Store(fname, &tab)
 		}
 	}
 	//正则标签库
@@ -571,7 +574,7 @@ func (e *ExtractTask) InitTag() {
 			}
 			sort.Sort(tab.Items)
 			//ju.TagdbTable[fname+"_reg"] = &tab
-			ju.TagdbTable.Store(fname+"_reg",&tab)
+			ju.TagdbTable.Store(fname+"_reg", &tab)
 		}
 	}
 }
@@ -806,15 +809,19 @@ func (e *ExtractTask) InitAreaCode() {
 //保存抽取详情数据
 func (e *ExtractTask) ResultSave(init bool) {
 	defer qu.Catch()
+	e.RWMutex.Lock()
 	if e.ResultArr == nil {
 		e.ResultArr = [][]map[string]interface{}{}
 	}
+	e.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(e.ResultArr) > 500 {
-					arr := e.ResultArr[:500]
-					e.ResultArr = e.ResultArr[500:]
+				e.RWMutex.Lock()
+				if len(e.ResultArr) > 100 {
+					arr := e.ResultArr[:100]
+					e.ResultArr = e.ResultArr[100:]
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						db.Mgo.UpSertBulk("extract_result", arr...)
 					}, func(err interface{}) {
@@ -823,21 +830,37 @@ func (e *ExtractTask) ResultSave(init bool) {
 				} else {
 					arr := e.ResultArr
 					e.ResultArr = [][]map[string]interface{}{}
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						db.Mgo.UpSertBulk("extract_result", arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
 				}
-				time.Sleep(10 * time.Second)
+
+				time.Sleep(3 * time.Second)
 			}
 		}()
 	} else {
+		e.RWMutex.Lock()
 		arr := e.ResultArr
 		e.ResultArr = [][]map[string]interface{}{}
+		e.RWMutex.Unlock()
 		qu.Try(func() {
-			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > 100 {
+					arr2 := arr[:100]
+					arr = arr[100:]
+					lenarr = len(arr)
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
+				} else {
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					break
+				}
+			}
 		}, func(err interface{}) {
+			defer e.RWMutex.Unlock()
 			log.Debug(err)
 		})
 	}
@@ -846,15 +869,19 @@ func (e *ExtractTask) ResultSave(init bool) {
 //保存抽取数据
 func (e *ExtractTask) BidSave(init bool) {
 	defer qu.Catch()
+	e.RWMutex.Lock()
 	if e.BidArr == nil {
 		e.BidArr = [][]map[string]interface{}{}
 	}
+	e.RWMutex.Unlock()
 	if init {
 		go func() {
 			for {
-				if len(e.BidArr) > 500 {
-					arr := e.BidArr[:500]
-					e.BidArr = e.BidArr[500:]
+				e.RWMutex.Lock()
+				if len(e.BidArr) > 100 {
+					arr := e.BidArr[:100]
+					e.BidArr = e.BidArr[100:]
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
 					}, func(err interface{}) {
@@ -863,21 +890,34 @@ func (e *ExtractTask) BidSave(init bool) {
 				} else {
 					arr := e.BidArr
 					e.BidArr = [][]map[string]interface{}{}
+					e.RWMutex.Unlock()
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
-
 				}
-				time.Sleep(10 * time.Second)
+				time.Sleep(3 * time.Second)
 			}
 		}()
 	} else {
+		e.RWMutex.Lock()
 		arr := e.BidArr
 		e.BidArr = [][]map[string]interface{}{}
+		e.RWMutex.Unlock()
 		qu.Try(func() {
-			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+			lenarr := len(arr)
+			for {
+				if lenarr > 100 {
+					arr2 := arr[:100]
+					arr = arr[100:]
+					lenarr = len(arr)
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
+				} else {
+					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					break
+				}
+			}
 		}, func(err interface{}) {
 			log.Debug(err)
 		})
@@ -953,7 +993,7 @@ func (e *ExtractTask) InitAuditRule() {
 func (e *ExtractTask) InitAuditFields() {
 	if len(e.AuditFields) == 0 {
 		v, _ := db.Mgo.FindOne("version", `{"isuse":true,"delete":false}`) //查找当前使用版本
-		if v != nil && len(*v) > 0 {                                       //查找当前使用版本中属性配置需要审核的字段
+		if v != nil && len(*v) > 0 { //查找当前使用版本中属性配置需要审核的字段
 			vid := qu.BsonIdToSId((*v)["_id"])
 			query := map[string]interface{}{
 				"isaudit": true,

+ 8 - 3
src/jy/extract/extractudp.go

@@ -56,6 +56,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 								"extstatus": "ok",
 							},
 						}, true, false)
+					//DeleteInstance("")
 					log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"]))
 				} else {
 					udpinfo, _ := rep["key"].(string)
@@ -211,10 +212,12 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					"pagecurrent": i + 1,
 				}}, true, false)
 		}
+		//DeleteInstance("")
 		log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
 	} else { //普通抽取
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 		count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+		fmt.Println("查询条件为:",query,"查询条数:",count)
 		pageNum := (count + PageSize - 1) / PageSize
 		limit := PageSize
 		if count < PageSize {
@@ -223,10 +226,11 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		wg := sync.WaitGroup{}
 		for i := 0; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
-			fmt.Printf("page=%d,query=%v", i+1, query)
+			fmt.Printf("page=%d,query=%v\n", i+1, query)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 			for _, v := range *list {
 				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+				log.Debug(index,qu.BsonIdToSId(v["_id"]),"//去除含敏感词数据")
 					continue
 				}
 				_id := qu.BsonIdToSId(v["_id"])
@@ -239,10 +243,11 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 				}
 				ext.TaskInfo.ProcessPool <- true
 				wg.Add(1)
-				go func() {
+				go func(wg *sync.WaitGroup,j, jf *ju.Job) {
 					defer wg.Done()
+					//log.Debug(index,j.SourceMid,)
 					ext.ExtractProcess(j, jf)
-				}()
+				}(&wg,j, jf)
 				index++
 				if index%1000 == 0 {
 					log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)