Răsfoiți Sursa

日志调整、抽取线程调整

zhangjinkun 6 ani în urmă
părinte
comite
85d29936d8

+ 26 - 26
src/jy/extract/extract.go

@@ -8,7 +8,6 @@ import (
 	db "jy/mongodbutil"
 	"jy/pretreated"
 	ju "jy/util"
-	"log"
 	qu "qfw/util"
 	"qfw/util/redis"
 	"reflect"
@@ -18,6 +17,7 @@ import (
 	"time"
 	"unicode/utf8"
 
+	log "github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
@@ -134,8 +134,8 @@ func StartExtractTaskId(taskId string) bool {
 	ext.InitFile()
 
 	ext.IsRun = true
-	go ext.ResultSave()
-	go ext.BidSave()
+	go ext.ResultSave(true)
+	go ext.BidSave(true)
 	if isgo {
 		go RunExtractTask(taskId)
 	}
@@ -167,17 +167,17 @@ func RunExtractTask(taskId string) {
 	if count < PageSize {
 		limit = count
 	}
-	log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
+	fmt.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
 	for i := 0; i < pageNum; i++ {
 		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
-		log.Printf("page=%d,query=%v", i+1, query)
+		fmt.Printf("page=%d,query=%v", i+1, query)
 		list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 		for _, v := range *list {
 			if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
 				continue
 			}
 			_id := qu.BsonIdToSId(v["_id"])
-			log.Println(_id)
+			log.Debug(_id)
 			if !ext.IsRun {
 				break
 			}
@@ -266,7 +266,7 @@ func PreInfo(doc map[string]interface{}) (j, jf *ju.Job) {
 			pretreated.AnalyStart(jf)
 		}
 	}, func(err interface{}) {
-		log.Println("pretreated.AnalyStart", err)
+		log.Debug("pretreated.AnalyStart", err)
 	})
 	return j, jf
 }
@@ -332,13 +332,13 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job) {
 			for _, v := range vc.RulePres {
 				tmp = ExtRegPre(tmp, j, v, e.TaskInfo)
 			}
-			//log.Println("抽取-前置规则", tmp)
+			// log.Debug("抽取-前置规则", tmp)
 
 			//抽取-规则
 			for _, v := range vc.RuleCores {
 				ExtRegCore(vc.ExtFrom, tmp, j, v, e)
 			}
-			//log.Println("抽取-规则", tmp)
+			// log.Debug("抽取-规则", tmp)
 
 			//项目名称未能抽取到,标题来凑
 			if vc.Field == "projectname" {
@@ -351,7 +351,7 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job) {
 			for _, v := range vc.RuleBacks {
 				ExtRegBack(j, v, e.TaskInfo)
 			}
-			//log.Println("抽取-后置规则", tmp)
+			// log.Debug("抽取-后置规则", tmp)
 		}
 
 		//全局后置规则
@@ -403,9 +403,9 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job) {
 		}
 		PackageDetail(j, e) //处理分包信息
 		//		bs, _ := json.Marshal(j.Result)
-		//		log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
+		//		 log.Debug("抽取结果", j.Title, j.SourceMid, string(bs))
 	}, func(err interface{}) {
-		log.Println("ExtractProcess err", err)
+		log.Debug("ExtractProcess err", err)
 	})
 }
 func (e *ExtractTask) ExtractFile(j *ju.Job) {
@@ -430,7 +430,7 @@ func (e *ExtractTask) ExtractFile(j *ju.Job) {
 					tmp = ExtRegPre(tmp, j, v, e.TaskInfo)
 				}
 			}
-			//log.Println("抽取-前置规则", tmp)
+			// log.Debug("抽取-前置规则", tmp)
 
 			//抽取-规则
 			for _, v := range vc.RuleCores {
@@ -438,7 +438,7 @@ func (e *ExtractTask) ExtractFile(j *ju.Job) {
 					ExtRegCore(vc.ExtFrom, tmp, j, v, e)
 				}
 			}
