Sfoglia il codice sorgente

抽取总开关程序-初步完善

zhengkun 3 anni fa
parent
commit
89e8135b94
4 ha cambiato i file con 600 aggiunte e 0 eliminazioni
  1. 410 0
      src/mark
  2. 24 0
      udpextract/src/config.json
  3. 155 0
      udpextract/src/main.go
  4. 11 0
      udpextract/src/mark

+ 410 - 0
src/mark

@@ -0,0 +1,410 @@
+{
+    "port": "9090",
+    "mgodb": "127.0.0.1:27017",
+    "dbsize": 3,
+    "dbname": "extract_local",
+    "dbname_addrs": "extract_local",
+    "dbname_addrs_c": "address_new_2020",
+    "redis": "qyk_redis=192.168.3.207:6379",
+    "elasticsearch": "http://127.0.0.1:9800",
+    "elasticsearch_index": "winner_enterprise_tmp",
+    "elasticsearch_type": "winnerent",
+    "elasticsearch_db": "winner_enterprise",
+    "elasticsearch_buyer_index": "buyer_enterprise_tmp",
+    "elasticsearch_buyer_type": "buyerent",
+    "elasticsearch_buyer_db": "buyer_enterprise",
+    "elasticsearch_agency_index": "agency_enterprise_tmp",
+    "elasticsearch_agency_type": "agencyent",
+    "elasticsearch_agency_db": "agency_enterprise",
+    "redis_qyk": "qyk_redis",
+    "redis_winner_db": "1",
+    "redis_buyer_db": "2",
+    "redis_agency_db": "3",
+    "elasticPoolSize": 1,
+    "mergetable": "projectset",
+    "mergetablealias": "projectset_v1",
+ 	"ffield": true,
+    "saveresult": false,
+    "fieldsfind": false,
+    "qualityaudit": false,
+    "saveblock": false,
+    "filelength": 150000,
+    "iscltlog": false,
+    "brandgoods": false,
+    "pricenumber":true,
+    "udptaskid": "60b493c2e138234cb4adb640",
+    "udpport": "1484",
+    "nextNode": [
+        {
+            "addr": "127.0.0.1",
+            "port": 1485,
+            "memo": "抽取城市"
+        }
+    ],
+    "esconfig": {
+        "available": false,
+        "AccessID": "",
+        "AccessSecret": "",
+        "ZoneIds": [
+            {
+                "zoneid": "cn-beijing-f",
+                "LaunchTemplateId4": "lt-2zejb8ayql48hn0hcjpy",
+                "LaunchTemplateId8": "lt-2zegx87hj07phcdtoh61",
+                "vswitchid": "vsw-2zei6snkgmqxcnnx6g04d"
+            },
+            {
+                "zoneid": "cn-beijing-g",
+                "LaunchTemplateId4": "lt-2ze5ktfgopayi48ok0hu",
+                "LaunchTemplateId8": "lt-2ze0qfrxdnkuwldj9s0u",
+                "vswitchid": "vsw-2ze586sxfwsaov4s5w88d"
+            },
+            {
+                "zoneid": "cn-beijing-h",
+                "LaunchTemplateId4": "lt-2ze5ir54gy4ui8okr71f",
+                "LaunchTemplateId8": "lt-2ze5fzxwgt8jcqczvmjy",
+                "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
+            }
+        ]
+    },
+    "istest": true,
+    "isSaveTag": false,
+    "tomail": "zhengkun@topnet.net.cn",
+    "api": "http://10.171.112.160:19281/_send/_mail",
+    "deleteInstanceTimeHour": 1,
+    "jsondata_extweight": 1
+}
+
+
+
+
+// extractudp
+package extract
+
+import (
+	"encoding/json"
+	"fmt"
+	db "jy/mongodbutil"
+	ju "jy/util"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"sync"
+	"time"
+
+	log "github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+var Udpclient mu.UdpClient //udp对象
+var nextNodes []map[string]interface{}
+
+//udp通知抽取
+func ExtractUdp() {
+	nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
+	Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
+	Udpclient.Listen(processUdpMsg)
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var rep map[string]interface{}
+		err := json.Unmarshal(data, &rep)
+		if err != nil {
+			log.Debug(err)
+		} else {
+			stype, _ := rep["stype"].(string)
+			if stype == "distributed" { //分布式抽取分支
+				go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
+				InstanceId := qu.ObjToString(rep["InstanceId"])
+				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"extstatus": "running",
+						},
+					}, true, false)
+				ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
+				db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"extstatus": "ok",
+						},
+					}, true, false)
+				log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
+			} else {
+				sid, _ := rep["gtid"].(string)
+				eid, _ := rep["lteid"].(string)
+				if sid == "" || eid == "" {
+					log.Debug("err", "sid=", sid, ",eid=", eid)
+				} else {
+					udpinfo, _ := rep["key"].(string)
+					if udpinfo == "" {
+						udpinfo = "udpok"
+					}
+					go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
+					log.Debug("udp通知抽取id段", sid, " ", eid)
+					ExtractByUdp(sid, eid, ra)
+					for _, m := range nextNodes {
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  sid,
+							"lteid": eid,
+							"stype": qu.ObjToString(m["stype"]),
+						})
+						err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP(m["addr"].(string)),
+							Port: qu.IntAll(m["port"]),
+						})
+						if err != nil {
+							log.Debug(err)
+						}
+					}
+					log.Debug("udp通知抽取完成,eid=", eid)
+				}
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Debug(string(data))
+	}
+}
+
+var ext *ExtractTask
+
+//根据id区间抽取-udp模式
+func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
+	defer qu.Catch()
+	if ext == nil {
+		ext = &ExtractTask{}
+		ext.Id = qu.ObjToString(ju.Config["udptaskid"])
+		ext.InitTaskInfo()
+		ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
+		ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
+		ext.InitSite()
+		ext.InitRulePres()
+		ext.InitRuleBacks(false)
+		ext.InitRuleBacks(true)
+		ext.InitRuleCore(false)
+		ext.InitRuleCore(true)
+		ext.InitBlockRule()
+		ext.InitPkgCore()
+		ext.InitTag(false)
+		ext.InitTag(true)
+		ext.InitClearFn(false)
+		ext.InitClearFn(true)
+		ext.Lock()
+		//ext.IsExtractCity = false
+		if ext.IsExtractCity { //版本上控制是否开始城市抽取
+			//初始化城市DFA信息
+			//ext.InitCityDFA()
+			ext.InitCityInfo()
+			ext.InitAreaCode()
+			ext.InitPostCode()
+		}
+		ext.Unlock()
+		//质量审核
+		ext.InitAuditFields()
+		ext.InitAuditRule()
+		ext.InitAuditClass()
+		ext.InitAuditRecogField()
+
+		//品牌抽取是否开启
+		ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
+
+		ext.ResultSave(true)
+		ext.BidSave(true)
+		ext.IsRun = true
+		ext.InitFile()
+	} else {
+		ext.BidTotal = 0
+	}
+	index := 0
+	if len(instanceId) > 0 { //分布式抽取进度
+		go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
+		for {
+			tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
+			if tsk != nil && !b {
+				break
+			}
+			db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
+				"$set": map[string]interface{}{
+					"InstanceId": instanceId[0],
+					"state":      1,
+					"runtime":    time.Now().Format(qu.Date_Full_Layout),
+				},
+			})
+			query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
+			count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+			count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
+			log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
+			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
+			for _, v := range *list {
+				//if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+				//	log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
+				//	continue
+				//}
+				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
+					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
+					continue
+				}
+				var j, jf *ju.Job
+				var isSite bool
+				if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
+					v["isextFile"] = true
+					j, jf, isSite = ext.PreInfo(v)
+				} else {
+					j, _, isSite = ext.PreInfo(v)
+				}
+				go ext.ExtractProcess(j, jf, isSite)
+				index++
+				ext.TaskInfo.ProcessPool <- true
+			}
+			list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
+			for _, v := range *list2 {
+				if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+					continue
+				}
+				var j, jf *ju.Job
+				var isSite bool
+				if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
+					v["isextFile"] = true
+					j, jf, isSite = ext.PreInfo(v)
+				} else {
+					j, _, isSite = ext.PreInfo(v)
+				}
+				go ext.ExtractProcess(j, jf, isSite)
+				index++
+				ext.TaskInfo.ProcessPool <- true
+			}
+			db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
+				"$set": map[string]interface{}{
+					"InstanceId": instanceId[0],
+					"oktime":     time.Now().Format(qu.Date_Full_Layout),
+					"state":      1,
+				},
+			})
+			db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
+				map[string]interface{}{
+					"$inc": map[string]interface{}{
+						"totalnum": count1 + count2,
+						"step":     1,
+					},
+				}, true, false)
+		}
+		log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
+	} else {
+		//普通抽取
+		query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
+		count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
+		log.Debug("查询条件为:", query, "查询条数:", count)
+		pageNum := (count + PageSize - 1) / PageSize
+		limit := PageSize
+		if count < PageSize {
+			limit = count
+		}
+		wg := sync.WaitGroup{}
+		for i := 0; i < pageNum; i++ {
+			query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
+			fmt.Printf("page=%d,query=%v\n", i+1, query)
+			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
+			for _, v := range *list {
+				//if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
+				//	log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
+				//	continue
+				//}
+				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
+					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
+					continue
+				}
+				_id := qu.BsonIdToSId(v["_id"])
+				var j, jf *ju.Job
+				var isSite bool
+				if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
+					v["isextFile"] = true
+					j, jf, isSite = ext.PreInfo(v)
+				} else {
+					j, _, isSite = ext.PreInfo(v)
+				}
+				ext.TaskInfo.ProcessPool <- true
+				wg.Add(1)
+				go func(wg *sync.WaitGroup, j, jf *ju.Job) {
+					defer wg.Done()
+					//log.Debug(index,j.SourceMid,)
+					ext.ExtractProcess(j, jf, isSite)
+				}(&wg, j, jf)
+				index++
+				if index%1000 == 0 {
+					log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
+				}
+				sid = _id
+				if sid >= eid {
+					break
+				}
+			}
+		}
+		wg.Wait()
+		ext.BidSave(false)
+		log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
+	}
+}
+
+//中标预测信息抽取,ossid为附件识别后的id
+var exF *ExtractTask
+
+func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
+	defer qu.Catch()
+	if exF == nil {
+		exF = &ExtractTask{}
+		exF.Id = qu.ObjToString(ju.Config["udptaskid"])
+		exF.InitTaskInfo()
+		exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
+		exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
+		exF.InitSite()
+		exF.InitRulePres()
+		exF.InitRuleBacks(false)
+		exF.InitRuleBacks(true)
+		exF.InitRuleCore(false)
+		exF.InitRuleCore(true)
+		exF.InitBlockRule()
+		exF.InitPkgCore()
+		exF.InitTag(false)
+		exF.InitTag(true)
+		exF.InitClearFn(false)
+		exF.InitClearFn(true)
+
+		if exF.IsExtractCity { //版本上控制是否开始城市抽取
+			//初始化城市DFA信息
+			//exF.InitCityDFA()
+			exF.InitCityInfo()
+			exF.InitAreaCode()
+			exF.InitPostCode()
+		}
+		//质量审核
+		exF.InitAuditFields()
+		exF.InitAuditRule()
+		exF.InitAuditClass()
+		exF.InitAuditRecogField()
+
+		//品牌抽取是否开启
+		ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
+
+		exF.ResultSave(true)
+		exF.BidSave(true)
+		exF.IsRun = true
+		exF.InitFile()
+	}
+	tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
+	if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
+		(*tmp)["isextFile"] = true
+	}
+	exF.TaskInfo.ProcessPool <- true
+	j, jf, _ := exF.PreInfo(*tmp)
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func(wg *sync.WaitGroup, j, jf *ju.Job) {
+		defer wg.Done()
+		exF.ExtractProcess(j, jf, false)
+	}(&wg, j, jf)
+	wg.Wait()
+	exF.BidSave(false)
+
+	return nil
+}

