Browse Source

项目预测

Jianghan 4 years ago
parent
commit
34e8450c2c
3 changed files with 286 additions and 0 deletions
  1. 28 0
      projectforecast/config.json
  2. 74 0
      projectforecast/main.go
  3. 184 0
      projectforecast/task.go

+ 28 - 0
projectforecast/config.json

@@ -0,0 +1,28 @@
+{
+  "mongodbServers": "192.168.3.166:27082",
+  "mongodbPoolSize": 10,
+  "mongodbName": "bqData",
+  "mongoColl_pro": "projectinfo_c",
+  "mongoColl_tag": "project_biaoqian",
+  "mongoColl_save": "project_forecast",
+  "udpport": ":1182",
+  "spiderCodes": ["jzgc_nxjzscjgfwxt_gcyj", "jzgc_fjsjshyxxgkpt_gcxmcx", "jzgc_ahszfhcxjshyggxxcxzx_qycx", "jzgc_ahszfhcxjshyggxxcxzx_xmcx", "jzgc_bjsjzscxxgkpt_qyxx", "jzgc_bjsjzscxxgkpt_xmxx_htbaxx", "jzgc_bjsjzscxxgkpt_xmxx_jgbaxx",
+    "jzgc_bjsjzscxxgkpt_xmxx_lwfblyxx", "jzgc_bjsjzscxxgkpt_xmxx_sgxkxx", "jzgc_bjsjzscxxgkpt_xmxx_yjxx", "jzgc_cqszfhcxjswyh_qyxx", "jzgc_cqszfhcxjswyh_xmxx", "jzgc_fjsjshyxxgkpt_qyxxcx", "jzgc_gdsjshysjkfpt_qyxx", "jzgc_gdsjshysjkfpt_xmxx",
+    "jzgc_gssjzscjgggfwpt_sgxkxm", "jzgc_gxjzscjgycxxxythptxxfb_gcxm", "jzgc_gxjzscjgycxxxythptxxfb_qyxx", "jzgc_hbszfhcxjst_gcxx", "jzgc_hbszfhcxjst_qyzz", "jzgc_hnsjzscjgggfwpt_jsgcqy", "jzgc_hnsjzschgczlaqjgythpt_gcxx", "jzgc_hnsjzschgczlaqjgythpt_qyxx",
+    "jzgc_hnsjzscjgggfwpt_jsxm", "jzgc_jlsjzscjgggfwpt_gcxmxx", "jzgc_jlsjzscjgggfwpt_qyxx", "jzgc_jssjzscjgycxxxythpt_qyxx", "jzgc_lnsjzscggfwpt_gcxmxx", "jzgc_lnsjzscggfwpt_qyxx", "jzgc_nmgzzqjzscjgycxxxglpt_gcxmxx", "jzgc_nmgzzqjzscjgycxxxglpt_qyxx",
+    "jzgc_nxjzscjgfwxt_qysj", "jzgc_nxjzscjgfwxt_gcsj", "jzgc_qhsgcjsjghxyglpt_gcxm", "jzgc_qhsgcjsjghxyglpt_qyxx", "jzgc_scszfcxjshysjgxpt_qyxx", "jzgc_sdszfcxjsfwjgyxyxxzhpt_qycx", "jzgc_sdszfcxjsfwjgyxyxxzhpt_xmcx", "jzgc_sxsjzscjgggfwpt_qyxx",
+    "jzgc_sxsjzscjgggfwpt_xmxx", "jzgc_tjszfhcxjswyh_gcxmxx", "jzgc_sxsjzscjgycxxxfbpt_gcxmxxcx", "jzgc_sxsjzscjgycxxxfbpt_qycx", "jzgc_xjgcjsy_gcxm", "jzgc_xjgcjsy_qyxx", "jzgc_xzzzqjzscjgycxxxpt_gcxx", "jzgc_xzzzqjzscjgycxxxpt_qyxx", "jzgc_scszfcxjshysjgxpt_xmxx"
+  ],
+  "forecast": {
+    "规划科研": ["立项环评", "勘察设计", "建设准备", "前期准备"],
+    "立项环评": ["勘察设计", "建设准备", "前期施工"],
+    "勘察设计": ["建设准备", "前期施工"],
+    "建设准备": ["前期施工"],
+    "前期施工": ["后期施工"],
+    "后期施工": ["竣工验收", "运行维护", "物品采购"],
+    "竣工验收": ["运行维护", "物品采购"],
+    "运行维护": ["物品采购"]},
+  "category": ["道路", "学校", "医院"],
+  "nature": ["新建", "扩建", "拆建"],
+  "rate": "60%"
+}