-			//log.Println("抽取-规则", tmp)
+			// log.Debug("抽取-规则", tmp)
 
 			//抽取-后置规则
 			for _, v := range vc.RuleBacks {
@@ -446,7 +446,7 @@ func (e *ExtractTask) ExtractFile(j *ju.Job) {
 					ExtRegBack(j, v, e.TaskInfo)
 				}
 			}
-			//log.Println("抽取-后置规则", tmp)
+			// log.Debug("抽取-后置规则", tmp)
 		}
 
 		//全局后置规则
@@ -501,9 +501,9 @@ func (e *ExtractTask) ExtractFile(j *ju.Job) {
 
 		PackageDetail(j, e) //处理分包信息
 		//		bs, _ := json.Marshal(j.Result)
-		//		log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
+		//		 log.Debug("抽取结果", j.Title, j.SourceMid, string(bs))
 	}, func(err interface{}) {
-		log.Println("ExtractProcess err", err)
+		log.Debug("ExtractProcess err", err)
 	})
 }
 
@@ -591,8 +591,8 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
 			if bl.ColonKV != nil {
 				kvs := bl.ColonKV.Kvs
 				kvs2 := bl.ColonKV.Kvs_2
-				//log.Println("ColonKV1", kvs)
-				//log.Println("ColonKV2", kvs2)
+				// log.Debug("ColonKV1", kvs)
+				// log.Debug("ColonKV2", kvs2)
 				for _, tag := range tags {
 					for _, kv := range kvs {
 						if tag.Type == "string" {
@@ -669,7 +669,7 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
 			//空格kv
 			if bl.SpaceKV != nil {
 				kvs := bl.SpaceKV.Kvs
-				//log.Println("SpaceKV", kvs)
+				// log.Debug("SpaceKV", kvs)
 				for _, tag := range tags {
 					for _, kv := range kvs {
 						if tag.Type == "string" {
@@ -711,7 +711,7 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
 			//表格kv
 			if bl.TableKV != nil {
 				tkv := bl.TableKV
-				//log.Println("tkv", tkv)
+				// log.Debug("tkv", tkv)
 				for k, v := range tkv.Kv {
 					if k == fieldname {
 						if len(tags) > -tkv.KvIndex[fieldname] {
@@ -731,7 +731,7 @@ func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]
 								"matchtype": "tag_string",
 							})
 						} else { //涉及其他待处理
-							//log.Println(tags)
+							// log.Debug(tags)
 						}
 					}
 				}
@@ -1031,7 +1031,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		}
 		if e.IsExtractCity { //城市抽取
 			b, p, c, d := e.TransmitData(tmp, _id) //抽取省份城市
-			//log.Println("省份---", p, "城市---", c, "区---", d)
+			// log.Debug("省份---", p, "城市---", c, "区---", d)
 			tmp["district"] = d
 			if b {
 				tmp["city"] = c
@@ -1049,7 +1049,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			if len(j.BrandData) > 0 {
 				tmp["tablebrand"] = j.BrandData
 			}
-			//log.Println("============", j.HasBrand, j.HasGoods, j.HasKey, j.HasTable, j.BrandData)
+			// log.Debug("============", j.HasBrand, j.HasGoods, j.HasKey, j.HasTable, j.BrandData)
 		}
 		if e.TaskInfo.TestColl == "" {
 			if len(tmp) > 0 { //保存抽取结果
@@ -1090,11 +1090,11 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			tmp["resultf"] = resultf
 			b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 			if !b {
-				log.Println(e.TaskInfo.TestColl, _id)
+				log.Debug(e.TaskInfo.TestColl, _id)
 			}
 		}
 	}, func(err interface{}) {
-		log.Println("AnalysisSaveResult err", err)
+		log.Debug("AnalysisSaveResult err", err)
 	})
 }
 

+ 93 - 91
src/jy/extract/extractInit.go

@@ -4,13 +4,14 @@ package extract
 import (
 	db "jy/mongodbutil"
 	ju "jy/util"
-	"log"
 	qu "qfw/util"
 	"regexp"
 	"sort"
 	"strconv"
 	"strings"
 	"time"
+
+	log "github.com/donnie4w/go-logger/logger"
 )
 
 type RegLuaInfo struct { //正则或脚本信息
@@ -66,8 +67,8 @@ type ExtractTask struct {
 	IsExtractCity bool                //是否开启城市抽取
 	Fields        map[string]int      //抽取属性组
 
-	IsFileField       bool      //是否开启附件抽取
-	FileFields        map[string]int      //抽取附件属性组
+	IsFileField bool           //是否开启附件抽取
+	FileFields  map[string]int //抽取附件属性组
 
 	ResultChanel chan bool                  //抽取结果详情
 	ResultArr    [][]map[string]interface{} //抽取结果详情
@@ -155,11 +156,11 @@ func (e *ExtractTask) InitTestTaskInfo(resultcoll, trackcoll string) {
 //加载任务信息
 func (e *ExtractTask) InitTaskInfo() {
 	task, _ := db.Mgo.FindById("task", e.Id, nil)
-	log.Println("task", task)
+	log.Debug("task", task)
 	if len(*task) > 1 {
 		v, _ := db.Mgo.FindOne("version", `{"version":"`+(*task)["s_version"].(string)+`","delete":false}`)
 		strs := strings.Split((*task)["s_mgosavecoll"].(string), "/")
-		log.Println("s_mgosavecoll", strs)
+		log.Debug("s_mgosavecoll", strs)
 		if len(strs) < 3 {
 			return
 		} else {
@@ -182,7 +183,7 @@ func (e *ExtractTask) InitTaskInfo() {
 				e.IsExtractCity = (*v)["isextractcity"].(bool)
 			}
 		}
-		log.Println(e.TaskInfo.Name, "thread:", qu.IntAllDef((*task)["i_process"], 1))
+		log.Debug(e.TaskInfo.Name, "thread:", qu.IntAllDef((*task)["i_process"], 1))
 	} else {
 		return
 	}
@@ -219,7 +220,7 @@ func (e *ExtractTask) InitRulePres() {
 				}
 				e.RulePres = append(e.RulePres, rinfo)
 			}, func(err interface{}) {
-				log.Println(rinfo.Code, rinfo.Field, err)
+				log.Debug(rinfo.Code, rinfo.Field, err)
 			})
 		}
 	}
@@ -256,7 +257,7 @@ func (e *ExtractTask) InitRuleBacks() {
 				}
 				e.RuleBacks = append(e.RuleBacks, rinfo)
 			}, func(err interface{}) {
-				log.Println(rinfo.Code, rinfo.Field, err)
+				log.Debug(rinfo.Code, rinfo.Field, err)
 			})
 		}
 	}
