瀏覽代碼

数据清洗+数据合并(ai+抽取)

zhengkun 1 年之前
父節點
當前提交
e01f61ffb3
共有 6 個文件被更改,包括 258 次插入10 次删除
  1. 24 4
      sensitive/src/main.go
  2. 12 1
      sensitive/src/util/init.go
  3. 49 0
      sensitive/src/util/udpdata.go
  4. 159 0
      src/check_ai.go
  5. 1 1
      src/config.json
  6. 13 4
      src/main.go

+ 24 - 4
sensitive/src/main.go

@@ -1,7 +1,11 @@
 package main
 package main
 
 
 import (
 import (
+	"github.com/nats-io/nats.go"
+	"go.mongodb.org/mongo-driver/bson"
+	"jynats/jnats"
 	"log"
 	"log"
+	qu "qfw/util"
 	"time"
 	"time"
 	ul "util"
 	ul "util"
 )
 )
@@ -10,14 +14,30 @@ func init() {
 	ul.InitC()
 	ul.InitC()
 }
 }
 
 
-func main() {
-	
+func mainT() {
+	go RunFlowSystem()
 	lock := make(chan bool)
 	lock := make(chan bool)
 	<-lock
 	<-lock
 }
 }
 
 
-func mainT() {
-	//ul.SentiveUdp() //数据流程-增量
+func RunFlowSystem() {
+	addr := qu.ObjToString("192.168.3.240:19090")
+	jn := jnats.NewJnats(addr)
+	jn.SubZip("dataprocess.clean", func(msg *nats.Msg) {
+		msgInfo := &ul.MsgInfo{}
+		err := bson.Unmarshal(msg.Data, &msgInfo)
+		if err != nil {
+			msgInfo.Err = err.Error()
+		} else {
+			ul.FlowHandleInfo(msgInfo)
+		}
+		bs, _ := bson.Marshal(msgInfo)
+		msg.Respond(bs)
+	})
+}
+
+func main() {
+	ul.SentiveUdp() //数据流程-增量
 	//go ul.AddTaskSensitiveWordsData() //词库增量-
 	//go ul.AddTaskSensitiveWordsData() //词库增量-
 	//ul.TestData() //临时测试数据
 	//ul.TestData() //临时测试数据
 	lock := make(chan bool)
 	lock := make(chan bool)

+ 12 - 1
sensitive/src/util/init.go

@@ -48,7 +48,7 @@ func InitC() {
 
 
 	//初始化Es qyxy
 	//初始化Es qyxy
 	//Qyxy_Client_Es, _ = elastic.NewClient(http.DefaultClient, Sysconfig["qyxy_client_es"].(string))
 	//Qyxy_Client_Es, _ = elastic.NewClient(http.DefaultClient, Sysconfig["qyxy_client_es"].(string))
-	es7.InitElasticSize(Sysconfig["qyxy_client_es"].(string), 10, "es_all", "TopJkO2E_d1x")
+	es7.InitElasticSize(Sysconfig["qyxy_client_es"].(string), 10, "jybid", "Top2023_JEB01i@31")
 	Qyxy_Es_type, Qyxy_Es_index = Sysconfig["qyxy_es_type"].(string), Sysconfig["qyxy_es_index"].(string)
 	Qyxy_Es_type, Qyxy_Es_index = Sysconfig["qyxy_es_type"].(string), Sysconfig["qyxy_es_index"].(string)
 
 
 }
 }
@@ -62,3 +62,14 @@ var Es_type, Es_index string
 var Client_Es *es1.Client
 var Client_Es *es1.Client
 var Qyxy_Es_type, Qyxy_Es_index string
 var Qyxy_Es_type, Qyxy_Es_index string
 var BuyerFilter = sensitive.New()
 var BuyerFilter = sensitive.New()
+
+type MsgInfo struct {
+	Id       string                 //消息唯一id
+	CurrSetp string                 //当前步骤
+	NextSetp string                 //下个步骤,特殊流程增加
+	IsEnd    int                    //当前流程后结束 1-结束
+	Data     map[string]interface{} //数据内容
+	Err      string                 //错误信息 有错误会告警并终止流程
+	Stime    int64
+	Etime    int64
+}

+ 49 - 0
sensitive/src/util/udpdata.go

@@ -255,3 +255,52 @@ func handleData(datas []string) string {
 	rstr := strings.Join(rdata, ",")
 	rstr := strings.Join(rdata, ",")
 	return rstr
 	return rstr
 }
 }
