package main import ( util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" ) // updateBidding 更新bidding数据 func updateBidding() { //mongodb Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() //es Es := &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19805", S_esurl: "http://172.17.4.184:19805", I_size: 5, Username: "es_all", Password: "TopJkO2E_d1x", } Es.InitElasticSize() //es 新集群 EsNew := &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19905", S_esurl: "http://172.17.4.184:19905", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } EsNew.InitElasticSize() // defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("bidding").Find(nil).Select(&selected).Iter() count := 0 log.Println("开始") // 拟建 爬虫代码 njCodes := []string{"a_qgjsxmhjxxgspt_ysbggs", "a_qgjsxmhjxxgspt_hpspqgs", "a_qgjsxmhjxxgspt_hpbggs"} //产权 爬虫代码 cqCodes := []string{"sh_shsggzyjyzx_nyys", "sh_shsggzyjyzx_sfpm", "gd_dgsncjtzcglw_djyxx_lbfbzc", "gd_dgsncjtzcglw_djyxx_djyzc", "sc_scsggzyjyfwpt_jygk_qt", "qg_zhrmghgzrzyb_tdsccrgg", "ln_lnsggzyjypt_gycq", "a_gjggzyjypt_qt"} for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { id := mongodb.BsonIdToSId(tmp["_id"]) spidercode := util.ObjToString(tmp["spidercode"]) if count%10000 == 0 { log.Println("current:", count, id) } if IsInStringArray(spidercode, njCodes) { //拟建数据 update := map[string]interface{}{ "toptype": "拟建", "subtype": "拟建", } Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": update}) err := Es.UpdateDocument("bidding", id, update) if err != nil { log.Println("es 更新失败", id, err) } err = EsNew.UpdateDocument("bidding", id, update) if err != nil { log.Println("EsNew 更新失败", id, err) } //产权数据 } else if IsInStringArray(spidercode, cqCodes) { update := map[string]interface{}{ "toptype": "产权", "subtype": "产权", } Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": update}) err := Es.DeleteByID("bidding", id) if err != nil { log.Println("es delete ", id, err) } err = EsNew.DeleteByID("bidding", id) if err != nil { log.Println("EsNew delete ", id, err) } } } log.Println("结束 ---------- over ") } // updateTop 更新一级分类 func updateTop() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("qfw").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } id := mongodb.BsonIdToSId(tmp["_id"]) bidding, _ := Mgo.FindById("bidding", id, nil) update := map[string]interface{}{ "toptype": (*bidding)["toptype"], "subtype": (*bidding)["subtype"], "infoformat": (*bidding)["infoformat"], } Mgo.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update}) tmp = make(map[string]interface{}) } log.Println("over") }