@@ -313,7 +314,7 @@ func (e *ExtractTask) InitRuleCore() {
 						}
 						rulePres = append(rulePres, rinfo)
 					}, func(err interface{}) {
-						log.Println(rinfo.Code, rinfo.Field, err)
+						log.Debug(rinfo.Code, rinfo.Field, err)
 					})
 				}
 			}
@@ -349,7 +350,7 @@ func (e *ExtractTask) InitRuleCore() {
 						}
 						ruleBacks = append(ruleBacks, rinfo)
 					}, func(err interface{}) {
-						log.Println(rinfo.Code, rinfo.Field, err)
+						log.Debug(rinfo.Code, rinfo.Field, err)
 					})
 				}
 			}
@@ -402,7 +403,7 @@ func (e *ExtractTask) InitRuleCore() {
 						}
 						ruleCores = append(ruleCores, rinfo)
 					}, func(err interface{}) {
-						log.Println(rinfo.Code, rinfo.Field, err)
+						log.Debug(rinfo.Code, rinfo.Field, err)
 					})
 				}
 			}
@@ -463,7 +464,7 @@ func (e *ExtractTask) InitPkgCore() {
 						}
 						ruleBacks = append(ruleBacks, rinfo)
 					}, func(err interface{}) {
-						log.Println(rinfo.Code, rinfo.Field, err)
+						log.Debug(rinfo.Code, rinfo.Field, err)
 					})
 				}
 			}
@@ -745,82 +746,83 @@ func (e *ExtractTask) InitDFA() {
 }
 
 //保存抽取详情数据
