package main import ( "github.com/robfig/cron/v3" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "time" ) var ( MgoP *mongodb.MongodbSim MgoB *mongodb.MongodbSim Es *elastic.Elastic EsNew *elastic.Elastic logProject = "projectset_amount_logs" //记录项目表更新记录 logBidding = "bidding_amount_logs" // 记录bidding表更新记录 ) func Init() { //mongodb MgoB = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() // //85 MgoP = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "qfw", Size: 10, //Direct: true, } MgoP.InitPool() // Es = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() // es 新集群 EsNew = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19905", S_esurl: "http://172.17.4.184:19905", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } EsNew.InitElasticSize() } func main() { Init() spec := "0 00 01 * * *" // 每天01点执行;秒分时日月星期 local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) eid, err := c.AddFunc(spec, dealIncData) if err != nil { log.Println("AddFunc err", err, eid) } c.Start() defer c.Stop() // select {} } // dealIncData 处理增量数据 func dealIncData() { log.Println("开始处理增量数据") sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) now := time.Now() yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) q := map[string]interface{}{ "pici": map[string]interface{}{ "$gte": yesterday.Unix(), "$lt": today.Unix(), }, //"ids": "6653e39c66cf0db42a619be7", } log.Println("q", q) query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["projectname"]) } budget := util.Float64All(tmp["budget"]) //预算 bidamount := util.Float64All(tmp["bidamount"]) //中标金额 bidstatus := util.ObjToString(tmp["bidstatus"]) if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" { projectId := mongodb.BsonIdToSId(tmp["_id"]) oldTmp := tmp //备份原来的数据 if list, ok := tmp["list"].([]interface{}); ok { if budget == 0 { //通过list 字段,获取其中预算金额 budget = getBudget(list) } newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount) if len(newList) > 0 && minBid > 0 { //1.备份数据 oldTmp["logs"] = logs MgoP.SaveByOriID(logProject, oldTmp) //2.更新项目MongoDB数据 update := make(map[string]interface{}) if _, ok := tmp["bidamount"]; ok { update["bidamount"] = minBid } if _, ok := tmp["sortprice"]; ok { update["sortprice"] = minBid } update["list"] = newList MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update}) //3.更新项目es Es.UpdateDocument("projectset", projectId, update) //4.更新对应的bidding数据 if len(logs) > 0 { for _, log := range logs { if log.InfoID != "" { //对应bidding ID updateB := map[string]interface{}{ "bidamount": log.UpdatedBid, } MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB}) Es.UpdateDocument("bidding", log.InfoID, updateB) EsNew.UpdateDocument("bidding", log.InfoID, updateB) } } } } } } } log.Println("理增量数据处理完毕") } // dealAllData 处理存量数据 func dealAllData() { log.Println("开始处理存量数据") sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) now := time.Now() yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) q := map[string]interface{}{ "pici": map[string]interface{}{ "$lt": yesterday.Unix(), }, } log.Println("q", q) query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["projectname"]) } budget := util.Float64All(tmp["budget"]) //预算 bidamount := util.Float64All(tmp["bidamount"]) //中标金额 bidstatus := util.ObjToString(tmp["bidstatus"]) if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" { projectId := mongodb.BsonIdToSId(tmp["_id"]) oldTmp := tmp //备份原来的数据 if list, ok := tmp["list"].([]interface{}); ok { if budget == 0 { //通过list 字段,获取其中预算金额 budget = getBudget(list) } newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount) if len(newList) > 0 && minBid > 0 { //1.备份数据 oldTmp["logs"] = logs MgoP.SaveByOriID(logProject, oldTmp) //2.更新项目MongoDB数据 update := make(map[string]interface{}) if _, ok := tmp["bidamount"]; ok { update["bidamount"] = minBid } if _, ok := tmp["sortprice"]; ok { update["sortprice"] = minBid } update["list"] = newList MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update}) //3.更新项目es Es.UpdateDocument("projectset", projectId, update) //4.更新对应的bidding数据 if len(logs) > 0 { for _, log := range logs { if log.InfoID != "" { //对应bidding ID updateB := map[string]interface{}{ "bidamount": log.UpdatedBid, } MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB}) Es.UpdateDocument("bidding", log.InfoID, updateB) EsNew.UpdateDocument("bidding", log.InfoID, updateB) } } } } } } } log.Println("理增量数据处理完毕") }