+ 24 - 0
udpextract/src/config.json

@@ -0,0 +1,24 @@
+{
+    "udpport": ":1784",
+    "extractNode": [
+        {
+            "addr": "127.0.0.1",
+            "port": 6601,
+            "stype": "extract_1"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 6601,
+            "stype": "extract_2"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 6603,
+            "stype": "extract_3"
+        }
+    ],
+    "nextNode": [
+
+    ]
+
+} 

+ 155 - 0
udpextract/src/main.go

@@ -0,0 +1,155 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	mu "mfw/util"
+	"net"
+	qu "qfw/util"
+	"sync"
+)
+var (
+	Config    		map[string]interface{} 		//配置文件
+	nextNode     	[]map[string]interface{} 	//下节点数组
+	extractNode     []map[string]interface{} 	//抽取节点数组
+	udpclient    	mu.UdpClient             	//udp对象
+	extractLevel    map[string]interface{} 		//抽取节点状态
+	udplock 		sync.Mutex         			//锁
+)
+func init() {
+	qu.ReadConfig(&Config)
+	nextNode = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
+	extractNode = qu.ObjArrToMapArr(Config["extractNode"].([]interface{}))
+	resetExtractLevel()
+}
+//重置抽取状态
+func resetExtractLevel()  {
+	extractLevel = make(map[string]interface{},0)
+	for _,v:=range extractNode{
+		key := fmt.Sprintf("%s",qu.ObjToString(v["stype"]))
+		extractLevel[key] = 0
+	}
+}
+func main()  {
+	updport := Config["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+}
+
+//udp接收
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			sid, _ := mapInfo["gtid"].(string)
+			eid, _ := mapInfo["lteid"].(string)
+			if sid == "" || eid == "" {
+				log.Println("接收id段-err ", "sid=", sid, ",eid=", eid)
+			} else {
+				udpinfo, _ := mapInfo["key"].(string)
+				if udpinfo == "" {
+					udpinfo = "udpok"
+				}
+				go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
+				log.Println("接收当前段落,udp通知抽取-需拆分",len(extractNode),"组", sid, "~", eid)
+				udplock.Lock()
+				resetExtractLevel() //重置状态
+				extractLevel["sid"]=sid
+				extractLevel["eid"]=eid
+				udplock.Unlock()
+
+				//拆分段落方法
+				splitArr:=splitIdMethod(sid,eid)
+				if len(splitArr)!=len(extractNode){//直接发送整段
+					log.Println("段落划分异常...请检查程序...")
+				}
+				sendExtractNode(splitArr) //通知抽取
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		//抽取多节点
+		udplock.Lock()
+		str := string(data)
+		if extractLevel[str] != nil {
+			extractLevel[str] = 1
+			log.Println("抽取节点回应:",str)
+			f := validExtractFinish() //验证段落是否均抽取完毕
+			if f {//发送下节点整体udp,补城市,敏感词等
+				sid := qu.ObjToString(extractLevel["sid"])
+				eid := qu.ObjToString(extractLevel["eid"])
+				if sid != ""&&eid != "" {
+					sendNextNode(sid,eid)
+				}
+			}
+		}else {
+			log.Println("其他节点回应:",str)
+		}
+		udplock.Unlock()
+	}
+}
+
+
+
+//验证抽取是否完毕	不验证-sid eid key
+func validExtractFinish() bool  {
+	for k,v :=range extractLevel{
+		if k=="sid" || k=="eid" {
+			continue
+		}
+		if qu.Int64All(v)==0 {
+			return false
+		}
+	}
+	return true
+}
+//拆分ID段方法
+func splitIdMethod(sid string,eid string)([]map[string]interface{}) {
+
+
+	return []map[string]interface{}{}
+}
+
+//发送抽取
+func sendExtractNode(splitArr []map[string]interface{})  {
+	for index, node := range extractNode {
+		tmp:=splitArr[index]
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  qu.ObjToString(tmp["sid"]),
+			"lteid": qu.ObjToString(tmp["eid"]),
+			"stype": qu.ObjToString(node["stype"]),
+		})
+		err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		})
+		if err != nil {
+			log.Println("发送段落异常:",node,tmp,err)
+		}
+	}
+	log.Println("通知抽取udp...等待抽取...回应...")
+}
+
+//发送其他
+func sendNextNode(sid string,eid string)  {
+	for _, node := range nextNode {
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": qu.ObjToString(node["stype"]),
+		})
+		err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		})
+		if err != nil {
+			log.Println(err)
+		}
+	}
+	log.Println("udp通知抽取完成...",sid,"~",eid)
+}

+ 11 - 0
udpextract/src/mark

@@ -0,0 +1,11 @@
+用于控制抽取的总开关
+
+端口采用原本,其他程序不做调整
+仅调整 抽取程序+总控制开关程序
+
+原本:  上承:分类
+       下接:补城市,敏感词
+
+调整:  上承:分类
+       下接:多程序抽取(完成) , 补城市,敏感词
+