Browse Source

数据-调整

zhengkun 11 months ago
parent
commit
a9e12d286d
9 changed files with 134 additions and 45 deletions
  1. 0 23
      clean/c_all.go
  2. 13 1
      clean/c_pname.go
  3. 12 2
      config.json
  4. 44 15
      extract/extract.go
  5. 3 1
      main.go
  6. 35 0
      udp/udprocess.go
  7. 3 3
      udp/udptask.go
  8. 2 0
      ul/attr.go
  9. 22 0
      ul/init.go

+ 0 - 23
clean/c_all.go

@@ -36,9 +36,6 @@ func CleanFieldInfo(zhipu map[string]interface{}) map[string]interface{} {
 	if s_winner := CleanWinner(qu.ObjToString(zhipu["中标单位"])); s_winner != "" {
 		data["s_winner"] = s_winner
 	}
-	//if s_biddiscount := CleanBiddiscount(qu.ObjToString(zhipu["投标折扣系数"])); s_biddiscount != "" {
-	//	data["s_biddiscount"] = s_biddiscount
-	//}
 	//分包字段
 	if zhipu["s_pkg"] != nil {
 		data["s_pkg"] = zhipu["s_pkg"]
@@ -61,23 +58,3 @@ func CleanFieldInfo(zhipu map[string]interface{}) map[string]interface{} {
 
 	return data
 }
-
-// 最终逻辑校验--暂时舍弃...最终判断在需要
-func CleanFinallyInfo(data map[string]interface{}) map[string]interface{} {
-	s_toptype := qu.ObjToString(data["s_toptype"])
-	s_subtype := qu.ObjToString(data["s_subtype"])
-
-	if s_subtype == "合同" || s_subtype == "中标" || s_subtype == "成交" {
-		if qu.Float64All(data["s_budget"]) == qu.Float64All(data["s_bidamount"]) && qu.Float64All(data["s_budget"]) > 0.0 {
-			delete(data, "s_budget")
-		}
-	}
-	//删除不删除均可···与抽取值进行合并时判断也行···
-	if s_subtype == "单一" || s_subtype == "合同" || s_subtype == "中标" || s_subtype == "成交" || (s_subtype == "" && s_toptype == "") {
-
-	} else {
-		//delete(data, "s_winner")
-		//delete(data, "s_bidamount")
-	}
-	return data
-}

+ 13 - 1
clean/c_pname.go

@@ -1,19 +1,31 @@
 package clean
 
 import (
+	"regexp"
 	"strings"
 	"unicode/utf8"
 )
 
+var pname_reg1 = regexp.MustCompile("([\\.。]+)$")
+var pname_reg2 = regexp.MustCompile("(中标候选人公示)$")
+var pname_reg3 = regexp.MustCompile("(采购项目采购项目)$")
+var pname_reg4 = regexp.MustCompile("(XX项目)$")
+
 // 清洗项目名称
 func CleanPname(pname string) string {
 	if pname == "无" {
 		return ""
 	}
 	pname = fieldReg1.ReplaceAllString(pname, "")
-	pname = pcodeReg1.ReplaceAllString(pname, "")
+	pname = pname_reg1.ReplaceAllString(pname, "")
+	pname = pname_reg2.ReplaceAllString(pname, "")
+	pname = pname_reg3.ReplaceAllString(pname, "采购项目")
+	if pname_reg4.MatchString(pname) {
+		return ""
+	}
 	pname = strings.ReplaceAll(pname, "(", "(")
 	pname = strings.ReplaceAll(pname, ")", ")")
+
 	if utf8.RuneCountInString(pname) < 5 {
 		pname = ""
 	}

+ 12 - 2
config.json

@@ -1,10 +1,20 @@
 {
-  "udpport": ":1777",
+  "udpport": ":1791",
+  "bid_name": "bidding",
+  "ext_name": "result_20220218",
   "smail": {
     "to": "zhengkun@topnet.net.cn,xuzhiheng@topnet.net.cn",
     "api": "http://172.17.145.179:19281/_send/_mail"
   },
   "s_mgo": {
+    "local": true,
+    "l_addr": "127.0.0.1:12005",
+    "addr": "172.17.189.140:27080,172.17.189.141:27081",
+    "dbname" : "qfw_ai",
+    "username": "zhengkun",
+    "password": "zk@123123"
+  },
+  "b_mgo": {
     "local": true,
     "l_addr": "127.0.0.1:12005",
     "addr": "172.17.189.140:27080,172.17.189.141:27081",
@@ -23,7 +33,7 @@
   "nextNode": [
     {
       "addr": "127.0.0.1",
-      "port": 1784,
+      "port": 1792,
       "stype": ""
     }
   ]

+ 44 - 15
extract/extract.go

@@ -11,42 +11,54 @@ import (
 )
 
 // 识别结构化字段
-func ExtractFieldInfo(sid string, eid string, name string) {
+func ExtractFieldInfo(sid string, eid string) {
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt":  ul.StringTOBsonId(sid),
 			"$lte": ul.StringTOBsonId(eid),
 		},
 	}
-	pool_mgo := make(chan bool, 50)
+	//先查询抽取表-确定大模型需要识别到范围
+	dict := ConfrimExtractInfo(q)
+	log.Debug("查询语句...", q, "~", len(dict))
+
+	pool_mgo := make(chan bool, 90)
 	wg_mgo := &sync.WaitGroup{}
-	dataArr, _ := ul.SourceMgo.Find(name, q, nil, nil)
-	for k, v := range dataArr {
-		if k%100 == 0 {
-			log.Debug(k, "~", ul.BsonTOStringId(v["_id"]))
+
+	sess := ul.SourceMgo.GetMgoConn()
+	defer ul.SourceMgo.DestoryMongoConn(sess)
+	total, isok := 0, 0
+	it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%5000 == 0 {
+			log.Debug("cur index ", total)
 		}
-		infoformat := qu.IntAll(v["infoformat"])
-		if infoformat != 1 {
+		tmpid := ul.BsonTOStringId(tmp["_id"])
+		infoformat := qu.IntAll(tmp["infoformat"])
+		if infoformat != 1 || dict[tmpid] == nil {
+			tmp = make(map[string]interface{})
 			continue
 		}
+		isok++
 		pool_mgo <- true
 		wg_mgo.Add(1)
-		go func(v map[string]interface{}) {
+		go func(tmp map[string]interface{}) {
 			defer func() {
 				<-pool_mgo
 				wg_mgo.Done()
 			}()
-			tmpid := ul.BsonTOStringId(v["_id"])
-			data := ResolveInfo(v)
-			if len(data) > 0 || tmpid == "" {
-				ul.SourceMgo.UpdateById(name, tmpid, map[string]interface{}{
+			u_id := ul.BsonTOStringId(tmp["_id"])
+			data := ResolveInfo(tmp)
+			if len(data) > 0 || u_id == "" {
+				ul.SourceMgo.UpdateById(ul.Ext_Name, u_id, map[string]interface{}{
 					"$set": map[string]interface{}{"ai_zhipu": data},
 				})
 			}
-		}(v)
+		}(tmp)
+		tmp = make(map[string]interface{})
 	}
 	wg_mgo.Wait()
-	log.Debug("is over ...", sid, "~", eid)
+	log.Debug("ai is over ...", sid, "~", eid)
 }
 
 // 获取处理数据...
@@ -84,6 +96,23 @@ func ResolveInfo(v map[string]interface{}) map[string]interface{} {
 	return f_data
 }
 
+func ConfrimExtractInfo(q map[string]interface{}) map[string]interface{} {
+	dict := map[string]interface{}{}
+	sess := ul.SourceMgo.GetMgoConn()
+	defer ul.SourceMgo.DestoryMongoConn(sess)
+	total := 0
+	it := sess.DB(ul.SourceMgo.DbName).C(ul.Ext_Name).Find(&q).Select(map[string]interface{}{"_id": 1}).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%100000 == 0 {
+			log.Debug("cur index ", total)
+		}
+		tmpid := ul.BsonTOStringId(tmp["_id"])
+		dict[tmpid] = tmpid
+		tmp = make(map[string]interface{})
+	}
+	return dict
+}
+
 // 暂时不启用...无限重试
 func RunResetUpdateFieldInfo(arr []string, name string, s_name string) {
 	//log.Debug("开始重置更新...", len(arr))

+ 3 - 1
main.go

@@ -4,6 +4,7 @@ import (
 	"data_ai/extract"
 	"data_ai/udp"
 	"data_ai/ul"
+	log "github.com/donnie4w/go-logger/logger"
 )
 
 func init() {
@@ -13,7 +14,8 @@ func init() {
 }
 
 func main() {
-	extract.ExtractFieldInfo("100000000000000000000000", "900000000000000000000000", "zktest_bidding_ai")
+	log.Debug("main ...")
+	extract.ExtractFieldInfo("100000000000000000000000", "900000000000000000000000")
 	lock := make(chan bool)
 	<-lock
 }

+ 35 - 0
udp/udprocess.go

@@ -104,6 +104,9 @@ func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 
 // 下节点发送
 func sendNextNode(sid string, eid string) {
+	//更新记录状态
+	updateProcessUdpIdsInfo(sid, eid)
+
 	for _, node := range nextNode {
 		key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
 		by, _ := json.Marshal(map[string]interface{}{
@@ -124,3 +127,35 @@ func sendNextNode(sid string, eid string) {
 	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
 	isGetask = false //此段落彻底完毕~继续获取任务
 }
+
+// 更新流程记录id段落
+func updateProcessUdpIdsInfo(sid string, eid string) {
+	query := map[string]interface{}{
+		"gtid": map[string]interface{}{
+			"$gte": sid,
+		},
+		"lteid": map[string]interface{}{
+			"$lte": eid,
+		},
+	}
+	task_coll := "bidding_processing_ids"
+	datas, _ := ul.BidMgo.Find(task_coll, query, nil, nil)
+	if len(datas) > 0 {
+		log.Debug("开始更新流程段落记录~~", len(datas), "段")
+		for _, v := range datas {
+			up_id := ul.BsonTOStringId(v["_id"])
+			if up_id != "" {
+				update := map[string]interface{}{
+					"$set": map[string]interface{}{
+						"dataprocess_ai": 2,
+						"updatetime":     time.Now().Unix(),
+					},
+				}
+				ul.BidMgo.UpdateById(task_coll, up_id, update)
+				log.Debug("流程段落记录~~更新完毕~", update)
+			}
+		}
+	} else {
+		log.Debug("未查询到记录id段落~", query)
+	}
+}

+ 3 - 3
udp/udptask.go

@@ -21,7 +21,7 @@ func getRepeatTask() {
 				if first_id != "" && end_id != "" {
 					taskList = taskList[len_list:]
 					log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
-					extract.ExtractFieldInfo(first_id, end_id, "bidding")
+					extract.ExtractFieldInfo(first_id, end_id)
 					log.Debug("AI识别数据完成...发送下节点udp...")
 					sendNextNode(first_id, end_id)
 				} else {
@@ -32,7 +32,7 @@ func getRepeatTask() {
 						log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
 						sid := qu.ObjToString(mapInfo["sid"])
 						eid := qu.ObjToString(mapInfo["eid"])
-						extract.ExtractFieldInfo(sid, eid, "bidding")
+						extract.ExtractFieldInfo(sid, eid)
 						log.Debug("AI识别数据完成...发送下节点udp...")
 						sendNextNode(sid, eid)
 					} else {
@@ -47,7 +47,7 @@ func getRepeatTask() {
 					log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
 					sid := qu.ObjToString(mapInfo["sid"])
 					eid := qu.ObjToString(mapInfo["eid"])
-					extract.ExtractFieldInfo(sid, eid, "bidding")
+					extract.ExtractFieldInfo(sid, eid)
 					log.Debug("AI识别数据完成...发送下节点udp...")
 					sendNextNode(sid, eid)
 				} else {

+ 2 - 0
ul/attr.go

@@ -4,7 +4,9 @@ import "regexp"
 
 var (
 	SourceMgo, QyxyMgo *MongodbSim
+	BidMgo             *MongodbSim
 	SysConfig          map[string]interface{}
+	Bid_Name, Ext_Name string
 	Url                = "https://www.jianyu360.cn/article/content/%s.html"
 	CleanResultReg     = regexp.MustCompile("((\\s|\n| |\\[|\\]|\\`|json)+)")
 	SaveResultReg      = regexp.MustCompile("([{].*[}])")

+ 22 - 0
ul/init.go

@@ -15,6 +15,28 @@ func InitGlobalVar() {
 
 // 初始化mgo
 func initMgo() {
+
+	Bid_Name, Ext_Name = qu.ObjToString(SysConfig["bid_name"]), qu.ObjToString(SysConfig["ext_name"])
+	//源数据
+	b_cfg := *qu.ObjToMap(SysConfig["b_mgo"])
+	b_local := b_cfg["local"].(bool)
+	b_addr := qu.ObjToString(b_cfg["addr"])
+	if b_local {
+		b_addr = qu.ObjToString(b_cfg["l_addr"])
+	}
+	BidMgo = &MongodbSim{
+		MongodbAddr: b_addr,
+		DbName:      qu.ObjToString(b_cfg["dbname"]),
+		Size:        10,
+		UserName:    qu.ObjToString(b_cfg["username"]),
+		Password:    qu.ObjToString(b_cfg["password"]),
+	}
+	if b_local {
+		BidMgo.InitPoolDirect()
+	} else {
+		BidMgo.InitPool()
+	}
+
 	//源数据
 	s_cfg := *qu.ObjToMap(SysConfig["s_mgo"])
 	s_local := s_cfg["local"].(bool)