+
+// 流式修补...
+func FlowHandleInfo(msgInfo *MsgInfo) {
+	if data := msgInfo.Data; data != nil {
+		if tmp := *qu.ObjToMap(data["ext"]); tmp != nil {
+			up := make(map[string]interface{})
+			id := BsonTOStringId(tmp["_id"])
+			agency := qu.ObjToString(tmp["agency"])
+			winner := qu.ObjToString(tmp["winner"])
+			s_winner := qu.ObjToString(tmp["s_winner"])
+			if agency != "" {
+				if fok, flog, fname := cheakname(agency); fok && flog != "" && flog != "termQuery" {
+					tmp["agency"] = fname
+					up["log"] = map[string]interface{}{
+						"agency": fmt.Sprintf("%s_%s", flog, agency),
+					}
+					up["agency"] = fname
+				}
+			}
+			if winner != "" && !specReg.MatchString(winner) {
+				if fok, flog, fname := cheakname(winner); fok && flog != "" && flog != "termQuery" {
+					tmp["winner"] = fname
+					up["log"] = map[string]interface{}{
+						"winner": fmt.Sprintf("%s_%s", flog, winner),
+					}
+					up["winner"] = fname
+				}
+			}
+
+			if s_winner != "" && !specReg.MatchString(s_winner) {
+				if fok, flog, fname := cheakname(s_winner); fok && flog != "" && flog != "termQuery" {
+					tmp["s_winner"] = fname
+					up["log"] = map[string]interface{}{
+						"s_winner": fmt.Sprintf("%s_%s", flog, s_winner),
+					}
+					up["s_winner"] = fname
+				}
+			}
+			if len(up) > 0 {
+				for k, v := range up {
+					tmp[k] = v
+				}
+				data["ext"] = tmp
+				msgInfo.Data = data
+				Save_Mgo.UpdateById(SaveCollName, id, map[string]interface{}{"$set": up})
+			}
+		}
+	}
+}

+ 159 - 0
src/check_ai.go

@@ -0,0 +1,159 @@
+package main
+
+import qu "qfw/util"
+
+// 大模型与抽取数据合并计算
+func getCheckDataAI(tmp map[string]interface{}, update_check *map[string]interface{}) {
+	if tmp["ai_zhipu"] == nil {
+		return
+	}
+	//记录抽取原值
+	ext_ai := map[string]interface{}{}
+	ai_zhipu := *qu.ObjToMap(tmp["ai_zhipu"])
+	//分类字段···
+	s_toptype, s_subtype := qu.ObjToString(ai_zhipu["s_toptype"]), qu.ObjToString(ai_zhipu["s_subtype"])
+	s_toptype, s_subtype = CheckClassByOtherFileds(s_toptype, s_subtype, tmp)
+	if s_toptype != "" && s_subtype != "" {
+		(*update_check)["toptype"] = s_toptype
+		(*update_check)["subtype"] = s_subtype
+		ext_ai["toptype"] = tmp["toptype"]
+		ext_ai["subtype"] = tmp["subtype"]
+	} else {
+		s_toptype = qu.ObjToString(tmp["toptype"])
+		s_subtype = qu.ObjToString(tmp["subtype"])
+	}
+
+	//基础字段···
+	if s_buyer := qu.ObjToString(ai_zhipu["s_buyer"]); s_buyer != "" {
+		(*update_check)["buyer"] = s_buyer
+		ext_ai["buyer"] = tmp["buyer"]
+	}
+	if s_projectname := qu.ObjToString(ai_zhipu["s_projectname"]); s_projectname != "" {
+		(*update_check)["projectname"] = s_projectname
+		ext_ai["projectname"] = tmp["projectname"]
+	}
+	if s_projectcode := qu.ObjToString(ai_zhipu["s_projectcode"]); s_projectcode != "" {
+		(*update_check)["projectcode"] = s_projectcode
+		ext_ai["projectcode"] = tmp["projectcode"]
+	}
+	if s_budget := qu.Float64All(ai_zhipu["s_budget"]); s_budget > 0.0 && s_budget < 1000000000.0 {
+		(*update_check)["budget"] = s_budget
+		ext_ai["budget"] = tmp["budget"]
+	}
+
+	//地域字段···
+	o_area, o_district := qu.ObjToString(tmp["area"]), qu.ObjToString(tmp["district"])
+	s_area, s_city := qu.ObjToString(ai_zhipu["s_area"]), qu.ObjToString(ai_zhipu["s_city"])
+	if s_area != "" && s_area != "全国" {
+		(*update_check)["area"] = s_area
+		if s_city != "" {
+			(*update_check)["city"] = s_city
+			if o_district != "" {
+				//判断抽取的区县是否合理···
+				isT := false
+				if ds := S_DistrictDict[o_district]; ds != nil {
+					for _, v := range ds {
+						if v.C_Name == s_city && v.P_Name == s_area {
+							isT = true
+							break
+						}
+					}
+				}
+				if !isT {
+					(*update_check)["district"] = ""
+				}
+			}
+		} else {
+			if o_area != s_area {
+				(*update_check)["city"] = ""
+				(*update_check)["district"] = ""
+			}
+		}
+		ext_ai["area"] = tmp["area"]
+		ext_ai["city"] = tmp["city"]
+		ext_ai["district"] = tmp["district"]
+	}
+
+	//中标字段···
+	isRulePkg := false
+	if pkg := *qu.ObjToMap(tmp["package"]); len(pkg) > 1 && (s_subtype == "中标" || s_subtype == "成交" || s_subtype == "合同") {
+		if !staffInfo(pkg) {
+			isRulePkg = true
+		}
+	}
+
+	if isRulePkg {
+
+	}
+
+}
+
+// 核算分包信息
+func staffInfo(pkg map[string]interface{}) bool {
+	//鉴定中标单位
+	is_w := 0
+	for _, v := range pkg {
+		info := *qu.ObjToMap(v)
+		if winner := qu.ObjToString(info["winner"]); winner != "" {
+			is_w++
+		}
+	}
+	//鉴定中标金额
+	is_b := 0
+	for _, v := range pkg {
+		info := *qu.ObjToMap(v)
+		if bidamount := qu.Float64All(info["bidamount"]); bidamount > 0.0 {
+			is_b++
+		}
+	}
+	if is_w != len(pkg) && is_w > 0 {
+		return false
+	}
+	if is_b != len(pkg) && is_b > 0 {
+		return false
+	}
+	if is_w == 0 || is_b == 0 {
+		return false
+	}
+	return true
+}
+
+func CheckClassByOtherFileds(toptype_ai, subtype_ai string, data map[string]interface{}) (string, string) {
+	toptype_rule := qu.ObjToString(data["toptype"])
+	subtype_rule := qu.ObjToString(data["subtype"])
+	//1、结果类 中标和成交错误校正
+	s_winner := qu.ObjToString(data["s_winner"])
+	winnerorder, _ := data["winnerorder"].([]interface{})
+	if toptype_ai == "结果" && toptype_rule == "结果" {
+		if (subtype_ai == "中标" && subtype_rule == "成交") || (subtype_ai == "成交" && subtype_rule == "中标") {
+			if len(winnerorder) > 0 { //有中标候选人->中标
+				return toptype_ai, "中标"
+			}
+			if s_winner != "" || data["bidamount"] != nil {
+				return toptype_ai, "成交"
+			}
+		}
+	}
+	//2、招标、结果错误校正
+	if toptype_ai != "结果" && toptype_rule == "结果" {
+		//return toptype_rule,subtype_rule//默认规则为准
+		if len(winnerorder) > 0 { //有中标候选人->中标
+			//return toptype_rule, "中标"//这里subtype是否返回"中标"?
+			return toptype_rule, subtype_rule //默认规则是正确的
+		} else if s_winner != "" || data["bidamount"] != nil {
+			return toptype_rule, subtype_rule
+		} else {
+			return toptype_ai, subtype_ai
+		}
+	} else if toptype_ai == "结果" && toptype_rule != "结果" {
+		//return toptype_rule,subtype_rule//默认规则为准
+		if len(winnerorder) > 0 { //有中标候选人->中标
+			return toptype_ai, "中标" //这里subtype返回"中标",避免ai识别错误
+		} else if s_winner != "" || data["bidamount"] != nil {
+			return toptype_ai, "成交" //这里subtype返回"成交",避免ai识别错误
+		} else {
+			return toptype_ai, subtype_ai
+		}
+	}
+	return toptype_ai, subtype_ai
+}

