123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764 |
- package main
- import (
- "context"
- "fmt"
- "github.com/wcc4869/common_utils/log"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo/options"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- MgoT *mongodb.MongodbSim //测试环境链接
- MgoR *mongodb.MongodbSim
- saveSize = 50
- Es *elastic.Elastic
- EsNew *elastic.Elastic
- EsT *elastic.Elastic
- // 更新mongo
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- //更新es
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 5) //保存协程
- )
- func main() {
- //mongodb 163
- //Mgo = &mongodb.MongodbSim{
- // MongodbAddr: "172.17.189.140:27080",
- // //MongodbAddr: "127.0.0.1:27083",
- // DbName: "qfw",
- // Size: 10,
- // UserName: "SJZY_RWbid_ES",
- // Password: "SJZY@B4i4D5e6S",
- // //Direct: true,
- //}
- //Mgo.InitPool()
- //85
- //MgoR = &mongodb.MongodbSim{
- // //MongodbAddr: "127.0.0.1:27080",
- // MongodbAddr: "172.17.4.85:27080",
- // DbName: "qfw",
- // Size: 10,
- // //Direct: true,
- //}
- //MgoR.InitPool()
- ////测试环境MongoDB
- //MgoT = &mongodb.MongodbSim{
- // //MongodbAddr: "172.17.189.140:27080",
- // MongodbAddr: "192.168.3.206:27002",
- // DbName: "qfw_data",
- // Size: 10,
- // UserName: "root",
- // Password: "root",
- // //Direct: true,
- //}
- //MgoT.InitPool()
- // 测试环境es
- //Es = &elastic.Elastic{
- // S_esurl: "http://192.168.3.149:9201",
- // //S_esurl: "http://172.17.4.184:19805",
- // I_size: 5,
- // Username: "",
- // Password: "",
- //}
- //Es.InitElasticSize()
- //es
- Es = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19908",
- S_esurl: "http://172.17.4.184:19908",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- Es.InitElasticSize()
- // es 新集群
- //EsNew = &elastic.Elastic{
- // //S_esurl: "http://127.0.0.1:19905",
- // S_esurl: "http://172.17.4.184:19905",
- // I_size: 5,
- // Username: "jybid",
- // Password: "Top2023_JEB01i@31",
- //}
- //EsNew.InitElasticSize()
- //go updateMethod() //更新mongodb
- //go updateEsMethod() //更新es
- go updateProjectEsMethod()
- //taskRunProject()
- //taskRunBidding()
- //dealDataTest()// 测试环境数据处理
- updateProject()
- fmt.Println("over")
- c := make(chan bool, 1)
- <-c
- }
- // taskRun 更新es 省市区三个字段
- func taskRunBidding() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //pool := make(chan bool, 2) //处理协程
- //wg := &sync.WaitGroup{}
- //查询条件
- //q := map[string]interface{}{
- // //"_id": map[string]interface{}{
- // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
- // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
- // //
- // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
- // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
- // //},
- // //"comeintime": map[string]interface{}{
- // // "$gt": 1669824000,
- // // //"$lte": 1669864950,
- // // "$lte": 1702265941,
- // //},
- // //"site": "国家能源e购",
- // "toptype": map[string]interface{}{"$exists": 0},
- //}
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
- it := sess.DB("qfw").C("zktest_0520_id").Find(nil).Select(selected).Sort("_id").Iter()
- fmt.Println("taskRun 开始")
- count := 0
- //realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- //1.更新MongoDB
- update := map[string]interface{}{}
- //if area, ok := tmp["area"]; ok && area != nil {
- // update["area"] = area
- //} else {
- // update["area"] = ""
- //}
- //
- //if city, ok := tmp["city"]; ok && city != nil {
- // update["city"] = city
- //} else {
- // update["city"] = ""
- //}
- //
- //if district, ok := tmp["district"]; ok && district != nil {
- // update["district"] = district
- //} else {
- // update["district"] = ""
- //}
- //========//
- //toptype := ""
- //if toptype, ok := tmp["toptype"]; ok && toptype != nil {
- // update["toptype"] = toptype
- //} else {
- // update["toptype"] = ""
- //}
- ////
- //if subtype, ok := tmp["subtype"]; ok && subtype != nil {
- // if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" {
- // update["subtype"] = ""
- // }
- //} else {
- // update["subtype"] = ""
- //}
- //update["toptype"] = "其它"
- //update["subtype"] = "其它"
- //if len(update) > 0 {
- // //更新MongoDB
- // updatePool <- []map[string]interface{}{
- // //{"_id": tmp["id"]},
- // {"_id": tmp["_id"]},
- // {"$set": update},
- // }
- // //====//
- // //biddingID := util.ObjToString(tmp["id"])
- // //biddingID := mongodb.BsonIdToSId(tmp["_id"])
- // //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- // ////2.es 项目 更新字段
- // //Es.UpdateDocument("bidding", biddingID, update)
- // //EsNew.UpdateDocument("bidding", biddingID, update)
- // //if err != nil {
- // // log.Info("bidding es update err", err, biddingID)
- // //}
- //}
- //2.es 更新字段
- //esUpdate := make(map[string]interface{})
- //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil {
- // esUpdate["subtitle_projectname"] = subtitle_projectname
- //}
- biddingID := util.ObjToString(tmp["id"])
- bidamount := util.Float64All(tmp["bidamount"])
- _, ok := tmp["bidamount"]
- if ok && bidamount > 0 {
- if biddingID != "" {
- update["bidamount"] = bidamount
- }
- }
- if len(update) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- //{"_id": mongodb.BsonIdToSId(tmp["_id"])},
- update,
- }
- }
- //if len(update) > 0 {
- // id := mongodb.BsonIdToSId(tmp["_id"])
- // //id := mongodb.BsonIdToSId(tmp["_id"])
- // err := Es.UpdateDocument("projectset", id, esUpdate)
- // if err != nil {
- // if strings.Contains(err.Error(), "Document not updated:") {
- // continue
- // } else {
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
- // }
- // }
- //}
- //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- // tmp = make(map[string]interface{})
- // continue
- //}
- //// 针对存量数据,重复数据不进索引
- //if util.IntAll(tmp["extracttype"]) == -1 {
- // continue
- //}
- //
- ////针对产权数据,暂时不入es 索引库
- //if util.IntAll(tmp["infoformat"]) == 3 {
- // continue
- //}
- //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
- //channel := util.ObjToString(tmp["channel"])
- //if channel != "紧急直接零星采购公告" {
- // continue
- //}
- //realNum++
- //pool <- true
- //wg.Add(1)
- //go func(tmp map[string]interface{}) {
- // defer func() {
- // <-pool
- // wg.Done()
- // }()
- ////2.es 更新字段
- //esUpdate := make(map[string]interface{})
- //if autoid, ok := tmp["autoid"]; ok && autoid != nil {
- // esUpdate["autoid"] = autoid
- //}
- //
- //if len(esUpdate) > 0 {
- // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate)
- // if err != nil {
- // if strings.Contains(err.Error(), "Document not updated:") {
- // return
- // } else {
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
- // }
- // }
- //}
- //if err != nil {
- // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"]))
- //}
- //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil {
- // esUpdate["tag_set"] = tag_set
- //}
- //
- //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
- // esUpdate["tag_topinformation"] = tag_topinformation
- //}
- //
- //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil {
- // esUpdate["tag_subinformation"] = tag_subinformation
- //}
- //if len(esUpdate) > 0 {
- // // 更新es
- // updateEsPool <- []map[string]interface{}{
- // {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- // esUpdate,
- // }
- //}
- //}(tmp)
- //tmp = make(map[string]interface{})
- }
- //wg.Wait()
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // taskRunProject 更新项目表 省市区
- func taskRunProject() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- // 项目数据
- MgoP := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.85:27080",
- //MongodbAddr: "127.0.0.1:27080",
- Size: 10,
- DbName: "qfw",
- //Direct: true,
- }
- MgoP.InitPool()
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
- it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- where := map[string]interface{}{
- "ids": biddingID,
- }
- // 找到对应项目数据
- p, _ := MgoP.FindOne("projectset_20230904", where)
- projectId := mongodb.BsonIdToSId((*p)["_id"])
- //1.更新MongoDB
- update := map[string]interface{}{}
- if area, ok := tmp["area"]; ok && area != nil {
- update["area"] = area
- } else {
- update["area"] = ""
- }
- if city, ok := tmp["city"]; ok && city != nil {
- update["city"] = city
- } else {
- update["city"] = ""
- }
- if district, ok := tmp["district"]; ok && district != nil {
- update["district"] = district
- } else {
- update["district"] = ""
- }
- if len(update) > 0 {
- MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
- //2.es 项目 更新字段
- err := Es.UpdateDocument("projectset", projectId, update)
- if err != nil {
- log.Info("es update err", err, projectId)
- }
- }
- //2.es 项目 更新字段
- //if len(update) > 0 {
- // // 更新es
- // //updateEsPool <- []map[string]interface{}{
- // // {"_id": projectId},
- // // update,
- // //}
- //}
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // dealData 正式环境,同步合同期限
- func dealData() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //where := map[string]interface{}{
- // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
- //}
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
- it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
- count := 0
- realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- idStr := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if tmp["signaturedate"] != nil {
- update["signaturedate"] = tmp["signaturedate"]
- }
- if tmp["contractperiod"] != nil {
- update["contractperiod"] = tmp["contractperiod"]
- }
- if tmp["expiredate"] != nil {
- update["expiredate"] = tmp["expiredate"]
- }
- if len(update) == 0 {
- continue
- }
- //bidding 表
- if idStr > "5a862e7040d2d9bbe88e3b1f" {
- bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- } else {
- //bidding_back
- bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- }
- realNum++
- //2.es 更新字段
- esUpdate := update
- esUpdate["id"] = idStr
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
- }
- // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
- func dealResult() {
- defer util.Catch()
- sess := MgoR.GetMgoConn()
- defer MgoR.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
- "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
- },
- "subtype": "合同",
- }
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
- it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- idStr := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if tmp["signaturedate"] != nil {
- update["signaturedate"] = tmp["signaturedate"]
- }
- if tmp["contractperiod"] != nil {
- update["contractperiod"] = tmp["contractperiod"]
- }
- if tmp["expiredate"] != nil {
- update["expiredate"] = tmp["expiredate"]
- }
- if len(update) == 0 {
- continue
- }
- bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- //2.es 更新字段
- esUpdate := update
- esUpdate["id"] = idStr
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // dealDataTest 处理测试环境数据
- func dealDataTest() {
- defer util.Catch()
- sess := MgoT.GetMgoConn()
- defer MgoT.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"),
- "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
- },
- }
- //where := map[string]interface{}{
- // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"),
- //}
- ctx := context.Background()
- coll := sess.M.C.Database("qfw_data").Collection("bidding")
- find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1})
- cur, err := coll.Find(ctx, where, find)
- if err != nil {
- fmt.Println(err)
- }
- ///////
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
- //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter()
- count := 0
- realNum := 0
- for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
- if cur != nil {
- cur.Decode(&tmp)
- }
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- idStr := mongodb.BsonIdToSId(tmp["_id"])
- data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected)
- if len(*data) == 0 {
- continue
- }
- update := make(map[string]interface{})
- if (*data)["signaturedate"] != nil {
- update["signaturedate"] = (*data)["signaturedate"]
- }
- if (*data)["contractperiod"] != nil {
- update["contractperiod"] = (*data)["contractperiod"]
- }
- if (*data)["expiredate"] != nil {
- update["expiredate"] = (*data)["expiredate"]
- }
- if len(update) == 0 {
- continue
- }
- fmt.Println(idStr)
- MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- //bidding 表
- //if idStr > "5a862e7040d2d9bbe88e3b1f" {
- // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
- // data := *bidding
- // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- //
- // // 针对存量数据,重复数据不进索引
- // if util.IntAll(data["extracttype"]) == -1 {
- // tmp = make(map[string]interface{})
- // continue
- // }
- //
- //} else {
- // //bidding_back
- // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
- // data := *bidding
- // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
- // // 针对存量数据,重复数据不进索引
- // if util.IntAll(data["extracttype"]) == -1 {
- // tmp = make(map[string]interface{})
- // continue
- // }
- //}
- realNum++
- //2.es 更新字段
- esUpdate := update
- esUpdate["id"] = idStr
- if len(esUpdate) > 0 {
- fmt.Println(idStr)
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- //err := Es.UpdateDocument("bidding", idStr, update)
- //if err != nil {
- // log.Error("es update", err)
- //}
- //
- //err = EsNew.UpdateDocument("bidding", idStr, update)
- //if err != nil {
- // log.Error("esNew update", err)
- //}
- tmp = make(map[string]interface{})
- }
- log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- // updateProjectEsMethod 更新项目索引
- func updateProjectEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|