-func (e *ExtractTask) ResultSave() {
+func (e *ExtractTask) ResultSave(init bool) {
 	defer qu.Catch()
-	e.ResultChanel = make(chan bool, 5)
-	e.ResultArr = [][]map[string]interface{}{}
-	for {
-		if len(e.ResultArr) > 500 {
-			e.ResultChanel <- true
-			arr := e.ResultArr[:500]
-			go func(tmp *[][]map[string]interface{}) {
-				qu.Try(func() {
-					db.Mgo.UpSertBulk("extract_result", *tmp...)
-					<-e.ResultChanel
-				}, func(err interface{}) {
-					log.Println(err)
-					<-e.ResultChanel
-				})
-			}(&arr)
-			e.ResultArr = e.ResultArr[500:]
-		} else {
-			e.ResultChanel <- true
-			arr := e.ResultArr
-			go func(tmp *[][]map[string]interface{}) {
-				qu.Try(func() {
-					db.Mgo.UpSertBulk("extract_result", *tmp...)
-					<-e.ResultChanel
-				}, func(err interface{}) {
-					log.Println(err)
-					<-e.ResultChanel
-				})
-			}(&arr)
-			e.ResultArr = [][]map[string]interface{}{}
-			time.Sleep(10 * time.Second)
-		}
-		if !e.IsRun {
-			break
-		}
+	if e.ResultArr == nil {
+		e.ResultArr = [][]map[string]interface{}{}
+	}
+	if init {
+		go func() {
+			for {
+				if len(e.ResultArr) > 500 {
+					arr := e.ResultArr[:500]
+					qu.Try(func() {
+						db.Mgo.UpSertBulk("extract_result", arr...)
+					}, func(err interface{}) {
+						log.Debug(err)
+					})
+					e.ResultArr = e.ResultArr[500:]
+				} else {
+					arr := e.ResultArr
+					qu.Try(func() {
+						db.Mgo.UpSertBulk("extract_result", arr...)
+					}, func(err interface{}) {
+						log.Debug(err)
+					})
+					e.ResultArr = [][]map[string]interface{}{}
+				}
+				time.Sleep(10 * time.Second)
+			}
+		}()
+	} else {
+		arr := e.ResultArr
+		qu.Try(func() {
+			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+		}, func(err interface{}) {
+			log.Debug(err)
+		})
+		e.ResultArr = [][]map[string]interface{}{}
 	}
 }
 
 //保存抽取数据
-func (e *ExtractTask) BidSave() {
+func (e *ExtractTask) BidSave(init bool) {
 	defer qu.Catch()
-	e.BidChanel = make(chan bool, 5)
-	e.BidArr = [][]map[string]interface{}{}
-	for {
-		if len(e.BidArr) > 500 {
-			e.BidChanel <- true
-			arr := e.BidArr[:500]
-			go func(tmp *[][]map[string]interface{}) {
-				qu.Try(func() {
-					db.Mgo.UpSertBulk(e.TaskInfo.ToColl, *tmp...)
-					<-e.BidChanel
-				}, func(err interface{}) {
-					log.Println(err)
-					<-e.BidChanel
-				})
-			}(&arr)
-			e.BidArr = e.BidArr[500:]
-		} else {
-			e.BidChanel <- true
-			arr := e.BidArr
-			go func(tmp *[][]map[string]interface{}) {
-				qu.Try(func() {
-					db.Mgo.UpSertBulk(e.TaskInfo.ToColl, *tmp...)
-					<-e.BidChanel
-				}, func(err interface{}) {
-					log.Println(err)
-					<-e.BidChanel
-				})
-			}(&arr)
-			e.BidArr = [][]map[string]interface{}{}
-		}
-		if !e.IsRun {
-			break
-		}
-		time.Sleep(10 * time.Second)
+	if e.BidArr == nil {
+		e.BidArr = [][]map[string]interface{}{}
+	}
+	if init {
+		go func() {
+			for {
+				if len(e.BidArr) > 500 {
+					arr := e.BidArr[:500]
+					qu.Try(func() {
+						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					}, func(err interface{}) {
+						log.Debug(err)
+					})
+					e.BidArr = e.BidArr[500:]
+				} else {
+					arr := e.BidArr
+					qu.Try(func() {
+						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+					}, func(err interface{}) {
+						log.Debug(err)
+					})
+					e.BidArr = [][]map[string]interface{}{}
+				}
+				time.Sleep(10 * time.Second)
+			}
+		}()
+	} else {
+		arr := e.BidArr
+		qu.Try(func() {
+			e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
+		}, func(err interface{}) {
+			log.Debug(err)
+		})
+		e.BidArr = [][]map[string]interface{}{}
+		time.Sleep(1 * time.Second)
 	}
 }
 
@@ -867,7 +869,7 @@ func (e *ExtractTask) InitAuditRule() {
 				ru = string(rs[1 : len(rs)-1])
 				rureg, err = regexp.Compile(ru)
 				if err != nil {
-					log.Println("error---rule:", r)
+					log.Debug("error---rule:", r)
 					continue
 				}
 				i_rule = append(i_rule, []interface{}{rureg}...)
@@ -914,16 +916,16 @@ func (e *ExtractTask) InitFile() {
 	//query:=bson.M{"version":e.TaskInfo.Version,"delete":false}
 	ve, _ := db.Mgo.FindOne("version", `{"version":"`+e.TaskInfo.Version+`","delete":false}`)
 	//ve, _ := db.Mgo.FindOne("version", query)
-	if ve == nil{
+	if ve == nil {
 		return
 	}
-	if (*ve)["isfiles"]!=nil && (*ve)["isfiles"].(bool){
-		e.IsFileField =true
+	if (*ve)["isfiles"] != nil && (*ve)["isfiles"].(bool) {
+		e.IsFileField = true
 	}
-	efiled := make(map[string]int,0)
-	if (*ve)["s_filefileds"] != nil{
-		for _,vff :=range (*ve)["s_filefileds"].([]interface{}) {
-			efiled[vff.(string)]=1
+	efiled := make(map[string]int, 0)
+	if (*ve)["s_filefileds"] != nil {
+		for _, vff := range (*ve)["s_filefileds"].([]interface{}) {
+			efiled[vff.(string)] = 1
 		}
 	}
 	e.FileFields = efiled
@@ -944,7 +946,7 @@ func (c *ClearTask) InitClearTaskInfo() {
 			IsCltLog:    ju.Config["iscltlog"].(bool),
 			ProcessPool: make(chan bool, qu.IntAllDef((*cleartask)["i_process"], 1)),
 		}
-		log.Println(c.ClearTaskInfo.Name, "thread:", qu.IntAllDef((*cleartask)["i_process"], 1))
+		log.Debug(c.ClearTaskInfo.Name, "thread:", qu.IntAllDef((*cleartask)["i_process"], 1))
 	} else {
 		return
 	}

+ 89 - 72
src/jy/extract/extractudp.go

@@ -3,13 +3,15 @@ package extract
 
 import (
 	"encoding/json"
+	"fmt"
 	db "jy/mongodbutil"
 	ju "jy/util"
-	"log"
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
+	"sync"
 
+	log "github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
@@ -29,44 +31,49 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		var rep map[string]interface{}
 		err := json.Unmarshal(data, &rep)
 		if err != nil {
-			log.Println(err)
+			log.Debug(err)
 		} else {
 			sid, _ := rep["gtid"].(string)
 			eid, _ := rep["lteid"].(string)
 			stype, _ := rep["stype"].(string)
-			if stype == "distributed" { //分布式抽取分支
-				log.Println("分布式抽取id段", sid, eid)
-				InstanceId := qu.ObjToString(rep["InstanceId"])
-				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"extstatus": "running",
-						},
-					}, true, false)
-				ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
-				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"extstatus": "ok",
-						},
-					}, true, false)
-				log.Println("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"]))
+			if sid == "" || eid == "" {
+				log.Debug("err", "sid=", sid, "eid=", eid)
 			} else {
-				log.Println("udp通知抽取id段", sid, eid)
-				ExtractByUdp(sid, eid)
-				log.Println("udp通知抽取完成,eid=", eid)
-				for _, m := range nextNodes {
-					by, _ := json.Marshal(map[string]interface{}{
-						"gtid":  sid,
-						"lteid": eid,
-						"stype": qu.ObjToString(m["stype"]),
-					})
-					err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-						IP:   net.ParseIP(m["addr"].(string)),
-						Port: qu.IntAll(m["port"]),
-					})
-					if err != nil {
-						log.Println(err)
+				go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+				if stype == "distributed" { //分布式抽取分支
+					log.Debug("分布式抽取id段", sid, eid)
+					InstanceId := qu.ObjToString(rep["InstanceId"])
+					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"extstatus": "running",
+							},
+						}, true, false)
+					ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
+					db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"extstatus": "ok",
+							},
+						}, true, false)
+					log.Debug("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"]))
+				} else {
+					log.Debug("udp通知抽取id段", sid, eid)
+					ExtractByUdp(sid, eid)
+					log.Debug("udp通知抽取完成,eid=", eid)
+					for _, m := range nextNodes {
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  sid,
+							"lteid": eid,
+							"stype": qu.ObjToString(m["stype"]),
+						})
+						err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP(m["addr"].(string)),
+							Port: qu.IntAll(m["port"]),
+						})
+						if err != nil {
+							log.Debug(err)
+						}
 					}
 				}
 			}
