|
@@ -1,20 +1,25 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"fmt"
|
|
|
"github.com/wcc4869/common_utils/log"
|
|
|
+ "go.mongodb.org/mongo-driver/bson"
|
|
|
+ "go.mongodb.org/mongo-driver/mongo/options"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
Mgo *mongodb.MongodbSim
|
|
|
+ MgoT *mongodb.MongodbSim //测试环境链接
|
|
|
+ MgoR *mongodb.MongodbSim
|
|
|
saveSize = 50
|
|
|
Es *elastic.Elastic
|
|
|
EsNew *elastic.Elastic
|
|
|
+ EsT *elastic.Elastic
|
|
|
|
|
|
// 更新mongo
|
|
|
updatePool = make(chan []map[string]interface{}, 5000)
|
|
@@ -27,16 +32,48 @@ var (
|
|
|
|
|
|
func main() {
|
|
|
//mongodb
|
|
|
- Mgo = &mongodb.MongodbSim{
|
|
|
- MongodbAddr: "172.17.189.140:27080",
|
|
|
- //MongodbAddr: "127.0.0.1:27083",
|
|
|
- DbName: "qfw",
|
|
|
- Size: 10,
|
|
|
- UserName: "SJZY_RWbid_ES",
|
|
|
- Password: "SJZY@B4i4D5e6S",
|
|
|
- //Direct: true,
|
|
|
+ //Mgo = &mongodb.MongodbSim{
|
|
|
+ // //MongodbAddr: "172.17.189.140:27080",
|
|
|
+ // MongodbAddr: "127.0.0.1:27083",
|
|
|
+ // DbName: "qfw",
|
|
|
+ // Size: 10,
|
|
|
+ // UserName: "SJZY_RWbid_ES",
|
|
|
+ // Password: "SJZY@B4i4D5e6S",
|
|
|
+ // Direct: true,
|
|
|
+ //}
|
|
|
+ //Mgo.InitPool()
|
|
|
+
|
|
|
+ //85
|
|
|
+ MgoR = &mongodb.MongodbSim{
|
|
|
+ //MongodbAddr: "127.0.0.1:27080",
|
|
|
+ MongodbAddr: "172.17.4.85:27080",
|
|
|
+ DbName: "qfw",
|
|
|
+ Size: 10,
|
|
|
+ //Direct: true,
|
|
|
}
|
|
|
- Mgo.InitPool()
|
|
|
+ MgoR.InitPool()
|
|
|
+
|
|
|
+ ////测试环境MongoDB
|
|
|
+ //MgoT = &mongodb.MongodbSim{
|
|
|
+ // //MongodbAddr: "172.17.189.140:27080",
|
|
|
+ // MongodbAddr: "192.168.3.206:27002",
|
|
|
+ // DbName: "qfw_data",
|
|
|
+ // Size: 10,
|
|
|
+ // UserName: "root",
|
|
|
+ // Password: "root",
|
|
|
+ // //Direct: true,
|
|
|
+ //}
|
|
|
+ //MgoT.InitPool()
|
|
|
+
|
|
|
+ // 测试环境es
|
|
|
+ //Es = &elastic.Elastic{
|
|
|
+ // S_esurl: "http://192.168.3.149:9201",
|
|
|
+ // //S_esurl: "http://172.17.4.184:19805",
|
|
|
+ // I_size: 5,
|
|
|
+ // Username: "",
|
|
|
+ // Password: "",
|
|
|
+ //}
|
|
|
+ //Es.InitElasticSize()
|
|
|
|
|
|
//es
|
|
|
Es = &elastic.Elastic{
|
|
@@ -48,21 +85,21 @@ func main() {
|
|
|
}
|
|
|
Es.InitElasticSize()
|
|
|
|
|
|
- //es 新集群
|
|
|
- EsNew = &elastic.Elastic{
|
|
|
- //S_esurl: "http://127.0.0.1:19905",
|
|
|
- S_esurl: "http://172.17.4.184:19905",
|
|
|
- I_size: 5,
|
|
|
- Username: "jybid",
|
|
|
- Password: "Top2023_JEB01i@31",
|
|
|
- }
|
|
|
- EsNew.InitElasticSize()
|
|
|
+ // es 新集群
|
|
|
+ //EsNew = &elastic.Elastic{
|
|
|
+ // S_esurl: "http://127.0.0.1:19905",
|
|
|
+ // //S_esurl: "http://172.17.4.184:19905",
|
|
|
+ // I_size: 5,
|
|
|
+ // Username: "jybid",
|
|
|
+ // Password: "Top2023_JEB01i@31",
|
|
|
+ //}
|
|
|
+ //EsNew.InitElasticSize()
|
|
|
|
|
|
- go updateMethod() //更新mongodb
|
|
|
+ //go updateMethod() //更新mongodb
|
|
|
go updateEsMethod() //更新es
|
|
|
|
|
|
taskRun()
|
|
|
-
|
|
|
+ //dealDataTest()// 测试环境数据处理
|
|
|
fmt.Println(111)
|
|
|
c := make(chan bool, 1)
|
|
|
<-c
|
|
@@ -71,100 +108,411 @@ func main() {
|
|
|
// taskRun 更新es 省市区三个字段
|
|
|
func taskRun() {
|
|
|
defer util.Catch()
|
|
|
- sess := Mgo.GetMgoConn()
|
|
|
- defer Mgo.DestoryMongoConn(sess)
|
|
|
+ sess := MgoR.GetMgoConn()
|
|
|
+ defer MgoR.DestoryMongoConn(sess)
|
|
|
|
|
|
- pool := make(chan bool, 10) //处理协程
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
+ //pool := make(chan bool, 2) //处理协程
|
|
|
+ //wg := &sync.WaitGroup{}
|
|
|
|
|
|
//查询条件
|
|
|
q := map[string]interface{}{
|
|
|
- //"_id": map[string]interface{}{
|
|
|
- // "$gt": mongodb.StringTOBsonId("652423800000000000000000"),
|
|
|
- // "$lte": mongodb.StringTOBsonId("6543c7800000000000000000"),
|
|
|
- //},
|
|
|
- "comeintime": map[string]interface{}{
|
|
|
- "$gt": 1669824000,
|
|
|
- //"$lte": 1669864950,
|
|
|
- "$lte": 1702265941,
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
|
|
|
+ //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
|
|
|
+
|
|
|
+ //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
|
|
|
+ "$lte": mongodb.StringTOBsonId("624949c64f7bde5444ed7c6c"),
|
|
|
},
|
|
|
- "site": "国家能源e购",
|
|
|
+ //"comeintime": map[string]interface{}{
|
|
|
+ // "$gt": 1669824000,
|
|
|
+ // //"$lte": 1669864950,
|
|
|
+ // "$lte": 1702265941,
|
|
|
+ //},
|
|
|
+ //"site": "国家能源e购",
|
|
|
}
|
|
|
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
|
|
|
- it := sess.DB("qfw").C("bidding").Find(&q).Select(&selected).Iter()
|
|
|
|
|
|
- fmt.Println("开始")
|
|
|
+ //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
|
|
|
+ it := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
|
|
|
+
|
|
|
+ fmt.Println("taskRun 开始")
|
|
|
count := 0
|
|
|
- realNum := 0
|
|
|
+ //realNum := 0
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
if count%10000 == 0 {
|
|
|
log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
|
}
|
|
|
|
|
|
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
|
|
|
- tmp = make(map[string]interface{})
|
|
|
+ esUpdate := make(map[string]interface{})
|
|
|
+ if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
|
|
|
+ esUpdate["subtitle_projectname"] = subtitle_projectname
|
|
|
+ }
|
|
|
+
|
|
|
+ //if len(esUpdate) > 0 {
|
|
|
+ // id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ // err := Es.UpdateDocument("projectset", id, esUpdate)
|
|
|
+ // if err != nil {
|
|
|
+ // if strings.Contains(err.Error(), "Document not updated:") {
|
|
|
+ // continue
|
|
|
+ // } else {
|
|
|
+ // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+ if len(esUpdate) > 0 {
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ esUpdate,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+ //// 针对存量数据,重复数据不进索引
|
|
|
+ //if util.IntAll(tmp["extracttype"]) == -1 {
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ ////针对产权数据,暂时不入es 索引库
|
|
|
+ //if util.IntAll(tmp["infoformat"]) == 3 {
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+ //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
|
|
|
+ //channel := util.ObjToString(tmp["channel"])
|
|
|
+ //if channel != "紧急直接零星采购公告" {
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+
|
|
|
+ //realNum++
|
|
|
+ //pool <- true
|
|
|
+ //wg.Add(1)
|
|
|
+ //go func(tmp map[string]interface{}) {
|
|
|
+ // defer func() {
|
|
|
+ // <-pool
|
|
|
+ // wg.Done()
|
|
|
+ // }()
|
|
|
+
|
|
|
+ ////1.更新MongoDB
|
|
|
+ //update := map[string]interface{}{
|
|
|
+ // "tag_set": tmp["tag_set"],
|
|
|
+ // "tag_topinformation": tmp["tag_topinformation"],
|
|
|
+ // "tag_subinformation": tmp["tag_subinformation"],
|
|
|
+ //}
|
|
|
+ //if len(update) > 0 {
|
|
|
+ // //更新MongoDB
|
|
|
+ // updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": tmp["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+ //2.es 更新字段
|
|
|
+ //esUpdate := make(map[string]interface{})
|
|
|
+ //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
|
|
|
+ // esUpdate["autoid"] = autoid
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if len(esUpdate) > 0 {
|
|
|
+ // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
|
|
|
+ // if err != nil {
|
|
|
+ // if strings.Contains(err.Error(), "Document not updated:") {
|
|
|
+ // return
|
|
|
+ // } else {
|
|
|
+ // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+ //if err != nil {
|
|
|
+ // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
+ //}
|
|
|
+ //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
|
|
|
+ // esUpdate["tag_set"] = tag_set
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
+ // esUpdate["tag_topinformation"] = tag_topinformation
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
|
|
|
+ // esUpdate["tag_subinformation"] = tag_subinformation
|
|
|
+ //}
|
|
|
+
|
|
|
+ //if len(esUpdate) > 0 {
|
|
|
+ // // 更新es
|
|
|
+ // updateEsPool <- []map[string]interface{}{
|
|
|
+ // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ // esUpdate,
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+ //}(tmp)
|
|
|
+ //tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ //wg.Wait()
|
|
|
+
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count))
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// dealData 正式环境,同步合同期限
|
|
|
+func dealData() {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ //where := map[string]interface{}{
|
|
|
+ // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
|
|
|
+ //}
|
|
|
+ selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
|
|
|
+ it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
|
|
|
+ count := 0
|
|
|
+ realNum := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ idStr := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ update := make(map[string]interface{})
|
|
|
+ if tmp["signaturedate"] != nil {
|
|
|
+ update["signaturedate"] = tmp["signaturedate"]
|
|
|
+ }
|
|
|
+ if tmp["contractperiod"] != nil {
|
|
|
+ update["contractperiod"] = tmp["contractperiod"]
|
|
|
+ }
|
|
|
+ if tmp["expiredate"] != nil {
|
|
|
+ update["expiredate"] = tmp["expiredate"]
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(update) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
+ //bidding 表
|
|
|
+ if idStr > "5a862e7040d2d9bbe88e3b1f" {
|
|
|
+ bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
+ data := *bidding
|
|
|
+ Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
+
|
|
|
+ // 针对存量数据,重复数据不进索引
|
|
|
+ if util.IntAll(data["extracttype"]) == -1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ //bidding_back
|
|
|
+ bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
+ data := *bidding
|
|
|
+ Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
|
|
|
+ // 针对存量数据,重复数据不进索引
|
|
|
+ if util.IntAll(data["extracttype"]) == -1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ realNum++
|
|
|
+
|
|
|
+ //2.es 更新字段
|
|
|
+ esUpdate := update
|
|
|
+ esUpdate["id"] = idStr
|
|
|
+ if len(esUpdate) > 0 {
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ esUpdate,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
|
|
|
+func dealResult() {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoR.GetMgoConn()
|
|
|
+ defer MgoR.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
|
|
|
+ "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
|
|
|
+ },
|
|
|
+ "subtype": "合同",
|
|
|
+ }
|
|
|
+ selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
|
|
|
+ it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ idStr := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ update := make(map[string]interface{})
|
|
|
+ if tmp["signaturedate"] != nil {
|
|
|
+ update["signaturedate"] = tmp["signaturedate"]
|
|
|
+ }
|
|
|
+ if tmp["contractperiod"] != nil {
|
|
|
+ update["contractperiod"] = tmp["contractperiod"]
|
|
|
+ }
|
|
|
+ if tmp["expiredate"] != nil {
|
|
|
+ update["expiredate"] = tmp["expiredate"]
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(update) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
+ data := *bidding
|
|
|
+ Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
+
|
|
|
// 针对存量数据,重复数据不进索引
|
|
|
- if util.IntAll(tmp["extracttype"]) == -1 {
|
|
|
+ if util.IntAll(data["extracttype"]) == -1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- //针对产权数据,暂时不入es 索引库
|
|
|
- if util.IntAll(tmp["infoformat"]) == 3 {
|
|
|
+ //2.es 更新字段
|
|
|
+ esUpdate := update
|
|
|
+ esUpdate["id"] = idStr
|
|
|
+ if len(esUpdate) > 0 {
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ esUpdate,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count))
|
|
|
+}
|
|
|
+
|
|
|
+// dealDataTest 处理测试环境数据
|
|
|
+func dealDataTest() {
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoT.GetMgoConn()
|
|
|
+ defer MgoT.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
|
|
|
+ "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ //where := map[string]interface{}{
|
|
|
+ // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
|
|
|
+ //}
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ coll := sess.M.C.Database("qfw_data").Collection("bidding")
|
|
|
+ find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
|
|
|
+ cur, err := coll.Find(ctx, where, find)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ ///////
|
|
|
+ selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
|
|
|
+ //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
|
|
|
+ count := 0
|
|
|
+ realNum := 0
|
|
|
+ for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
|
|
|
+ if cur != nil {
|
|
|
+ cur.Decode(&tmp)
|
|
|
+ }
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
|
+ }
|
|
|
+ idStr := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+
|
|
|
+ data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
|
|
|
+
|
|
|
+ if len(*data) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
- //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
|
|
|
- channel := util.ObjToString(tmp["channel"])
|
|
|
- if channel != "紧急直接零星采购公告" {
|
|
|
+ update := make(map[string]interface{})
|
|
|
+ if (*data)["signaturedate"] != nil {
|
|
|
+ update["signaturedate"] = (*data)["signaturedate"]
|
|
|
+ }
|
|
|
+ if (*data)["contractperiod"] != nil {
|
|
|
+ update["contractperiod"] = (*data)["contractperiod"]
|
|
|
+ }
|
|
|
+ if (*data)["expiredate"] != nil {
|
|
|
+ update["expiredate"] = (*data)["expiredate"]
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(update) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ fmt.Println(idStr)
|
|
|
+ MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
+ //bidding 表
|
|
|
+ //if idStr > "5a862e7040d2d9bbe88e3b1f" {
|
|
|
+ // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
+ // data := *bidding
|
|
|
+ // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
+ //
|
|
|
+ // // 针对存量数据,重复数据不进索引
|
|
|
+ // if util.IntAll(data["extracttype"]) == -1 {
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // continue
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ //} else {
|
|
|
+ // //bidding_back
|
|
|
+ // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
+ // data := *bidding
|
|
|
+ // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
|
|
|
+ // // 针对存量数据,重复数据不进索引
|
|
|
+ // if util.IntAll(data["extracttype"]) == -1 {
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // continue
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
realNum++
|
|
|
- fmt.Println(mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
- pool <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-pool
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
-
|
|
|
- //1.更新MongoDB
|
|
|
- update := map[string]interface{}{
|
|
|
- "toptype": "结果",
|
|
|
- "subtype": "成交",
|
|
|
- }
|
|
|
- if len(update) > 0 {
|
|
|
- //更新MongoDB
|
|
|
- updatePool <- []map[string]interface{}{
|
|
|
- {"_id": tmp["_id"]},
|
|
|
- {"$set": update},
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- //2.es 更新字段
|
|
|
- esUpdate := map[string]interface{}{
|
|
|
- "toptype": "结果",
|
|
|
- "subtype": "成交",
|
|
|
- "id": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
- }
|
|
|
- if len(esUpdate) > 0 {
|
|
|
- // 更新es
|
|
|
- updateEsPool <- []map[string]interface{}{
|
|
|
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
- esUpdate,
|
|
|
- }
|
|
|
+ //2.es 更新字段
|
|
|
+ esUpdate := update
|
|
|
+ esUpdate["id"] = idStr
|
|
|
+ if len(esUpdate) > 0 {
|
|
|
+ fmt.Println(idStr)
|
|
|
+ // 更新es
|
|
|
+ updateEsPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ esUpdate,
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ //err := Es.UpdateDocument("bidding", idStr, update)
|
|
|
+ //if err != nil {
|
|
|
+ // log.Error("es update", err)
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //err = EsNew.UpdateDocument("bidding", idStr, update)
|
|
|
+ //if err != nil {
|
|
|
+ // log.Error("esNew update", err)
|
|
|
+ //}
|
|
|
|
|
|
- }(tmp)
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- wg.Wait()
|
|
|
|
|
|
- log.Info("Run Over...Count1:", log.Int("count", count), log.Int("realNum", realNum))
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
|
|
|
|
|
|
- fmt.Println("结束")
|
|
|
}
|
|
|
|
|
|
// updateMethod 更新MongoDB
|
|
@@ -218,8 +566,8 @@ func updateEsMethod() {
|
|
|
defer func() {
|
|
|
<-updateEsSp
|
|
|
}()
|
|
|
- Es.UpdateBulk("bidding", arru...)
|
|
|
- EsNew.UpdateBulk("bidding", arru...)
|
|
|
+ Es.UpdateBulk("projectset", arru...)
|
|
|
+ //EsNew.UpdateBulk("bidding", arru...)
|
|
|
}(arru)
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|
|
@@ -231,8 +579,8 @@ func updateEsMethod() {
|
|
|
defer func() {
|
|
|
<-updateEsSp
|
|
|
}()
|
|
|
- Es.UpdateBulk("bidding", arru...)
|
|
|
- EsNew.UpdateBulk("bidding", arru...)
|
|
|
+ Es.UpdateBulk("projectset", arru...)
|
|
|
+ //EsNew.UpdateBulk("bidding", arru...)
|
|
|
}(arru[:indexu])
|
|
|
arru = make([][]map[string]interface{}, 200)
|
|
|
indexu = 0
|