Jelajahi Sumber

各种修改

zhengkun 1 tahun lalu
induk
melakukan
93fb517360

+ 2 - 1
src/config.json

@@ -30,5 +30,6 @@
     "tomail": "zhengkun@topnet.net.cn",
     "api": "http://10.171.112.160:19281/_send/_mail",
     "deleteInstanceTimeHour": 1,
-    "jsondata_extweight": 1
+    "jsondata_extweight": 1,
+    "flowaddr": "192.168.3.240:19090"
 }

+ 2 - 2
src/jy/cluster/ssh.go

@@ -43,13 +43,13 @@ func ssHConnect(user, password, host string, port int) (*ssh.Session, error) {
 	return session, nil
 }
 
-//wget https://www.jianyu360.com/upload/extract_v3.tgz
+// wget https://www.jianyu360.com/upload/extract_v3.tgz
 var sshstr = `
 #!/bin/bash
 cd /opt
 kill -9 $(pidof extract_fbs)
 rm -rf extract_fbs*
-wget http://172.17.145.179:9080/res/extract_fbs.tgz
+wget http://172.17.162.36:9080/res/extract_fbs.tgz
 tar -xzvf extract_fbs.tgz
 cd /opt/extract_fbs
 chmod 777 extract_fbs

+ 93 - 0
src/jy/extract/extract.go

@@ -834,3 +834,96 @@ func (e *ExtractTask) StartMatch(field, text string) *pretreated.SortMap {
 	}
 	return SMap
 }
+
+// 抽取-正文-流结构
+func (e *ExtractTask) ExtractProcessFlow(j, jf *ju.Job, isSite bool) map[string]interface{} {
+	e.ExtractDetail(j, isSite, j.SpiderCode) //正文抽取属性
+	if jf != nil && jf.IsFile {              //附件jf → j  合并
+		e.ExtractDetail(jf, isSite, j.SpiderCode)
+		for tmpk, xs := range jf.Result {
+			if len(j.Result[tmpk]) == 0 {
+				if tmpk == "budget" || tmpk == "bidamount" {
+					for _, v := range xs {
+						if fv, ok := v.Value.(float64); ok && fv > 100 && fv < 50000000000 {
+							j.Result[tmpk] = append(j.Result[tmpk], v)
+						}
+					}
+				} else {
+					if tmpk == "winner" && j.Category == "招标" && j.CategorySecond != "单一" {
+						continue
+					}
+					j.Result[tmpk] = append(j.Result[tmpk], jf.Result[tmpk]...)
+				}
+			} else {
+				if tmpk == "winner" { //均没有有效值~采用附件的
+					isUsed := false
+					for _, v := range j.Result[tmpk] {
+						if v.Value != "" {
+							isUsed = true
+							break
+						}
+					}
+					if !isUsed {
+						if j.Category == "招标" && j.CategorySecond != "单一" {
+							continue
+						}
+						j.Result[tmpk] = append(j.Result[tmpk], jf.Result[tmpk]...)
+					}
+				}
+			}
+		}
+		if len(j.Winnerorder) == 0 && jf.Winnerorder != nil && len(jf.Winnerorder) > 0 {
+			if j.Category == "招标" && j.CategorySecond != "单一" {
+
+			} else {
+				j.Winnerorder = append(j.Winnerorder, jf.Winnerorder...)
+			}
+		}
+		if len(j.PackageInfo) == 0 && isUsedPackageJF(jf.PackageInfo) {
+			j.PackageInfo = jf.PackageInfo
+		}
+	}
+	if isSite {
+		ismerge, ok := e.SiteMerge.Load(j.SpiderCode)
+		if ok && ismerge.(bool) {
+			tmpj := &ju.Job{
+				SourceMid:      j.SourceMid,
+				Category:       j.Category,
+				CategorySecond: j.CategorySecond,
+				Content:        j.Content,
+				SpiderCode:     j.SpiderCode,
+				//Domain:     qu.ObjToString(doc["domain"]),
+				//Href:       qu.ObjToString(doc["href"]),
+				Title:     j.Title,
+				Data:      j.Data,
+				City:      j.City,
+				Province:  j.Province,
+				Jsondata:  j.Jsondata,
+				Result:    map[string][]*ju.ExtField{},
+				BuyerAddr: j.BuyerAddr,
+				RuleBlock: e.RuleBlock,
+			}
+			qu.Try(func() {
+				pretreated.AnalyStart(tmpj, false, "") //job.Block分块
+			}, func(err interface{}) {
+				log.Debug("pretreated.AnalyStart.ExtractProcess", err, j.SourceMid)
+			})
+			e.ExtractDetail(tmpj, false, "")
+			//合并数据
+			j.Block = append(j.Block, tmpj.Block...)
+			j.Winnerorder = append(j.Winnerorder, tmpj.Winnerorder...)
+			for tmpk, _ := range j.Result {
+				if len(tmpj.Result[tmpk]) > 0 {
+					j.Result[tmpk] = append(j.Result[tmpk], tmpj.Result[tmpk]...)
+				}
+			}
+			for tmpk, _ := range tmpj.Result {
+				if len(j.Result[tmpk]) == 0 {
+					j.Result[tmpk] = append(j.Result[tmpk], tmpj.Result[tmpk]...)
+				}
+			}
+		}
+	}
+	//分析抽取结果并保存
+	return AnalysisSaveFlowResult(j, jf, e)
+}

+ 38 - 1
src/jy/extract/extractcheck.go

@@ -153,7 +153,6 @@ func checkFields(tmp map[string]interface{}, j_data map[string]interface{}) map[
 	projectperiod := qu.ObjToString(tmp["projectperiod"])
 	if projectperiod != "" {
 		//项目周期包含日期,数字及日期单位可保留,其余可清洗
-		isNeedValueReg := regexp.MustCompile(`([0-9俩两一二三四五六七八九年月日天周]|合同)`)
 		if !isNeedValueReg.MatchString(projectperiod) {
 			delete(tmp, "projectperiod")
 		}
@@ -310,5 +309,43 @@ func checkFields(tmp map[string]interface{}, j_data map[string]interface{}) map[
 		}
 	}
 
+	//发布时间--判断修复
+	publishtime = qu.Int64All(tmp["publishtime"])
+	bidopentime = qu.Int64All(tmp["bidopentime"])
+	bidendtime = qu.Int64All(tmp["bidendtime"])
+	if (bidopentime > 0 || bidendtime > 0) && publishtime > 0 {
+		//验证-是否超过周期16天
+		bid_time := int64(0)
+		if bidopentime > 0 {
+			bid_time = bidopentime
+		} else {
+			bid_time = bidendtime
+		}
+		if bidendtime < bidopentime && bidendtime > 0 && bidopentime > 0 {
+			bid_time = bidendtime
+		}
+		if publishtime-bid_time > 16*86400 && publishtime > 0 && bid_time > 0 {
+			tmp["dataging"] = 1
+			tmp["publishtime"] = bid_time
+			tmp["pt_modify"] = publishtime
+		}
+	}
+
+	//合同周期...校验
+	contractperiod := qu.ObjToString(tmp["contractperiod"])
+	if contractperiod != "" {
+		//项目周期包含日期,数字及日期单位可保留,其余可清洗
+		if !isNeedValueReg.MatchString(contractperiod) {
+			delete(tmp, "contractperiod")
+		}
+	}
+
+	//计算合同签订时间-结束时间-是否异常
+	signaturedate := qu.Int64All(tmp["signaturedate"])
+	expiredate := qu.Int64All(tmp["expiredate"])
+	if expiredate > 0 && signaturedate > 0 && expiredate < signaturedate {
+		delete(tmp, "expiredate")
+	}
+
 	return tmp
 }

+ 95 - 0
src/jy/extract/extractflow.go

@@ -0,0 +1,95 @@
+package extract
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	db "jy/mongodbutil"
+	ju "jy/util"
+	qu "qfw/util"
+)
+
+// 流模式
+
+var ExtFlow *ExtractTask
+
+func InitExtractFlowTask() {
+	if ExtFlow == nil {
+		ExtFlow = nil
+		ExtFlow = &ExtractTask{}
+		ExtFlow.Id = qu.ObjToString(ju.Config["udptaskid"])
+		ExtFlow.InitTaskInfo()
+		ExtFlow.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ExtFlow.TaskInfo.FromDbAddr, ExtFlow.TaskInfo.FromDB)
+		ExtFlow.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ExtFlow.TaskInfo.ToDbAddr, ExtFlow.TaskInfo.ToDB)
+		ExtFlow.InitSite()
+		ExtFlow.InitRulePres()
+		ExtFlow.InitRuleBacks(false)
+		ExtFlow.InitRuleBacks(true)
+		ExtFlow.InitRuleCore(false)
+		ExtFlow.InitRuleCore(true)
+		ExtFlow.InitBlockRule()
+		ExtFlow.InitPkgCore()
+		ExtFlow.InitTag(false)
+		ExtFlow.InitTag(true)
+		ExtFlow.InitClearFn(false)
+		ExtFlow.InitClearFn(true)
+		ExtFlow.Lock()
+		if ExtFlow.IsExtractCity { //版本上控制是否开始城市抽取
+			ExtFlow.InitCityInfo()
+			ExtFlow.InitAreaCode()
+			ExtFlow.InitPostCode()
+		}
+		ExtFlow.Unlock()
+		//质量审核
+		ExtFlow.InitAuditFields()
+		ExtFlow.InitAuditRule()
+		ExtFlow.InitAuditClass()
+		ExtFlow.InitAuditRecogField()
+		//品牌抽取是否开启
+		ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
+		ExtFlow.InitFile()
+		ExtFlow.IsRun = true
+		ExtFlow.BidTotal = 0
+	}
+}
+
+func ExtractByExtFlow(v map[string]interface{}) map[string]interface{} {
+	defer qu.Catch()
+	if ju.IsUpdateRule {
+		ju.IsUpdateRule = false
+		log.Debug("每天更新一次规则......")
+		//规则重置
+		ExtFlow.InitSite()
+		ExtFlow.InitRulePres()
+		ExtFlow.InitRuleBacks(false)
+		ExtFlow.InitRuleBacks(true)
+		ExtFlow.InitRuleCore(false)
+		ExtFlow.InitRuleCore(true)
+		ExtFlow.InitBlockRule()
+		ExtFlow.InitPkgCore()
+		ExtFlow.InitTag(false)
+		ExtFlow.InitTag(true)
+		ExtFlow.InitClearFn(false)
+		ExtFlow.InitClearFn(true)
+		//地域重置
+		ExtFlow.Lock()
+		if ExtFlow.IsExtractCity { //版本上控制是否开始城市抽取
+			ExtFlow.InitCityInfo()
+			ExtFlow.InitAreaCode()
+			ExtFlow.InitPostCode()
+		}
+		ExtFlow.Unlock()
+	}
+	if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
+		log.Debug(qu.BsonIdToSId(v["_id"]), "//开标记录")
+		return v
+	}
+	//......
+	var j, jf *ju.Job
+	var isSite bool
+	if ExtFlow.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
+		v["isextFile"] = true
+		j, jf, isSite = ExtFlow.PreInfo(v)
+	} else {
+		j, _, isSite = ExtFlow.PreInfo(v)
+	}
+	return ExtFlow.ExtractProcessFlow(j, jf, isSite)
+}

+ 516 - 0
src/jy/extract/extractsave.go

@@ -728,3 +728,519 @@ func rangeBlockToJson(j *ju.Block, tmpblock ju.TmpBlock) (b *ju.TmpBlock) {
 	}
 	return &tmpblock
 }
+
+// 分析抽取结果并保存
+func AnalysisSaveFlowResult(j, jf *ju.Job, e *ExtractTask) map[string]interface{} {
+	tmp := map[string]interface{}{}
+	qu.Try(func() {
+		//(j.Category == "招标" || j.Category == "预告")
+		if (j.Category == "招标" || j.Category == "预告") && (len(j.BlockPackage) > 0 || len(j.PackageInfo) > 0 || len(j.Result) > 0) {
+			if j.CategorySecond != "单一" {
+				delete(j.Result, "winner")
+				delete(j.Result, "bidamount")
+				for _, v := range j.BlockPackage {
+					v.Bidamount = 0
+					v.IsTrueBidamount = false
+					if v.Winner != "" {
+						v.Winner = ""
+						if v.SpaceKV != nil {
+							delete(v.SpaceKV.KvTags, "中标单位")
+						}
+						if v.TableKV != nil {
+							delete(v.TableKV.KvTags, "中标单位")
+						}
+						if v.ColonKV != nil {
+							delete(v.ColonKV.KvTags, "中标单位")
+						}
+					}
+				}
+				for _, v := range j.PackageInfo {
+					delete(v, "winner")
+					delete(v, "bidamount")
+				}
+				j.Winnerorder = nil
+				if jf != nil && jf.Winnerorder != nil {
+					jf.Winnerorder = nil
+				}
+			}
+		}
+		//重新取出清理过后的中标候选人重置候选人
+		e.ResetWinnerorder(j)
+		//打分
+		doc, result, _id := funcAnalysis(j, e)
+		//标签是否保存
+		if ju.IsSaveTag {
+			go otherNeedSave(j, result, e)
+		}
+		//从排序结果中取值
+		tmp["spidercode"] = j.SpiderCode
+		tmp["site"] = j.Site
+		if len(*j.Jsondata) > 0 {
+			tmp["jsondata"] = j.Jsondata
+		}
+		//字段-抽取来源
+		fieldSource := make(map[string]interface{}, 0)
+		//字段-抽取来源
+		for k, val := range result {
+			if k == "qualifies" {
+				squalifies := make([]interface{}, 0)
+				squalifiesMap := make(map[string]*scoreIndex, 0)
+				for _, kv := range val {
+					skey := kv.RuleText
+					if kv.Score > 0 {
+						if squalifiesMap[skey] == nil {
+							squalifiesMap = map[string]*scoreIndex{
+								skey: &scoreIndex{
+									Score: kv.Score,
+									Index: len(squalifies),
+								},
+							}
+							squalifies = append(squalifies, map[string]interface{}{
+								"key":   skey,
+								"value": kv.Value,
+							})
+						} else {
+							if squalifiesMap[skey].Score < kv.Score {
+								squalifies[squalifiesMap[skey].Index] = map[string]interface{}{
+									"key":   skey,
+									"value": kv.Value,
+								}
+							}
+						}
+					}
+				}
+				tmp[k] = squalifies
+				continue
+			}
+
+			//预算-中标金额字段-特殊情况特殊处理
+			if k == "bidamount" || k == "budget" {
+				b, index := calculateAbnormalMoney(val)
+				if b {
+					new_v := val[index]
+					tmp[new_v.Field] = new_v.Value
+					fieldSource[new_v.Field] = map[string]interface{}{
+						"ext_type": new_v.Type,
+						"ext_from": new_v.ExtFrom,
+						"ext_tag":  new_v.BlockTitle,
+					}
+					tmp["is_dif_ratioMoney"] = true
+					continue
+				}
+			}
+
+			for _, v := range val { //取第一个非负数,项目名称除外//存0是否有效
+				if (v.Field == "bidamount" || v.Field == "budget") && v.IsTrue && v.Score > -1 {
+					tmp[v.Field] = v.Value
+					fieldSource[v.Field] = map[string]interface{}{
+						"ext_type": v.Type,
+						"ext_from": v.ExtFrom,
+						"ext_tag":  v.BlockTitle,
+					}
+					break
+				}
+				if v.Score > -1 && (v.Field != "bidamount" && v.Field != "budget") && len(strings.TrimSpace(fmt.Sprint(v.Value))) > 0 {
+					tmp[v.Field] = v.Value
+					fieldSource[v.Field] = map[string]interface{}{
+						"ext_type": v.Type,
+						"ext_from": v.ExtFrom,
+						"ext_tag":  v.BlockTitle,
+					}
+					//中标单位~含字母判断~对比企业库
+					if (v.Field == "winner" || v.Field == "buyer") && letter_entity.MatchString(qu.ObjToString(v.SourceValue)) {
+						qyxy_data := ju.Qyxy_Mgo.FindOne("qyxy_std", map[string]interface{}{
+							"company_name": qu.ObjToString(v.SourceValue),
+						})
+						if qyxy_data != nil && len(qyxy_data) > 0 {
+							tmp[v.Field] = v.SourceValue
+						}
+					}
+
+					break
+				}
+			}
+		}
+		tmp["winner"] = strings.ReplaceAll(qu.ObjToString(tmp["winner"]), ",,", ",")
+		if len(j.PackageInfo) > 15 {
+			for k, v := range j.PackageInfo {
+				j.PackageInfo = map[string]map[string]interface{}{}
+				j.PackageInfo[k] = v
+				break
+			}
+		}
+		if len(j.PackageInfo) > 0 { //分包信息
+			tmp["package"] = j.PackageInfo
+			//包预算,中标金额合并大于抽取就覆盖
+			tmpBidamount, tmpBudget, tmpAgencyfee := qu.Float64All(0), qu.Float64All(0), qu.Float64All(0)
+			//s_winner逗号分隔拼接,分包中标人
+			var tmpstr, savewinner []string
+			//按包排序
+			for b, v := range j.PackageInfo {
+				if v["winner"] != nil && v["winner"] != "" {
+					tmpstr = append(tmpstr, b)
+				}
+			}
+			//包预算,中标金额合并大于抽取就覆盖
+			if len(j.PackageInfo) > 1 {
+				//包数大于1累加
+				for _, v := range j.PackageInfo {
+					if v["budget"] != nil {
+						tmpBudget = precisionAddFloat(tmpBudget, qu.Float64All(v["budget"]))
+					}
+					if v["bidamount"] != nil {
+						tmpBidamount = precisionAddFloat(tmpBidamount, qu.Float64All(v["bidamount"]))
+					}
+					if v["agencyfee"] != nil {
+						tmpAgencyfee = precisionAddFloat(tmpAgencyfee, qu.Float64All(v["agencyfee"]))
+					}
+				}
+				if qu.Float64All(tmp["budget"]) < tmpBudget {
+					fieldSource["budget"] = map[string]interface{}{
+						"ext_type": "",
+						"ext_from": "package",
+						"ext_tag":  "",
+					}
+					tmp["budget"] = tmpBudget
+				}
+				if qu.Float64All(tmp["agencyfee"]) < tmpAgencyfee {
+					fieldSource["agencyfee"] = map[string]interface{}{
+						"ext_type": "",
+						"ext_from": "package",
+						"ext_tag":  "",
+					}
+					tmp["agencyfee"] = tmpAgencyfee
+				}
+				if qu.Float64All(tmp["bidamount"]) < tmpBidamount {
+					//特殊爬虫分包金额-不要
+					if j.SpiderCode == "sx_sxgzszcgxt_fzb_cjhxrgs_bu" && qu.Float64All(tmp["bidamount"]) > 0.0 {
+
+					} else {
+						fieldSource["bidamount"] = map[string]interface{}{
+							"ext_type": "",
+							"ext_from": "package",
+							"ext_tag":  "",
+						}
+						tmp["bidamount"] = tmpBidamount
+					}
+				}
+				//if qu.Float64All(tmp["bidamount"]) > 0 && qu.Float64All(tmp["budget"]) > 0 && (qu.Float64All(tmp["bidamount"])/10 > qu.Float64All(tmp["budget"])) {
+				//	fieldSource["bidamount"] = map[string]interface{}{
+				//		"ext_type": "",
+				//		"ext_from": "package",
+				//	}
+				//	tmp["bidamount"] = tmpBidamount
+				//} else if qu.Float64All(tmp["bidamount"]) < tmpBidamount {
+				//	fieldSource["bidamount"] = map[string]interface{}{
+				//		"ext_type": "",
+				//		"ext_from": "package",
+				//	}
+				//	tmp["bidamount"] = tmpBidamount
+				//}
+			} else {
+				//包数等于1,tmp没有值取包里的值
+				if tmp["budget"] == nil || tmp["budget"] == 0 {
+					for _, v := range j.PackageInfo {
+						if v["budget"] != nil {
+							fieldSource["budget"] = map[string]interface{}{
+								"ext_type": "",
+								"ext_from": "package",
+								"ext_tag":  "",
+							}
+							tmp["budget"] = v["budget"]
+						}
+					}
+				}
+				if tmp["agencyfee"] == nil || tmp["agencyfee"] == 0 {
+					for _, v := range j.PackageInfo {
+						if v["agencyfee"] != nil {
+							fieldSource["agencyfee"] = map[string]interface{}{
+								"ext_type": "",
+								"ext_from": "package",
+								"ext_tag":  "",
+							}
+							tmp["agencyfee"] = v["agencyfee"]
+						}
+					}
+				}
+				if tmp["bidamount"] == nil || tmp["bidamount"] == 0 {
+					for _, v := range j.PackageInfo {
+						if v["bidamount"] != nil {
+							fieldSource["bidamount"] = map[string]interface{}{
+								"ext_type": "",
+								"ext_from": "package",
+								"ext_tag":  "",
+							}
+							tmp["bidamount"] = v["bidamount"]
+						}
+					}
+				}
+			}
+			//s_winner逗号分隔拼接,分包中标人
+			sort.Strings(tmpstr)
+			for _, v := range tmpstr {
+				winner := qu.ObjToString(j.PackageInfo[v]["winner"])
+				new_winner := clearWinnerReg.ReplaceAllString(winner, "")
+				if new_winner == "" {
+					continue
+				}
+				//名称黑名单
+				if unPackageWinnerReg.MatchString(new_winner) {
+					continue
+				}
+				savewinner = append(savewinner, new_winner)
+			}
+			if (savewinner == nil || len(savewinner) == 0) && tmp["winner"] != nil {
+				tmp["s_winner"] = tmp["winner"]
+				fieldSource["s_winner"] = fieldSource["winner"]
+			} else if savewinner != nil {
+				if len(savewinner) == 1 && tmp["winner"] != nil {
+					tmp["s_winner"] = tmp["winner"]
+					fieldSource["s_winner"] = fieldSource["winner"]
+				} else {
+					savewinner = RemoveReplicaSliceString(savewinner)
+					tmp["s_winner"] = strings.Join(savewinner, ",")
+					fieldSource["s_winner"] = map[string]interface{}{
+						"ext_type": "",
+						"ext_from": "package",
+						"ext_tag":  "",
+					}
+				}
+			}
+		} else if tmp["winner"] != nil {
+			//没有分包取winner
+			tmp["s_winner"] = tmp["winner"]
+			fieldSource["s_winner"] = fieldSource["winner"]
+		}
+
+		if len(j.Winnerorder) > 0 { //候选人信息
+			for i, v := range j.Winnerorder {
+				if v["price"] != nil {
+					tmpPrice := clear.ObjToMoney([]interface{}{v["price"], ""}, j.SpiderCode, j.IsClearnMoney)
+					if tmpPrice[len(tmpPrice)-1].(bool) {
+						j.Winnerorder[i]["price"] = tmpPrice[0]
+					} else {
+						delete(j.Winnerorder[i], "price")
+					}
+				}
+			}
+			tmp["winnerorder"] = j.Winnerorder
+		}
+		//处理附件
+		var resultf map[string][]*ju.ExtField
+		ffield := map[string]interface{}{}
+		if jf != nil {
+			e.ResetWinnerorder(jf)
+			_, resultf, _ = funcAnalysis(jf, e)
+			for _, val := range resultf {
+				for _, v := range val { //取第一个非负数
+					if v.Score > -1 {
+						ffield[v.Field] = v.Value
+						if tmp[v.Field] == nil || tmp[v.Field] == "" {
+							if v.Field == "addressing" {
+								break
+							}
+							if (v.Field == "bidamount" || v.Field == "budget") && v.IsTrue && v.Value.(float64) > 100 && v.Value.(float64) < 50000000000 {
+								tmp[v.Field] = v.Value
+								fieldSource[v.Field] = map[string]interface{}{
+									"ext_type": v.Type,
+									"ext_from": "ff",
+									"ext_tag":  v.BlockTitle,
+								}
+								break
+							}
+							if v.Score > -1 && (v.Field != "bidamount" && v.Field != "budget") && len(strings.TrimSpace(fmt.Sprint(v.Value))) > 0 {
+								if v.Field == "winner" && j.Category == "招标" && j.CategorySecond != "单一" {
+									break //此方法逻辑上已经不会达到这里 winner
+								}
+								tmp[v.Field] = v.Value
+								fieldSource[v.Field] = map[string]interface{}{
+									"ext_type": v.Type,
+									"ext_from": "ff",
+									"ext_tag":  v.BlockTitle,
+								}
+								//中标单位~含字母判断~对比企业库
+								if (v.Field == "winner" || v.Field == "buyer") && letter_entity.MatchString(qu.ObjToString(v.SourceValue)) {
+									qyxy_data := ju.Qyxy_Mgo.FindOne("qyxy_std", map[string]interface{}{
+										"company_name": qu.ObjToString(v.SourceValue),
+									})
+									if qyxy_data != nil && len(qyxy_data) > 0 {
+										tmp[v.Field] = v.SourceValue
+									}
+								}
+
+								break
+							}
+						}
+						break
+					}
+				}
+			}
+			if len(jf.PackageInfo) > 0 { //分包信息
+				ffield["package"] = jf.PackageInfo
+			}
+			if len(jf.Winnerorder) > 0 { //候选人信息
+				ffield["winnerorder"] = jf.Winnerorder
+			}
+		}
+
+		//添加字段来源
+		tmp["field_source"] = fieldSource
+		//是否为不规则表格字段
+		if j.IsUnRulesTab {
+			tmp["is_UnRules_Tab"] = j.IsUnRulesTab
+		}
+		//补充源表数据的数据
+		for k, v := range *doc {
+			if utf8.RuneCountInString(qu.ObjToString(v)) > 100000 {
+				(*doc)[k] = []rune(qu.ObjToString(v))[:100000]
+			}
+			//去重冗余字段
+			if delFiled(k) {
+				continue
+			}
+			if tmp[k] == nil && BiddingFields[k] != nil {
+				tmp[k] = v
+			}
+		}
+		//质量审核
+		if ju.QualityAudit {
+			e.QualityAudit(tmp)
+		}
+		//站点所有单位补充···
+		if site := e.SiteCityMap[j.Site]; site != nil && qu.ObjToString(tmp["buyer"]) == "" {
+			if site.B != "" {
+				tmp["buyer"] = site.B
+			}
+		}
+		//落款等文本识别
+		jf_text := ""
+		if jf != nil {
+			jf_text = jf.ContentClean
+		}
+		e.inscribeRecognize(&tmp, *j.Data, jf_text)
+		//根据正文获取资质要求
+		e.getQualifications(&tmp, *j.Data)
+		//城市抽取
+		if e.IsExtractCity {
+			e.ExtractRegionInfo(j, jf, &tmp, true)
+			e.ExtractRegionClean(&tmp) //正常标准清洗
+			if qu.ObjToString(tmp["area"]) == "" || qu.ObjToString(tmp["全国"]) == "" {
+				//需要调试...
+				e.ExtractRegionOtherInfo(j, &tmp)
+			}
+		}
+		//品牌抽取
+		if ju.IsBrandGoods {
+			tmp["checkhas"] = map[string]int{
+				"hastable": j.HasTable,
+				"hasgoods": j.HasGoods,
+				"hasbrand": j.HasBrand,
+				"haskey":   j.HasKey,
+			}
+			if len(j.BrandData) > 0 {
+				tmp["tablebrand"] = j.BrandData
+			}
+		}
+		//prince和number抽取
+		if ju.IsPriceNumber {
+			priceNumberLen := len(j.PriceNumberData)
+			if priceNumberLen > 1 { //table数据去重
+				tmpPriceNumberData := []map[string]interface{}{}
+				tableStrs := map[string]bool{}
+				for _, tb := range j.PriceNumberData {
+					has := false
+					bytes, _ := json.Marshal(tb)
+					str := string(bytes)
+					if len(tableStrs) > 0 && tableStrs[str] {
+						has = true
+					} else {
+						tableStrs[str] = true
+					}
+					if !has {
+						for _, data := range tb {
+							tmpPriceNumberData = append(tmpPriceNumberData, data)
+						}
+					}
+				}
+				tmp["pricenumber"] = tmpPriceNumberData
+			} else if priceNumberLen == 1 {
+				tmp["pricenumber"] = j.PriceNumberData[0]
+			}
+		}
+		//所有kv组成的字符串
+		assembleKVText(j, &tmp)
+		//检查字段
+		tmp["dataging"] = j.Dataging
+		tmp = checkFields(tmp, *j.Data)
+
+		if tmp["projectname"] == nil || tmp["projectname"] == "" {
+			tmp["projectname"] = j.Title
+		}
+		tmp["repeat"] = 0
+		if ju.Ffield {
+			if len(ffield) > 0 {
+				tmp["ffield"] = ffield
+			}
+		}
+		//临时保存指定字段数据
+		//new_tmp := map[string]interface{}{}
+		//new_tmp["area"] = qu.ObjToString(tmp["area"])
+		//new_tmp["city"] = qu.ObjToString(tmp["city"])
+		//new_tmp["district"] = qu.ObjToString(tmp["district"])
+		//tmp = new_tmp
+
+		if e.TaskInfo.TestColl == "" {
+			if len(tmp) > 0 { //保存抽取结果
+				delete(tmp, "_id")
+				tmparr := []map[string]interface{}{
+					map[string]interface{}{
+						"_id": qu.StringTOBsonId(_id),
+					},
+					map[string]interface{}{"$set": tmp},
+				}
+				e.RWMutex.Lock()
+				e.BidArr = append(e.BidArr, tmparr)
+				e.BidTotal++
+				e.RWMutex.Unlock()
+			}
+			if ju.SaveResult {
+				id := tmp["_id"]
+				tmp["result"] = result
+				tmp["resultf"] = resultf
+				delete(tmp, "_id")
+				tmparr := []map[string]interface{}{
+					map[string]interface{}{
+						"_id": id,
+					},
+					map[string]interface{}{"$set": tmp},
+				}
+				e.RWMutex.Lock()
+				e.ResultArr = append(e.ResultArr, tmparr)
+				e.RWMutex.Unlock()
+			}
+		} else { //测试结果~结果追踪
+			delete(tmp, "_id")
+			delete(tmp, "fieldall")
+			if len(j.BlockPackage) > 0 { //分包详情
+				if len(j.BlockPackage) > 10 {
+					tmp["epackage"] = "分包异常"
+				} else {
+					bs, _ := json.Marshal(j.BlockPackage)
+					tmp["epackage"] = string(bs)
+				}
+			}
+			tmp["result"] = result
+			//tmp["resultf"] = resultf
+			//_,err :=db.Mgo.Get().DB("zhengkun").C("result_data").Upsert(`{"_id":"`+_id+`"}`,map[string]interface{}{"$set": tmp})
+			//log.Debug("save:",err)
+			b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
+			if !b {
+				log.Debug(e.TaskInfo.TestColl, _id)
+			}
+		}
+	}, func(err interface{}) {
+		log.Debug("AnalysisSaveResult err", err)
+	})
+
+	return tmp
+}

+ 10 - 2
src/jy/extract/extractudp.go

@@ -214,10 +214,13 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 			log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
 			list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
 			for _, v := range *list {
-				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
+				if spidercode[qu.ObjToString(v["spidercode"])] {
 					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
 					continue
 				}
+				//if qu.ObjToString(v["subtype"]) != "合同" { //临时调整···
+				//	continue
+				//}
 				var j, jf *ju.Job
 				var isSite bool
 				if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
@@ -236,6 +239,9 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
 					continue
 				}
+				//if qu.ObjToString(v["subtype"]) != "合同" { //临时调整···
+				//	continue
+				//}
 				var j, jf *ju.Job
 				var isSite bool
 				if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
@@ -263,6 +269,7 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 					},
 				}, true, false)
 		}
+
 		log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
 	} else {
 		//普通抽取
@@ -283,10 +290,11 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 				if IsExtStop {
 					break
 				}
-				if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
+				if spidercode[qu.ObjToString(v["spidercode"])] {
 					log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
 					continue
 				}
+
 				_id := qu.BsonIdToSId(v["_id"])
 				//......
 				var j, jf *ju.Job

+ 3 - 0
src/jy/extract/extraxtmethod.go

@@ -131,6 +131,9 @@ var entdfa_entity = regexp.MustCompile("^([\u4E00-\u9FA5]{4,25}(公司|集团|
 var entdfa_clean = regexp.MustCompile("([\\s \n]+)")
 var entdfa_filtration = regexp.MustCompile("(开标记录)")
 
+// 周期有效
+var isNeedValueReg = regexp.MustCompile(`([0-9俩两一二三四五六七八九年月日天周]|合同)`)
+
 // 清洗正文
 func CleanDetailText(detail string, summary string) string {
 	detail = regexp.MustCompile(`<!--[\w\W]*?-->`).ReplaceAllString(detail, "")

+ 39 - 3
src/jy/util/util.go

@@ -1,16 +1,18 @@
 package util
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/cron"
+	"github.com/nats-io/nats.go"
 	"go.mongodb.org/mongo-driver/bson/primitive"
+	. "gopkg.in/mgo.v2/bson"
 	. "jy/mongodbutil"
 	qu "qfw/util"
 	"regexp"
 	"strconv"
 	"strings"
-
-	. "gopkg.in/mgo.v2/bson"
+	"time"
 )
 
 // 敏感词
@@ -48,6 +50,17 @@ var Site_Mgo, Qyxy_Mgo *MongodbSim
 var IsUpdateRule bool
 var DefaultRegions, AdjustmentRegions = []string{}, []string{}
 
+type MsgInfo struct {
+	Id       string                 //消息唯一id
+	CurrSetp string                 //当前步骤
+	NextSetp string                 //下个步骤,特殊流程增加
+	IsEnd    int                    //当前流程后结束 1-结束
+	Data     map[string]interface{} //数据内容
+	Err      string                 //错误信息 有错误会告警并终止流程
+	Stime    int64
+	Etime    int64
+}
+
 func init() {
 	syncint = make(chan bool, 1)
 }
@@ -359,7 +372,7 @@ func RemoveDuplicates(input []string) []string {
 	return output
 }
 
-//RemoveDuplicatesAndKeepLonger 去除字符串数组中的重复数据,同时只保留更全的字符串
+// RemoveDuplicatesAndKeepLonger 去除字符串数组中的重复数据,同时只保留更全的字符串
 func RemoveDuplicatesAndKeepLonger(arr []string) []string {
 	result := make([]string, 0)
 	uniqueStrings := make(map[string]struct{})
@@ -388,3 +401,26 @@ func RemoveDuplicatesAndKeepLonger(arr []string) []string {
 
 	return result
 }
+
+func SendRequest(nc *nats.Conn, subject, step string, requestData *MsgInfo, timeout time.Duration) (*MsgInfo, error) {
+	requestData.CurrSetp = step
+	stime := time.Now().UnixMilli()
+	// 发送请求并等待响应
+	bs, err := json.Marshal(requestData)
+	if err != nil {
+		return nil, err
+	}
+	rep, err := nc.Request(subject+"."+step, bs, timeout)
+	if err != nil {
+		return nil, err
+	}
+	// 返回响应数据
+	msgInfo := &MsgInfo{}
+	msgInfo.Etime = time.Now().UnixMilli()
+	msgInfo.Stime = stime
+	err = json.Unmarshal(rep.Data, msgInfo)
+	if err != nil {
+		return nil, err
+	}
+	return msgInfo, nil
+}

+ 42 - 4
src/main.go

@@ -2,6 +2,8 @@ package main
 
 import (
 	log "github.com/donnie4w/go-logger/logger"
+	"github.com/nats-io/nats.go"
+	"go.mongodb.org/mongo-driver/bson"
 	_ "jy/admin"
 	_ "jy/admin/audit"
 	_ "jy/admin/distribution"
@@ -10,9 +12,11 @@ import (
 	_ "jy/front"
 	. "jy/router"
 	u "jy/util"
+	"jynats/jnats"
 	"net/http"
 	_ "net/http/pprof"
 	qu "qfw/util"
+	"sync"
 )
 
 func init() {
@@ -26,6 +30,14 @@ func init() {
 	//初始化util
 	u.UtilInit()
 }
+
+// 流式...
+func mainT() {
+	go RunFlowSystem()
+	lock := make(chan bool)
+	<-lock
+}
+
 func main() {
 	extract.ExtractUdpUpdateMachine() //节点上传~构建
 	extract.ExtractUdp()              //udp通知抽取
@@ -38,13 +50,39 @@ func main() {
 	<-lock
 }
 
+func RunFlowSystem() {
+	addr := qu.ObjToString(u.Config["flowaddr"])
+	jn := jnats.NewJnats(addr)
+	extract.InitExtractFlowTask()
+	wg_mgo := &sync.WaitGroup{}
+	jn.SubZip("dataprocess.extract", func(msg *nats.Msg) {
+		msgInfo := &u.MsgInfo{}
+		err := bson.Unmarshal(msg.Data, &msgInfo)
+		if err != nil {
+			msgInfo.Err = err.Error()
+			bs, _ := bson.Marshal(msgInfo)
+			msg.Respond(bs)
+		} else {
+			extract.ExtFlow.TaskInfo.ProcessPool <- true
+			wg_mgo.Add(1)
+			go func(msgInfo *u.MsgInfo, msg *nats.Msg) {
+				defer func() {
+					<-extract.ExtFlow.TaskInfo.ProcessPool
+					wg_mgo.Done()
+				}()
+				res := extract.ExtractByExtFlow(msgInfo.Data)
+				msgInfo.Data["ext"] = res
+				bs, _ := bson.Marshal(msgInfo)
+				msg.Respond(bs)
+			}(msgInfo, msg)
+		}
+	})
+}
+
 // 验证规则
 func testMain() {
+	//http://extcity.spdata.jianyu360.com/service/entity/test?text=我是正文开滦(集团)有限责任公司
 	con := `2134576`
 	text := con[1:2]
 	log.Debug(text)
 }
-
-func testPostDfa() {
-	//http://extcity.spdata.jianyu360.com/service/entity/test?text=我是正文开滦(集团)有限责任公司
-}

+ 1 - 1
src/mark

@@ -22,7 +22,7 @@
     "pricenumber":true,
     "udpitaskid": "6275d34223119b206c86182e",
     "udpport": "1177",
-    "nextNode": [  ],
+    "nextNode": [],
     "esconfig": {
         "available": true,
         "AccessID": "LTAI4G5x9aoZx8dDamQ7vfZi",

+ 3 - 1
udps/main.go

@@ -39,7 +39,9 @@ func main() {
 	//	id2 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(end))
 	log.Println(id1, id2, tmptime)
 	//}
-	if ip != "" && p > 0 && ((id1 != "" && id2 != "") || (q != "" || tmptime > 0)) {
+	//if ip != "" && p > 0 && ((id1 != "" && id2 != "") || (q != "" || tmptime > 0)) {
+
+	if ip != "" && p > 0 {
 		toadd := &net.UDPAddr{
 			IP:   net.ParseIP(ip),
 			Port: p,