+ 1 - 1
src/config.json

@@ -1,5 +1,5 @@
 {
 {
-  "udpport": ":11109",
+  "udpport": ":1166",
   "mongodb": {
   "mongodb": {
     "addrName": "127.0.0.1:27017",
     "addrName": "127.0.0.1:27017",
     "dbName": "zhengkun",
     "dbName": "zhengkun",

+ 13 - 4
src/main.go

@@ -211,10 +211,19 @@ func startCheckData(sid, eid string) {
 			}()
 			}()
 			//更新-
 			//更新-
 			update_check := make(map[string]interface{}, 0)
 			update_check := make(map[string]interface{}, 0)
-			//getCheckDataCity(tmp, &update_check)		//审查-城市
-			//getCheckDataBidamount(tmp, &update_check)	//审查-中标金额
-			getCheckDataPublishtime(tmp, &update_check) //修复-发布时间
-			//getCheckDataCategory(tmp,&update_check)	//修复分类
+
+			//审查-城市-迁移
+			//getCheckDataCity(tmp, &update_check)
+			//审查-金额-迁移
+			//getCheckDataBidamount(tmp, &update_check)
+			//审查-分类-弃用
+			//getCheckDataCategory(tmp,&update_check)
+
+			//审查-发布时间
+			getCheckDataPublishtime(tmp, &update_check)
+			//审查-大模型与抽取
+			getCheckDataAI(tmp, &update_check)
+
 			//最终计算是否清洗
 			//最终计算是否清洗
 			update_dict := make(map[string]interface{}, 0)
 			update_dict := make(map[string]interface{}, 0)
 			if len(update_check) > 0 {
 			if len(update_check) > 0 {