package main import ( "fmt" "github.com/wcc4869/common_utils/log" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" "time" ) var ( Mgo *mongodb.MongodbSim saveSize = 50 Es *elastic.Elastic EsNew *elastic.Elastic // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 2) //保存协程 ) func main() { //mongodb Mgo = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() //es Es = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19805", S_esurl: "http://172.17.4.184:19805", I_size: 5, Username: "es_all", Password: "TopJkO2E_d1x", } Es.InitElasticSize() //es 新集群 EsNew = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19905", S_esurl: "http://172.17.4.184:19905", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } EsNew.InitElasticSize() go updateMethod() //更新mongodb go updateEsMethod() //更新es taskRun() fmt.Println(111) c := make(chan bool, 1) <-c } // taskRun 更新es 省市区三个字段 func taskRun() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 10) //处理协程 wg := &sync.WaitGroup{} //查询条件 q := map[string]interface{}{ //"_id": map[string]interface{}{ // "$gt": mongodb.StringTOBsonId("652423800000000000000000"), // "$lte": mongodb.StringTOBsonId("6543c7800000000000000000"), //}, "comeintime": map[string]interface{}{ "$gt": 1669824000, //"$lte": 1669864950, "$lte": 1702265941, }, "site": "国家能源e购", } selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("bidding").Find(&q).Select(&selected).Iter() fmt.Println("开始") count := 0 realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { continue } //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { continue } //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交 channel := util.ObjToString(tmp["channel"]) if channel != "紧急直接零星采购公告" { continue } realNum++ fmt.Println(mongodb.BsonIdToSId(tmp["_id"])) pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() //1.更新MongoDB update := map[string]interface{}{ "toptype": "结果", "subtype": "成交", } if len(update) > 0 { //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } } //2.es 更新字段 esUpdate := map[string]interface{}{ "toptype": "结果", "subtype": "成交", "id": mongodb.BsonIdToSId(tmp["_id"]), } if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info("Run Over...Count1:", log.Int("count", count), log.Int("realNum", realNum)) fmt.Println("结束") } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }