فهرست منبع

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 4 سال پیش
والد
کامیت
182a482a13
7فایلهای تغییر یافته به همراه379 افزوده شده و 21 حذف شده
  1. 28 0
      projectforecast/config.json
  2. 74 0
      projectforecast/main.go
  3. 184 0
      projectforecast/task.go
  4. 46 0
      udpfilterdup/src/README.md
  5. 6 0
      udpfilterdup/src/config.json
  6. 11 11
      udpfilterdup/src/datamap.go
  7. 30 10
      udpfilterdup/src/main.go

+ 28 - 0
projectforecast/config.json

@@ -0,0 +1,28 @@
+{
+  "mongodbServers": "192.168.3.166:27082",
+  "mongodbPoolSize": 10,
+  "mongodbName": "bqData",
+  "mongoColl_pro": "projectinfo_c",
+  "mongoColl_tag": "project_biaoqian",
+  "mongoColl_save": "project_forecast",
+  "udpport": ":1182",
+  "spiderCodes": ["jzgc_nxjzscjgfwxt_gcyj", "jzgc_fjsjshyxxgkpt_gcxmcx", "jzgc_ahszfhcxjshyggxxcxzx_qycx", "jzgc_ahszfhcxjshyggxxcxzx_xmcx", "jzgc_bjsjzscxxgkpt_qyxx", "jzgc_bjsjzscxxgkpt_xmxx_htbaxx", "jzgc_bjsjzscxxgkpt_xmxx_jgbaxx",
+    "jzgc_bjsjzscxxgkpt_xmxx_lwfblyxx", "jzgc_bjsjzscxxgkpt_xmxx_sgxkxx", "jzgc_bjsjzscxxgkpt_xmxx_yjxx", "jzgc_cqszfhcxjswyh_qyxx", "jzgc_cqszfhcxjswyh_xmxx", "jzgc_fjsjshyxxgkpt_qyxxcx", "jzgc_gdsjshysjkfpt_qyxx", "jzgc_gdsjshysjkfpt_xmxx",
+    "jzgc_gssjzscjgggfwpt_sgxkxm", "jzgc_gxjzscjgycxxxythptxxfb_gcxm", "jzgc_gxjzscjgycxxxythptxxfb_qyxx", "jzgc_hbszfhcxjst_gcxx", "jzgc_hbszfhcxjst_qyzz", "jzgc_hnsjzscjgggfwpt_jsgcqy", "jzgc_hnsjzschgczlaqjgythpt_gcxx", "jzgc_hnsjzschgczlaqjgythpt_qyxx",
+    "jzgc_hnsjzscjgggfwpt_jsxm", "jzgc_jlsjzscjgggfwpt_gcxmxx", "jzgc_jlsjzscjgggfwpt_qyxx", "jzgc_jssjzscjgycxxxythpt_qyxx", "jzgc_lnsjzscggfwpt_gcxmxx", "jzgc_lnsjzscggfwpt_qyxx", "jzgc_nmgzzqjzscjgycxxxglpt_gcxmxx", "jzgc_nmgzzqjzscjgycxxxglpt_qyxx",
+    "jzgc_nxjzscjgfwxt_qysj", "jzgc_nxjzscjgfwxt_gcsj", "jzgc_qhsgcjsjghxyglpt_gcxm", "jzgc_qhsgcjsjghxyglpt_qyxx", "jzgc_scszfcxjshysjgxpt_qyxx", "jzgc_sdszfcxjsfwjgyxyxxzhpt_qycx", "jzgc_sdszfcxjsfwjgyxyxxzhpt_xmcx", "jzgc_sxsjzscjgggfwpt_qyxx",
+    "jzgc_sxsjzscjgggfwpt_xmxx", "jzgc_tjszfhcxjswyh_gcxmxx", "jzgc_sxsjzscjgycxxxfbpt_gcxmxxcx", "jzgc_sxsjzscjgycxxxfbpt_qycx", "jzgc_xjgcjsy_gcxm", "jzgc_xjgcjsy_qyxx", "jzgc_xzzzqjzscjgycxxxpt_gcxx", "jzgc_xzzzqjzscjgycxxxpt_qyxx", "jzgc_scszfcxjshysjgxpt_xmxx"
+  ],
+  "forecast": {
+    "规划科研": ["立项环评", "勘察设计", "建设准备", "前期准备"],
+    "立项环评": ["勘察设计", "建设准备", "前期施工"],
+    "勘察设计": ["建设准备", "前期施工"],
+    "建设准备": ["前期施工"],
+    "前期施工": ["后期施工"],
+    "后期施工": ["竣工验收", "运行维护", "物品采购"],
+    "竣工验收": ["运行维护", "物品采购"],
+    "运行维护": ["物品采购"]},
+  "category": ["道路", "学校", "医院"],
+  "nature": ["新建", "扩建", "拆建"],
+  "rate": "60%"
+}

