Forráskód Böngészése

Merge branch 'master' of https://jygit.jydev.jianyu360.cn/data_processing/timed_tasks

wcc 8 hónapja
szülő
commit
170bdb35ac
6 módosított fájl, 203 hozzáadás és 80 törlés
  1. 4 13
      yg/config.go
  2. 15 6
      yg/config.json
  3. 1 0
      yg/go.mod
  4. 2 0
      yg/go.sum
  5. 181 61
      yg/main.go
  6. BIN
      yg/yg

+ 4 - 13
yg/config.go

@@ -9,7 +9,6 @@ type (
 			DbSize   int    `json:"dbSize"`
 			UserName string `json:"username"`
 			Password string `json:"password"`
-			ReplSet  string `json:"replSet"`
 		} `json:"mgo"`
 		Bidding struct {
 			Address  string `json:"address"`
@@ -17,26 +16,18 @@ type (
 			DbSize   int    `json:"dbSize"`
 			UserName string `json:"username"`
 			Password string `json:"password"`
-			ReplSet  string `json:"replSet"`
 		} `json:"bidding"`
 		Es struct {
 			Address  string `json:"address"`
 			DbSize   int    `json:"dbSize"`
-			Index    string `json:"index"`
-			IType    string `json:"iType"`
 			Version  string `json:"version"`
 			UserName string `json:"userName"`
 			Password string `json:"password"`
 		} `json:"es"`
-		Es2 struct {
-			Address  string `json:"address"`
-			DbSize   int    `json:"dbSize"`
-			Index    string `json:"index"`
-			IType    string `json:"iType"`
-			Version  string `json:"version"`
-			UserName string `json:"userName"`
-			Password string `json:"password"`
-		} `json:"es2"`
+		Udp struct {
+			JyAddr string `json:"jyAddr"`
+			JyPort int    `json:"jyPort"`
+		} `json:"udp"`
 		LastId string `json:"lastId"`
 	}
 )

+ 15 - 6
yg/config.json

@@ -4,17 +4,26 @@
 		"address": "172.17.4.86:27080",
 		"dbName": "jyqyfw",
 		"dbSize": 5,
-		"colName": "usermail_xhs_project",
-		"temporaryColName": "usermail_xhs_project_tmp"
+		"userName": "",
+  		"password": ""
+	},
+	"bidding": {
+		"address": "172.17.4.86:27080",
+		"dbName": "jyqyfw",
+		"dbSize": 5,
+		"userName": "",
+  		"password": ""
 	},
 	"es": {
 		"address": "http://172.17.4.184:19800",
 		"dbSize": 10,
-		"index": "projectset",
-		"iType": "projectset",
 		"version": "v7",
 		"userName": "",
-  		"password":
+  		"password": ""
+	},
+	"udp": {
+		"jyAddr": "",
+		"jyPort": 1799
 	},
-	"lastId": "",
+	"lastId": ""
 }

+ 1 - 0
yg/go.mod

