123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package main
- import (
- "fmt"
- "github.com/wcc4869/common_utils/log"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "sync"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- saveSize = 50
- Es *elastic.Elastic
- EsNew *elastic.Elastic
- // 更新mongo
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- //更新es
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 2) //保存协程
- )
- func main() {
- //mongodb
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- //es
- Es = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19805",
- S_esurl: "http://172.17.4.184:19805",
- I_size: 5,
- Username: "es_all",
- Password: "TopJkO2E_d1x",
- }
- Es.InitElasticSize()
- //es 新集群
- EsNew = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19905",
- S_esurl: "http://172.17.4.184:19905",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- EsNew.InitElasticSize()
- go updateMethod() //更新mongodb
- go updateEsMethod() //更新es
- taskRun()
- fmt.Println(111)
- c := make(chan bool, 1)
- <-c
- }
- // taskRun 更新es 省市区三个字段
- func taskRun() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 10) //处理协程
- wg := &sync.WaitGroup{}
- //查询条件
- q := map[string]interface{}{
- //"_id": map[string]interface{}{
- // "$gt": mongodb.StringTOBsonId("652423800000000000000000"),
- // "$lte": mongodb.StringTOBsonId("6543c7800000000000000000"),
- //},
- "comeintime": map[string]interface{}{
- "$gt": 1669824000,
- //"$lte": 1669864950,
- "$lte": 1702265941,
- },
- "site": "国家能源e购",
- }
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
- it := sess.DB("qfw").C("bidding").Find(&q).Select(&selected).Iter()
- fmt.Println("开始")
- count := 0
- realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- continue
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- //针对产权数据,暂时不入es 索引库
- if util.IntAll(tmp["infoformat"]) == 3 {
- continue
- }
- //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交
- channel := util.ObjToString(tmp["channel"])
- if channel != "紧急直接零星采购公告" {
- continue
- }
- realNum++
- fmt.Println(mongodb.BsonIdToSId(tmp["_id"]))
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //1.更新MongoDB
- update := map[string]interface{}{
- "toptype": "结果",
- "subtype": "成交",
- }
- if len(update) > 0 {
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- }
- //2.es 更新字段
- esUpdate := map[string]interface{}{
- "toptype": "结果",
- "subtype": "成交",
- "id": mongodb.BsonIdToSId(tmp["_id"]),
- }
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info("Run Over...Count1:", log.Int("count", count), log.Int("realNum", realNum))
- fmt.Println("结束")
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|