+ 74 - 0
projectforecast/main.go

@@ -0,0 +1,74 @@
+package main
+
+import (
+	"encoding/json"
+	mu "mfw/util"
+	"mongodb"
+	"net"
+	"qfw/util"
+)
+
+var (
+	Sysconfig   map[string]interface{}
+	MongoTool   *mongodb.MongodbSim
+	Dbname      string
+	CollPro     string //项目表
+	CollTag     string //标签表
+	CollSave    string
+	Rate        string
+	Forecast    map[string]interface{}
+	Category    []interface{}
+	Nature      []interface{}
+	SpiderCodes []interface{}
+	udpclient   mu.UdpClient
+)
+
+var MgoSaveCache = make(chan map[string]interface{}, 2000)
+var SP = make(chan bool, 5)
+
+func init() {
+	util.ReadConfig(&Sysconfig)
+
+	Dbname = Sysconfig["mongodbName"].(string)
+	MongoTool = &mongodb.MongodbSim{
+		MongodbAddr: Sysconfig["mongodbServers"].(string),
+		Size:        util.IntAll(Sysconfig["mongodbPoolSize"]),
+		DbName:      Dbname,
+	}
+	MongoTool.InitPool()
+
+	CollPro = Sysconfig["mongoColl_pro"].(string)
+	CollTag = Sysconfig["mongoColl_tag"].(string)
+	CollSave = Sysconfig["mongoColl_save"].(string)
+	Rate = util.ObjToString(Sysconfig["rate"])
+	Forecast = Sysconfig["forecast"].(map[string]interface{})
+	Category = Sysconfig["category"].([]interface{})
+	Nature = Sysconfig["nature"].([]interface{})
+	SpiderCodes = Sysconfig["spiderCodes"].([]interface{})
+
+	udpport, _ := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: udpport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	util.Debug("Udp服务监听", udpport)
+}
+
+func main() {
+	go SaveMgo()
+	//GetProjectData("1597386920")
+	ch := make(chan bool, 1)
+	<-ch
+}
+
+
+//udp调用信号
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	util.Debug(act)
+	var mapInfo map[string]interface{}
+	err := json.Unmarshal(data, &mapInfo)
+	util.Debug("err:", err, "mapInfo:", mapInfo)
+	if err != nil {
+		_ = udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+	} else if mapInfo != nil {
+		GetProjectData(util.ObjToString(mapInfo["updatetime"]))
+	}
+}

+ 184 - 0
projectforecast/task.go