+ 74 - 0
projectforecast/main.go

@@ -0,0 +1,74 @@
+package main
+
+import (
+	"encoding/json"
+	mu "mfw/util"
+	"mongodb"
+	"net"
+	"qfw/util"
+)
+
+var (
+	Sysconfig   map[string]interface{}
+	MongoTool   *mongodb.MongodbSim
+	Dbname      string
+	CollPro     string //项目表
+	CollTag     string //标签表
+	CollSave    string
+	Rate        string
+	Forecast    map[string]interface{}
+	Category    []interface{}
+	Nature      []interface{}
+	SpiderCodes []interface{}
+	udpclient   mu.UdpClient
+)
+
+var MgoSaveCache = make(chan map[string]interface{}, 2000)
+var SP = make(chan bool, 5)
+
+func init() {
+	util.ReadConfig(&Sysconfig)
+
+	Dbname = Sysconfig["mongodbName"].(string)
+	MongoTool = &mongodb.MongodbSim{
+		MongodbAddr: Sysconfig["mongodbServers"].(string),
+		Size:        util.IntAll(Sysconfig["mongodbPoolSize"]),
+		DbName:      Dbname,
+	}
+	MongoTool.InitPool()
+
+	CollPro = Sysconfig["mongoColl_pro"].(string)
+	CollTag = Sysconfig["mongoColl_tag"].(string)
+	CollSave = Sysconfig["mongoColl_save"].(string)
+	Rate = util.ObjToString(Sysconfig["rate"])
+	Forecast = Sysconfig["forecast"].(map[string]interface{})
+	Category = Sysconfig["category"].([]interface{})
+	Nature = Sysconfig["nature"].([]interface{})
+	SpiderCodes = Sysconfig["spiderCodes"].([]interface{})
+
+	udpport, _ := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: udpport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	util.Debug("Udp服务监听", udpport)
+}
+
+func main() {
+	go SaveMgo()
+	//GetProjectData("1597386920")
+	ch := make(chan bool, 1)
+	<-ch
+}
+
+
+//udp调用信号
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	util.Debug(act)
+	var mapInfo map[string]interface{}
+	err := json.Unmarshal(data, &mapInfo)
+	util.Debug("err:", err, "mapInfo:", mapInfo)
+	if err != nil {
+		_ = udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+	} else if mapInfo != nil {
+		GetProjectData(util.ObjToString(mapInfo["updatetime"]))
+	}
+}

+ 184 - 0
projectforecast/task.go

@@ -0,0 +1,184 @@
+package main
+
+import (
+	"go.mongodb.org/mongo-driver/bson"
+	"log"
+	"mongodb"
+	qu "qfw/util"
+	"strconv"
+	"time"
+)
+
+var queryClose = make(chan bool)
+var queryCloseOver = make(chan bool)
+
+func SaveMgo() {
+	log.Println("Mgo Save...")
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-MgoSaveCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MongoTool.SaveBulk(CollSave, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MongoTool.SaveBulk(CollSave, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//项目数据
+func GetProjectData(t string) {
+	defer qu.Catch()
+	count, taskcount := 0, 0
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+	dataPool := make(chan map[string]interface{}, 2000)
+	over := make(chan bool)
+	pool := make(chan bool, 4)
+	go func() {
+	L:
+		for {
+			select {
+			case tmp := <-dataPool:
+				pool <- true
+				taskcount++
+				go func(tmp map[string]interface{}) {
+					defer func() {
+						<-pool
+					}()
+					ForecastMethod(tmp)
+				}(tmp)
+			case <-over:
+				break L
+			}
+		}
+	}()
+	uptime, err := strconv.ParseInt(t, 10, 64)
+	if err == nil {
+		qu.Debug(err)
+	}
+	query := bson.M{
+		"updatetime": bson.M{"$gt": uptime},
+		"o_projectinfo.nature":     bson.M{"$in": Nature},
+		"spidercode": bson.M{"$in": SpiderCodes},
+		"$or": []bson.M{
+			{"category_buyer": bson.M{"$in": Category}},
+			{"category_purpose": bson.M{"$in": Category}},
+		},
+	}
+	qu.Debug("query-----", CollPro, query["updatetime"])
+	filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1}
+	it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
+	var lastid interface{}
+L:
+	for {
+		select {
+		case <-queryClose:
+			log.Println("receive interrupt sign")
+			log.Println("close iter..", lastid, it.Cursor.Close(nil))
+			queryCloseOver <- true
+			break L
+		default:
+			tmp := make(map[string]interface{})
+			if it.Next(&tmp) {
+				lastid = tmp["_id"]
+				if count%1000 == 0 {
+					log.Println("current", count, lastid)
+				}
+				dataPool <- tmp
+				count++
+			} else {
+				break L
+			}
+		}
+	}
+	time.Sleep(5 * time.Second)
+	over <- true
+	//阻塞
+	for n := 0; n < 4; n++ {
+		pool <- true
+	}
+}
+
+func ForecastMethod(pro map[string]interface{}) {
+	pro["infoid"] = mongodb.BsonIdToSId(pro["_id"])
+	pro["yucetime"] = time.Now().Unix()
+	pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
+	delete(pro, "_id")
+	delete(pro, "o_projectinfo")
+	category := GetCategory(pro)
+	stage := qu.ObjToString(pro["stage"])
+	q := bson.M{
+		"category": category,
+		"stage":    bson.M{"$in": Forecast[stage]},
+	}
+	var maps []map[string]interface{}
+	if pro["results"] != nil {
+		maps = qu.ObjArrToMapArr(pro["results"].([]interface{}))
+	} else {
+		maps = []map[string]interface{}{}
+	}
+	result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1)
+	for _, t := range *result {
+		if len(t) == 0 {
+			continue
+		}
+		tmp := make(map[string]interface{})
+		tmp["stage"] = t["stage"]
+		tmp["purchase_classify"] = t["purchase_classify"]
+		tmp["purchasing"] = t["purchasing"]
+		tmp["p_rate"] = Rate
+		tmp["time"] = ""
+		//tmp["p_projects"] = ""		暂无该字段
+		maps = append(maps, tmp)
+	}
+	if len(maps) > 0 {
+		pro["results"] = maps
+	}
+	//update := map[string]interface{}{}
+	//update["$set"] = pro
+	MgoSaveCache <- pro
+}
+
+func GetCategory(tmp map[string]interface{}) string {
+	categoryBuyerIndex := -1
+	categoryPurposeIndex := -1
+	for k, v := range Category {
+		if tmp["category_buyer"] != nil {
+			if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) {
+				categoryBuyerIndex = k
+			}
+		}
+		if tmp["category_purpose"] != nil {
+			categoryPurposeIndex = k
+		}
+	}
+
+	if categoryBuyerIndex >= categoryPurposeIndex {
+		return qu.ObjToString(Category[categoryBuyerIndex])
+	} else {
+		return qu.ObjToString(Category[categoryPurposeIndex])
+	}
+}

+ 46 - 0
udpfilterdup/src/README.md

@@ -13,6 +13,13 @@
             "coll": "site"
         }
     },
