123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- // extractudp
- package extract
- import (
- "encoding/json"
- "fmt"
- db "jy/mongodbutil"
- ju "jy/util"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "sync"
- "time"
- log "github.com/donnie4w/go-logger/logger"
- "gopkg.in/mgo.v2/bson"
- )
- var Udpclient mu.UdpClient //udp对象
- var nextNodes []map[string]interface{}
- var IsExtStop bool
- //新增机器节点
- func ExtractUdpUpdateMachine() {
- machine := *qu.ObjToMap(ju.Config["udpmachine"])
- if len(machine) == 0 || machine == nil {
- return
- }
- skey := fmt.Sprintf("%s:%d:%s", machine["addr"], qu.IntAll(machine["port"]), machine["stype"])
- machine["skey"] = skey
- db.Mgo.Update("extract_control_center", map[string]interface{}{"skey": skey}, machine, true, false)
- }
- //udp通知抽取
- func ExtractUdp() {
- nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
- Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
- Udpclient.Listen(processUdpMsg)
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Debug(err)
- } else {
- stype, _ := rep["stype"].(string)
- if stype == "distributed" { //分布式抽取分支
- go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
- InstanceId := qu.ObjToString(rep["InstanceId"])
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "running",
- },
- }, true, false)
- ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "extstatus": "ok",
- },
- }, true, false)
- log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
- } else if stype == "stop_extract" {
- IsExtStop = true
- } else if stype == "heart_extract" {
- skey, _ := rep["skey"].(string)
- Udpclient.WriteUdp([]byte(skey), mu.OP_NOOP, ra)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- if sid == "" || eid == "" {
- log.Debug("err", "sid=", sid, ",eid=", eid)
- } else {
- udpinfo, _ := rep["stype"].(string)
- if udpinfo == "" {
- udpinfo = "udpok"
- }
- IsExtStop = false
- //新版本控制抽取
- ExtractByUdp(sid, eid, ra)
- if !IsExtStop {
- log.Debug("抽取完成udp通知抽取id段-控制台", udpinfo, sid, "~", eid)
- Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
- } else {
- log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid)
- }
- //适配重采抽取-发送udp-必须替换
- //go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
- //发布数据~测试流程
- //key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
- //go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //
- //log.Debug("udp通知抽取id段", sid, " ", eid)
- //ExtractByUdp(sid, eid, ra)
- //for _, m := range nextNodes {
- // by, _ := json.Marshal(map[string]interface{}{
- // "gtid": sid,
- // "lteid": eid,
- // "stype": qu.ObjToString(m["stype"]),
- // })
- // err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- // IP: net.ParseIP(m["addr"].(string)),
- // Port: qu.IntAll(m["port"]),
- // })
- // if err != nil {
- // log.Debug(err)
- // }
- //}
- //log.Debug("udp通知抽取完成,eid=", eid)
- }
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Debug(string(data))
- }
- }
- var ext *ExtractTask
- //根据id区间抽取-udp模式
- func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
- defer qu.Catch()
- if ext == nil {
- ext = &ExtractTask{}
- ext.Id = qu.ObjToString(ju.Config["udptaskid"])
- ext.InitTaskInfo()
- ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
- ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
- ext.InitSite()
- ext.InitRulePres()
- ext.InitRuleBacks(false)
- ext.InitRuleBacks(true)
- ext.InitRuleCore(false)
- ext.InitRuleCore(true)
- ext.InitBlockRule()
- ext.InitPkgCore()
- ext.InitTag(false)
- ext.InitTag(true)
- ext.InitClearFn(false)
- ext.InitClearFn(true)
- ext.Lock()
- //ext.IsExtractCity = false
- if ext.IsExtractCity { //版本上控制是否开始城市抽取
- //初始化城市DFA信息
- //ext.InitCityDFA()
- ext.InitCityInfo()
- ext.InitAreaCode()
- ext.InitPostCode()
- }
- ext.Unlock()
- //质量审核
- ext.InitAuditFields()
- ext.InitAuditRule()
- ext.InitAuditClass()
- ext.InitAuditRecogField()
- //品牌抽取是否开启
- ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
- ext.ResultSave(true)
- ext.BidSave(true)
- ext.IsRun = true
- ext.InitFile()
- } else {
- ext.BidTotal = 0 //是否更新站点数据~~~
- if ju.IsUpdateSite && ext.IsExtractCity {
- ext.InitUpdateSite()
- ju.IsUpdateSite = false
- }
- //更新规则~标签~~
- if ju.IsUpdateRuleTag {
- ju.IsUpdateRuleTag = false
- ext.InitRulePres()
- ext.InitRuleBacks(false)
- ext.InitRuleBacks(true)
- ext.InitRuleCore(false)
- ext.InitRuleCore(true)
- ext.InitBlockRule()
- ext.InitPkgCore()
- ext.InitTag(false)
- ext.InitTag(true)
- ext.InitClearFn(false)
- ext.InitClearFn(true)
- ext.Lock()
- if ext.IsExtractCity { //版本上控制是否开始城市抽取
- //初始化城市DFA信息
- //ext.InitCityDFA()
- ext.InitCityInfo()
- ext.InitAreaCode()
- ext.InitPostCode()
- }
- ext.Unlock()
- }
- }
- index := 0
- if len(instanceId) > 0 { //分布式抽取进度
- go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
- for {
- tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
- if tsk != nil && !b {
- break
- }
- db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
- "$set": map[string]interface{}{
- "InstanceId": instanceId[0],
- "state": 1,
- "runtime": time.Now().Format(qu.Date_Full_Layout),
- },
- })
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
- count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
- count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
- log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
- list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
- for _, v := range *list {
- if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
- continue
- }
- var j, jf *ju.Job
- var isSite bool
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
- v["isextFile"] = true
- j, jf, isSite = ext.PreInfo(v)
- } else {
- j, _, isSite = ext.PreInfo(v)
- }
- go ext.ExtractProcess(j, jf, isSite)
- index++
- ext.TaskInfo.ProcessPool <- true
- }
- list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
- for _, v := range *list2 {
- if spidercode[qu.ObjToString(v["spidercode"])] {
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
- continue
- }
- var j, jf *ju.Job
- var isSite bool
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
- v["isextFile"] = true
- j, jf, isSite = ext.PreInfo(v)
- } else {
- j, _, isSite = ext.PreInfo(v)
- }
- go ext.ExtractProcess(j, jf, isSite)
- index++
- ext.TaskInfo.ProcessPool <- true
- }
- db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
- "$set": map[string]interface{}{
- "InstanceId": instanceId[0],
- "oktime": time.Now().Format(qu.Date_Full_Layout),
- "state": 1,
- },
- })
- db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
- map[string]interface{}{
- "$inc": map[string]interface{}{
- "totalnum": count1 + count2,
- "step": 1,
- },
- }, true, false)
- }
- log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
- } else {
- //普通抽取
- query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
- count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
- log.Debug("查询条件为:", query, "查询条数:", count)
- pageNum := (count + PageSize - 1) / PageSize
- limit := PageSize
- if count < PageSize {
- limit = count
- }
- wg := sync.WaitGroup{}
- for i := 0; i < pageNum; i++ {
- query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
- fmt.Printf("page=%d,query=%v\n", i+1, query)
- list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
- for _, v := range *list {
- if IsExtStop {
- break
- }
- if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
- continue
- }
- _id := qu.BsonIdToSId(v["_id"])
- var j, jf *ju.Job
- var isSite bool
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
- v["isextFile"] = true
- j, jf, isSite = ext.PreInfo(v)
- } else {
- j, _, isSite = ext.PreInfo(v)
- }
- ext.TaskInfo.ProcessPool <- true
- wg.Add(1)
- go func(wg *sync.WaitGroup, j, jf *ju.Job) {
- defer wg.Done()
- //log.Debug(index,j.SourceMid,)
- ext.ExtractProcess(j, jf, isSite)
- }(&wg, j, jf)
- index++
- if index%1000 == 0 {
- log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
- }
- sid = _id
- if sid >= eid {
- break
- }
- }
- }
- wg.Wait()
- ext.BidSave(false)
- log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
- }
- }
- //中标预测信息抽取,ossid为附件识别后的id
- var exF *ExtractTask
- func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
- defer qu.Catch()
- if exF == nil {
- exF = &ExtractTask{}
- exF.Id = qu.ObjToString(ju.Config["udptaskid"])
- exF.InitTaskInfo()
- exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
- exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
- exF.InitSite()
- exF.InitRulePres()
- exF.InitRuleBacks(false)
- exF.InitRuleBacks(true)
- exF.InitRuleCore(false)
- exF.InitRuleCore(true)
- exF.InitBlockRule()
- exF.InitPkgCore()
- exF.InitTag(false)
- exF.InitTag(true)
- exF.InitClearFn(false)
- exF.InitClearFn(true)
- if exF.IsExtractCity { //版本上控制是否开始城市抽取
- //初始化城市DFA信息
- //exF.InitCityDFA()
- exF.InitCityInfo()
- exF.InitAreaCode()
- exF.InitPostCode()
- }
- //质量审核
- exF.InitAuditFields()
- exF.InitAuditRule()
- exF.InitAuditClass()
- exF.InitAuditRecogField()
- //品牌抽取是否开启
- ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
- exF.ResultSave(true)
- exF.BidSave(true)
- exF.IsRun = true
- exF.InitFile()
- }
- tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
- if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
- (*tmp)["isextFile"] = true
- }
- exF.TaskInfo.ProcessPool <- true
- j, jf, _ := exF.PreInfo(*tmp)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func(wg *sync.WaitGroup, j, jf *ju.Job) {
- defer wg.Done()
- exF.ExtractProcess(j, jf, false)
- }(&wg, j, jf)
- wg.Wait()
- exF.BidSave(false)
- return nil
- }
|