12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- es7 "github.com/olivere/elastic/v7"
- "github.com/wcc4869/common_utils/log"
- "go.uber.org/zap"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "reflect"
- "strings"
- "time"
- )
- var (
- Mgo *mongodb.MongodbSim
- MgoB *mongodb.MongodbSim
- MgoBAi *mongodb.MongodbSim
- MgoT *mongodb.MongodbSim //测试环境链接
- MgoR *mongodb.MongodbSim
- saveSize = 50
- Es *elastic.Elastic // 19908
- EsNew *elastic.Elastic //19905
- //EsT *elastic.Elastic
- // 更新mongo
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- //更新es
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 5) //保存协程
- updateProjectEsPool = make(chan []map[string]interface{}, 5000)
- updateProjectEsSp = make(chan bool, 5) //保存协程
- BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
- BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
- )
- func Init() {
- //MgoB = &mongodb.MongodbSim{
- // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- // //MongodbAddr: "127.0.0.1:27083",
- // DbName: "qfw",
- // Size: 10,
- // UserName: "SJZY_RWbid_ES",
- // Password: "SJZY@B4i4D5e6S",
- // //Direct: true,
- //}
- //MgoB.InitPool()
- //MgoBAi = &mongodb.MongodbSim{
- // //MongodbAddr: "172.17.189.140:27080",
- // MongodbAddr: "127.0.0.1:27083",
- // DbName: "qfw_ai",
- // Size: 10,
- // UserName: "SJZY_RWbid_ES",
- // Password: "SJZY@B4i4D5e6S",
- // Direct: true,
- //}
- //MgoBAi.InitPool()
- //mongodb 163
- //Mgo = &mongodb.MongodbSim{
- // //MongodbAddr: "172.17.189.140:27080",
- // MongodbAddr: "127.0.0.1:27083",
- // DbName: "qfw",
- // Size: 10,
- // UserName: "SJZY_RWbid_ES",
- // Password: "SJZY@B4i4D5e6S",
- // Direct: true,
- //}
- //Mgo.InitPool()
- //85
- MgoR = &mongodb.MongodbSim{
- //MongodbAddr: "127.0.0.1:27080",
- MongodbAddr: "172.17.4.85:27080",
- DbName: "qfw",
- Size: 10,
- //Direct: true,
- }
- MgoR.InitPool()
- //测试环境MongoDB
- //MgoT = &mongodb.MongodbSim{
- // //MongodbAddr: "172.17.189.140:27080",
- // MongodbAddr: "192.168.3.206:27002",
- // DbName: "qfw_data",
- // Size: 10,
- // UserName: "root",
- // Password: "root",
- // //Direct: true,
- //}
- //MgoT.InitPool()
- ////测试环境es
- //Es = &elastic.Elastic{
- // S_esurl: "http://192.168.3.149:9201",
- // //S_esurl: "http://172.17.4.184:19805",
- // I_size: 5,
- // Username: "",
- // Password: "",
- //}
- //Es.InitElasticSize()
- //es
- Es = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19908",
- S_esurl: "http://172.17.4.184:19908",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- Es.InitElasticSize()
- //es 新集群
- EsNew = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19905",
- S_esurl: "http://172.17.4.184:19905",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- EsNew.InitElasticSize()
- }
- func main() {
- Init()
- //InitEsBiddingField()
- //go updateMethod() //更新mongodb
- //go updateEsMethod() //更新es
- //go updateEsHrefMethod() //更新es href 字段
- //go updateProjectEsMethod()
- //taskRunProject()
- //taskRunBidding()
- //dealBidding() //正式环境bidding数据处理
- //dealBiddingAi() //正式环境bidding数据处理
- //dealBiddingTest() // 测试环境数据处理
- //dealBiddingEsHref() // 根据临时表,更新es href 字段
- //dealBiddingNiJian() //更新拟建数据中buyer = owner
- //updateBiddingBidamount()
- //updateProject()
- //-------------------------------//
- fixBiddingEs()
- log.Info("over")
- c := make(chan bool, 1)
- <-c
- }
- // fixBiddingEs 修复bidding 索引数据,
- func fixBiddingEs() {
- defer util.Catch()
- sess := MgoR.GetMgoConn()
- defer MgoR.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"),
- "$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"),
- },
- }
- it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- repeat := util.IntAll(tmp["repeat"])
- repeat_reason := util.ObjToString(tmp["repeat_reason"])
- if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") {
- Es.DeleteByID("bidding", biddingID)
- log.Info("fixBiddingEs", zap.String("biddingID", biddingID))
- EsNew.DeleteByID("bidding", biddingID)
- }
- }
- }
- func InitEsBiddingField() {
- now := time.Now()
- info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
- if len(*info) > 0 {
- for _, m := range *info {
- if util.IntAll(m["level"]) == 1 {
- BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- } else if util.IntAll(m["level"]) == 2 {
- pfield := util.ObjToString(m["pfield"])
- pfieldMap := BiddingLevelField[pfield]
- if pfieldMap == nil {
- pfieldMap = make(map[string]string, 0)
- }
- pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- BiddingLevelField[pfield] = pfieldMap
- }
- }
- }
- log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
- log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
- log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
- }
- // taskRun 更新es 省市区三个字段
- func taskRunBidding() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //查询条件
- //q := map[string]interface{}{
- // //"_id": map[string]interface{}{
- // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"),
- // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
- // //
- // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"),
- // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"),
- // //},
- // //"comeintime": map[string]interface{}{
- // // "$gt": 1669824000,
- // // //"$lte": 1669864950,
- // // "$lte": 1702265941,
- // //},
- // //"site": "国家能源e购",
- // "toptype": map[string]interface{}{"$exists": 0},
- //}
- //selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
- it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- //realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%100 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- update := map[string]interface{}{}
- // 1.更新省市区
- //if area, ok := tmp["area"]; ok && area != nil {
- // update["area"] = area
- //} else {
- // update["area"] = ""
- //}
- //
- //if city, ok := tmp["city"]; ok && city != nil {
- // update["city"] = city
- //} else {
- // update["city"] = ""
- //}
- //
- //if district, ok := tmp["district"]; ok && district != nil {
- // if district == "乌拉盖管委会" {
- // update["district"] = "乌拉盖管理区管委会"
- // } else if district == "错那县" {
- // update["district"] = "错那市"
- // } else if district == "河南周口经济开发区" {
- // update["district"] = "周口临港开发区"
- // } else if district == "米林县" {
- // update["district"] = "米林市"
- // }
- //
- //}
- //-------------------------------------------//
- // 2.更新中标单位、采购单位、代理机构
- biddingID := util.ObjToString(tmp["id"])
- //biddingID := mongodb.BsonIdToSId(tmp["_id"])
- if _, ok := tmp["buyer"]; ok {
- update["buyer"] = tmp["buyer"]
- }
- if _, ok := tmp["agency"]; ok {
- update["agency"] = tmp["agency"]
- }
- if _, ok := tmp["s_winner"]; ok {
- update["s_winner"] = tmp["s_winner"]
- }
- if _, ok := tmp["winner"]; ok {
- update["winner"] = tmp["winner"]
- }
- //-------------------------------------------//
- //3. 更新中标金额
- //biddingID := util.ObjToString(tmp["id"])
- //if _, ok := tmp["nb"]; !ok {
- // continue
- //} else {
- // update["bidamount"] = tmp["nb"]
- //}
- //update["bidamount"] = tmp["bidamount"]
- //// 更新 MongoDB + ES
- if len(update) > 0 {
- MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //2.es 项目 更新字段
- err := Es.UpdateDocument("bidding", biddingID, update)
- err = EsNew.UpdateDocument("bidding", biddingID, update)
- if err != nil && err.Error() != "Document not updated: noop" {
- log.Info("bidding es update err", err, biddingID)
- }
- }
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // taskRunProject 更新项目表 省市区
- func taskRunProject() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- // 项目数据
- MgoP := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.85:27080",
- //MongodbAddr: "127.0.0.1:27080",
- Size: 10,
- DbName: "qfw",
- //Direct: true,
- }
- MgoP.InitPool()
- selected := map[string]interface{}{"contenthtml": 0, "detail": 0}
- it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- where := map[string]interface{}{
- "ids": biddingID,
- }
- // 找到对应项目数据
- p, _ := MgoP.FindOne("projectset_20230904", where)
- projectId := mongodb.BsonIdToSId((*p)["_id"])
- //1.更新MongoDB
- update := map[string]interface{}{}
- if area, ok := tmp["area"]; ok && area != nil {
- update["area"] = area
- } else {
- update["area"] = ""
- }
- if city, ok := tmp["city"]; ok && city != nil {
- update["city"] = city
- } else {
- update["city"] = ""
- }
- if district, ok := tmp["district"]; ok && district != nil {
- update["district"] = district
- } else {
- update["district"] = ""
- }
- if len(update) > 0 {
- MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
- //2.es 项目 更新字段
- err := Es.UpdateDocument("projectset", projectId, update)
- if err != nil {
- log.Info("es update err", err, projectId)
- }
- }
- //2.es 项目 更新字段
- //if len(update) > 0 {
- // // 更新es
- // //updateEsPool <- []map[string]interface{}{
- // // {"_id": projectId},
- // // update,
- // //}
- //}
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // dealData 正式环境,同步合同期限
- func dealData() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //where := map[string]interface{}{
- // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"),
- //}
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
- it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter()
- count := 0
- realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- idStr := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if tmp["signaturedate"] != nil {
- update["signaturedate"] = tmp["signaturedate"]
- }
- if tmp["contractperiod"] != nil {
- update["contractperiod"] = tmp["contractperiod"]
- }
- if tmp["expiredate"] != nil {
- update["expiredate"] = tmp["expiredate"]
- }
- if len(update) == 0 {
- continue
- }
- //bidding 表
- if idStr > "5a862e7040d2d9bbe88e3b1f" {
- bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- } else {
- //bidding_back
- bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- }
- realNum++
- //2.es 更新字段
- esUpdate := update
- esUpdate["id"] = idStr
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum))
- }
- // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据
- func dealResult() {
- defer util.Catch()
- sess := MgoR.GetMgoConn()
- defer MgoR.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"),
- "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"),
- },
- "subtype": "合同",
- }
- selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1}
- it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- idStr := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if tmp["signaturedate"] != nil {
- update["signaturedate"] = tmp["signaturedate"]
- }
- if tmp["contractperiod"] != nil {
- update["contractperiod"] = tmp["contractperiod"]
- }
- if tmp["expiredate"] != nil {
- update["expiredate"] = tmp["expiredate"]
- }
- if len(update) == 0 {
- continue
- }
- bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1})
- data := *bidding
- Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update})
- // 针对存量数据,重复数据不进索引
- if util.IntAll(data["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- //2.es 更新字段
- esUpdate := update
- esUpdate["id"] = idStr
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": mongodb.BsonIdToSId(tmp["_id"])},
- esUpdate,
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // dealBidding 处理bidding数据
- func dealBidding() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- //where := map[string]interface{}{
- // "comeintime": map[string]interface{}{
- // "$lt": 1722009600,
- // //"$lt": 1718812802,
- // "$gte": 1718899200,
- // },
- //}
- //where := map[string]interface{}{
- // "_id": map[string]interface{}{
- // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"),
- // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"),
- // },
- //}
- it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- //update := map[string]interface{}{}
- esUpdate := map[string]interface{}{}
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- if util.ObjToString(tmp["purchasing"]) == "" {
- continue
- }
- esUpdate["purchasing"] = tmp["purchasing"]
- // 2.更新中标单位,中标金额
- //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
- // update["tag_topinformation"] = tag_topinformation
- //}
- //
- //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
- // update["property_form"] = property_form
- //}
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- //fmt.Println(biddingID)
- /**
- "s_subscopeclass" : "其它",
- "s_topscopeclass" : "其它",
- "subscopeclass" : [
- "其它"
- ],
- "topscopeclass" : [
- "其它"
- ],
- */
- //// 行业分类默认值
- //resultSubs := make([]string, 0)
- //resultTobs := make([]string, 0)
- //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil {
- // if topps, ok2 := topscopeclass.([]interface{}); ok2 {
- // for _, v := range topps {
- // top := util.ObjToString(v)
- // if top != "" {
- // resultTobs = append(resultTobs, top)
- // }
- // }
- // }
- // //1.一级分类是空数组或者 是 其它
- // if len(resultTobs) == 0 || resultTobs[0] == "其它" {
- // update["topscopeclass"] = []string{"其它"}
- // update["subscopeclass"] = []string{"其它"}
- // update["s_topscopeclass"] = "其它"
- // update["s_subscopeclass"] = "其它"
- // esUpdate["s_topscopeclass"] = "其它"
- // esUpdate["s_subscopeclass"] = "其它"
- // esUpdate["topscopeclass"] = []string{"其它"}
- // } else {
- // if subs, ok3 := tmp["subscopeclass"]; ok3 {
- // if subbs, ok4 := subs.([]interface{}); ok4 {
- // for _, v := range subbs {
- // sub := util.ObjToString(v)
- // if sub != "" && sub != "其它" {
- // resultSubs = append(resultSubs, sub)
- // }
- // }
- // }
- // }
- // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs)
- // update["topscopeclass"] = newTops
- // update["subscopeclass"] = newSubs
- // update["s_topscopeclass"] = strings.Join(cleanedTops, ",")
- // update["s_subscopeclass"] = strings.Join(newSubs, ",")
- // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",")
- // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",")
- // esUpdate["topscopeclass"] = newTops
- // }
- //
- //} else {
- // update["topscopeclass"] = []string{"其它"}
- // update["subscopeclass"] = []string{"其它"}
- // update["s_topscopeclass"] = "其它"
- // update["s_subscopeclass"] = "其它"
- // esUpdate["s_topscopeclass"] = "其它"
- // esUpdate["s_subscopeclass"] = "其它"
- // esUpdate["topscopeclass"] = []string{"其它"}
- //}
- //
- ////procurementlist 处理预计采购时间
- ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
- //// field := "procurementlist"
- //// if tmp[field] != nil {
- //// if field == "procurementlist" {
- //// if tmp["procurementlist"] != nil {
- //// var arr []interface{}
- //// plist := tmp["procurementlist"].([]interface{})
- //// for _, p := range plist {
- //// p1 := p.(map[string]interface{})
- //// p2 := make(map[string]interface{})
- //// for k, v := range BiddingLevelField[field] {
- //// if k == "projectname" && util.ObjToString(p1[k]) == "" {
- //// p2[k] = util.ObjToString(tmp["projectname"])
- //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
- //// p2[k] = util.ObjToString(tmp["buyer"])
- //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
- //// res := getMethod(util.ObjToString(p1[k]))
- //// if res != 0 {
- //// p2[k] = res
- //// }
- //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
- //// p2[k] = p1[k]
- //// }
- ////
- //// }
- //// arr = append(arr, p2)
- //// }
- //// if len(arr) > 0 {
- //// esUpdate[field] = arr
- //// }
- //// }
- //// }
- //// }
- ////}
- //
- //if len(update) > 0 {
- // //fmt.Println("aaaaa", biddingID)
- // //更新mongo
- // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- // //更新MongoDB
- // updatePool <- []map[string]interface{}{
- // {"_id": tmp["_id"]},
- // {"$set": update},
- // }
- //
- // //2.es 项目 更新字段
- // //err := Es.UpdateDocument("bidding", biddingID, update)
- // //if err != nil && err.Error() != "Document not updated: noop" {
- // // log.Info("bidding es update err", err, biddingID)
- // //}
- // //// 更新es
- // //updateEsPool <- []map[string]interface{}{
- // // {"_id": biddingID},
- // // update,
- // //}
- //}
- // 更新Es 数据
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- esUpdate,
- }
- }
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // dealBiddingAi 处理qfw_ai 数据库bidding 数据
- func dealBiddingAi() {
- defer util.Catch()
- sess := MgoBAi.GetMgoConn()
- defer MgoBAi.DestoryMongoConn(sess)
- it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- fmt.Println("current:", count)
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- update := map[string]interface{}{}
- //if budget, ok := tmp["budget"]; ok && budget != nil {
- // update["budget"] = budget
- //}
- if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil {
- update["bidamount"] = bidamount
- } else {
- update["bidamount"] = 0.0
- }
- //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil {
- // update["projectcode"] = projectcode
- //}
- if len(update) > 0 {
- MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //2.es 项目 更新字段
- err := Es.UpdateDocument("bidding_ai", biddingID, update)
- if err != nil && err.Error() != "Document not updated: noop" {
- log.Info("bidding es update err", err, biddingID)
- }
- }
- }
- fmt.Println("over ----------- over ")
- }
- func dealBiddingByEs() {
- //url := "http://172.17.4.184:19908"
- url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "bidding" //索引名称
- //index := "projectset" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := es7.NewClient(
- es7.SetURL(url),
- es7.SetBasicAuth(username, password),
- es7.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- query := es7.NewBoolQuery()
- query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800))
- query.MustNot(es7.NewExistsQuery("s_topscopeclass"))
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := es7.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- fmt.Printf("解析文档失败:%s", err)
- continue
- }
- //delete(doc, "filetext")
- //delete(doc, "detail")
- //
- ////存入新表
- //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
- //if err != nil {
- // fmt.Println("error", doc["id"])
- //}
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- fmt.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- fmt.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- fmt.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- // dealBiddingTest 处理测试环境数据
- func dealBiddingTest() {
- defer util.Catch()
- sess := MgoT.GetMgoConn()
- defer MgoT.DestoryMongoConn(sess)
- it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- update := map[string]interface{}{}
- // 2.更新中标单位,中标金额
- //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil {
- // update["tag_topinformation"] = tag_topinformation
- //}
- //
- //if property_form, ok := tmp["property_form"]; ok && property_form != nil {
- // update["property_form"] = property_form
- //}
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- /**
- "s_subscopeclass" : "其它",
- "s_topscopeclass" : "其它",
- "subscopeclass" : [
- "其它"
- ],
- "topscopeclass" : [
- "其它"
- ],
- */
- // 行业分类默认值
- if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil {
- update["topscopeclass"] = []string{"其它"}
- update["s_topscopeclass"] = "其它"
- }
- if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil {
- update["subscopeclass"] = []string{"其它"}
- update["s_subscopeclass"] = "其它"
- }
- if util.ObjToString(tmp["s_topscopeclass"]) == "其它" {
- update["topscopeclass"] = []string{"其它"}
- update["s_topscopeclass"] = "其它"
- }
- if util.ObjToString(tmp["s_subscopeclass"]) == "其它" {
- update["subscopeclass"] = []string{"其它"}
- update["s_subscopeclass"] = "其它"
- }
- //procurementlist 处理预计采购时间
- if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil {
- for field, _ := range BiddingField {
- if tmp[field] != nil {
- if field == "procurementlist" {
- if tmp["procurementlist"] != nil {
- var arr []interface{}
- plist := tmp["procurementlist"].([]interface{})
- for _, p := range plist {
- p1 := p.(map[string]interface{})
- p2 := make(map[string]interface{})
- for k, v := range BiddingLevelField[field] {
- if k == "projectname" && util.ObjToString(p1[k]) == "" {
- p2[k] = util.ObjToString(tmp["projectname"])
- } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" {
- p2[k] = util.ObjToString(tmp["buyer"])
- } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" {
- res := getMethod(util.ObjToString(p1[k]))
- if res != 0 {
- p2[k] = res
- }
- } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v {
- p2[k] = p1[k]
- }
- }
- arr = append(arr, p2)
- }
- if len(arr) > 0 {
- update[field] = arr
- }
- }
- }
- }
- }
- }
- if len(update) > 0 {
- fmt.Println("aaaaa", biddingID)
- //更新mongo
- //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //更新MongoDB
- //updatePool <- []map[string]interface{}{
- // {"_id": tmp["_id"]},
- // {"$set": update},
- //}
- //2.es 项目 更新字段
- //err := Es.UpdateDocument("bidding", biddingID, update)
- //if err != nil && err.Error() != "Document not updated: noop" {
- // log.Info("bidding es update err", err, biddingID)
- //}
- // 更新es
- //updateEsPool <- []map[string]interface{}{
- // {"_id": biddingID},
- // update,
- //}
- }
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- // updateEsMethod 更新es href 字段
- func updateEsHrefMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- Es.UpdateBulk("bidding_ai", arru...)
- Es.UpdateBulk("bidding_temporary", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding_customer", arru...)
- EsNew.UpdateBulk("bidding_free", arru...)
- EsNew.UpdateBulk("bidding_year", arru...)
- EsNew.UpdateBulk("bidding_all", arru...)
- EsNew.UpdateBulk("bidding_temporary", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- Es.UpdateBulk("bidding_ai", arru...)
- Es.UpdateBulk("bidding_temporary", arru...)
- EsNew.UpdateBulk("bidding", arru...)
- EsNew.UpdateBulk("bidding_customer", arru...)
- EsNew.UpdateBulk("bidding_free", arru...)
- EsNew.UpdateBulk("bidding_year", arru...)
- EsNew.UpdateBulk("bidding_all", arru...)
- EsNew.UpdateBulk("bidding_temporary", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- // updateProjectEsMethod 更新项目索引
- func updateProjectEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateProjectEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateProjectEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateProjectEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateProjectEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateProjectEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|