@@ -75,44 +82,46 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		var rep map[string]interface{}
 		err := json.Unmarshal(data, &rep)
 		if err != nil {
-			log.Println(err)
+			log.Debug(err)
 		} else {
-			log.Println(rep)
+			log.Debug(rep)
 		}
 	}
 }
 
+var ext *ExtractTask
+
 //根据id区间抽取
 func ExtractByUdp(sid, eid string, instanceId ...string) {
 	defer qu.Catch()
-	ext := &ExtractTask{}
-	ext.Id = qu.ObjToString(ju.Config["udptaskid"])
-	ext.InitTaskInfo()
-	ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
-	ext.InitRulePres()
-	ext.InitRuleBacks()
-	ext.InitRuleCore()
-	ext.InitTag()
-	ext.InitClearFn()
-	if ext.IsExtractCity { //版本上控制是否开始城市抽取
-		//初始化城市DFA信息
-		ext.InitDFA()
-	}
-	//质量审核
-	ext.InitAuditFields()
-	ext.InitAuditRule()
-	ext.InitAuditClass()
-	ext.InitAuditRecogField()
-
-	//品牌抽取是否开启
-	ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
-	//附件抽取是否开启
-	ext.InitFile()
+	if ext == nil {
+		ext = &ExtractTask{}
+		ext.Id = qu.ObjToString(ju.Config["udptaskid"])
+		ext.InitTaskInfo()
+		ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+		ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
+		ext.InitRulePres()
+		ext.InitRuleBacks()
+		ext.InitRuleCore()
+		ext.InitTag()
+		ext.InitClearFn()
+		if ext.IsExtractCity { //版本上控制是否开始城市抽取
+			//初始化城市DFA信息
+			ext.InitDFA()
+		}
+		//质量审核
+		ext.InitAuditFields()
+		ext.InitAuditRule()
+		ext.InitAuditClass()
+		ext.InitAuditRecogField()
 
-	go ext.ResultSave()
-	go ext.BidSave()
-	ext.IsRun = true
+		//品牌抽取是否开启
+		ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
 
+		ext.ResultSave(true)
+		ext.BidSave(true)
+		ext.IsRun = true
+	}
 	if len(instanceId) > 0 { //分布式抽取进度
 		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 		count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
@@ -123,7 +132,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		if count < PageSize {
 			limit = count
 		}
-		log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
+		fmt.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
 
 		startI := 0 //接着上次任务执行
 		sidback := sid
@@ -138,7 +147,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 
 		for i := startI; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-			log.Printf("page=%d,query=%v", i+1, query)
+			fmt.Printf("page=%d,query=%v", i+1, query)
 			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
 				list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
 				for _, v := range *list {
@@ -146,7 +155,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Println(_id)
+					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -164,7 +173,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					}}, true, false)
 			}
 			queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