@@ -8,6 +8,7 @@ require (
 	github.com/gogf/gf/v2 v2.7.4
 	github.com/nsqio/go-nsq v1.1.0
 	github.com/olivere/elastic/v7 v7.0.32
+	github.com/robfig/cron v1.2.0
 	github.com/tealeg/xlsx v1.0.5
 	github.com/tealeg/xlsx/v3 v3.3.10
 	go.mongodb.org/mongo-driver v1.11.4

+ 2 - 0
yg/go.sum

@@ -210,6 +210,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
 github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=

+ 181 - 61
yg/main.go

@@ -1,7 +1,9 @@
 package main
 
 import (
+	"encoding/json"
 	"log"
+	"net"
 	"reflect"
 	"regexp"
 	"strconv"
@@ -14,14 +16,18 @@ import (
 	"github.com/gogf/gf/v2/util/gconv"
 	"github.com/robfig/cron"
 	common "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 )
 
 var (
 	Bidding, Mgo      *MongodbSim
 	Es                elastic.Es
 	Es2               elastic.Es
+	UdpClient         udp.UdpClient
+	JyUdpAddr         *net.UDPAddr
 	cfg               = new(Config)
 	ClearHtml         = regexp.MustCompile("<[^>]*>")
+	BiddingField      = make(map[string]string)
 	BiddingLevelField = make(map[string]map[string]string)
 	TimeV1            = regexp.MustCompile("^(\\d{4})[年.]?$")
 	TimeV2            = regexp.MustCompile("^(\\d{4})[年./-]?(\\d{1,2})[月./-]?$")
@@ -29,31 +35,48 @@ var (
 	TimeClear         = regexp.MustCompile("[年|月|日|/|.|-]")
 )
 
+type UdpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
 func init() {
 	common.ReadConfig(&cfg)
+	log.Println("配置文件 ", cfg)
 	Mgo = &MongodbSim{
-		// MongodbAddr: "172.17.4.86:27080",
-		MongodbAddr: "192.168.3.166:27082",
-		DbName:      "yantianlei",
-		Size:        20,
-		UserName:    "",
-		Password:    "",
+		MongodbAddr: cfg.Mgo.Address,
+		DbName:      cfg.Mgo.DbName,
+		Size:        cfg.Mgo.DbSize,
+		UserName:    cfg.Mgo.UserName,
+		Password:    cfg.Mgo.Password,
 	}
 	Mgo.InitPool()
 	Bidding = &MongodbSim{
-		// MongodbAddr: "172.17.4.86:27080",
-		MongodbAddr: "192.168.3.166:27082",
-		DbName:      "yantianlei",
-		Size:        20,
-		UserName:    "",
-		Password:    "",
+		MongodbAddr: cfg.Bidding.Address,
+		DbName:      cfg.Bidding.DbName,
+		Size:        cfg.Bidding.DbSize,
+		UserName:    cfg.Bidding.UserName,
+		Password:    cfg.Bidding.Password,
 	}
 	Bidding.InitPool()
-	// Es = elastic.NewEs("07", "http://127.0.0.1:9801", 20, "jybid", "Top2023_JEB01i@31")
-	Es2 = elastic.NewEs("07", "http://192.168.3.241:9205,http://192.168.3.149:9200", 20, "", "")
+	// Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
+	Es = elastic.NewEs("07", "http://172.17.4.184:19908", 50, "jybid", "Top2023_JEB01i@31")
+	log.Println("初始化完成")
+	// Es2 = elastic.NewEs("07", "http://192.168.3.241:9205,http://192.168.3.149:9200", 20, "", "")
+	JyUdpAddr = &net.UDPAddr{
+		IP:   net.ParseIP(cfg.Udp.JyAddr),
+		Port: cfg.Udp.JyPort,
+	}
+	InitEsBiddingField()
+	UdpClient = udp.UdpClient{Local: ":1799", BufSize: 1024}
+	UdpClient.Listen(processUdpMsg)
+	log.Println("Udp服务监听 port:", ":1799")
 }
 
 func main() {
+	run()
 	c := cron.New()
 	c.AddFunc("0 */10 * * * ?", run)
 	c.Start()
@@ -62,6 +85,7 @@ func main() {
 }
 
 func run() {
+	log.Println("开始执行阳光采购", cfg.LastId)
 	session := Mgo.GetMgoConn()
 	lastId := cfg.LastId
 	query := map[string]interface{}{}
@@ -102,12 +126,24 @@ func run() {
 					}
 				}
 			}
+			newData := GetEsField(data)
+			updateData := map[string]interface{}{
+				"domain_firsttype":  domain_firsttype,
+				"domain_secondtype": domain_secondtype,
+				"domain_thirdtype":  domain_thirdtype,
+			}
 			deliver_area, deliver_city, deliver_district := data["area"], data["city"], data["district"]
 			if source == "user" {
 				public_type = "用户发布"
 				deliver_area, deliver_city, deliver_district = data["deliver_area"], data["deliver_city"], data["deliver_district"]
+			} else {
+				updateData["deliver_area"] = deliver_area
+				updateData["deliver_city"] = deliver_city
+				updateData["deliver_district"] = deliver_district
+			}
+			if source == "is_yg_new" {
+				newData["is_yg_new"] = 1
 			}
-			newData := GetEsField(data)
 			newData["deliver_area"] = deliver_area
 			newData["deliver_city"] = deliver_city
 			newData["deliver_district"] = deliver_district
@@ -116,55 +152,105 @@ func run() {
 			newData["domain_thirdtype"] = domain_thirdtype
 			newData["public_type"] = public_type
 			newData["source_id"] = id
-			Bidding.UpdateById("bidding", id, map[string]interface{}{"$set": map[string]interface{}{
-				"deliver_area":      deliver_area,
-				"deliver_city":      deliver_city,
-				"deliver_district":  deliver_district,
-				"domain_firsttype":  domain_firsttype,
-				"domain_secondtype": domain_secondtype,
-				"domain_thirdtype":  domain_thirdtype,
-			}})
+			Bidding.UpdateById("bidding", id, map[string]interface{}{"$set": updateData})
 			if newData["purchasinglist"] != nil {
-				purchasinglist := common.ObjArrToMapArr(newData["purchasinglist"].(primitive.A))
-				if source == "user" && len(purchasinglist) > 1 {
-					newDatas := newData
-					itemMap := map[string]string{}
-					itemArr := []string{}
-					for _, v := range purchasinglist {
-						itemname := common.ObjToString(v["itemname"])
-						if itemname != "" {
-							itemMap[itemname] = "1"
+				if purchasinglists, ok := newData["purchasinglist"].(primitive.A); ok {
+					purchasinglist := common.ObjArrToMapArr(purchasinglists)
+					if source == "user" && len(purchasinglist) > 1 {
+						newDatas := newData
+						itemMap := map[string]string{}
+						itemArr := []string{}
+						for _, v := range purchasinglist {
+							itemname := common.ObjToString(v["itemname"])
+							if itemname != "" {
+								itemMap[itemname] = "1"
+							}
+						}
+						for k, _ := range itemMap {
+							itemArr = append(itemArr, k)
+						}
+						citys := gconv.String(deliver_city)
+						if citys == "" {
+							citys = gconv.String(deliver_area)
+						}
+						newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
+						mid := primitive.NewObjectID()
+						newDatas["_id"] = mid
+						Mgo.Save("bidding_yg_info", newDatas)
+						newDatas["_id"] = mid.Hex()
+						Es.Save("bidding_yg", "", newDatas)
+						log.Println("保存成功", mid)
+					} else {
+						for _, v := range purchasinglist {
+							newDatas := newData
+							itemname := common.ObjToString(v["itemname"])
+							if itemname != "" {
+								citys := gconv.String(deliver_city)
+								if citys == "" {
+									citys = gconv.String(deliver_area)
+								}
+								if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" {
+									newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
+								} else {
+									newDatas["title"] = itemname + "-" + citys
+								}
+							}
+							mid := primitive.NewObjectID()
+							newDatas["_id"] = mid
+							Mgo.Save("bidding_yg_info", newDatas)
+							newDatas["_id"] = mid.Hex()
+							Es.Save("bidding_yg", "", newDatas)
+							log.Println("保存成功", mid)
 						}
 					}
-					for k, _ := range itemMap {
-						itemArr = append(itemArr, k)
-					}
-					citys := gconv.String(deliver_city)
-					if citys == "" {
-						citys = gconv.String(deliver_area)
-					}
-					newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
-					mid := primitive.NewObjectID()
-					newDatas["_id"] = mid
-					Mgo.Save("bidding_yg_info", newDatas)
-					newDatas["_id"] = mid.Hex()
-					Es.Save("bidding_yg", "", newDatas)
-				} else {
-					for _, v := range purchasinglist {
+				} else if purchasinglists, ok := newData["purchasinglist"].([]map[string]interface{}); ok {
+					purchasinglist := purchasinglists
+					if source == "user" && len(purchasinglist) > 1 {
 						newDatas := newData
-						itemname := common.ObjToString(v["itemname"])
-						if itemname != "" {
-							citys := gconv.String(deliver_city)
-							if citys == "" {
-								citys = gconv.String(deliver_area)
+						itemMap := map[string]string{}
+						itemArr := []string{}
+						for _, v := range purchasinglist {
+							itemname := common.ObjToString(v["itemname"])
+							if itemname != "" {
+								itemMap[itemname] = "1"
 							}
-							newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
 						}
+						for k, _ := range itemMap {
+							itemArr = append(itemArr, k)
+						}
+						citys := gconv.String(deliver_city)
+						if citys == "" {
+							citys = gconv.String(deliver_area)
+						}
+						newDatas["title"] = strings.Join(itemArr, "/") + "-" + citys
 						mid := primitive.NewObjectID()
 						newDatas["_id"] = mid
 						Mgo.Save("bidding_yg_info", newDatas)
 						newDatas["_id"] = mid.Hex()
 						Es.Save("bidding_yg", "", newDatas)
+						log.Println("保存成功", mid)
+					} else {
+						for _, v := range purchasinglist {
+							newDatas := newData
+							itemname := common.ObjToString(v["itemname"])
+							if itemname != "" {
+								citys := gconv.String(deliver_city)
+								if citys == "" {
+									citys = gconv.String(deliver_area)
+								}
+								if gconv.String(v["unitname"]) != "" && gconv.String(v["number"]) != "" {
+									newDatas["title"] = itemname + "-" + gconv.String(v["number"]) + gconv.String(v["unitname"]) + "-" + citys
+								} else {
+									newDatas["title"] = itemname + "-" + citys
+								}
+							}
+							mid := primitive.NewObjectID()
+							newDatas["_id"] = mid
+							Mgo.Save("bidding_yg_info", newDatas)
+							newDatas["_id"] = mid.Hex()
+							Es.Save("bidding_yg", "", newDatas)
+							log.Println("保存成功", mid)
+						}
 					}
 				}
 			} else {
@@ -173,21 +259,32 @@ func run() {
 				Mgo.Save("bidding_yg_info", newData)
 				newData["_id"] = mid.Hex()
 				Es.Save("bidding_yg", "", newData)
+				log.Println("保存成功", mid)
+			}
+			if source == "user" {
+				mapinfo := map[string]interface{}{
+					"infoid": id,
+					"stype":  "jyfb_data_over",
+				}
+				datas, _ := json.Marshal(mapinfo)
+				log.Println("信息发布成功", JyUdpAddr, "mapinfo", string(datas))
+				_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr)
 			}
 		}
 		cfg.LastId = BsonTOStringId(thisData["_id"])
 		thisData = map[string]interface{}{}
 	}
 	common.WriteSysConfig(&cfg)
+	log.Println("阳光采购结束", cfg.LastId)
 }
 
 func GetEsField(tmp map[string]interface{}) map[string]interface{} {
-	newTmp := tmp
-	for field, _ := range tmp {
+	newTmp := make(map[string]interface{})
+	for field, ftype := range BiddingField {
 		if tmp[field] != nil { //
 			if field == "purchasinglist" { //标的物处理
 				purchasinglist_new := []map[string]interface{}{}
-				if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
+				if pcl, _ := tmp[field].(primitive.A); len(pcl) > 0 {
 					for _, ls := range pcl {
 						lsm_new := make(map[string]interface{})
 						lsm := ls.(map[string]interface{})
@@ -208,7 +305,7 @@ func GetEsField(tmp map[string]interface{}) map[string]interface{} {
 			} else if field == "procurementlist" {
 				if tmp["procurementlist"] != nil {
 					var arr []interface{}
-					plist := tmp["procurementlist"].([]interface{})
+					plist := tmp["procurementlist"].(primitive.A)
 					for _, p := range plist {
 						p1 := p.(map[string]interface{})
 						p2 := make(map[string]interface{})
@@ -238,7 +335,7 @@ func GetEsField(tmp map[string]interface{}) map[string]interface{} {
 				newTmp["projectscope"] = ps
 			} else if field == "winnerorder" { //中标候选
 				winnerorder_new := []map[string]interface{}{}
-				if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
+				if winnerorder, _ := tmp[field].(primitive.A); len(winnerorder) > 0 {
 					for _, win := range winnerorder {
 						winMap_new := make(map[string]interface{})
 						winMap := win.(map[string]interface{})
@@ -262,7 +359,7 @@ func GetEsField(tmp map[string]interface{}) map[string]interface{} {
 			} else if field == "qualifies" {
 				//项目资质
 				qs := []string{}
-				if q, _ := tmp[field].([]interface{}); len(q) > 0 {
+				if q, _ := tmp[field].(primitive.A); len(q) > 0 {
 					for _, v := range q {
 						v1 := v.(map[string]interface{})
 						qs = append(qs, common.ObjToString(v1["key"]))
@@ -301,6 +398,14 @@ func GetEsField(tmp map[string]interface{}) map[string]interface{} {
 				delete(newTmp, "package")
 			} else if field == "infoformat" {
 				newTmp[field] = tmp[field]
+			} else { //其它字段判断数据类型,不正确舍弃
+				if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype && ftype != "" {
+					continue
+				} else {
+					if fieldval != "" {
+						newTmp[field] = fieldval
+					}
+				}
 			}
 		}
 	}
@@ -312,7 +417,9 @@ func InitEsBiddingField() {
 	info, _ := Bidding.Find("bidding_processing_field", map[string]interface{}{"stype": "bidding"}, nil, nil)
 	if len(info) > 0 {
 		for _, m := range info {
-			if common.IntAll(m["level"]) == 2 {
+			if common.IntAll(m["level"]) == 1 {
+				BiddingField[common.ObjToString(m["field"])] = common.ObjToString(m["ftype"])
+			} else if common.IntAll(m["level"]) == 2 {
 				pfield := common.ObjToString(m["pfield"])
 				pfieldMap := BiddingLevelField[pfield]
 				if pfieldMap == nil {
@@ -323,6 +430,7 @@ func InitEsBiddingField() {
 			}
 		}
 	}
+	log.Println("BiddingField es 一级字段数量", len(BiddingField))
 	log.Println("BiddingLevelField es 二级字段数量", len(BiddingLevelField))
 }
 
@@ -372,6 +480,18 @@ func getMethod(str string) int64 {
 			}
 		}
 	}
-
 	return 0
 }
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer common.Catch()
+	switch act {
+	case udp.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Println("processUdpMsg mapInfo:", mapInfo, err)
+	case udp.OP_NOOP:
+		ok := string(data)
+		log.Println("下节点回应:", ok)
+	}
+}