// extractudp package extract import ( "encoding/json" "fmt" db "jy/mongodbutil" ju "jy/util" mu "mfw/util" "net" qu "qfw/util" "sync" "time" log "github.com/donnie4w/go-logger/logger" "gopkg.in/mgo.v2/bson" ) var Udpclient mu.UdpClient //udp对象 var nextNodes []map[string]interface{} var IsExtStop bool //新增机器节点 func ExtractUdpUpdateMachine() { machine := *qu.ObjToMap(ju.Config["udpmachine"]) if len(machine) == 0 || machine == nil { return } skey := fmt.Sprintf("%s:%d:%s", machine["addr"], qu.IntAll(machine["port"]), machine["stype"]) machine["skey"] = skey db.Mgo.Update("extract_control_center", map[string]interface{}{"skey": skey}, machine, true, false) } //udp通知抽取 func ExtractUdp() { nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{})) Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024} Udpclient.Listen(processUdpMsg) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Debug(err) } else { stype, _ := rep["stype"].(string) if stype == "distributed" { //分布式抽取分支 go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra) InstanceId := qu.ObjToString(rep["InstanceId"]) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "running", }, }, true, false) ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"])) db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`, map[string]interface{}{ "$set": map[string]interface{}{ "extstatus": "ok", }, }, true, false) log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"])) } else if stype == "stop_extract" { IsExtStop = true } else if stype == "heart_extract" { skey, _ := rep["skey"].(string) Udpclient.WriteUdp([]byte(skey), mu.OP_NOOP, ra) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) if sid == "" || eid == "" { log.Debug("err", "sid=", sid, ",eid=", eid) } else { udpinfo, _ := rep["stype"].(string) if udpinfo == "" { udpinfo = "udpok" } IsExtStop = false //新版本控制抽取 ExtractByUdp(sid, eid, ra) if !IsExtStop { log.Debug("抽取完成udp通知抽取id段-控制台", udpinfo, sid, "~", eid) Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra) } else { log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid) } //适配重采抽取-发送udp-必须替换 //go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra) //发布数据~测试流程 //key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"]) //go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) // //log.Debug("udp通知抽取id段", sid, " ", eid) //ExtractByUdp(sid, eid, ra) //for _, m := range nextNodes { // by, _ := json.Marshal(map[string]interface{}{ // "gtid": sid, // "lteid": eid, // "stype": qu.ObjToString(m["stype"]), // }) // err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ // IP: net.ParseIP(m["addr"].(string)), // Port: qu.IntAll(m["port"]), // }) // if err != nil { // log.Debug(err) // } //} //log.Debug("udp通知抽取完成,eid=", eid) } } } case mu.OP_NOOP: //下个节点回应 log.Debug(string(data)) } } var ext *ExtractTask //根据id区间抽取-udp模式 func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) { defer qu.Catch() if ext == nil { ext = &ExtractTask{} ext.Id = qu.ObjToString(ju.Config["udptaskid"]) ext.InitTaskInfo() ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB) ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB) ext.InitSite() ext.InitRulePres() ext.InitRuleBacks(false) ext.InitRuleBacks(true) ext.InitRuleCore(false) ext.InitRuleCore(true) ext.InitBlockRule() ext.InitPkgCore() ext.InitTag(false) ext.InitTag(true) ext.InitClearFn(false) ext.InitClearFn(true) ext.Lock() //ext.IsExtractCity = false if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 //ext.InitCityDFA() ext.InitCityInfo() ext.InitAreaCode() ext.InitPostCode() } ext.Unlock() //质量审核 ext.InitAuditFields() ext.InitAuditRule() ext.InitAuditClass() ext.InitAuditRecogField() //品牌抽取是否开启 ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool) ext.ResultSave(true) ext.BidSave(true) ext.IsRun = true ext.InitFile() } else { ext.BidTotal = 0 //是否更新站点数据~~~ if ju.IsUpdateSite && ext.IsExtractCity { ext.InitUpdateSite() ju.IsUpdateSite = false } //更新规则~标签~~ if ju.IsUpdateRuleTag { ju.IsUpdateRuleTag = false ext.InitRulePres() ext.InitRuleBacks(false) ext.InitRuleBacks(true) ext.InitRuleCore(false) ext.InitRuleCore(true) ext.InitBlockRule() ext.InitPkgCore() ext.InitTag(false) ext.InitTag(true) ext.InitClearFn(false) ext.InitClearFn(true) ext.Lock() if ext.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 //ext.InitCityDFA() ext.InitCityInfo() ext.InitAreaCode() ext.InitPostCode() } ext.Unlock() } } index := 0 if len(instanceId) > 0 { //分布式抽取进度 go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra) for { tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`) if tsk != nil && !b { break } db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{ "$set": map[string]interface{}{ "InstanceId": instanceId[0], "state": 1, "runtime": time.Now().Format(qu.Date_Full_Layout), }, }) query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}} count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query) log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2) list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1) for _, v := range *list { //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据") // continue //} if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录 log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录") continue } //if qu.ObjToString(v["subtype"])!="中标" && // qu.ObjToString(v["subtype"])!="成交" && // qu.ObjToString(v["subtype"])!="合同" { // continue //} var j, jf *ju.Job var isSite bool if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) { v["isextFile"] = true j, jf, isSite = ext.PreInfo(v) } else { j, _, isSite = ext.PreInfo(v) } go ext.ExtractProcess(j, jf, isSite) index++ ext.TaskInfo.ProcessPool <- true } list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1) for _, v := range *list2 { //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据 // continue //} if spidercode[qu.ObjToString(v["spidercode"])] { log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录") continue } //if qu.ObjToString(v["subtype"])!="中标" && // qu.ObjToString(v["subtype"])!="成交" && // qu.ObjToString(v["subtype"])!="合同" { // continue //} var j, jf *ju.Job var isSite bool if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) { v["isextFile"] = true j, jf, isSite = ext.PreInfo(v) } else { j, _, isSite = ext.PreInfo(v) } go ext.ExtractProcess(j, jf, isSite) index++ ext.TaskInfo.ProcessPool <- true } db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{ "$set": map[string]interface{}{ "InstanceId": instanceId[0], "oktime": time.Now().Format(qu.Date_Full_Layout), "state": 1, }, }) db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, map[string]interface{}{ "$inc": map[string]interface{}{ "totalnum": count1 + count2, "step": 1, }, }, true, false) } log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal) } else { //普通抽取 query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}} count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) log.Debug("查询条件为:", query, "查询条数:", count) pageNum := (count + PageSize - 1) / PageSize limit := PageSize if count < PageSize { limit = count } wg := sync.WaitGroup{} for i := 0; i < pageNum; i++ { query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}} fmt.Printf("page=%d,query=%v\n", i+1, query) list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit) for _, v := range *list { if IsExtStop { break } if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录 log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录") continue } _id := qu.BsonIdToSId(v["_id"]) var j, jf *ju.Job var isSite bool if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) { v["isextFile"] = true j, jf, isSite = ext.PreInfo(v) } else { j, _, isSite = ext.PreInfo(v) } ext.TaskInfo.ProcessPool <- true wg.Add(1) go func(wg *sync.WaitGroup, j, jf *ju.Job) { defer wg.Done() //log.Debug(index,j.SourceMid,) ext.ExtractProcess(j, jf, isSite) }(&wg, j, jf) index++ if index%1000 == 0 { log.Debug("index:", index, ",页码:", i+1, ",_id:", _id) } sid = _id if sid >= eid { break } } } wg.Wait() ext.BidSave(false) log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid) } } //中标预测信息抽取,ossid为附件识别后的id var exF *ExtractTask func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} { defer qu.Catch() if exF == nil { exF = &ExtractTask{} exF.Id = qu.ObjToString(ju.Config["udptaskid"]) exF.InitTaskInfo() exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB) exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB) exF.InitSite() exF.InitRulePres() exF.InitRuleBacks(false) exF.InitRuleBacks(true) exF.InitRuleCore(false) exF.InitRuleCore(true) exF.InitBlockRule() exF.InitPkgCore() exF.InitTag(false) exF.InitTag(true) exF.InitClearFn(false) exF.InitClearFn(true) if exF.IsExtractCity { //版本上控制是否开始城市抽取 //初始化城市DFA信息 //exF.InitCityDFA() exF.InitCityInfo() exF.InitAreaCode() exF.InitPostCode() } //质量审核 exF.InitAuditFields() exF.InitAuditRule() exF.InitAuditClass() exF.InitAuditRecogField() //品牌抽取是否开启 ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool) exF.ResultSave(true) exF.BidSave(true) exF.IsRun = true exF.InitFile() } tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil) if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) { (*tmp)["isextFile"] = true } exF.TaskInfo.ProcessPool <- true j, jf, _ := exF.PreInfo(*tmp) wg := sync.WaitGroup{} wg.Add(1) go func(wg *sync.WaitGroup, j, jf *ju.Job) { defer wg.Done() exF.ExtractProcess(j, jf, false) }(&wg, j, jf) wg.Wait() exF.BidSave(false) return nil }