-			log.Printf("page=%d,queryback=%v", i+1, queryback)
+			fmt.Printf("page=%d,queryback=%v", i+1, queryback)
 			if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
 				list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
 				for _, v := range *list2 {
@@ -172,7 +181,7 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 						continue
 					}
 					_id := qu.BsonIdToSId(v["_id"])
-					log.Println(_id)
+					log.Debug(_id)
 					var j, jf *ju.Job
 					if ext.IsFileField && v["projectinfo"] != nil {
 						v["isextFile"] = true
@@ -203,16 +212,16 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		if count < PageSize {
 			limit = count
 		}
+		wg := sync.WaitGroup{}
 		for i := 0; i < pageNum; i++ {
 			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
-			log.Printf("page=%d,query=%v", i+1, query)
+			fmt.Printf("page=%d,query=%v", i+1, query)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
-			for _, v := range *list {
+			for k, v := range *list {
 				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
 					continue
 				}
 				_id := qu.BsonIdToSId(v["_id"])
-				log.Println(_id)
 				var j, jf *ju.Job
 				if ext.IsFileField && v["projectinfo"] != nil {
 					v["isextFile"] = true
@@ -221,10 +230,18 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 					j, _ = PreInfo(v)
 				}
 				ext.TaskInfo.ProcessPool <- true
-				go ext.ExtractProcess(j, jf)
+				wg.Add(1)
+				go func() {
+					defer wg.Done()
+					ext.ExtractProcess(j, jf)
+				}()
+				if k%1000 == 0 {
+					log.Debug(i, k, _id)
+				}
 				sid = _id
 			}
-
 		}
+		wg.Wait()
+		ext.BidSave(false)
 	}
 }

