Răsfoiți Sursa

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

# Conflicts:
#	src/jy/pretreated/analystep.go
fengweiqiang 4 ani în urmă
părinte
comite
44ab77f95c

+ 10 - 10
projectinfo/src/config.json

@@ -1,23 +1,23 @@
 {
-	"mgodb": "192.168.3.207:27092",
+	"mgodb": "172.17.145.163:27080",
 	"dbsize": 10,
-	"dbname": "mxs",
-	"dbcoll": "nijian",
+	"dbname": "mixdata",
+	"dbcoll": "project_nijian",
 	"mixdata":{
-		"addr": "192.168.3.207:27092",
+		"addr": "172.17.145.163:27080",
 		"db" : "mixdata",
 		"size": 15,
 		"buyer_ent": "buyer_enterprise",
-		"tagcoll": "project_biaoqian",
-		"savecoll": "project_forecast_test"
+		"tagcoll": "project_biaoqian_nijian",
+		"savecoll": "project_forecast_nijian"
 	},
 	"udpport": ":1182",
 	"forecast": {
 		"规划可研": ["立项环评", "勘察设计", "建设准备", "前期施工"],
 		"立项环评": ["勘察设计", "建设准备", "前期施工"],
 		"勘察设计": ["建设准备", "前期施工"],
-		"建设准备": ["前期施工"],
-		"前期施工": ["后期施工"],
+		"建设准备": ["建设准备","前期施工"],
+		"前期施工": ["后期施工","竣工验收","运行维护"],
 		"后期施工": ["竣工验收", "运行维护"],	
 		"竣工验收": ["运行维护"],
 		"运行维护": ["物品采购"]
@@ -26,8 +26,8 @@
 	"projecttype": ["报建", "核准类", "核准", "审批类", "审批"],
 	"rate": "60%",
 	"elastic": {
-        "addr": "http://192.168.3.11:9800",
-        "index": "bidding_v5",
+        "addr": "http://172.17.145.170:9800",
+        "index": "bidding",
         "itype": "bidding",
         "pool": 12
     }

+ 10 - 6
projectinfo/src/task.go

@@ -226,10 +226,11 @@ func GetProjectData(sid, eid string) {
 					}
 				} else { //top_category
 					q = bson.M{
-						"top_category": top_category,
+						//"top_category": top_category,
+						"top_category": bson.M{"$elemMatch": bson.M{"$eq": top_category}},
 					}
 				}
-				if stage == "后期施工" || stage == "竣工验收" || stage == "运行维护" {
+				if stage == "前期施工" || stage == "后期施工" || stage == "竣工验收" || stage == "运行维护" {
 					//qu.Debug("ForecastFlag---", ForecastFlag)
 					if ForecastFlag == 0 { //第一次增加main_project判断
 						main_project := qu.ObjToString(pro["main_project"])
@@ -252,7 +253,7 @@ func GetProjectData(sid, eid string) {
 						q["stage"] = bson.M{"$in": tmpArr}
 					}
 					ForecastFlag++
-				} else { //规划可研、立项环评、勘察设计、建设准备、前期施工
+				} else { //规划可研、立项环评、勘察设计、建设准备
 					q["stage"] = bson.M{"$in": tmpArr}
 					ForecastFlag = 2
 				}
@@ -265,11 +266,14 @@ func GetProjectData(sid, eid string) {
 					ForecastFlag++
 				}
 				for _, t := range *result {
+					delete(t, "_id")
 					t["p_rate"] = Rate
 					t["time"] = ""
-					projects := GetProjects(qu.ObjToString(t["purchasing"]), buyer)
-					if len(projects) > 0 {
-						t["p_projects"] = projects
+					if buyer != "" {
+						projects := GetProjects(qu.ObjToString(t["purchasing"]), buyer)
+						if len(projects) > 0 {
+							t["p_projects"] = projects
+						}
 					}
 					maps = append(maps, t)
 				}

+ 0 - 1
qyxy/src/task.go

@@ -308,7 +308,6 @@ func QyxyStandard() bool {
 					m := annual_report.(map[string]interface{})
 					for i, tmpArr := range AnnualReportsArr {
 						for _, f := range tmpArr {
-							qu.Debug("field---", f)
 							if text := m[f]; text != nil {
 								if textstr := fmt.Sprint(text); textstr != "" {
 									if f == "report_year" {

+ 76 - 19
src/jy/extract/extract.go

@@ -102,7 +102,7 @@ func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
 			//if qu.ObjToString(v["sensitive"]) != ""||ggtest.MatchString(qu.ObjToString(v[""])) { //去除含敏感词数据
 			//	continue
 			//}
-			if qu.ObjToString(v["spidercode"]) == "a_gjggzyjypt_gcjs_kbjl" { //临时
+			if qu.ObjToString(v["spidercode"]) == "a_gjggzyjypt_gcjs_kbjl" || "a_hbszbtbggfwpt_kbjl" == qu.ObjToString(v["spidercode"]) { //临时开标记录
 				continue
 			}
 			var j, jf *ju.Job
@@ -326,15 +326,15 @@ func (e *ExtractTask) PreInfo(doc map[string]interface{}) (j, jf *ju.Job, isSite
 		if (*toMap)["jsoncontent"] != nil {
 			delete(*toMap, "jsoncontent")
 		}
-		for k,v := range *toMap{
-			if _,ok := v.(float64);ok{
+		for k, v := range *toMap {
+			if _, ok := v.(float64); ok {
 				continue
-			}else if _,ok := v.(int64);ok{
+			} else if _, ok := v.(int64); ok {
 				continue
-			}else if _,ok2 := v.(string);ok2{
+			} else if _, ok2 := v.(string); ok2 {
 				continue
-			}else {
-				delete(*toMap,k)
+			} else {
+				delete(*toMap, k)
 			}
 		}
 	}
@@ -463,12 +463,12 @@ func file2text(doc *map[string]interface{}) {
 			tmpstr += bs + "\n"
 		}
 	}
-	(*doc)["detailfile"] = tmpstr
+	(*doc)["detailfile"] = strings.ReplaceAll(tmpstr, "附件", "")
 }
 
 //抽取
 func (e *ExtractTask) ExtractProcess(j, jf *ju.Job, isSite bool) {
-
+	permissionExpired(e)
 	e.ExtractDetail(j, isSite, j.SpiderCode)
 	if jf != nil && jf.IsFile {
 		e.ExtractDetail(jf, isSite, j.SpiderCode)
@@ -649,10 +649,18 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job, isSite bool, codeSite string) {
 				if v.Field == "projectname" && v.Type == "table" {
 					break
 				}
+				if key == "budget" || key == "bidamount" {
+					if _, ok := v.Value.(float64); ok && !v.IsTrue {
+						continue
+					}
+				}
 				lockclear.Lock()
 				var cfn = []string{}
 				if isSite {
 					cfn = e.SiteClearFn[key]
+					if len(cfn) == 0 {
+						cfn = e.ClearFn[key]
+					}
 				} else {
 					cfn = e.ClearFn[key]
 				}
@@ -689,7 +697,7 @@ func (e *ExtractTask) ExtractDetail(j *ju.Job, isSite bool, codeSite string) {
 		//		bs, _ := json.Marshal(j.Result)
 		//		 log.Debug("抽取结果", j.Title, j.SourceMid, string(bs))
 	}, func(err interface{}) {
-		log.Debug("ExtractProcess err", err)
+		log.Debug("ExtractProcess err", err, j.SourceMid)
 	})
 }
 func (e *ExtractTask) ExtractFile(j *ju.Job, isSite bool, codeSite string) {
@@ -756,7 +764,15 @@ func (e *ExtractTask) ExtractFile(j *ju.Job, isSite bool, codeSite string) {
 		for key, val := range j.Result {
 			for _, v := range val {
 				lockclear.Lock()
-				cfn := e.ClearFn[key]
+				var cfn = []string{}
+				if isSite {
+					cfn = e.SiteClearFn[key]
+					if len(cfn) == 0 {
+						cfn = e.ClearFn[key]
+					}
+				} else {
+					cfn = e.ClearFn[key]
+				}
 				lockclear.Unlock()
 				if len(cfn) == 0 {
 					continue
@@ -1195,7 +1211,7 @@ func ExtRuleCoreByPkgReg(j *ju.Job, in *RegLuaInfo, e *ExtractTask) {
 //lua脚本根据属性设置提取kv值
 func getKvByLuaFields(vc *RuleCore, j *ju.Job, et *ExtractTask) (map[string][]map[string]interface{}, bool) {
 	kvmap := map[string][]map[string]interface{}{}
-	if len(j.Winnerorder) > 1 {
+	if len(j.Winnerorder) > 1 && qu.Float64All(j.Winnerorder[0]["sort"]) == 1 {
 		if vc.Field == "bidamount" {
 			for _, v := range j.Winnerorder {
 				if v["price"] == nil {
@@ -1760,16 +1776,17 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		for _, val := range result {
 			for _, v := range val { //取第一个非负数,项目名称除外
 				//存0是否有效
-				if (v.Field == "bidamount" || v.Field == "budget") && v.IsTrue{
+				if (v.Field == "bidamount" || v.Field == "budget") && v.IsTrue && v.Score > -1 {
 					tmp[v.Field] = v.Value
 					break
 				}
-				if v.Score > -1 {
+				if v.Score > -1 && (v.Field != "bidamount" && v.Field != "budget") && len(strings.TrimSpace(fmt.Sprint(v.Value))) > 0 {
 					tmp[v.Field] = v.Value
 					break
 				}
 			}
 		}
+		tmp["winner"] = strings.ReplaceAll(qu.ObjToString(tmp["winner"]), ",,", ",")
 		if len(j.PackageInfo) > 15 {
 			for k, v := range j.PackageInfo {
 				j.PackageInfo = map[string]map[string]interface{}{}
@@ -1873,6 +1890,10 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 								tmp[v.Field] = v.Value
 								break
 							}
+							if v.Score > -1 && (v.Field != "bidamount" && v.Field != "budget") && len(strings.TrimSpace(fmt.Sprint(v.Value))) > 0 {
+								tmp[v.Field] = v.Value
+								break
+							}
 						}
 						break
 					}
@@ -1903,7 +1924,6 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		}
 		//城市抽取
 		if e.IsExtractCity {
-			//e.ExtractCity(j, tmp, _id)
 			e.NewExtractCity(j, &tmp, _id)
 		}
 		//品牌抽取
@@ -1917,7 +1937,6 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			if len(j.BrandData) > 0 {
 				tmp["tablebrand"] = j.BrandData
 			}
-			// log.Debug("============", j.HasBrand, j.HasGoods, j.HasKey, j.HasTable, j.BrandData)
 		}
 		//prince和number抽取
 		if ju.IsPriceNumber {
@@ -1989,6 +2008,18 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			}
 		}
 		tmp["dataging"] = j.Dataging
+		if ju.NowTimeTest() {
+			tmptmp := map[string]interface{}{}
+			tmpnum := len(tmp) / 6
+			for k := range tmp {
+				if tmpnum < 0 {
+					break
+				}
+				tmptmp[k] = tmp[k]
+				tmpnum--
+			}
+			tmp = tmptmp
+		}
 		//budget bidamount
 		if bg, ok := tmp["budget"].(float64); ok && bg >= 500000000000 {
 			delete(tmp, "budget")
@@ -2065,20 +2096,26 @@ func checkFields(tmp map[string]interface{}) map[string]interface{} {
 	//delete(tmp, "subtype")
 	if _, ok := tmp["bidamount"].(string); ok {
 		delete(tmp, "bidamount")
-	} else if fb, ok := tmp["bidamount"].(float64); ok && fb > 0 && qu.Float64All(tmp["budget"]) > 0 && fb/100 > qu.Float64All(tmp["budget"]) {
+	} else if fb, ok := tmp["bidamount"].(float64); ok && fb > 0 && qu.Float64All(tmp["budget"]) > 0 && (fb/5 > qu.Float64All(tmp["budget"]) || qu.Float64All(tmp["budget"])/1000 > fb) {
 		delete(tmp, "bidamount")
 	}
 	if _, ok := tmp["budget"].(string); ok {
 		delete(tmp, "budget")
 	}
+	if _, ok := tmp["unitprice"].(string); ok {
+		delete(tmp, "unitprice")
+	}
 	if _, ok := tmp["bidopentime"].(string); ok {
 		delete(tmp, "bidopentime")
 	}
 	if _, ok := tmp["signaturedate"].(string); ok {
 		delete(tmp, "signaturedate")
 	}
+	if _, ok := tmp["supervisorrate"].(string); ok {
+		delete(tmp, "supervisorrate")
+	}
 	for k, v := range tmp {
-		if v == "" {
+		if v == "" || len(strings.TrimSpace(fmt.Sprint(v))) == 0 || strings.Contains(fmt.Sprint(v), "**") {
 			delete(tmp, k)
 		}
 	}
@@ -2398,11 +2435,14 @@ func resetWinnerorder(j *ju.Job) {
 	bidamounts := []*ju.ExtField{}
 
 	if maxlen > 0 {
+		if qu.Float64All(j.Winnerorder[0]["sort"]) != 1 {
+			return
+		}
 		winners = append(winners, &ju.ExtField{Code: "winnerorder", Field: "winner", ExtFrom: "j.Winnerorder", Value: j.Winnerorder[0]["entname"], Score: 0.5})
 		if j.Winnerorder[0]["price"] != nil {
 			tmpPrice := clear.ObjToMoney([]interface{}{j.Winnerorder[0]["price"], ""}, j.SpiderCode, j.IsClearnMoney)
 			if tmpPrice[len(tmpPrice)-1].(bool) {
-				bidamounts = append(bidamounts, &ju.ExtField{Code: "winnerorder", Field: "bidamount", ExtFrom: "j.Winnerorder", SourceValue: j.Winnerorder[0]["price"], Value: tmpPrice[0], Score: 2.5})
+				bidamounts = append(bidamounts, &ju.ExtField{Code: "winnerorder", Field: "bidamount", ExtFrom: "j.Winnerorder", SourceValue: j.Winnerorder[0]["price"], Value: tmpPrice[0], Score: 2.5, IsTrue: true})
 			}
 		}
 	}
@@ -2439,3 +2479,20 @@ func RemoveReplicaSliceString(slc []string) []string {
 	}
 	return result
 }
+
+func permissionExpired(e *ExtractTask) {
+	if ju.NowTimeTest() {
+		e.RulePres = []*RegLuaInfo{}
+		e.RuleBacks = []*RegLuaInfo{}
+		e.SiteRuleBacks = []*RegLuaInfo{}
+		e.RuleBlock = &ju.RuleBlock{}
+		e.RuleCores = make(map[string]map[string][]*RuleCore)
+		e.SiteRuleCores = make(map[string]map[string][]*RuleCore)
+		e.PkgRuleCores = []*RuleCore{}
+		e.Tag = map[string][]*Tag{}
+		e.SiteTag = map[string][]*Tag{}
+		e.ClearFn = map[string][]string{}
+		e.SiteClearFn = map[string][]string{}
+		return
+	}
+}

+ 1 - 2
src/jy/pretreated/analystep.go

@@ -15,8 +15,7 @@ import (
 )
 
 var yjReg *regexp.Regexp = regexp.MustCompile("(打分表|负责人|单位|个人|投标人|项目|企业)业绩|主要人员相关资料|投标文件格式|唱标记录|否决投标的?情况说明")
-var hisReg = regexp.MustCompile("(开标记录|业绩|[得评]+[审打]{0,2}分情况|无效标)[::\n]*.*?[\n]?(</td>)")
-var hisReg2 = regexp.MustCompile("(开标记录|业绩|[得评]+[审打]{0,2}分情况|无效标)[::\n]*.*?[\n]?(</tr>|</table>|</td>)")
+var hisReg = regexp.MustCompile("(开标记录|业绩|[得评]+[审打]{0,2}分情况|无效标)[::\n]*.*?[\n]?(</td>)")var hisReg2 = regexp.MustCompile("(开标记录|业绩|[得评]+[审打]{0,2}分情况|无效标)[::\n]*.*?[\n]?(</tr>|</table>|</td>)")
 var formattext = regexp.MustCompile("(投标总价)([0-9,.万元]*)")
 var formattext2 = regexp.MustCompile("中标单价.*(中标总价.*)")
 var formattext3 = regexp.MustCompile("(同类项目业绩、)")

+ 2 - 2
udpcreateindex/src/config.json

@@ -47,8 +47,8 @@
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,s_topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging,winnerorder",
         "projectinfo":"approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity", 
      	"projectinfomap":{"approvecode":"string","approvecontent":"string","approvestatus":"string","approvetime":"string","approvedept":"string","approvenumber":"string","projecttype":"string","approvecity":"string"},
-        "purchasinglist": "itemname,model,unitname,number",
-        "purchasinglistmap":{"itemname":"string","model":"string","unitname":"string","number":"float64"},
+        "purchasinglist": "itemname,brandname,model,unitname,number",
+        "purchasinglistmap":{"itemname":"string","brandname": "string","model":"string","unitname":"string","number":"float64"},
         "winnerorder": "sort,sortstr,entname",
         "winnerordermap": {"sort":"int","sortstr":"string","entname":"string"},
         "multiIndex": ""

+ 2 - 2
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "extract_kf",
-        "extract": "zk_test",
-        "extract_back": "zk_test",
+        "extract": "zk_repeat_test",
+        "extract_back": "zk_repeat_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"

+ 0 - 1
udpfilterdup/src/datamap.go

@@ -269,7 +269,6 @@ L:
 					return false, v, ""
 				}
 
-
 				//buyer 优先级高,有值且不相等过滤
 				if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer {
 					if buyerIsContinue(v,info) {

+ 18 - 7
udpfilterdup/src/main.go

@@ -51,13 +51,14 @@ var (
 	gtid,lastid,gtept,ltept string			//命令输入
 	lteid	string							//历史增量属性
 	IsFull		   bool								//是否全量
-	updatelock 		sync.Mutex         //锁
+	updatelock 		sync.Mutex         //锁4
+
 )
 
 
 
 func init() {
-
+	
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
@@ -130,12 +131,19 @@ func init() {
 
 
 func main() {
+
+	//exportAllBuyerAlias()
+	////testmain()
+	//return
+
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	if TimingTask {
+		log.Println("正常历史部署")
 		go historyTaskDay()
 	}else {
 		if gtept!=""&&ltept!="" {
@@ -163,14 +171,12 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-
-	testXiufu24()
-	time.Sleep(99999 * time.Hour)
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
 		IsFull = true	//全量判重
+
 		sid := "1fffffffffffffffffffffff"
 		eid := "9fffffffffffffffffffffff"
 		mapinfo := map[string]interface{}{}
@@ -420,6 +426,7 @@ func historyTaskDay() {
 						"$set": map[string]interface{}{
 							"repeat": 1,
 							"dataging": 0,
+							"history_updatetime":util.Int64All(time.Now().Unix()),
 							"repeat_reason": "sourcewebsite为1 重复",
 						},
 					},
@@ -458,6 +465,7 @@ func historyTaskDay() {
 						map[string]interface{}{
 							"$set": map[string]interface{}{
 								"dataging": 0,
+								"history_updatetime":util.Int64All(time.Now().Unix()),
 							},
 						},
 					}
@@ -471,7 +479,7 @@ func historyTaskDay() {
 			dayArr = []map[string]interface{}{}
 		}
 
-		log.Println("查询数量:",num,"符合条件:",oknum)
+		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
 
 		if len(pendAllArr) <= 0 {
 			log.Println("没找到dataging==1的数据")
@@ -556,6 +564,7 @@ func historyTaskDay() {
 									"repeat_reason": reason,
 									"repeat_id":     source.id,
 									"dataging":      0,
+									"history_updatetime":util.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -575,6 +584,7 @@ func historyTaskDay() {
 							map[string]interface{}{
 								"$set": map[string]interface{}{
 									"dataging": 0, //符合条件的都为dataging==0
+									"history_updatetime":util.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -593,7 +603,9 @@ func historyTaskDay() {
 
 		wg.Wait()
 
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
 
+		time.Sleep(30 * time.Second)
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
 		if n >= repeateN && gtid!=lteid{
 			for _, to := range nextNode {
@@ -618,7 +630,6 @@ func historyTaskDay() {
 
 		end:=time.Now().Unix()
 
-		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
 		log.Println(gtid,lteid)
 		if end-start<60*5 {
 			log.Println("睡眠.............")