@@ -0,0 +1,184 @@
+package main
+
+import (
+	"go.mongodb.org/mongo-driver/bson"
+	"log"
+	"mongodb"
+	qu "qfw/util"
+	"strconv"
+	"time"
+)
+
+var queryClose = make(chan bool)
+var queryCloseOver = make(chan bool)
+
+func SaveMgo() {
+	log.Println("Mgo Save...")
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-MgoSaveCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MongoTool.SaveBulk(CollSave, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					MongoTool.SaveBulk(CollSave, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}
+
+//项目数据
+func GetProjectData(t string) {
+	defer qu.Catch()
+	count, taskcount := 0, 0
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+	dataPool := make(chan map[string]interface{}, 2000)
+	over := make(chan bool)
+	pool := make(chan bool, 4)
+	go func() {
+	L:
+		for {
+			select {
+			case tmp := <-dataPool:
+				pool <- true
+				taskcount++
+				go func(tmp map[string]interface{}) {
+					defer func() {
+						<-pool
+					}()
+					ForecastMethod(tmp)
+				}(tmp)
+			case <-over:
+				break L
+			}
+		}
+	}()
+	uptime, err := strconv.ParseInt(t, 10, 64)
+	if err == nil {
+		qu.Debug(err)
+	}
+	query := bson.M{
+		"updatetime": bson.M{"$gt": uptime},
+		"o_projectinfo.nature":     bson.M{"$in": Nature},
+		"spidercode": bson.M{"$in": SpiderCodes},
+		"$or": []bson.M{
+			{"category_buyer": bson.M{"$in": Category}},
+			{"category_purpose": bson.M{"$in": Category}},
+		},
+	}
+	qu.Debug("query-----", CollPro, query["updatetime"])
+	filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1}
+	it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
+	var lastid interface{}
+L:
+	for {
+		select {
+		case <-queryClose:
+			log.Println("receive interrupt sign")
+			log.Println("close iter..", lastid, it.Cursor.Close(nil))
+			queryCloseOver <- true
+			break L
+		default:
+			tmp := make(map[string]interface{})
+			if it.Next(&tmp) {
+				lastid = tmp["_id"]
+				if count%1000 == 0 {
+					log.Println("current", count, lastid)
+				}
+				dataPool <- tmp
+				count++
+			} else {
+				break L
+			}
+		}
+	}
+	time.Sleep(5 * time.Second)
+	over <- true
+	//阻塞
+	for n := 0; n < 4; n++ {
+		pool <- true
+	}
+}
+
+func ForecastMethod(pro map[string]interface{}) {
+	pro["infoid"] = mongodb.BsonIdToSId(pro["_id"])
+	pro["yucetime"] = time.Now().Unix()
+	pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
+	delete(pro, "_id")
+	delete(pro, "o_projectinfo")
+	category := GetCategory(pro)
+	stage := qu.ObjToString(pro["stage"])
+	q := bson.M{
+		"category": category,
+		"stage":    bson.M{"$in": Forecast[stage]},
+	}
+	var maps []map[string]interface{}
+	if pro["results"] != nil {
+		maps = qu.ObjArrToMapArr(pro["results"].([]interface{}))
+	} else {
+		maps = []map[string]interface{}{}
+	}
+	result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1)
+	for _, t := range *result {
+		if len(t) == 0 {
+			continue
+		}
+		tmp := make(map[string]interface{})
+		tmp["stage"] = t["stage"]
+		tmp["purchase_classify"] = t["purchase_classify"]
+		tmp["purchasing"] = t["purchasing"]
+		tmp["p_rate"] = Rate
+		tmp["time"] = ""
+		//tmp["p_projects"] = ""		暂无该字段
+		maps = append(maps, tmp)
+	}
+	if len(maps) > 0 {
+		pro["results"] = maps
+	}
+	//update := map[string]interface{}{}
+	//update["$set"] = pro
+	MgoSaveCache <- pro
+}
+
+func GetCategory(tmp map[string]interface{}) string {
+	categoryBuyerIndex := -1
+	categoryPurposeIndex := -1
+	for k, v := range Category {
+		if tmp["category_buyer"] != nil {
+			if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) {
+				categoryBuyerIndex = k
+			}
+		}
+		if tmp["category_purpose"] != nil {
+			categoryPurposeIndex = k
+		}
+	}
+
+	if categoryBuyerIndex >= categoryPurposeIndex {
+		return qu.ObjToString(Category[categoryBuyerIndex])
+	} else {
+		return qu.ObjToString(Category[categoryPurposeIndex])
+	}
+}