|
- package main
- import (
- "github.com/robfig/cron/v3"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "time"
- )
- var (
- MgoP *mongodb.MongodbSim
- MgoB *mongodb.MongodbSim
- Es *elastic.Elastic
- EsNew *elastic.Elastic
- logProject = "projectset_amount_logs" //记录项目表更新记录
- logBidding = "bidding_amount_logs" // 记录bidding表更新记录
- )
- func Init() {
- //mongodb
- MgoB = &mongodb.MongodbSim{
- //MongodbAddr: "172.17.189.140:27080",//老的集群
- MongodbAddr: "172.20.45.128:27080", // 迁移后的新地址
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- //
- //85
- MgoP = &mongodb.MongodbSim{
- //MongodbAddr: "127.0.0.1:27080",
- MongodbAddr: "172.17.4.85:27080",
- DbName: "qfw",
- Size: 10,
- //Direct: true,
- }
- MgoP.InitPool()
- //
- Es = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19908",
- S_esurl: "http://172.17.4.184:19908",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- Es.InitElasticSize()
- // es 新集群
- EsNew = &elastic.Elastic{
- //S_esurl: "http://127.0.0.1:19905",
- S_esurl: "http://172.17.4.184:19905",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- EsNew.InitElasticSize()
- }
- func main() {
- Init()
- spec := "0 00 01 * * *" // 每天01点执行;秒分时日月星期
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- eid, err := c.AddFunc(spec, dealIncData)
- if err != nil {
- log.Println("AddFunc err", err, eid)
- }
- c.Start()
- defer c.Stop()
- //
- select {}
- }
- // dealIncData 处理增量数据
- func dealIncData() {
- log.Println("开始处理增量数据")
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- now := time.Now()
- yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
- today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- q := map[string]interface{}{
- "pici": map[string]interface{}{
- "$gte": yesterday.Unix(),
- "$lt": today.Unix(),
- },
- //"ids": "6653e39c66cf0db42a619be7",
- }
- log.Println("q", q)
- query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Println("current:", count, tmp["projectname"])
- }
- budget := util.Float64All(tmp["budget"]) //预算
- bidamount := util.Float64All(tmp["bidamount"]) //中标金额
- bidstatus := util.ObjToString(tmp["bidstatus"])
- if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
- projectId := mongodb.BsonIdToSId(tmp["_id"])
- oldTmp := tmp //备份原来的数据
- if list, ok := tmp["list"].([]interface{}); ok {
- if budget == 0 {
- //通过list 字段,获取其中预算金额
- budget = getBudget(list)
- }
- newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
- if len(newList) > 0 && minBid > 0 {
- //1.备份数据
- oldTmp["logs"] = logs
- MgoP.SaveByOriID(logProject, oldTmp)
- //2.更新项目MongoDB数据
- update := make(map[string]interface{})
- if _, ok := tmp["bidamount"]; ok {
- update["bidamount"] = minBid
- }
- if _, ok := tmp["sortprice"]; ok {
- update["sortprice"] = minBid
- }
- update["list"] = newList
- MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
- //3.更新项目es
- Es.UpdateDocument("projectset", projectId, update)
- //4.更新对应的bidding数据
- if len(logs) > 0 {
- for _, log := range logs {
- if log.InfoID != "" { //对应bidding ID
- updateB := map[string]interface{}{
- "bidamount": log.UpdatedBid,
- }
- MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
- Es.UpdateDocument("bidding", log.InfoID, updateB)
- EsNew.UpdateDocument("bidding", log.InfoID, updateB)
- }
- }
- }
- }
- }
- }
- }
- log.Println("理增量数据处理完毕")
- }
- // dealAllData 处理存量数据
- func dealAllData() {
- log.Println("开始处理存量数据")
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- now := time.Now()
- yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
- q := map[string]interface{}{
- "pici": map[string]interface{}{
- "$lt": yesterday.Unix(),
- },
- }
- log.Println("q", q)
- query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Println("current:", count, tmp["projectname"])
- }
- budget := util.Float64All(tmp["budget"]) //预算
- bidamount := util.Float64All(tmp["bidamount"]) //中标金额
- bidstatus := util.ObjToString(tmp["bidstatus"])
- if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
- projectId := mongodb.BsonIdToSId(tmp["_id"])
- oldTmp := tmp //备份原来的数据
- if list, ok := tmp["list"].([]interface{}); ok {
- if budget == 0 {
- //通过list 字段,获取其中预算金额
- budget = getBudget(list)
- }
- newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
- if len(newList) > 0 && minBid > 0 {
- //1.备份数据
- oldTmp["logs"] = logs
- MgoP.SaveByOriID(logProject, oldTmp)
- //2.更新项目MongoDB数据
- update := make(map[string]interface{})
- if _, ok := tmp["bidamount"]; ok {
- update["bidamount"] = minBid
- }
- if _, ok := tmp["sortprice"]; ok {
- update["sortprice"] = minBid
- }
- update["list"] = newList
- MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
- //3.更新项目es
- Es.UpdateDocument("projectset", projectId, update)
- //4.更新对应的bidding数据
- if len(logs) > 0 {
- for _, log := range logs {
- if log.InfoID != "" { //对应bidding ID
- updateB := map[string]interface{}{
- "bidamount": log.UpdatedBid,
- }
- MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
- Es.UpdateDocument("bidding", log.InfoID, updateB)
- EsNew.UpdateDocument("bidding", log.InfoID, updateB)
- }
- }
- }
- }
- }
- }
- }
- log.Println("理增量数据处理完毕")
- }
|