|
@@ -3,42 +3,57 @@ package main
|
|
|
import (
|
|
|
"fmt"
|
|
|
"github.com/wcc4869/common_utils/log"
|
|
|
+ "go.uber.org/zap"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
+ "reflect"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
Mgo *mongodb.MongodbSim
|
|
|
+ MgoB *mongodb.MongodbSim
|
|
|
MgoT *mongodb.MongodbSim //测试环境链接
|
|
|
MgoR *mongodb.MongodbSim
|
|
|
saveSize = 50
|
|
|
Es *elastic.Elastic
|
|
|
EsNew *elastic.Elastic
|
|
|
- EsT *elastic.Elastic
|
|
|
+ //EsT *elastic.Elastic
|
|
|
|
|
|
// 更新mongo
|
|
|
updatePool = make(chan []map[string]interface{}, 5000)
|
|
|
updateSp = make(chan bool, 5)
|
|
|
//更新es
|
|
|
- updateEsPool = make(chan []map[string]interface{}, 5000)
|
|
|
- updateEsSp = make(chan bool, 5) //保存协程
|
|
|
-
|
|
|
+ updateEsPool = make(chan []map[string]interface{}, 5000)
|
|
|
+ updateEsSp = make(chan bool, 5) //保存协程
|
|
|
+ BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
|
|
|
+ BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
|
|
|
)
|
|
|
|
|
|
-func main() {
|
|
|
- //mongodb 163
|
|
|
- Mgo = &mongodb.MongodbSim{
|
|
|
- //MongodbAddr: "172.17.189.140:27080",
|
|
|
- MongodbAddr: "127.0.0.1:27083",
|
|
|
- DbName: "qfw",
|
|
|
- Size: 10,
|
|
|
- UserName: "SJZY_RWbid_ES",
|
|
|
- Password: "SJZY@B4i4D5e6S",
|
|
|
- Direct: true,
|
|
|
+func Init() {
|
|
|
+ MgoB = &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: "172.17.189.140:27080",
|
|
|
+ //MongodbAddr: "127.0.0.1:27083",
|
|
|
+ DbName: "qfw",
|
|
|
+ Size: 10,
|
|
|
+ UserName: "SJZY_RWbid_ES",
|
|
|
+ Password: "SJZY@B4i4D5e6S",
|
|
|
+ //Direct: true,
|
|
|
}
|
|
|
- Mgo.InitPool()
|
|
|
+ MgoB.InitPool()
|
|
|
+
|
|
|
+ //mongodb 163
|
|
|
+ //Mgo = &mongodb.MongodbSim{
|
|
|
+ // //MongodbAddr: "172.17.189.140:27080",
|
|
|
+ // MongodbAddr: "127.0.0.1:27083",
|
|
|
+ // DbName: "qfw",
|
|
|
+ // Size: 10,
|
|
|
+ // UserName: "SJZY_RWbid_ES",
|
|
|
+ // Password: "SJZY@B4i4D5e6S",
|
|
|
+ // Direct: true,
|
|
|
+ //}
|
|
|
+ //Mgo.InitPool()
|
|
|
|
|
|
//85
|
|
|
//MgoR = &mongodb.MongodbSim{
|
|
@@ -50,7 +65,7 @@ func main() {
|
|
|
//}
|
|
|
//MgoR.InitPool()
|
|
|
|
|
|
- ////测试环境MongoDB
|
|
|
+ //测试环境MongoDB
|
|
|
//MgoT = &mongodb.MongodbSim{
|
|
|
// //MongodbAddr: "172.17.189.140:27080",
|
|
|
// MongodbAddr: "192.168.3.206:27002",
|
|
@@ -62,7 +77,7 @@ func main() {
|
|
|
//}
|
|
|
//MgoT.InitPool()
|
|
|
|
|
|
- // 测试环境es
|
|
|
+ ////测试环境es
|
|
|
//Es = &elastic.Elastic{
|
|
|
// S_esurl: "http://192.168.3.149:9201",
|
|
|
// //S_esurl: "http://172.17.4.184:19805",
|
|
@@ -74,8 +89,8 @@ func main() {
|
|
|
|
|
|
//es
|
|
|
Es = &elastic.Elastic{
|
|
|
- S_esurl: "http://127.0.0.1:19908",
|
|
|
- //S_esurl: "http://172.17.4.184:19908",
|
|
|
+ //S_esurl: "http://127.0.0.1:19908",
|
|
|
+ S_esurl: "http://172.17.4.184:19908",
|
|
|
I_size: 5,
|
|
|
Username: "jybid",
|
|
|
Password: "Top2023_JEB01i@31",
|
|
@@ -84,19 +99,24 @@ func main() {
|
|
|
|
|
|
//es 新集群
|
|
|
EsNew = &elastic.Elastic{
|
|
|
- S_esurl: "http://127.0.0.1:19905",
|
|
|
- //S_esurl: "http://172.17.4.184:19905",
|
|
|
+ //S_esurl: "http://127.0.0.1:19905",
|
|
|
+ S_esurl: "http://172.17.4.184:19905",
|
|
|
I_size: 5,
|
|
|
Username: "jybid",
|
|
|
Password: "Top2023_JEB01i@31",
|
|
|
}
|
|
|
EsNew.InitElasticSize()
|
|
|
+}
|
|
|
|
|
|
- //go updateMethod() //更新mongodb
|
|
|
- //go updateEsMethod() //更新es
|
|
|
+func main() {
|
|
|
+ Init()
|
|
|
+ InitEsBiddingField()
|
|
|
+ go updateMethod() //更新mongodb
|
|
|
+ go updateEsMethod() //更新es
|
|
|
//go updateProjectEsMethod()
|
|
|
//taskRunProject()
|
|
|
- taskRunBidding()
|
|
|
+ //taskRunBidding()
|
|
|
+ dealBidding() //正式环境bidding数据处理
|
|
|
//dealBiddingTest() // 测试环境数据处理
|
|
|
|
|
|
//updateProject()
|
|
@@ -105,6 +125,29 @@ func main() {
|
|
|
<-c
|
|
|
}
|
|
|
|
|
|
+func InitEsBiddingField() {
|
|
|
+ now := time.Now()
|
|
|
+ info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
|
|
|
+ if len(*info) > 0 {
|
|
|
+ for _, m := range *info {
|
|
|
+ if util.IntAll(m["level"]) == 1 {
|
|
|
+ BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
|
|
|
+ } else if util.IntAll(m["level"]) == 2 {
|
|
|
+ pfield := util.ObjToString(m["pfield"])
|
|
|
+ pfieldMap := BiddingLevelField[pfield]
|
|
|
+ if pfieldMap == nil {
|
|
|
+ pfieldMap = make(map[string]string, 0)
|
|
|
+ }
|
|
|
+ pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
|
|
|
+ BiddingLevelField[pfield] = pfieldMap
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
|
|
|
+ log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
|
|
|
+ log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
|
|
|
+}
|
|
|
+
|
|
|
// taskRun 更新es 省市区三个字段
|
|
|
func taskRunBidding() {
|
|
|
defer util.Catch()
|
|
@@ -404,6 +447,136 @@ func dealResult() {
|
|
|
log.Info("Run Over...Count:", log.Int("count", count))
|
|
|
}
|
|
|
|
|
|
+// dealBidding 处理bidding数据
|
|
|
+func dealBidding() {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ //where := map[string]interface{}{
|
|
|
+ // "title": "2020年12月采购意向项目-3",
|
|
|
+ //}
|
|
|
+
|
|
|
+ it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
|
|
|
+
|
|
|
+ fmt.Println("taskRun 开始")
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%10000 == 0 {
|
|
|
+ log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ update := map[string]interface{}{}
|
|
|
+ esUpdate := map[string]interface{}{}
|
|
|
+ // 2.更新中标单位,中标金额
|
|
|
+ //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
+ // update["tag_topinformation"] = tag_topinformation
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
|
|
|
+ // update["property_form"] = property_form
|
|
|
+ //}
|
|
|
+
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ //fmt.Println(biddingID)
|
|
|
+ /**
|
|
|
+ "s_subscopeclass" : "其它",
|
|
|
+ "s_topscopeclass" : "其它",
|
|
|
+ "subscopeclass" : [
|
|
|
+ "其它"
|
|
|
+ ],
|
|
|
+ "topscopeclass" : [
|
|
|
+ "其它"
|
|
|
+ ],
|
|
|
+ */
|
|
|
+ // 行业分类默认值
|
|
|
+ if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
|
|
|
+ update["topscopeclass"] = []string{"其它"}
|
|
|
+ update["s_topscopeclass"] = "其它"
|
|
|
+
|
|
|
+ esUpdate["topscopeclass"] = []string{"其它"}
|
|
|
+ esUpdate["s_topscopeclass"] = "其它"
|
|
|
+ }
|
|
|
+
|
|
|
+ if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
|
|
|
+ update["subscopeclass"] = []string{"其它"}
|
|
|
+ update["s_subscopeclass"] = "其它"
|
|
|
+
|
|
|
+ esUpdate["subscopeclass"] = []string{"其它"}
|
|
|
+ esUpdate["s_subscopeclass"] = "其它"
|
|
|
+ }
|
|
|
+
|
|
|
+ //procurementlist 处理预计采购时间
|
|
|
+ if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
|
|
|
+ field := "procurementlist"
|
|
|
+ if tmp[field] != nil {
|
|
|
+ if field == "procurementlist" {
|
|
|
+ if tmp["procurementlist"] != nil {
|
|
|
+ var arr []interface{}
|
|
|
+ plist := tmp["procurementlist"].([]interface{})
|
|
|
+ for _, p := range plist {
|
|
|
+ p1 := p.(map[string]interface{})
|
|
|
+ p2 := make(map[string]interface{})
|
|
|
+ for k, v := range BiddingLevelField[field] {
|
|
|
+ if k == "projectname" && util.ObjToString(p1[k]) == "" {
|
|
|
+ p2[k] = util.ObjToString(tmp["projectname"])
|
|
|
+ } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
|
|
|
+ p2[k] = util.ObjToString(tmp["buyer"])
|
|
|
+ } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
|
|
|
+ res := getMethod(util.ObjToString(p1[k]))
|
|
|
+ if res != 0 {
|
|
|
+ p2[k] = res
|
|
|
+ }
|
|
|
+ } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
|
|
|
+ p2[k] = p1[k]
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ arr = append(arr, p2)
|
|
|
+ }
|
|
|
+ if len(arr) > 0 {
|
|
|
+ esUpdate[field] = arr
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(update) > 0 {
|
|
|
+ //fmt.Println("aaaaa", biddingID)
|
|
|
+ //更新mongo
|
|
|
+ //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
+ //更新MongoDB
|
|
|
+ updatePool <- []map[string]interface{}{
|
|
|
+ {"_id": tmp["_id"]},
|
|
|
+ {"$set": update},
|
|
|
+ }
|
|
|
+
|
|
|
+ //2.es 项目 更新字段
|
|
|
+ //err := Es.UpdateDocument("bidding", biddingID, update)
|
|
|
+ //if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
+ // log.Info("bidding es update err", err, biddingID)
|
|
|
+ //}
|
|
|
+ //// 更新es
|
|
|
+ //updateEsPool <- []map[string]interface{}{
|
|
|
+ // {"_id": biddingID},
|
|
|
+ // update,
|
|
|
+ //}
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新Es 数据
|
|
|
+ if len(esUpdate) > 0 {
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": biddingID},
|
|
|
+ esUpdate,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count))
|
|
|
+}
|
|
|
+
|
|
|
// dealBiddingTest 处理测试环境数据
|
|
|
func dealBiddingTest() {
|
|
|
defer util.Catch()
|
|
@@ -421,28 +594,104 @@ func dealBiddingTest() {
|
|
|
|
|
|
update := map[string]interface{}{}
|
|
|
// 2.更新中标单位,中标金额
|
|
|
- if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
- update["tag_topinformation"] = tag_topinformation
|
|
|
+ //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
+ // update["tag_topinformation"] = tag_topinformation
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
|
|
|
+ // update["property_form"] = property_form
|
|
|
+ //}
|
|
|
+
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ /**
|
|
|
+ "s_subscopeclass" : "其它",
|
|
|
+ "s_topscopeclass" : "其它",
|
|
|
+ "subscopeclass" : [
|
|
|
+ "其它"
|
|
|
+ ],
|
|
|
+ "topscopeclass" : [
|
|
|
+ "其它"
|
|
|
+ ],
|
|
|
+ */
|
|
|
+ // 行业分类默认值
|
|
|
+ if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
|
|
|
+ update["topscopeclass"] = []string{"其它"}
|
|
|
+ update["s_topscopeclass"] = "其它"
|
|
|
}
|
|
|
|
|
|
- if property_form, ok := tmp["property_form"]; ok && property_form != nil {
|
|
|
- update["property_form"] = property_form
|
|
|
+ if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
|
|
|
+ update["subscopeclass"] = []string{"其它"}
|
|
|
+ update["s_subscopeclass"] = "其它"
|
|
|
}
|
|
|
|
|
|
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
|
|
|
+ update["topscopeclass"] = []string{"其它"}
|
|
|
+ update["s_topscopeclass"] = "其它"
|
|
|
+ }
|
|
|
+
|
|
|
+ if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
|
|
|
+ update["subscopeclass"] = []string{"其它"}
|
|
|
+ update["s_subscopeclass"] = "其它"
|
|
|
+ }
|
|
|
+
|
|
|
+ //procurementlist 处理预计采购时间
|
|
|
+ if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
|
|
|
+ for field, _ := range BiddingField {
|
|
|
+ if tmp[field] != nil {
|
|
|
+ if field == "procurementlist" {
|
|
|
+ if tmp["procurementlist"] != nil {
|
|
|
+ var arr []interface{}
|
|
|
+ plist := tmp["procurementlist"].([]interface{})
|
|
|
+ for _, p := range plist {
|
|
|
+ p1 := p.(map[string]interface{})
|
|
|
+ p2 := make(map[string]interface{})
|
|
|
+ for k, v := range BiddingLevelField[field] {
|
|
|
+ if k == "projectname" && util.ObjToString(p1[k]) == "" {
|
|
|
+ p2[k] = util.ObjToString(tmp["projectname"])
|
|
|
+ } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
|
|
|
+ p2[k] = util.ObjToString(tmp["buyer"])
|
|
|
+ } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
|
|
|
+ res := getMethod(util.ObjToString(p1[k]))
|
|
|
+ if res != 0 {
|
|
|
+ p2[k] = res
|
|
|
+ }
|
|
|
+ } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
|
|
|
+ p2[k] = p1[k]
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ arr = append(arr, p2)
|
|
|
+ }
|
|
|
+ if len(arr) > 0 {
|
|
|
+ update[field] = arr
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if len(update) > 0 {
|
|
|
- //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
+ fmt.Println("aaaaa", biddingID)
|
|
|
+ //更新mongo
|
|
|
+ //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
+
|
|
|
+ //更新MongoDB
|
|
|
+ //updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": tmp["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ //}
|
|
|
+
|
|
|
//2.es 项目 更新字段
|
|
|
//err := Es.UpdateDocument("bidding", biddingID, update)
|
|
|
//if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
// log.Info("bidding es update err", err, biddingID)
|
|
|
//}
|
|
|
// 更新es
|
|
|
- updateEsPool <- []map[string]interface{}{
|
|
|
- {"_id": biddingID},
|
|
|
- update,
|
|
|
- }
|
|
|
+ //updateEsPool <- []map[string]interface{}{
|
|
|
+ // {"_id": biddingID},
|
|
|
+ // update,
|
|
|
+ //}
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -464,7 +713,7 @@ func updateMethod() {
|
|
|
defer func() {
|
|
|
<-updateSp
|
|
|
}()
|
|
|
- Mgo.UpdateBulk("bidding", arru...)
|
|
|
+ MgoB.UpdateBulk("bidding", arru...)
|
|
|
}(arru)
|
|
|
arru = make([][]map[string]interface{}, saveSize)
|
|
|
indexu = 0
|
|
@@ -476,7 +725,7 @@ func updateMethod() {
|
|
|
defer func() {
|
|
|
<-updateSp
|
|
|
}()
|
|
|
- Mgo.UpdateBulk("bidding", arru...)
|
|
|
+ MgoB.UpdateBulk("bidding", arru...)
|
|
|
}(arru[:indexu])
|
|
|
arru = make([][]map[string]interface{}, saveSize)
|
|
|
indexu = 0
|
|
@@ -501,7 +750,7 @@ func updateEsMethod() {
|
|
|
<-updateEsSp
|
|
|
}()
|
|
|
Es.UpdateBulk("bidding", arru...)
|
|
|
- //EsNew.UpdateBulk("bidding", arru...)
|
|
|
+ EsNew.UpdateBulk("bidding", arru...)
|
|
|
}(arru)
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|
|
@@ -514,7 +763,7 @@ func updateEsMethod() {
|
|
|
<-updateEsSp
|
|
|
}()
|
|
|
Es.UpdateBulk("bidding", arru...)
|
|
|
- //EsNew.UpdateBulk("bidding", arru...)
|
|
|
+ EsNew.UpdateBulk("bidding", arru...)
|
|
|
}(arru[:indexu])
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|