Bladeren bron

抽取~备份~清洗~企业校验等

zhengkun 3 jaren geleden
bovenliggende
commit
1898108e9f

+ 8 - 2
src/config.json

@@ -3,7 +3,7 @@
     "mgodb": "127.0.0.1:27017",
     "dbsize": 3,
     "dbname": "extract_local",
-    "dbname_addrs": "extract_local",
+    "dbname_addrs": "mixdata",
     "dbname_addrs_c": "address_new_2020",
     "redis": "qyk_redis=192.168.3.207:6379",
     "elasticsearch": "http://127.0.0.1:9800",
@@ -34,7 +34,13 @@
     "pricenumber":true,
     "udptaskid": "60b493c2e138234cb4adb640",
     "nextNode": [],
-    "udpport": "1784",
+    "udpport": "6601",
+    "udpmachine": {
+        "addr" : "127.0.0.1",
+        "port" : 6601,
+        "stype" : "extract_1",
+        "isuse" : 1
+    },
     "esconfig": {
         "available": false,
         "AccessID": "",

+ 38 - 40
src/jy/extract/extract.go

@@ -1903,6 +1903,8 @@ type FieldValue struct {
 
 var clearWinnerReg = regexp.MustCompile("名称|施工|拟定供应商名称|:|:")
 var unPackageWinnerReg = regexp.MustCompile("(重新招标)")
+//包含字母的实体单位
+var letter_entity = regexp.MustCompile("^[\u4E00-\u9FA5]{1,10}[A-Za-z]{1,5}[\u4E00-\u9FA5]{1,10}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|市|局|站|城|处|行|部|队|联合[会|体])$")
 
 
 //特殊金额-处理判断-倍率关系
@@ -2065,8 +2067,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				}
 			}
 
-			for _, v := range val { //取第一个非负数,项目名称除外
-				//存0是否有效
+			for _, v := range val { //取第一个非负数,项目名称除外//存0是否有效
 				if (v.Field == "bidamount" || v.Field == "budget") && v.IsTrue && v.Score > -1 {
 					tmp[v.Field] = v.Value
 					fieldSource[v.Field] = map[string]interface{}{
@@ -2081,6 +2082,17 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 						"ext_type":v.Type,
 						"ext_from":v.ExtFrom,
 					}
+					//中标单位~含字母判断~对比企业库
+					if (v.Field =="winner"||v.Field=="buyer") && letter_entity.MatchString(qu.ObjToString(v.SourceValue)){
+						qyxy_data := make([]map[string]interface{}, 0)
+						ju.QyxySess.Find(map[string]interface{}{
+							"company_name": qu.ObjToString(v.SourceValue),
+						}).All(&qyxy_data)
+						if qyxy_data!=nil && len(qyxy_data)>0 {
+							tmp[v.Field] = v.SourceValue
+						}
+					}
+
 					break
 				}
 			}
@@ -2148,25 +2160,6 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 					}
 					tmp["bidamount"] = tmpBidamount
 				}
-
-				//if qu.Float64All(tmp["bidamount"]) > 0 && qu.Float64All(tmp["budget"]) > 0 && (qu.Float64All(tmp["bidamount"])/10 > qu.Float64All(tmp["budget"])) {
-				//	fieldSource["bidamount"] = map[string]interface{}{
-				//		"ext_type":"",
-				//		"ext_from":"package",
-				//	}
-				//	tmp["bidamount"] = tmpBidamount
-				//} else if qu.Float64All(tmp["bidamount"]) < tmpBidamount {
-				//	if tmpBidamount == qu.Float64All(tmp["bidamount"])*float64(10000) &&
-				//		tmpBidamount>=1000000000 && qu.Float64All(tmp["bidamount"])>0{
-				//		tmp["is_dif_ratioMoney"] = true
-				//	}else {
-				//		fieldSource["bidamount"] = map[string]interface{}{
-				//			"ext_type":"",
-				//			"ext_from":"package",
-				//		}
-				//		tmp["bidamount"] = tmpBidamount
-				//	}
-				//}
 			} else {
 				//包数等于1,tmp没有值取包里的值
 				if tmp["budget"] == nil || tmp["budget"] == 0 {
@@ -2234,17 +2227,6 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 						"ext_from":"package",
 					}
 				}
-
-				//savewinner = RemoveReplicaSliceString(savewinner)
-				//tmp["s_winner"] = strings.Join(savewinner, ",")
-				//if len(savewinner)==1 {
-				//	fieldSource["s_winner"] = fieldSource["winner"]
-				//}else if len(savewinner)>1{
-				//	fieldSource["s_winner"] = map[string]interface{}{
-				//		"ext_type":"",
-				//		"ext_from":"package",
-				//	}
-				//}
 			}
 		} else if tmp["winner"] != nil && tmp["winner"] != "" {
 			//没有分包取winner
@@ -2289,6 +2271,17 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 									"ext_type":v.Type,
 									"ext_from":"ff",
 								}
+								//中标单位~含字母判断~对比企业库
+								if (v.Field =="winner" || v.Field=="buyer") && letter_entity.MatchString(qu.ObjToString(v.SourceValue)){
+									qyxy_data := make([]map[string]interface{}, 0)
+									ju.QyxySess.Find(map[string]interface{}{
+										"company_name": qu.ObjToString(v.SourceValue),
+									}).All(&qyxy_data)
+									if qyxy_data!=nil && len(qyxy_data)>0 {
+										tmp[v.Field] = v.SourceValue
+									}
+								}
+
 								break
 							}
 						}
@@ -2304,8 +2297,8 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			}
 		}
 
-		//添加字段来源 ~~ 临时注释
-		//tmp["field_source"] = fieldSource
+		//添加字段来源
+		tmp["field_source"] = fieldSource
 		//是否为不规则表格字段
 		if j.IsUnRulesTab {
 			tmp["is_UnRules_Tab"]= j.IsUnRulesTab
@@ -2426,9 +2419,6 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		tmp = checkFields(tmp,*j.Data)
 
 
-
-
-
 		if tmp["projectname"] == nil || tmp["projectname"] == "" {
 			tmp["projectname"] = j.Title
 		}
@@ -2438,6 +2428,14 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				tmp["ffield"] = ffield
 			}
 		}