+    "task_mongodb": {
+        "task_addrName": "172.17.4.187:27081",
+        "task_dbName": "qfw",
+        "task_collName": "ocr_flie_over",
+        "pool": 5
+        
+    },
     "jkmail": {
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"
@@ -95,6 +102,45 @@ func moveOnceTimeOut()  {
 
 
 
+{
+    "udpport": ":1785",
+    "dupdays": 7,
+    "mongodb": {
+        "addr": "172.17.4.85:27080",
+        "pool": 10,
+        "db": "qfw",
+        "extract": "result_20200715",
+        "extract_back": "result_20200714",
+        "site": {
+            "dbname": "qfw",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://172.17.145.179:19281/_send/_mail"
+    },
+    "nextNode": [
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+    "threads": 1,
+    "isMerger": false,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 5,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}
+
 
 
 

+ 6 - 0
udpfilterdup/src/config.json

@@ -12,6 +12,12 @@
             "coll": "site"
         }
     },
+    "task_mongodb": {
+        "task_addrName": "172.17.4.187:27081",
+        "task_dbName": "qfw",
+        "task_collName": "ocr_flie_over",
+        "pool": 5
+    },
     "jkmail": {
         "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"

+ 11 - 11
udpfilterdup/src/datamap.go

@@ -125,7 +125,6 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 	return dm
 }
 
-
 //增量
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400 * 2)
@@ -472,7 +471,8 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 
 	}else {
-		d.keymap = d.GetLatelyFiveDayDouble(t)
+		//d.keymap = d.GetLatelyFiveDayDouble(t)
+		d.keymap = d.GetLatelyFiveDay(t)
 	}
 	m := map[string]bool{}
 	for _, v := range d.keymap {
@@ -497,15 +497,15 @@ func (d *datamap) update(t int64) {
 	//log.Println("更新前后数据:", all, all1)
 }
 
-//func (d *datamap) GetLatelyFiveDay(t int64) []string  {
-//	array := make([]string, d.days)
-//	now := time.Unix(t, 0)
-//	for i := 0; i < d.days; i++ {
-//		array[i] = now.Format(qutil.Date_yyyyMMdd)
-//		now = now.AddDate(0, 0, -1)
-//	}
-//	return array
-//}
+func (d *datamap) GetLatelyFiveDay(t int64) []string  {
+	array := make([]string, d.days)
+	now := time.Unix(t, 0)
+	for i := 0; i < d.days; i++ {
+		array[i] = now.Format(qutil.Date_yyyyMMdd)
+		now = now.AddDate(0, 0, -1)
+	}
+	return array
+}
 
 func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
 	array := make([]string, d.days*2)

+ 30 - 10
udpfilterdup/src/main.go

@@ -26,11 +26,13 @@ var (
 	Sysconfig    map[string]interface{} //配置文件
 	mconf        map[string]interface{} //mongodb配置信息
 	mgo          *MongodbSim            //mongodb操作对象
+	task_mgo     *MongodbSim            //mongodb操作对象
+	task_collName	string
 	extract      string
 	extract_back string
 	udpclient    mu.UdpClient             //udp对象
 	nextNode     []map[string]interface{} //下节点数组
-	dupdays      = 5                      //初始化判重范围
+	dupdays      = 7                      //初始化判重范围
 	DM           *datamap                 //
 
 	//正则筛选相关
@@ -61,6 +63,17 @@ func init() {
 	flag.Parse()
 	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
+
+	task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
+	task_mgo = &MongodbSim{
+		MongodbAddr: task_mconf["task_addrName"].(string),
+		DbName:      task_mconf["task_dbName"].(string),
+		Size:        util.IntAllDef(task_mconf["task_pool"], 10),
+	}
+	task_mgo.InitPool()
+	task_collName = task_mconf["task_collName"].(string)
+
+
 	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
 	mgo = &MongodbSim{
@@ -130,8 +143,8 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid := "4f16936d52c1d9fbf843c60e"
-		eid := "6f16936d52c1d9fbf843c60e"
+		sid := "1f16936d52c1d9fbf843c60e"
+		eid := "9f16936d52c1d9fbf843c60e"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -355,7 +368,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 
-	time.Sleep(60 * time.Second)
+	time.Sleep(30 * time.Second)
 
 	//任务完成,开始发送广播通知下面节点
 	if n >= repeateN && mapInfo["stop"] == nil {
@@ -401,18 +414,25 @@ func historyTaskDay() {
 		}
 
 		//查询表最后一个id
-		sess := mgo.GetMgoConn()
-		defer mgo.DestoryMongoConn(sess)
-		q:=map[string]interface{}{}
+		task_sess := task_mgo.GetMgoConn()
+		defer task_mgo.DestoryMongoConn(task_sess)
+		q:=map[string]interface{}{
+			"isused":true,
+		}
 		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
-		it_last := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("-_id").Iter()
+		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
 		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-			lteid = BsonTOStringId(tmp["_id"])
+			lteid = util.ObjToString(tmp["gtid"])
+			log.Println("查询的最后一个任务Id:",lteid)
 			break
 		}
-
+		//
+		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
 		time.Sleep(5 * time.Minute)
 
+
+		sess := mgo.GetMgoConn()//连接器
+		defer mgo.DestoryMongoConn(sess)
 		//开始判重
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{