|
@@ -1,11 +1,8 @@
|
|
package main
|
|
package main
|
|
|
|
|
|
import (
|
|
import (
|
|
- "context"
|
|
|
|
"fmt"
|
|
"fmt"
|
|
"github.com/wcc4869/common_utils/log"
|
|
"github.com/wcc4869/common_utils/log"
|
|
- "go.mongodb.org/mongo-driver/bson"
|
|
|
|
- "go.mongodb.org/mongo-driver/mongo/options"
|
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
@@ -33,13 +30,13 @@ var (
|
|
func main() {
|
|
func main() {
|
|
//mongodb 163
|
|
//mongodb 163
|
|
//Mgo = &mongodb.MongodbSim{
|
|
//Mgo = &mongodb.MongodbSim{
|
|
- // MongodbAddr: "172.17.189.140:27080",
|
|
|
|
- // //MongodbAddr: "127.0.0.1:27083",
|
|
|
|
- // DbName: "qfw",
|
|
|
|
- // Size: 10,
|
|
|
|
- // UserName: "SJZY_RWbid_ES",
|
|
|
|
- // Password: "SJZY@B4i4D5e6S",
|
|
|
|
- // //Direct: true,
|
|
|
|
|
|
+ // //MongodbAddr: "172.17.189.140:27080",
|
|
|
|
+ // MongodbAddr: "127.0.0.1:27083",
|
|
|
|
+ // DbName: "qfw",
|
|
|
|
+ // Size: 10,
|
|
|
|
+ // UserName: "SJZY_RWbid_ES",
|
|
|
|
+ // Password: "SJZY@B4i4D5e6S",
|
|
|
|
+ // Direct: true,
|
|
//}
|
|
//}
|
|
//Mgo.InitPool()
|
|
//Mgo.InitPool()
|
|
|
|
|
|
@@ -54,41 +51,41 @@ func main() {
|
|
//MgoR.InitPool()
|
|
//MgoR.InitPool()
|
|
|
|
|
|
////测试环境MongoDB
|
|
////测试环境MongoDB
|
|
- //MgoT = &mongodb.MongodbSim{
|
|
|
|
- // //MongodbAddr: "172.17.189.140:27080",
|
|
|
|
- // MongodbAddr: "192.168.3.206:27002",
|
|
|
|
- // DbName: "qfw_data",
|
|
|
|
- // Size: 10,
|
|
|
|
- // UserName: "root",
|
|
|
|
- // Password: "root",
|
|
|
|
- // //Direct: true,
|
|
|
|
- //}
|
|
|
|
- //MgoT.InitPool()
|
|
|
|
|
|
+ MgoT = &mongodb.MongodbSim{
|
|
|
|
+ //MongodbAddr: "172.17.189.140:27080",
|
|
|
|
+ MongodbAddr: "192.168.3.206:27002",
|
|
|
|
+ DbName: "qfw_data",
|
|
|
|
+ Size: 10,
|
|
|
|
+ UserName: "root",
|
|
|
|
+ Password: "root",
|
|
|
|
+ //Direct: true,
|
|
|
|
+ }
|
|
|
|
+ MgoT.InitPool()
|
|
|
|
|
|
// 测试环境es
|
|
// 测试环境es
|
|
- //Es = &elastic.Elastic{
|
|
|
|
- // S_esurl: "http://192.168.3.149:9201",
|
|
|
|
- // //S_esurl: "http://172.17.4.184:19805",
|
|
|
|
- // I_size: 5,
|
|
|
|
- // Username: "",
|
|
|
|
- // Password: "",
|
|
|
|
- //}
|
|
|
|
- //Es.InitElasticSize()
|
|
|
|
-
|
|
|
|
- //es
|
|
|
|
Es = &elastic.Elastic{
|
|
Es = &elastic.Elastic{
|
|
- //S_esurl: "http://127.0.0.1:19908",
|
|
|
|
- S_esurl: "http://172.17.4.184:19908",
|
|
|
|
|
|
+ S_esurl: "http://192.168.3.149:9201",
|
|
|
|
+ //S_esurl: "http://172.17.4.184:19805",
|
|
I_size: 5,
|
|
I_size: 5,
|
|
- Username: "jybid",
|
|
|
|
- Password: "Top2023_JEB01i@31",
|
|
|
|
|
|
+ Username: "",
|
|
|
|
+ Password: "",
|
|
}
|
|
}
|
|
Es.InitElasticSize()
|
|
Es.InitElasticSize()
|
|
|
|
|
|
|
|
+ //es
|
|
|
|
+ //Es = &elastic.Elastic{
|
|
|
|
+ // S_esurl: "http://127.0.0.1:19908",
|
|
|
|
+ // //S_esurl: "http://172.17.4.184:19908",
|
|
|
|
+ // I_size: 5,
|
|
|
|
+ // Username: "jybid",
|
|
|
|
+ // Password: "Top2023_JEB01i@31",
|
|
|
|
+ //}
|
|
|
|
+ //Es.InitElasticSize()
|
|
|
|
+
|
|
// es 新集群
|
|
// es 新集群
|
|
//EsNew = &elastic.Elastic{
|
|
//EsNew = &elastic.Elastic{
|
|
- // //S_esurl: "http://127.0.0.1:19905",
|
|
|
|
- // S_esurl: "http://172.17.4.184:19905",
|
|
|
|
|
|
+ // S_esurl: "http://127.0.0.1:19905",
|
|
|
|
+ // //S_esurl: "http://172.17.4.184:19905",
|
|
// I_size: 5,
|
|
// I_size: 5,
|
|
// Username: "jybid",
|
|
// Username: "jybid",
|
|
// Password: "Top2023_JEB01i@31",
|
|
// Password: "Top2023_JEB01i@31",
|
|
@@ -96,13 +93,13 @@ func main() {
|
|
//EsNew.InitElasticSize()
|
|
//EsNew.InitElasticSize()
|
|
|
|
|
|
//go updateMethod() //更新mongodb
|
|
//go updateMethod() //更新mongodb
|
|
- //go updateEsMethod() //更新es
|
|
|
|
- go updateProjectEsMethod()
|
|
|
|
|
|
+ go updateEsMethod() //更新es
|
|
|
|
+ //go updateProjectEsMethod()
|
|
//taskRunProject()
|
|
//taskRunProject()
|
|
//taskRunBidding()
|
|
//taskRunBidding()
|
|
- //dealDataTest()// 测试环境数据处理
|
|
|
|
|
|
+ dealBiddingTest() // 测试环境数据处理
|
|
|
|
|
|
- updateProject()
|
|
|
|
|
|
+ //updateProject()
|
|
fmt.Println("over")
|
|
fmt.Println("over")
|
|
c := make(chan bool, 1)
|
|
c := make(chan bool, 1)
|
|
<-c
|
|
<-c
|
|
@@ -114,9 +111,6 @@ func taskRunBidding() {
|
|
sess := Mgo.GetMgoConn()
|
|
sess := Mgo.GetMgoConn()
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
- //pool := make(chan bool, 2) //处理协程
|
|
|
|
- //wg := &sync.WaitGroup{}
|
|
|
|
-
|
|
|
|
//查询条件
|
|
//查询条件
|
|
//q := map[string]interface{}{
|
|
//q := map[string]interface{}{
|
|
// //"_id": map[string]interface{}{
|
|
// //"_id": map[string]interface{}{
|
|
@@ -135,8 +129,8 @@ func taskRunBidding() {
|
|
// "toptype": map[string]interface{}{"$exists": 0},
|
|
// "toptype": map[string]interface{}{"$exists": 0},
|
|
//}
|
|
//}
|
|
|
|
|
|
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
|
|
|
|
- it := sess.DB("qfw").C("zktest_0520_id").Find(nil).Select(selected).Sort("_id").Iter()
|
|
|
|
|
|
+ //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
|
|
|
|
+ it := sess.DB("qfw").C("zktest_bidding_0619_compare").Find(nil).Select(nil).Sort("_id").Iter()
|
|
|
|
|
|
fmt.Println("taskRun 开始")
|
|
fmt.Println("taskRun 开始")
|
|
count := 0
|
|
count := 0
|
|
@@ -146,9 +140,9 @@ func taskRunBidding() {
|
|
log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
}
|
|
}
|
|
|
|
|
|
- //1.更新MongoDB
|
|
|
|
update := map[string]interface{}{}
|
|
update := map[string]interface{}{}
|
|
|
|
|
|
|
|
+ // 1.更新省市区
|
|
//if area, ok := tmp["area"]; ok && area != nil {
|
|
//if area, ok := tmp["area"]; ok && area != nil {
|
|
// update["area"] = area
|
|
// update["area"] = area
|
|
//} else {
|
|
//} else {
|
|
@@ -167,151 +161,37 @@ func taskRunBidding() {
|
|
// update["district"] = ""
|
|
// update["district"] = ""
|
|
//}
|
|
//}
|
|
|
|
|
|
- //========//
|
|
|
|
- //toptype := ""
|
|
|
|
- //if toptype, ok := tmp["toptype"]; ok && toptype != nil {
|
|
|
|
- // update["toptype"] = toptype
|
|
|
|
- //} else {
|
|
|
|
- // update["toptype"] = ""
|
|
|
|
- //}
|
|
|
|
- ////
|
|
|
|
- //if subtype, ok := tmp["subtype"]; ok && subtype != nil {
|
|
|
|
- // if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" {
|
|
|
|
- // update["subtype"] = ""
|
|
|
|
- // }
|
|
|
|
- //} else {
|
|
|
|
- // update["subtype"] = ""
|
|
|
|
- //}
|
|
|
|
- //update["toptype"] = "其它"
|
|
|
|
- //update["subtype"] = "其它"
|
|
|
|
- //if len(update) > 0 {
|
|
|
|
- // //更新MongoDB
|
|
|
|
- // updatePool <- []map[string]interface{}{
|
|
|
|
- // //{"_id": tmp["id"]},
|
|
|
|
- // {"_id": tmp["_id"]},
|
|
|
|
- // {"$set": update},
|
|
|
|
- // }
|
|
|
|
- // //====//
|
|
|
|
- // //biddingID := util.ObjToString(tmp["id"])
|
|
|
|
- // //biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
- // //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
|
- // ////2.es 项目 更新字段
|
|
|
|
- // //Es.UpdateDocument("bidding", biddingID, update)
|
|
|
|
- // //EsNew.UpdateDocument("bidding", biddingID, update)
|
|
|
|
- // //if err != nil {
|
|
|
|
- // // log.Info("bidding es update err", err, biddingID)
|
|
|
|
- // //}
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //2.es 更新字段
|
|
|
|
- //esUpdate := make(map[string]interface{})
|
|
|
|
- //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
|
|
|
|
- // esUpdate["subtitle_projectname"] = subtitle_projectname
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- biddingID := util.ObjToString(tmp["id"])
|
|
|
|
- bidamount := util.Float64All(tmp["bidamount"])
|
|
|
|
- _, ok := tmp["bidamount"]
|
|
|
|
- if ok && bidamount > 0 {
|
|
|
|
- if biddingID != "" {
|
|
|
|
- update["bidamount"] = bidamount
|
|
|
|
- }
|
|
|
|
|
|
+ // 2.更新中标单位,中标金额
|
|
|
|
+ if winner, ok := tmp["winner"]; ok && winner != nil {
|
|
|
|
+ update["winner"] = winner
|
|
|
|
+ } else {
|
|
|
|
+ update["winner"] = ""
|
|
|
|
+ }
|
|
|
|
+ if s_winner, ok := tmp["s_winner"]; ok && s_winner != nil {
|
|
|
|
+ update["s_winner"] = s_winner
|
|
|
|
+ } else {
|
|
|
|
+ update["s_winner"] = ""
|
|
}
|
|
}
|
|
|
|
+ if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
|
|
|
|
+ update["bidamount"] = bidamount
|
|
|
|
+ } else {
|
|
|
|
+ update["bidamount"] = nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //biddingID := util.ObjToString(tmp["id"])
|
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
|
|
if len(update) > 0 {
|
|
if len(update) > 0 {
|
|
- // 更新es
|
|
|
|
- updateEsPool <- []map[string]interface{}{
|
|
|
|
- {"_id": biddingID},
|
|
|
|
- //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
|
- update,
|
|
|
|
|
|
+ //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
|
+ //2.es 项目 更新字段
|
|
|
|
+ err := Es.UpdateDocument("bidding", biddingID, update)
|
|
|
|
+ err = EsNew.UpdateDocument("bidding", biddingID, update)
|
|
|
|
+ if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
|
+ log.Info("bidding es update err", err, biddingID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- //if len(update) > 0 {
|
|
|
|
- // id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
- // //id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
- // err := Es.UpdateDocument("projectset", id, esUpdate)
|
|
|
|
- // if err != nil {
|
|
|
|
- // if strings.Contains(err.Error(), "Document not updated:") {
|
|
|
|
- // continue
|
|
|
|
- // } else {
|
|
|
|
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
|
- // }
|
|
|
|
- // }
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
|
|
|
|
- // tmp = make(map[string]interface{})
|
|
|
|
- // continue
|
|
|
|
- //}
|
|
|
|
- //// 针对存量数据,重复数据不进索引
|
|
|
|
- //if util.IntAll(tmp["extracttype"]) == -1 {
|
|
|
|
- // continue
|
|
|
|
- //}
|
|
|
|
- //
|
|
|
|
- ////针对产权数据,暂时不入es 索引库
|
|
|
|
- //if util.IntAll(tmp["infoformat"]) == 3 {
|
|
|
|
- // continue
|
|
|
|
- //}
|
|
|
|
- //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
|
|
|
|
- //channel := util.ObjToString(tmp["channel"])
|
|
|
|
- //if channel != "紧急直接零星采购公告" {
|
|
|
|
- // continue
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //realNum++
|
|
|
|
- //pool <- true
|
|
|
|
- //wg.Add(1)
|
|
|
|
- //go func(tmp map[string]interface{}) {
|
|
|
|
- // defer func() {
|
|
|
|
- // <-pool
|
|
|
|
- // wg.Done()
|
|
|
|
- // }()
|
|
|
|
-
|
|
|
|
- ////2.es 更新字段
|
|
|
|
- //esUpdate := make(map[string]interface{})
|
|
|
|
- //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
|
|
|
|
- // esUpdate["autoid"] = autoid
|
|
|
|
- //}
|
|
|
|
- //
|
|
|
|
- //if len(esUpdate) > 0 {
|
|
|
|
- // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
|
|
|
|
- // if err != nil {
|
|
|
|
- // if strings.Contains(err.Error(), "Document not updated:") {
|
|
|
|
- // return
|
|
|
|
- // } else {
|
|
|
|
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
|
- // }
|
|
|
|
- // }
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //if err != nil {
|
|
|
|
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
|
|
|
|
- //}
|
|
|
|
- //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
|
|
|
|
- // esUpdate["tag_set"] = tag_set
|
|
|
|
- //}
|
|
|
|
- //
|
|
|
|
- //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
|
- // esUpdate["tag_topinformation"] = tag_topinformation
|
|
|
|
- //}
|
|
|
|
- //
|
|
|
|
- //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
|
|
|
|
- // esUpdate["tag_subinformation"] = tag_subinformation
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //if len(esUpdate) > 0 {
|
|
|
|
- // // 更新es
|
|
|
|
- // updateEsPool <- []map[string]interface{}{
|
|
|
|
- // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
|
- // esUpdate,
|
|
|
|
- // }
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- //}(tmp)
|
|
|
|
- //tmp = make(map[string]interface{})
|
|
|
|
}
|
|
}
|
|
- //wg.Wait()
|
|
|
|
|
|
|
|
log.Info("Run Over...Count:", log.Int("count", count))
|
|
log.Info("Run Over...Count:", log.Int("count", count))
|
|
|
|
|
|
@@ -537,120 +417,49 @@ func dealResult() {
|
|
log.Info("Run Over...Count:", log.Int("count", count))
|
|
log.Info("Run Over...Count:", log.Int("count", count))
|
|
}
|
|
}
|
|
|
|
|
|
-// dealDataTest 处理测试环境数据
|
|
|
|
-func dealDataTest() {
|
|
|
|
|
|
+// dealBiddingTest 处理测试环境数据
|
|
|
|
+func dealBiddingTest() {
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
sess := MgoT.GetMgoConn()
|
|
sess := MgoT.GetMgoConn()
|
|
defer MgoT.DestoryMongoConn(sess)
|
|
defer MgoT.DestoryMongoConn(sess)
|
|
|
|
|
|
- where := map[string]interface{}{
|
|
|
|
- "_id": map[string]interface{}{
|
|
|
|
- "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
|
|
|
|
- "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //where := map[string]interface{}{
|
|
|
|
- // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
|
|
|
|
- //}
|
|
|
|
|
|
+ it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
|
|
|
|
|
|
- ctx := context.Background()
|
|
|
|
- coll := sess.M.C.Database("qfw_data").Collection("bidding")
|
|
|
|
- find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
|
|
|
|
- cur, err := coll.Find(ctx, where, find)
|
|
|
|
- if err != nil {
|
|
|
|
- fmt.Println(err)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ///////
|
|
|
|
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
|
|
|
|
- //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
|
|
|
|
|
|
+ fmt.Println("taskRun 开始")
|
|
count := 0
|
|
count := 0
|
|
- realNum := 0
|
|
|
|
- for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
|
|
|
|
- if cur != nil {
|
|
|
|
- cur.Decode(&tmp)
|
|
|
|
- }
|
|
|
|
- if count%1000 == 0 {
|
|
|
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
|
+ if count%10000 == 0 {
|
|
log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
|
|
}
|
|
}
|
|
- idStr := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
|
|
|
|
- data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
|
|
|
|
-
|
|
|
|
- if len(*data) == 0 {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- update := make(map[string]interface{})
|
|
|
|
- if (*data)["signaturedate"] != nil {
|
|
|
|
- update["signaturedate"] = (*data)["signaturedate"]
|
|
|
|
- }
|
|
|
|
- if (*data)["contractperiod"] != nil {
|
|
|
|
- update["contractperiod"] = (*data)["contractperiod"]
|
|
|
|
- }
|
|
|
|
- if (*data)["expiredate"] != nil {
|
|
|
|
- update["expiredate"] = (*data)["expiredate"]
|
|
|
|
|
|
+ update := map[string]interface{}{}
|
|
|
|
+ // 2.更新中标单位,中标金额
|
|
|
|
+ if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
|
|
|
|
+ update["tag_topinformation"] = tag_topinformation
|
|
}
|
|
}
|
|
|
|
|
|
- if len(update) == 0 {
|
|
|
|
- continue
|
|
|
|
|
|
+ if property_form, ok := tmp["property_form"]; ok && property_form != nil {
|
|
|
|
+ update["property_form"] = property_form
|
|
}
|
|
}
|
|
|
|
|
|
- fmt.Println(idStr)
|
|
|
|
- MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
|
- //bidding 表
|
|
|
|
- //if idStr > "5a862e7040d2d9bbe88e3b1f" {
|
|
|
|
- // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
|
- // data := *bidding
|
|
|
|
- // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
|
|
|
|
- //
|
|
|
|
- // // 针对存量数据,重复数据不进索引
|
|
|
|
- // if util.IntAll(data["extracttype"]) == -1 {
|
|
|
|
- // tmp = make(map[string]interface{})
|
|
|
|
- // continue
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- //} else {
|
|
|
|
- // //bidding_back
|
|
|
|
- // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
|
|
|
|
- // data := *bidding
|
|
|
|
- // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
|
|
|
|
- // // 针对存量数据,重复数据不进索引
|
|
|
|
- // if util.IntAll(data["extracttype"]) == -1 {
|
|
|
|
- // tmp = make(map[string]interface{})
|
|
|
|
- // continue
|
|
|
|
- // }
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- realNum++
|
|
|
|
|
|
+ biddingID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
|
|
- //2.es 更新字段
|
|
|
|
- esUpdate := update
|
|
|
|
- esUpdate["id"] = idStr
|
|
|
|
- if len(esUpdate) > 0 {
|
|
|
|
- fmt.Println(idStr)
|
|
|
|
|
|
+ if len(update) > 0 {
|
|
|
|
+ //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
|
|
|
|
+ //2.es 项目 更新字段
|
|
|
|
+ //err := Es.UpdateDocument("bidding", biddingID, update)
|
|
|
|
+ //if err != nil && err.Error() != "Document not updated: noop" {
|
|
|
|
+ // log.Info("bidding es update err", err, biddingID)
|
|
|
|
+ //}
|
|
// 更新es
|
|
// 更新es
|
|
updateEsPool <- []map[string]interface{}{
|
|
updateEsPool <- []map[string]interface{}{
|
|
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
|
- esUpdate,
|
|
|
|
|
|
+ {"_id": biddingID},
|
|
|
|
+ update,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- //err := Es.UpdateDocument("bidding", idStr, update)
|
|
|
|
- //if err != nil {
|
|
|
|
- // log.Error("es update", err)
|
|
|
|
- //}
|
|
|
|
- //
|
|
|
|
- //err = EsNew.UpdateDocument("bidding", idStr, update)
|
|
|
|
- //if err != nil {
|
|
|
|
- // log.Error("esNew update", err)
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- tmp = make(map[string]interface{})
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
|
|
|
|
-
|
|
|
|
|
|
+ log.Info("Run Over...Count:", log.Int("count", count))
|
|
}
|
|
}
|
|
|
|
|
|
// updateMethod 更新MongoDB
|
|
// updateMethod 更新MongoDB
|
|
@@ -705,7 +514,7 @@ func updateEsMethod() {
|
|
<-updateEsSp
|
|
<-updateEsSp
|
|
}()
|
|
}()
|
|
Es.UpdateBulk("bidding", arru...)
|
|
Es.UpdateBulk("bidding", arru...)
|
|
- EsNew.UpdateBulk("bidding", arru...)
|
|
|
|
|
|
+ //EsNew.UpdateBulk("bidding", arru...)
|
|
}(arru)
|
|
}(arru)
|
|
arru = make([][]map[string]interface{}, 200)
|
|
arru = make([][]map[string]interface{}, 200)
|
|
indexu = 0
|
|
indexu = 0
|
|
@@ -718,7 +527,7 @@ func updateEsMethod() {
|
|
<-updateEsSp
|
|
<-updateEsSp
|
|
}()
|
|
}()
|
|
Es.UpdateBulk("bidding", arru...)
|
|
Es.UpdateBulk("bidding", arru...)
|
|
- EsNew.UpdateBulk("bidding", arru...)
|
|
|
|
|
|
+ //EsNew.UpdateBulk("bidding", arru...)
|
|
}(arru[:indexu])
|
|
}(arru[:indexu])
|
|
arru = make([][]map[string]interface{}, 200)
|
|
arru = make([][]map[string]interface{}, 200)
|
|
indexu = 0
|
|
indexu = 0
|