+
+		//只要项目名称
+		//p_name := qu.ObjToString(tmp["projectname"])
+		//tmp = map[string]interface{}{}
+		//if p_name!="" {
+		//	tmp["projectname"] = p_name
+		//}
+
 		if e.TaskInfo.TestColl == "" {
 			if len(tmp) > 0 { //保存抽取结果
 				delete(tmp, "_id")
@@ -2467,7 +2465,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				e.ResultArr = append(e.ResultArr, tmparr)
 				e.RWMutex.Unlock()
 			}
-		} else { //测试结果
+		} else { //测试结果~结果追踪
 			delete(tmp, "_id")
 			delete(tmp, "fieldall")
 			if len(j.BlockPackage) > 0 { //分包详情
@@ -2678,14 +2676,14 @@ func dealWithDiscountBid(tmp map[string]interface{}) float64 {
 	if biddiscount>0.0 {
 		if biddiscount > 1.0 && biddiscount<=10.0  {
 			num1:=decimal.NewFromFloat(10.0)
-			num2:=decimal.NewFromFloat(biddiscount_up)
+			num2:=decimal.NewFromFloat(biddiscount)
 			decimalValue := num2.Div(num1)
 			res,_ := decimalValue.Float64()
 			//log.Debug("标准-①折扣系数:",res)
 			return res
 		}else if biddiscount>10.0 {
 			num1:=decimal.NewFromFloat(100.0)
-			num2:=decimal.NewFromFloat(biddiscount_up)
+			num2:=decimal.NewFromFloat(biddiscount)
 			decimalValue := num2.Div(num1)
 			res,_ := decimalValue.Float64()
 			//log.Debug("标准-⑩折扣系数:",res)

+ 22 - 12
src/jy/extract/extractInit.go

@@ -1042,19 +1042,15 @@ func (e *ExtractTask) InitXjbtCityInfo() {
 	e.XjbtCityArr = arr
 }
 
-
-func (e *ExtractTask) InitCityInfo() {
+//站点加载...
+func (e *ExtractTask) InitUpdateSite() {
 	defer qu.Catch()
-	e.InitVar() //初始化变量
-	//新疆兵团数据
-	e.InitXjbtCityInfo()
-
-	//site站点信息
+	e.SiteCityMap = make(map[string]*SiteCity)
 	for _, v := range InitSite() {
-		site, _ := v["site"].(string)
-		area, _ := v["area"].(string)
-		city, _ := v["city"].(string)
-		district, _ := v["district"].(string)
+		site:= qu.ObjToString(v["site"])
+		area:= qu.ObjToString(v["area"])
+		city:= qu.ObjToString(v["city"])
+		district:= qu.ObjToString(v["district"])
 		if area != "" && area != "全国" && site != "" {
 			s := &SiteCity{
 				P: area,
@@ -1064,6 +1060,20 @@ func (e *ExtractTask) InitCityInfo() {
 			e.SiteCityMap[site] = s
 		}
 	}
+	log.Debug("有效站点数量:",len(e.SiteCityMap))
+
+
+}
+
+
+
+func (e *ExtractTask) InitCityInfo() {
+	defer qu.Catch()
+	e.InitVar() //初始化变量
+	//新疆兵团数据
+	e.InitXjbtCityInfo()
+	//site站点信息
+	e.InitUpdateSite()
 	//初始化省信息
 	fn1 := InitProvince(e.TaskInfo.Version)
 	for k, v := range fn1 {
@@ -1532,7 +1542,7 @@ func getFieldAllAndBlocks(a [][]map[string]interface{}) (arr [][]map[string]inte
 			delete(tmp, "fieldallf")
 
 			v[1] = tmp //全部更新
-			//v[1]["$set"] = tmp //指定更新
+			//v[1]["$set"] = tmp //指定更新~针对指定projectname
 		}
 		arr = append(arr, v)
 	}

+ 39 - 3
src/jy/extract/extractudp.go

@@ -19,11 +19,19 @@ import (
 var Udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 var IsExtStop bool
+
+//新增机器节点
+func ExtractUdpUpdateMachine() {
+	machine := *qu.ObjToMap(ju.Config["udpmachine"])
+	skey := fmt.Sprintf("%s:%d:%s",machine["addr"],qu.IntAll(machine["port"]),machine["stype"])
+	machine["skey"] = skey
+	db.Mgo.Update("extract_control_center", map[string]interface{}{"skey":skey},machine,true,false)
+}
+
 //udp通知抽取
 func ExtractUdp() {
 	nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
-	//Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
-	Udpclient = mu.UdpClient{Local: ":6601", BufSize: 1024}
+	Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
 	Udpclient.Listen(processUdpMsg)
 }
 
@@ -152,7 +160,35 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 		ext.IsRun = true
 		ext.InitFile()
 	} else {
-		ext.BidTotal = 0
+		ext.BidTotal = 0 //是否更新站点数据~~~
+		if ju.IsUpdateSite && ext.IsExtractCity {
+			ext.InitUpdateSite()
+			ju.IsUpdateSite = false
+		}
+		//更新规则~标签~~
+		if ju.IsUpdateRuleTag {
+			ju.IsUpdateRuleTag = false
+			ext.InitRulePres()
+			ext.InitRuleBacks(false)
+			ext.InitRuleBacks(true)
+			ext.InitRuleCore(false)
+			ext.InitRuleCore(true)
+			ext.InitBlockRule()
+			ext.InitPkgCore()
+			ext.InitTag(false)
+			ext.InitTag(true)
+			ext.InitClearFn(false)
+			ext.InitClearFn(true)
+			ext.Lock()
+			if ext.IsExtractCity { //版本上控制是否开始城市抽取
+				//初始化城市DFA信息
+				//ext.InitCityDFA()
+				ext.InitCityInfo()
+				ext.InitAreaCode()
+				ext.InitPostCode()
+			}
+			ext.Unlock()
+		}
 	}
 	index := 0
 	if len(instanceId) > 0 { //分布式抽取进度

+ 5 - 2
src/jy/extract/newextractcity.go

@@ -185,8 +185,6 @@ func (e *ExtractTask) NewExtractCity(j *ju.Job, resulttmp *map[string]interface{
 		}
 	}
 
-
-
 	//如果-仅有省份-敏感词-校验核对方法
 	if arearesult!="全国" && cityresult=="" {
 		sensitive_city := e.SensitiveCityData(qu.ObjToString((*j.Data)["detail"]),arearesult)
@@ -195,6 +193,11 @@ func (e *ExtractTask) NewExtractCity(j *ju.Job, resulttmp *map[string]interface{
 			(*resulttmp)["is_sensitive"] = 1
 		}
 	}
+
+
+
+
+
 }
 
 //jsondata中抽取城市

+ 0 - 4
src/jy/extract/score.go

@@ -115,10 +115,6 @@ func ScoreFields(j *ju.Job, ftag map[string][]*Tag) map[string][]*ju.ExtField {
 		if field == "projectcode" {
 			tmps = projectWeightClear(tmps)
 		}
-		if field=="docendtime" {
-			//log.Println("调试调试")
-		}
-
 		if field == "budget" || field == "bidamount" {
 			for tmpsindex, tmpsvalue := range tmps {
 				if ((strings.Contains(tmpsvalue.RuleText, "总") && !strings.Contains(tmpsvalue.RuleText, "项目总投资"))||strings.Contains(tmpsvalue.Code, "总价")) && tmpsvalue.RuleText!="总价(元)" &&(tmpsvalue.Type == "colon"||tmpsvalue.Type == "table" ) {

+ 2 - 0
src/jy/pretreated/analystep.go

@@ -42,6 +42,7 @@ var formattext10  = regexp.MustCompile(".*包号\n.*\n.*供应商名称\n.*\n.*(
 	"<td.*>(.*)\n(<td>\n)?.*\n<td.*>[\n]?(.*公司)\n.*\n<td.*>([0-9.,,万元]+)\n")
 var formattext11  = regexp.MustCompile("(项目预算)\n(第[一1](包|标段)[::])([0-9.万元人民币]+)\n" +
 	"(第[二2](包|标段)[::])([0-9.万元人民币]+)\n")
+var formattext12  = regexp.MustCompile("((成交|中标)价格|本期预算)\n[((]万元[))]\n([0-9.万元人民币]+)\n")
 
 //特殊文本提取-计算
 var formattext50  = regexp.MustCompile("主要标的数量[::]([0-9.]+)\n主要标的单价[::]([0-9.]+)\n合同金额[::].*\n履约期限")
@@ -134,6 +135,7 @@ func AnalyStart(job *util.Job, isSite bool, codeSite string) {
 	//改变特殊结构
 	con = formattext10.ReplaceAllString(con,"\n分包$3\n中标单位:$5 中标金额:$6\n")
 	con = formattext11.ReplaceAllString(con,"${1}\n${2}\n预算金额:${4}\n${5}\n预算金额:${7}\n${8}\n")
+	con = formattext12.ReplaceAllString(con,"\n${1}:${3}万元\n")
 
 
 	//指定爬虫-特殊结构-计算抽取

+ 61 - 52
src/jy/pretreated/analytable.go

@@ -245,7 +245,6 @@ func CommonDataAnaly(k, tabletag, tabledesc string, v interface{}, isSite bool,
 var glRex *regexp.Regexp = regexp.MustCompile("(成交|中标|候选|排名|名次|供应商排序|中标候选人|名单及其排序|排序)")
 var djReg *regexp.Regexp = regexp.MustCompile("^单价")
 var hxrRex *regexp.Regexp = regexp.MustCompile("((成交|中标|中选)?候选人[弟|第][1-5一二三四五]名|[弟|第][1-5一二三四五][名]?(成交|中标|中选)?候选人)")
-var winnerRex *regexp.Regexp = regexp.MustCompile("(公司)$")
 
 //判断数组string 是否重复
 func isRepeatArrString(arr1,arr2 []string)bool{
@@ -341,7 +340,7 @@ func (table *Table) KVFilter(isSite bool, codeSite string) {
 		for _, k := range table.SortKV.Keys {
 			if hxrRex.MatchString(k) {
 				v := table.SortKV.Map[k]
-				if new_v, ok := v.(string); ok && winnerRex.MatchString(new_v) {
+				if new_v, ok := v.(string); ok && findCandidate2.MatchString(new_v) {
 					winsArr = append(winsArr,new_v)
 					sortsArr = append(sortsArr,k)
 				}
@@ -944,7 +943,7 @@ func (table *Table) Analy(contactFormat *u.ContactFormat, isSite bool, codeSite
 	//if ztb >= 9 {
 	//	return []*Table{}
 	//}
-	if ztb > 10 {
+	if ztb > 20 {
 		return []*Table{}
 	}
 	//遍历节点,初始化table 结构  TRs Sorts
@@ -1882,9 +1881,15 @@ func GetBidSort(str string, n int) int {
 }
 
 var cleardwReg *regexp.Regexp = regexp.MustCompile("[((]{1}\\d*[人元件个公斤户]/[人元件个公斤户][))]")
-var zbhxrReg *regexp.Regexp = regexp.MustCompile("(中标候选人|投标单位名称)")
-var zbhxrSortReg *regexp.Regexp = regexp.MustCompile("([第|弟][1-3一二三]名)")
-var zbhxrSortNameReg *regexp.Regexp = regexp.MustCompile("(中标候选人[第|弟][1-3一二三]名)|[第|弟][1-3一二三]中标候选人")
+var zbhxrReg *regexp.Regexp = regexp.MustCompile("(中标候选人|投标单位名称|候选人姓名)")
+var zbhxrSortReg_1 *regexp.Regexp = regexp.MustCompile("([第|弟][123一二三]名)")
+var zbhxrSortReg_2 *regexp.Regexp = regexp.MustCompile("^([123一二三])$")
+
+var zbhxrSortReg_3 *regexp.Regexp = regexp.MustCompile("^([123一二三])")
+
+
+
+var zbhxrSortNameReg *regexp.Regexp = regexp.MustCompile("(中标候选人[第|弟][123一二三]名)|[第|弟][123一二三]中标候选人")
 var zbhxrSecondReg *regexp.Regexp = regexp.MustCompile("(中标候选人[第|弟][2二]名)|[第|弟][2二]中标候选人")
 
 
@@ -1899,6 +1904,10 @@ func (table *Table) FindTdVal(td *TD, direct, vdirect int) (b bool) {
 		near.KeyDirect = vdirect
 		td.KVDirect = direct
 		key := repSpace.ReplaceAllString(near.Val, "")
+
+		//临时去掉换行-进行判断
+		tmp_tdVal := strings.ReplaceAll(td.Val,"\n","")
+
 		if key == "名称" && near.StartCol == 0 && near.Rowspan > 0 {
 			new_key := ""
 			tr := table.TRs[:td.TR.RowPos]
@@ -1927,50 +1936,52 @@ func (table *Table) FindTdVal(td *TD, direct, vdirect int) (b bool) {
 			}else {
 				key = new_key
 			}
-		} else if zbhxrReg.MatchString(key) && findCandidate2.MatchString(td.Val) {
+		} else if zbhxrReg.MatchString(key) && findCandidate2.MatchString(tmp_tdVal){
 			new_key := "中标单位"
 			tr_top := table.TRs[:td.TR.RowPos]
 			if len(tr_top)>len(tr_top)-1 && len(tr_top)>0{ //上临查询
 				tds := tr_top[len(tr_top)-1].TDs
-				if len(tds)>td.EndCol && tds !=nil{
-					td1 := tds[td.EndCol]
+				if len(tds)>td.ColPos && tds !=nil && len(tds)== len(td.TR.TDs){
+					td1 := tds[td.ColPos]
 					if zbhxrSortNameReg.MatchString(td1.Val) {
 						new_key = td1.Val
 					}else {
-						if zbhxrSortReg.MatchString(td1.Val) {
-							new_key = "中候选人"+td1.Val
+						if zbhxrSortReg_1.MatchString(td1.Val) {
+							new_key = "中候选人"+td1.Val
 						}
 					}
 				}
 			}
-			//if new_key=="中标单位" {
-			//	tr_left := table.TRs[:td.TR.RowPos+1] //左临查询
-			//	tds := tr_left[len(tr_left)-1].TDs
-			//	if td.EndCol-1 >=0 {
-			//		td1 := tds[td.EndCol-1]
-			//		if zbhxrSortNameReg.MatchString(td1.Val) {
-			//			new_key = td1.Val
-			//		}else {
-			//			if zbhxrSortReg.MatchString(td1.Val) {
-			//				new_key = "中选候选人"+td1.Val
-			//			}
-			//		}
-			//	}
-			//}
-
+			if new_key=="中标单位" { //最左临查询
+				tr_left := table.TRs[:td.TR.RowPos+1]
+				tds_left := tr_left[len(tr_left)-1].TDs
+				if td.ColPos>0 {
+					td1 := tds_left[0]
+					if zbhxrSortReg_2.MatchString(td1.Val) { //针对排名情况
+						new_key = "中标候选人第"+td1.Val+"名"
+					}
+				}
+			}
+			if new_key!="中标单位" {
+				td.Val = tmp_tdVal
+			}
 			key = new_key
-		} else if key == "单位名称" {//左临上临-拼接
+		} else if key == "投标人名称" ||  key == "单位名称" {//左临上临-拼接
 			tmpnewnear := table.FindNear(near, 1)
 			if tmpnewnear == nil {
 				tmpnewnear = table.FindNear(near, 2)
 			}
 			if tmpnewnear != nil {
 				if (table.Tag=="成交候选人" || table.Tag=="中标候选人") &&
-					zbhxrSortReg.MatchString(tmpnewnear.Val) {
+					zbhxrSortReg_1.MatchString(tmpnewnear.Val) {
 					key = "中选候选人"+tmpnewnear.Val
 				}else {
 					if tmpnewnear.MustBH || tmpnewnear.BH {
-						key = tmpnewnear.Val + near.Val
+						if tmpnewnear.Val == "中标候选人情况" && zbhxrSortReg_3.MatchString(td.Val){
+							key = "中标候选人第"+zbhxrSortReg_3.FindString(td.Val)+"名"
+						}else {
+							key = tmpnewnear.Val + near.Val
+						}
 					}
 				}
 			}
@@ -2077,13 +2088,8 @@ func (table *Table) FindTdVal(td *TD, direct, vdirect int) (b bool) {
 				tmapval := strings.TrimSpace(cleardwReg.ReplaceAllString(vals, ""))//已存在的kv
 				tmapvaltd := strings.TrimSpace(cleardwReg.ReplaceAllString(td.Val, ""))
 				if bvalfind {
-					//if tmapvaltd == "" {
-					//	val = td.Val //vals + "__" + td.Val
-					//} else {
-					//	val = tmapvaltd
-					//}
-					if key=="中标单位" {
-						//不能覆盖---
+					if key=="中标单位" {//不能覆盖---
+
 					}else {
 						if tmapvaltd == "" {
 							val = td.Val //vals + "__" + td.Val
@@ -2092,24 +2098,27 @@ func (table *Table) FindTdVal(td *TD, direct, vdirect int) (b bool) {
 						}
 					}
 				} else{
-					if key=="中标单位" {
-						//新增不能数组
+					if key=="中标单位" { //新增不能数组
+
 					}else {
-						tval := []string{}
-						if tmapval == "" {
-							tval = append(tval, vals)
-						} else {
-							tval = append(tval, tmapval)
-						}
-						if tmapvaltd == "" {
-							tval = append(tval, td.Val)
-						} else {
-							tval = append(tval, tmapvaltd)
+						if zbhxrSortNameReg.MatchString(key){
+							//特殊~不构建数组
+						}else {
+							tval := []string{}
+							if tmapval == "" {
+								tval = append(tval, vals)
+							} else {
+								tval = append(tval, tmapval)
+							}
+							if tmapvaltd == "" {
+								tval = append(tval, td.Val)
+							} else {
+								tval = append(tval, tmapvaltd)
+							}
+							val = tval
+							varrpos = 1
 						}
-						val = tval
-						varrpos = 1
 					}
-
 				}
 			}
 			barr = true
@@ -2172,7 +2181,7 @@ func (table *Table) FindNear(td *TD, direct int) *TD {
 			for _, td1 := range tds {
 				if td1.StartRow <= td.StartRow && td1.EndRow >= td.EndRow && td1.EndCol+1 == td.StartCol {
 					//找到左临节点
-					if td1.BH || (zbhxrSortReg.MatchString(td1.Val)&&(table.Tag=="成交候选人"||table.Tag=="中标候选人")) {
+					if td1.BH || (zbhxrSortReg_1.MatchString(td1.Val)&&(table.Tag=="成交候选人"||table.Tag=="中标候选人")) {
 						return td1
 					} else {
 						if td1.HeadTd != nil && td1.HeadTd.KVDirect == direct {

+ 0 - 1
src/jy/pretreated/tablev2.go

@@ -764,7 +764,6 @@ func CheckCommon(txt string, matchStr ...string) (res, must bool, stype, reg, re
 		//先正则、后子串查找
 	L1:
 		for _, v := range matchStr {
-			//u.Debug(v)
 			for n, vreg := range TKMaps[v].TReg {
 				if vreg.MatchString(txt) {
 					//u.Debug(txt, v, vreg.String())

+ 3 - 1
src/jy/pretreated/winnerorder.go

@@ -58,6 +58,7 @@ var (
     winnerReg_1 = regexp.MustCompile("(第[一二三1-3]候选人)[::]([\u4E00-\u9FA5()()]{4,25}公司)[((]([0-9.,,万元]+)[))]")
     winnerReg_2 = regexp.MustCompile("(中标候选人第[一二三1-3][\\s]?名)[::]([\u4E00-\u9FA5()()]{4,25}公司)[,,]其他类型投标报价[::][不]?含税报价[\n]?[((]元[))][::][\n]?([0-9.,,万元]+)[,,]质量")
     winnerReg_3 = regexp.MustCompile("([弟|第][1-9一二三四五]名(中标候选人)?|[弟|第][1-9一二三四五](中标|成交)候选人)[::\\s]+([\u4E00-\u9FA5]{4,20}公司)[,,;]?(报价|投标报价|投标含税总价|投标报含税总价)[为]?[::]?([0-9\\.\\s万元]+)")
+	winnerReg_4 = regexp.MustCompile("(第[一二三]中标候选人)(.{4,15}(公司|院)).*\n投标报价[((]元[))]([\\s]+)?([0-9.,,万元]+)")
 
 
 
@@ -113,7 +114,7 @@ var (
 	findCompanyReg = regexp.MustCompile("[^::]+公司")
 	colonSpaceReg  = regexp.MustCompile("[::]\\s+")
 	findCandidate  = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|市|局|站|城|处|行|部|队|联合[会|体])|工作室)")
-	findCandidate2 = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|局|站|城|处|行|部|队|联合[会|体]|工作室|有限司)$)")
+	findCandidate2 = regexp.MustCompile("(^.{5,}(公司|集团|单位|机构|企业|厂|场|院|所|店|中心|局|站|城|处|行|部|队|联合[会体]((成员|牵头人)[))]?)?|工作室|有限司)$)")
 	clearSpace1    = regexp.MustCompile("([((][\\d一二三四五六七八九十][))][\\s\u3000\u2003\u00a0\\t]*|<[^>].+?>)")
 	clearSpace2    = regexp.MustCompile("</?[^>]+>")
 	offerReg       = regexp.MustCompile("(中标|磋商|投标|报|单|成交)总?(价|金额)")
@@ -179,6 +180,7 @@ func (wo *WinnerOrderEntity) Find(text string, flag bool, from int, isSite bool,
 	text = winnerReg_1.ReplaceAllString(text,"\n${1}:${2} 中标金额:${3}\n")
 	text = winnerReg_2.ReplaceAllString(text,"\n${1}:${2} 中标金额:${3}\n")
 	text = winnerReg_3.ReplaceAllString(text,"\n${1}:${4} 中标金额:${6}\n")
+	text = winnerReg_4.ReplaceAllString(text,"\n${1}:${2} 中标金额:${5}\n")
 
 
 

+ 16 - 0
src/jy/util/util.go

@@ -2,6 +2,7 @@ package util
 
 import (
 	"fmt"
+	"github.com/cron"
 	"gopkg.in/mgo.v2"
 	. "jy/mongodbutil"
 	qu "qfw/util"
@@ -40,6 +41,10 @@ var IsBrandGoods bool //是否开启品牌抽取
 
 var SaveResult, FieldsFind, IsSaveTag, SaveBlock, QualityAudit, Ffield bool
 var AddrsSess *mgo.Collection
+var QyxySess  *mgo.Collection
+
+var IsUpdateSite	bool
+var IsUpdateRuleTag	bool
 
 func init() {
 	syncint = make(chan bool, 1)
@@ -51,6 +56,8 @@ func UtilInit() {
 	dbname := qu.ObjToString(Config["dbname"])
 	Mgo = MgoFactory(initCap, initCap*3, 120, addr, dbname)
 	AddrsSess = Mgo.Get().DB(qu.ObjToString(Config["dbname_addrs"])).C(qu.ObjToString(Config["dbname_addrs_c"]))
+	QyxySess  = Mgo.Get().DB(qu.ObjToString(Config["dbname_addrs"])).C("qyxy_std")
+
 	SaveResult, _ = Config["saveresult"].(bool)
 	FieldsFind, _ = Config["fieldsfind"].(bool)
 	IsSaveTag, _ = Config["iscltlog"].(bool)
@@ -61,6 +68,15 @@ func UtilInit() {
 	for k, v := range PriceNumberConfig {
 		PriceNumberReg[k] = regexp.MustCompile(v)
 	}
+
+	//定时更新站点信息
+	IsUpdateSite = false
+	c := cron.New()
+	c.AddFunc("0 0 1 ? * WED", func() {
+		IsUpdateSite = true
+	})
+	c.Start()
+
 }
 
 func GetSyncIndex(code string) string {

+ 13 - 13
src/main.go

@@ -19,9 +19,9 @@ import (
 )
 
 func init() {
-	//log.SetConsole(false)
-	//log.SetLevel(log.DEBUG)
-	//log.SetRollingDaily("./", "out.log")
+	log.SetConsole(false)
+	log.SetLevel(log.DEBUG)
+	log.SetRollingDaily("./", "out.log")
 
 	qu.ReadConfig(&u.Config)
 	//抽取price和number相关
@@ -57,13 +57,15 @@ func init() {
 }
 
 func main() {
-	extract.ExtractUdp() 		//udp通知抽取
-	//go extract.Export()			//导出任务
-	//extract.ClearUdp()   			//udp通知清理
-	//go heart.HeartMonitor()		//心跳监测
+
+	extract.ExtractUdpUpdateMachine()	//节点上传~构建
+	extract.ExtractUdp() 				//udp通知抽取
 
 	go Router.Run(":" + qu.ObjToString(u.Config["port"]))
 	go log.Debug("启动..", qu.ObjToString(u.Config["port"]))
+
+	//testMain()
+
 	go func() {
 		http.ListenAndServe("localhost:10000", nil)
 	}()
@@ -77,13 +79,10 @@ func main() {
 
 //验证规则
 func testMain()  {
-
 	text :=``
-	var formattext11  = regexp.MustCompile("(项目预算)\n(第[一1](包|标段)[::])([0-9.万元人民币]+)\n" +
-		"(第[二2](包|标段)[::])([0-9.万元人民币]+)\n")
-	if 	formattext11.MatchString(text) {
-		text = formattext11.ReplaceAllString(text,"${1}\n${2}\n预算金额:${4}\n${5}\n预算金额:${7}\n${8}\n")
-		log.Debug(text)
+	var letter_entity = regexp.MustCompile("")
+	if 	letter_entity.MatchString(text) {
+		log.Debug("匹配")
 	}else {
 		log.Debug("不匹配")
 	}
@@ -92,3 +91,4 @@ func testMain()  {
 
 
 
+

+ 8 - 1
src/mark

@@ -475,4 +475,11 @@ func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{}
     "api": "http://172.17.145.179:19281/_send/_mail",
     "deleteInstanceTimeHour": 1,
     "jsondata_extweight": 1
-}
+}
+
+
+
+
+	//go extract.Export()			//导出任务
+	//extract.ClearUdp()   			//udp通知清理
+	//go heart.HeartMonitor()		//心跳监测

+ 53 - 8
src/res/fieldscore.json

@@ -189,12 +189,12 @@
         "negativewords": [
             {
                 "describe": "包含负分",
-                "regstr": "(标人|附件|委托|认证|代理|咨询|管理顾问|招标失败|不足|公告|变更|废标|废止|流标|中标|评标|开标|供应商|金额|万元|元整|预算|报价|单价|第(\\d|一|二|三|四|五)(名|包)|排名|候选|确定|标段|(标|一|二|三|四|五)包|中选|成交|包号|(A|B|C|D|E|F|G)包|地址|详情|要求|推荐|名称|评审|得分|合同|平方米|公示期|结果|备注|说明|理由|评委|委托|工作日|营业(执|期)|通过|代码|电话|联系|条件|合理|费率|以上|以下|拟定|注:|\\d[\\s]{0,10}(\\.|元|包|米|平米|平方米|吨|辆|千克|克|毫克|毫升|公升|套|件|瓶|箱|只|台|年|月|日|天|号)|(:|:|;|;|?|¥|\\*|%)|^[a-zA-Z0-9-]{5,100}|^[a-zA-Z0-9-]{1,100}$|[a-zA-Z0-9-]{10,100})",
+                "regstr": "(标人|附件|乙方|委托|(答|质|怀)疑|(保|质疑|查询)函|认证|代理|咨询|管理顾问|招标失败|不足|公告|变更|废标|废止|流标|中标|评标|开标|供应商|金额|万元|元整|预算|报价|单价|第(\\d|一|二|三|四|五)(名|包)|排名|候选|确定|标段|(标|一|二|三|四|五)包|中选|成交|包号|(A|B|C|D|E|F|G)包|地址|详情|要求|推荐|名称|评审|得分|合同|平方米|公示期|结果|备注|说明|理由|评委|委托|工作日|营业(执|期)|通过|代码|电话|联系|条件|合理|费率|以上|以下|拟定|注:|\\d[\\s]{0,10}(\\.|元|包|米|平米|平方米|吨|辆|千克|克|毫克|毫升|公升|套|件|瓶|箱|只|台|年|月|日|天|号)|(:|:|;|;|?|¥|\\*|%)|^[a-zA-Z0-9-]{5,100}|^[a-zA-Z0-9-]{1,100}$|[a-zA-Z0-9-]{10,100})",
                 "score": -15
             },
             {
                 "describe": "包含负分",
-                "regstr": "(代表|招标|交易中心|顾问|单位|测试|采购)",
+                "regstr": "(代表|招标|同时|管理费|安全员|[天月]内|方法|但不限|索赔|年龄|进度|须在|交易中心|甲方|清单|顾问|单位|测试|采购|说明|姓名|资格预审|不接受|用户名)",
                 "score": -10
             },
             {
@@ -204,17 +204,17 @@
             },
             {
                 "describe": "包含特殊符号",
-                "regstr": "(-|—|,|!|,|!)",
+                "regstr": "(-|—|,|!|#、|,|!)",
                 "score": -50
             },
             {
                 "describe": "包含负分不再展示",
-                "regstr": "([^实]施工[^程]|项目$|详细请?见?正文)",
+                "regstr": "([^实]施工[^程]|项目$|详细请?见?正文|安装调试|验收(.{0,2})?合格|终止合同|成交(供应|候选)|随机抽取|指定[的]?位置|招标方式|不(提供|含|接受|合格|得|退还)|代表参与定标|成交供应商|项目编号|询价采购|法人资格|交纳人民币|此项目无效|竞争性谈判|时间范围|招标时间|生产厂家|经评审委员会评审|在此|入围公司|年[0-9])",
                 "score": -50
             },
             {
                 "describe": "黑名单",
-                "regstr": "(^.{0,4}$|T及分公司|大厦[0-9]+室|东侧路面拓宽|招标投标交易平台|截止之前|、技术研发中心|钢芯铝绞线)",
+                "regstr": "(^.{0,4}$|T及分公司|大厦[0-9]+室|东侧路面拓宽|项目基本情况|项目背景概况|的情况|招标投标交易平台|截止之前|、技术研发中心|钢芯铝绞线|供应商|^(项目概况|谈判人|申请采用|合同|其他|委托|认可的|研究决定|推荐|最终确定|确认|中标|并要求|本项目)|(服务费|有资质公司|项目类别|时间|年月日|管理费用|询价)$)",
                 "score": -50
             }
         ],
@@ -273,9 +273,34 @@
                 "regstr": ".*[^集团|公司|学校|中心|家具城|门诊|\\[大中小\\]学|部|院|局|厂|店|所|队|社|室|厅|段|会|场|行|处]$",
                 "score": -5
             },
+            {
+                "describe": "包含数字金额类型",
+                "regstr": "\\d{2,}.\\d{1,}[万元]+",
+                "score": -30
+            },
+            {
+                "describe": "包含特殊严重负分",
+                "regstr": "(详见.{0,4}(文件|附件|表|正文|名单|方式)|(授权|委托|招标人|法人|业主)代表|不(提供|含|接受|可以|合格|得|退还)|其他(类型|内容)|法定(授权|代表)人|追究.*法律责任|不可抗力.*不能履行)",
+                "score": -50
+            },
+            {
+                "describe": "包含特殊短词严重负分",
+                "regstr": "(谈判|放弃中标|严格按照|性别年龄|随机抽取|务费|填写|质疑|但该公司|但是|以及|磋商|年月日|学历|本项目|请输入|后填入|询价小组|单一来源|清单|名单|编号|纳税人识别号|点击去原网报名|结构序号|序号标项内容|型号规格标准)",
+                "score": -50
+            },
+            {
+                "describe": "前缀严重负分",
+                "regstr": "^((根据)?甲方|姓名|人民币|序号|的方式|本项目|(依|(将承担)?由|将|因|单击|以)此)",
+                "score": -50
+            },
+            {
+                "describe": "后缀严重负分",
+                "regstr": "(款[等]?|费[等用]?|甲级|大写|执行|程序|折扣率|序号)$",
+                "score": -50
+            },
             {
                 "describe": "黑名单",
-                "regstr": "(^.{0,4}$|[((]或印鉴[))]|(多家|没有)中标商|违法违规行|(中标人|供应商)名称|合同包合计|楼青年公寓装修工程|评标委员会|(投标人|供应商)被人民法院|符合初步评审标准|评标结果公示|供应商评审申报|与营业执照|保管员签字|评审小组名单|单位负责人|^[xX]+$|候选人数量|微型企业且所)",
+                "regstr": "(^.{0,4}$|[((]或印鉴[))]|(多家|没有)中标商|请在此处填写|资格性审查|序号标项内容|名单的顺序进行|竞争性谈判|违法违规行|(中标人|供应商)名称|合同包合计|响应情况公司|楼青年公寓装修工程|评标委员会|(投标人|供应商)被人民法院|符合初步评审标准|评标结果公示|供应商评审申报|与营业执照|保管员签字|评审小组名单|单位负责人|^[xX]+$|候选人数量|微型企业且所)",
                 "score": -50
             }
         ],
@@ -323,7 +348,7 @@
         "negativewords": [
             {
                 "describe": "包含负分",
-                "regstr": "(原因|未知|收费|标注|负责人|test|联系(人|电话)|邀请书|公开|本次|指定|定点|签订|文件|评标|诉讼|投诉|号|吨|成交|结果|浏览器|下载|进行|加强|详(见|情)|现将|签字|身份证|我中心|测试|终止|名称|证书|单元)",
+                "regstr": "(原因|未知|收费|标注|包件|招标人|负责人|test|联系(人|电话)|邀请书|公开|本次|指定|定点|签订|文件|评标|诉讼|投诉|号|吨|成交|结果|浏览器|下载|进行|加强|详(见|情)|现将|签字|身份证|我中心|测试|终止|名称|证书|单元)",
                 "score": -10
             },
             {
@@ -341,9 +366,29 @@
                 "regstr": "[±??¨êí¤ì×üàóμˉ÷°úéè《》-]",
                 "score": -20
             },
+            {
+                "describe": "包含特殊严重负分",
+                "regstr": "(合同[^诚]|(授权|委托|招标人|法人|业主)代表|不(提供|含|接受|合格|得|退还)|完整|业主|提供|理由|须在|邀请|宣传费|理费|现场|类型|供应商|(招标|采购)方式|中标(单位|人|候选人|结果)|利益|大写|资格|接收|取消|日期|什么|服务费|第一名|建筑面积|项目概况|[一二三四五六七八九]条)",
+                "score": -50
+            },
+            {
+                "describe": "包含特殊短词严重负分",
+                "regstr": "(时间|可能|学历|接受|是否|所在地|关键|用途|平方米|清单[^宝]|从业|人员|随机|高风险|执照|甲方|内容|说明|澄清|姓名|税率|万元|祝贺|感谢|期限|进度|答复|保养|乙方|性质|成交|性别|定标|以及|应及|方式|资格|资质|预审|标段|通过|入围|同时|金额|单价|备往|无效)",
+                "score": -50
+            },
+            {
+                "describe": "前缀严重负分",
+                "regstr": "^(采购项目[::]|币种|货币|部分|本项目)",
+                "score": -50
+            },
+            {
+                "describe": "后缀严重负分",
+                "regstr": "(项目|工程|标段)$",
+                "score": -50
+            },
             {
                 "describe": "黑名单",
-                "regstr": "(集中代理采购|招标投标交易平台|竞争性谈判|“组组通”工程|提交一份公司|公司组织的|[((]章[))]$|^.{0,4}$)",
+                "regstr": "(集中代理采购|详见.*文件|招标投标交易平台|年____月____日|受权签字人|竞争性(谈判|磋商)|“组组通”工程|提交一份公司|公司组织的|[((]章[))]$|^.{0,4}$)",
                 "score": -50
             }
         ],

+ 3 - 3
src/res/tablev1.json

@@ -1,6 +1,6 @@
 {
 	"normalhead":[
-		"^((.{2,6}(描述|名称|编号|代码|时间|类型|性质|行政区域|原因|意见|须知|程度))|标段(编号)?|招标金额|规模|统一社会信用代码|拟?中标供应商|质量|(质量)?承诺|地址|招标代理|序号|材料|结构|结构层数|评委|单位|数量|排名|标的|标项|开户银行|邮编|账号|电话|传真|网址|得分|名次|包件?号|职务|(建设|招标|采购|中标|成交|甲|乙)(单位|人|供应商|方|规模).{0,2}|.{0,5}(价格?|额|资金|[预概]算|投资|费用|报价|投标价)[(]?(万?元?([大小]写)?)[)]?)$__M",
+		"^((.{2,6}(描述|名称|编号|代码|时间|类型|性质|行政区域|原因|意见|须知|程度))|标段(编号)?|招标金额|规模|统一社会信用代码|拟?中标供应商|质量|(质量)?承诺|地址|招标代理|代理机构|序号|材料|结构|结构层数|评委|单位|数量|排名|标的|标项|开户银行|邮编|账号|电话|传真|网址|得分|名次|包件?号|职务|(建设|招标|采购|中标|成交|甲|乙)(单位|人|供应商|方|规模).{0,2}|.{0,5}(价格?|额|资金|[预概]算|投资|费用|报价|投标价)[(]?(万?元?([大小]写)?)[)]?)$__M",
 		"^.{0,7}(((单位)?名称|总监|经理|负责人|信息|率|费|期|人|号|码|(价格?|额|资金)(万?元?([大小]写)?)|员|品目|标包|代表|区域|方式|因素|合价|合计|小计|地点|条件|(资质|类别和)等级|类别|状态)|得分|注册专业|方法|家数|全称|简称|邮件|执业或职业资格|证书|部门|事项|来源|划分|长度|规模|保证金|目标)$__",
 		"(名单|证号|名称|要求|时间|日期|地点|单位|条款|机构|范围|情况|概况|品名|规格|参数|标准|指标|型号|限价|数量|方式|等级|依据|明细|概况|内容|次数|产品|性质|地区|地址|币种|主题|详情|说明|代理(公司|机构)|节支率|名单|结果|结果公示)$|^(职称|姓名|级别|职称专业)$__",
 		"^(包号|联系|评标|单位|公告|采购|商品|附件|质保|用途|公示|机构|评审|品名|规格|参数|指标|型号|数量|证书).{0,10}$__",
@@ -13,7 +13,7 @@
 	"jghead":[
 		"(报价|(元(/人|/间/天))|中转|航延|代理机构名称|地址和联系方式|联系电话|项目负责人及注册号|不含税预算金额\n(万元))__M",
 		"^.{0,2}[预拟]?(成交|中标|候选)(供应商|单位|企业|人|机构|价|金额).{0,2}$__M",
-		"^.{0,6}[打得评总](分)$__",
+		"^.{0,6}[打得评总](分)$__M",
 		"(中标|磋商|投标|报|成交)总?(价|金额)__",
 		"(投标|中标)(人|方|单位|供应商)(名称)?__",
 		"(成交|名次|候选|业绩|荣誉|排名|中标|供应商|详见附件及谈判、报价文件|折扣系数|合同期限|委托方|项目所在地|投标文件递交开始间)__"
@@ -28,7 +28,7 @@
 		".{2,20}元整|[\\d]+万?元__",
 		".{4,}采购(项目)?__",
 		"(首选|第[一二三四五1-5])(顺序|推荐)?(承包|中标|候选|成交)?(候选)?(人|单位|供应商)__M",
-		"(招单价|无供应商报价|全部内容|计量单位|符合国家及行业标准的合格工程|最终报价[0-9,.,。万元()]*|二级建造师|公示信息|[甲乙丙]级)__",
+		"(招单价|无供应商报价|全部内容|计量单位|符合国家及行业标准的合格工程|二级建造师|公示信息|[甲乙丙]级)__",
 		"^采购包[0-9]+$__"
 	],
 	"abandontable":[

+ 22 - 6
udpcontrol/src/config.json

@@ -1,16 +1,32 @@
 {
     "udpport": ":1784",
     "jkmail": {
-        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn",
         "api": "http://172.17.145.179:19281/_send/_mail"
     },
     "data_mgodb": {
-        "addr": "127.0.0.1:27017",
-        "db": "zhengkun",
-        "coll": "extract_control_center",
-        "s_coll": "zktest_city_field_test"
+        "addr": "172.17.145.163:27083,172.17.4.187:27082",
+        "db": "extract_2021",
+        "coll": "extract_control_center"
+    },
+    "using_machine": 3,
+    "source_mgodb": {
+        "addr": "172.17.145.163:27083,172.17.4.187:27082",
+        "db": "qfw",
+        "coll": "bidding"
     },
     "nextNode": [
-
+        {
+            "addr": "172.17.4.196",
+            "port": 1799,
+            "stype": "",
+            "memo": "清洗数据"
+        },
+        {
+            "addr": "172.17.4.196",
+            "port": 1762,
+            "stype": "",
+            "memo": "敏感词清理"
+        }
     ]
 }

+ 29 - 10
udpcontrol/src/initdata.go

@@ -3,28 +3,46 @@ package main
 import (
 	log "github.com/donnie4w/go-logger/logger"
 	qu "qfw/util"
+	"time"
 )
 
 var (
-	sysconfig    	map[string]interface{} 		//配置文件
-	data_mgo        *MongodbSim
-	data_c_name,data_s_name		string
+	sysconfig    			map[string]interface{} 		//配置文件
+	data_mgo,source_mgo     *MongodbSim
+	data_c_name,source_c_name	string
+	using_machine			int
+	lastNodeResponse		int64
 )
 func initMgo()  {
+	sourceconf := sysconfig["source_mgodb"].(map[string]interface{})
+	source_c_name = qu.ObjToString(sourceconf["coll"])//数据源bidding
+	source_mgo = &MongodbSim{
+		MongodbAddr: sourceconf["addr"].(string),
+		DbName:      sourceconf["db"].(string),
+		Size:        3,
+		UserName: "zhengkun",
+		Password: "zk@123123",
+	}
+	source_mgo.InitPool()
+
 	dataconf := sysconfig["data_mgodb"].(map[string]interface{})
 	data_c_name = qu.ObjToString(dataconf["coll"])  //机器源center
-	data_s_name = qu.ObjToString(dataconf["s_coll"])//数据源bidding
 	data_mgo = &MongodbSim{
 		MongodbAddr: dataconf["addr"].(string),
 		DbName:      dataconf["db"].(string),
-		Size:        5,
+		Size:        3,
+		UserName: "zhengkun",
+		Password: "zk@123123",
 	}
 	data_mgo.InitPool()
+
 }
 func initVarData()  {
 	qu.ReadConfig(&sysconfig)
 	initMgo()
+	using_machine = qu.IntAll(sysconfig["using_machine"])
 	nextNode = qu.ObjArrToMapArr(sysconfig["nextNode"].([]interface{}))
+	lastNodeResponse = time.Now().Unix()
 }
 
 //加载抽取
@@ -48,8 +66,8 @@ func initExtractNode()  {
 		tmp = make(map[string]interface{})
 	}
 	//根据实际情况~把备用节点~与正常节点综合一下
-	for {
-		if len(using_ext_node) < 3 {
+	for { //可用数量-可变
+		if len(using_ext_node) < using_machine {
 			if len(standby_ext_node)==0 {
 				break
 			}
@@ -61,12 +79,13 @@ func initExtractNode()  {
 		}
 	}
 	if len(using_ext_node)<=0 {
-		sendErrMailApi("抽取控制程序停止~严重错误","当前无可用机器......")
-	}else if len(using_ext_node)==1 {
-		//sendErrMailApi("抽取控制中心~警告","当前可用机器...仅有一个...请检查...")
+		sendErrMailApi("抽取控制中心~严重错误","当前无可用机器")
+	}else if len(using_ext_node)<using_machine { //不足预设-通知
+		sendErrMailApi("抽取控制中心~警告","当前可用机器不足预设~请检查")
 	}else {
 
 	}
+
 	log.Debug("综合后节点~有效~备用~无效",len(using_ext_node),len(standby_ext_node),len(invalid_ext_node))
 }
 

+ 4 - 2
udpcontrol/src/main.go

@@ -14,8 +14,10 @@ func init() {
 }
 
 func main()  {
-	go extractRunningMonitoring() //监控
-
+	//各种监控等
+	go extractRunningMonitoring()
+	go lastUdpMonitoring()
+	go nextUdpMonitoring()
 
 	lock := make(chan bool)
 	<-lock

+ 32 - 0
udpcontrol/src/mark

@@ -0,0 +1,32 @@
+{
+    "udpport": ":1784",
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn",
+        "api": "http://172.17.145.179:19281/_send/_mail"
+    },
+    "data_mgodb": {
+        "addr": "172.17.145.163:27083,172.17.4.187:27082",
+        "db": "extract_2021",
+        "coll": "extract_control_center"
+    },
+    "using_machine": 3,
+    "source_mgodb": {
+        "addr": "172.17.145.163:27083,172.17.4.187:27082",
+        "db": "qfw",
+        "coll": "bidding"
+    },
+    "nextNode": [
+        {
+            "addr": "172.17.4.196",
+            "port": 1799,
+            "stype": "",
+            "memo": "清洗数据"
+        },
+        {
+            "addr": "172.17.4.196",
+            "port": 1762,
+            "stype": "",
+            "memo": "敏感词清理"
+        }
+    ]
+}

+ 60 - 11
udpcontrol/src/method.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	log "github.com/donnie4w/go-logger/logger"
+	"net"
 	qu "qfw/util"
 	"strings"
 	"sync"
@@ -11,12 +12,21 @@ import (
 )
 
 var methodlock 		sync.Mutex
+var responselock 	sync.Mutex
+
+//邮件下节点响应
+var udptaskmap = &sync.Map{}
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+}
+
 
 //监控~当前抽取段~状态  生命周期
 func extractRunningMonitoring()  {
 	for  {
 		if isAction {
-			log.Debug("检测一次...")
 			time_now := time.Now().Unix()
 			isErr := false
 			methodlock.Lock()
@@ -57,13 +67,13 @@ func extractRunningMonitoring()  {
 				sid:= qu.ObjToString(extractAction["extract_ids"]["sid"])
 				eid:= qu.ObjToString(extractAction["extract_ids"]["eid"])
 				isAction = false
-				sendStopExtractNode(using_ext_node)
+				sendStopExtractNode(using_ext_node) //停止
 				if len(standby_ext_node)==0 {
-					sendErrMailApi("抽取控制告警~机器异常~无备用机器",fmt.Sprintf("此段落需要过滤~%s~%s",sid,eid))
+					sendErrMailApi("抽取控制中心~异常",fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s",sid,eid))
 					time.Sleep(15*time.Second)
 					sendNextNode(sid,eid)
 				}else {
-					sendErrMailApi("抽取控制告警~机器异常~有备用机器",fmt.Sprintf("启用备用机器~%s~%s",sid,eid))
+					sendErrMailApi("抽取控制中心~异常",fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s",sid,eid))
 					time.Sleep(15*time.Second)
 					dealWithExtUdpData(sid,eid)
 				}
@@ -73,6 +83,43 @@ func extractRunningMonitoring()  {
 	}
 }
 
+//监控~上节点~长时间未响应
+func lastUdpMonitoring()  {
+	for  {
+		responselock.Lock()
+		if !isAction && time.Now().Unix()-lastNodeResponse > 1800 {
+			sendErrMailApi("抽取控制中心~流程超时~告警",fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
+			lastNodeResponse = time.Now().Unix() //重置时间
+		}
+		responselock.Unlock()
+		time.Sleep(600*time.Second)
+	}
+}
+
+
+//监控~
+func nextUdpMonitoring() {
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				udptaskmap.Delete(k)
+				sendErrMailApi("抽取控制中心~下节点未响应~警告",fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
+			}
+			return true
+		})
+		time.Sleep(10 * time.Second)
+	}
+}
+
+
+
+
+
+
+
+
 
 //验证抽取是否完毕	不验证-extract_ids~key
 func validExtractFinish() bool  {
@@ -93,15 +140,16 @@ func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
 	if sid=="" || eid=="" || len(using_ext_node)==0 {
 		return dataArr,lifeArr
 	}
-	sess := data_mgo.GetMgoConn()
-	defer data_mgo.DestoryMongoConn(sess)
+	sess := source_mgo.GetMgoConn()
+	defer source_mgo.DestoryMongoConn(sess)
 	q ,total := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt":  StringTOBsonId(sid),
 			"$lte": StringTOBsonId(eid),
 		},
 	},int64(0)
-	count,_ := sess.DB(data_mgo.DbName).C(data_s_name).Find(&q).Count()
+	count,_ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count()
+	log.Debug("查询当前数量:",count)
 	if len(using_ext_node)==1 {
 		dataArr = append(dataArr, map[string]interface{}{
 			"sid":sid,
@@ -111,6 +159,7 @@ func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
 	} else {
 		node_num := int64(len(using_ext_node))
 		if count<node_num{ //采用一个节点-多余临时删除
+			log.Debug("数量过少~采用一个节点")
 			tmp_node := using_ext_node[0]
 			using_ext_node = []map[string]interface{}{}
 			using_ext_node = append(using_ext_node,tmp_node)
@@ -123,7 +172,7 @@ func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
 			limit := count/node_num
 			limit_lifetime := calculateLiftime(limit)
 			tmp_sid:=sid
-			it := sess.DB(data_mgo.DbName).C(data_s_name).Find(&q).Sort("_id").Select(map[string]interface{}{
+			it := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Sort("_id").Select(map[string]interface{}{
 				"_id":1,
 			}).Iter()
 			for tmp := make(map[string]interface{}); it.Next(&tmp);{
@@ -167,10 +216,10 @@ func splitIdMethod(sid string,eid string)([]map[string]interface{},[]int64) {
 }
 //计算生命周期
 func calculateLiftime(count int64) int64 {
-	time_one := 1000.0/1000.0//暂定~每千条用时1000秒
+	time_one := 1500.0/1000.0//暂定~每千条用时1000秒
 	life_time := int64(time_one*float64(count)*3.0)
-	if life_time<2000 {
-		life_time = 2000
+	if life_time<2400 {
+		life_time = 2400
 	}
 	return time.Now().Unix()+life_time
 }

+ 1 - 3
udpcontrol/src/sendmail.go

@@ -10,8 +10,6 @@ var tomail string
 var api string
 
 func sendErrMailApi(title,body string)  {
-	log.Println(title,body)
-	return
 	jkmail, _ := sysconfig["jkmail"].(map[string]interface{})
 	if jkmail != nil {
 		tomail, _ = jkmail["to"].(string)
@@ -26,4 +24,4 @@ func sendErrMailApi(title,body string)  {
 	}else {
 		log.Println("邮件发送失败:", err)
 	}
-}
+}

+ 26 - 8
udpcontrol/src/updprocess.go

@@ -9,6 +9,7 @@ import (
 	qu "qfw/util"
 	"strings"
 	"sync"
+	"time"
 )
 var (
 	nextNode     		[]map[string]interface{}
@@ -32,7 +33,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			if sid == "" || eid == "" {
 				log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
 			} else {
-				go udpclient.WriteUdp([]byte("ok"), mu.OP_NOOP, ra)
+				lastNodeResponse = time.Now().Unix()
+				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
+				go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 				udplock.Lock()
 				dealWithExtUdpData(sid,eid)
 				udplock.Unlock()
@@ -42,10 +45,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		//抽取多节点
 		udplock.Lock()
 		str := string(data)
-		if strings.Contains(str,"heart_extract") {
-			dealWithHeartBackUdpData(strings.ReplaceAll(str,"heart_extract",""))
+		if isAction {
+			if strings.Contains(str,"heart_extract") {
+				dealWithHeartBackUdpData(strings.ReplaceAll(str,"heart_extract",""))
+			}else {
+				dealWithCallBackUdpData(str)
+			}
 		}else {
-			dealWithCallBackUdpData(str)
+			log.Debug("其他节点回应:",str)
+			udptaskmap.Delete(str)
 		}
 		udplock.Unlock()
 	}
@@ -59,10 +67,11 @@ func dealWithExtUdpData(sid,eid string) {
 	if len(using_ext_node)>0 {
 		//拆分段落方法~并附加抽取状态标记~有效期等
 		splitArr,lifeArr:=splitIdMethod(sid,eid)
+		log.Debug("最终分",len(splitArr),"段")
 		extractAction = map[string]map[string]interface{}{}
 		heartAction = map[string]interface{}{}
 		for k,v:=range using_ext_node{
-			skey := fmt.Sprintf("%s:%d:%s",v["addr"],v["port"],v["stype"])
+			skey := fmt.Sprintf("%s:%d:%s",v["addr"],qu.IntAll(v["port"]),v["stype"])
 			extractAction[skey] = map[string]interface{}{
 				"life":lifeArr[k],
 				"action":0,
@@ -90,10 +99,12 @@ func dealWithCallBackUdpData(str string) {
 			sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
 			eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
 			isAction = false
+			lastNodeResponse = time.Now().Unix()
 			sendNextNode(sid,eid)
 		}
 	}else {
 		log.Debug("其他节点回应:",str)
+		udptaskmap.Delete(str)
 	}
 }
 //处理-心跳回调
@@ -108,7 +119,7 @@ func dealWithHeartBackUdpData(str string) {
 func sendRunExtractNode(splitArr []map[string]interface{})  {
 	for index, node := range using_ext_node {
 		tmp:=splitArr[index]
-		skey := fmt.Sprintf("%s:%d:%s",node["addr"],node["port"],node["stype"])
+		skey := fmt.Sprintf("%s:%d:%s",node["addr"],qu.IntAll(node["port"]),node["stype"])
 		by, _ := json.Marshal(map[string]interface{}{
 			"gtid":  qu.ObjToString(tmp["sid"]),
 			"lteid": qu.ObjToString(tmp["eid"]),
@@ -144,10 +155,17 @@ func sendNextNode(sid string,eid string)  {
 			"lteid": eid,
 			"stype": qu.ObjToString(node["stype"]),
 		})
-		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+		addr := &net.UDPAddr{
 			IP:   net.ParseIP(node["addr"].(string)),
 			Port: qu.IntAll(node["port"]),
-		})
+		}
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
+
+		//只监控清洗流程
+		if qu.IntAll(node["port"])==1799 {
+			node := &udpNode{by, addr, time.Now().Unix()}
+			udptaskmap.Store(string(by), node)
+		}
 	}
 	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市",sid,"~",eid)
 }

+ 1 - 1
udps/main.go

@@ -19,7 +19,7 @@ func main() {
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.IntVar(&p, "p", 1784, "端口")
+	flag.IntVar(&p, "p", 6601, "端口")
 	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")