+ 9 - 0
src/jy/util/logger.go

@@ -0,0 +1,9 @@
+package util
+
+import (
+	"github.com/donnie4w/go-logger/logger"
+)
+
+func Debug(v ...interface{}) {
+	logger.Debug(v)
+}

+ 0 - 19
src/jy/util/util.go

@@ -2,14 +2,10 @@ package util
 
 import (
 	"fmt"
-	"io"
 	. "jy/mongodbutil"
-	"log"
-	"os"
 	qu "qfw/util"
 
 	. "gopkg.in/mgo.v2/bson"
-	"gopkg.in/natefinch/lumberjack.v2"
 )
 
 //敏感词
@@ -30,21 +26,6 @@ var BrandGet *DFA     //品牌
 var IsBrandGoods bool //是否开启品牌抽取
 
 func init() {
-	//输出日志配置,多输出源
-	filelog := &lumberjack.Logger{
-		Filename:   "./out.log",
-		MaxSize:    500, // megabytes
-		MaxBackups: 3,
-		MaxAge:     20,   //days
-		Compress:   true, // disabled by default
-	}
-	writers := []io.Writer{
-		filelog,
-		os.Stdout,
-	}
-	fileAndStdoutWriter := io.MultiWriter(writers...)
-	log.SetOutput(fileAndStdoutWriter)
-
 	syncint = make(chan bool, 1)
 }
 

+ 0 - 3
src/jy/util/util2.go

@@ -256,6 +256,3 @@ func FirstKeyValueInMap(m interface{}) (string, interface{}) {
 	}
 	return "", nil
 }
-func Debug(v ...interface{}) {
-
-}

+ 5 - 0
src/main.go

@@ -13,9 +13,14 @@ import (
 	qu "qfw/util"
 	"qfw/util/elastic"
 	redis "qfw/util/redis"
+
+	"github.com/donnie4w/go-logger/logger"
 )
 
 func init() {
+	logger.SetConsole(false)
+	logger.SetLevel(logger.DEBUG)
+	logger.SetRollingDaily("./", "out.log")
 	qu.ReadConfig(&util.Config)
 	qu.ReadConfig("./res/brandrule.json", &util.BrandRules)
 	qu.ReadConfig("./res/goods.json", &util.GoodsConfig)