main.go 6.5 KB


  1. package main
  2. import (
  3. "github.com/robfig/cron/v3"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "time"
  9. )
  10. var (
  11. MgoP *mongodb.MongodbSim
  12. MgoB *mongodb.MongodbSim
  13. Es *elastic.Elastic
  14. EsNew *elastic.Elastic
  15. logProject = "projectset_amount_logs" //记录项目表更新记录
  16. logBidding = "bidding_amount_logs" // 记录bidding表更新记录
  17. )
  18. func Init() {
  19. //mongodb
  20. MgoB = &mongodb.MongodbSim{
  21. //MongodbAddr: "172.17.189.140:27080",//老的集群
  22. MongodbAddr: "172.20.45.128:27080", // 迁移后的新地址
  23. //MongodbAddr: "127.0.0.1:27083",
  24. DbName: "qfw",
  25. Size: 10,
  26. UserName: "SJZY_RWbid_ES",
  27. Password: "SJZY@B4i4D5e6S",
  28. //Direct: true,
  29. }
  30. MgoB.InitPool()
  31. //
  32. //85
  33. MgoP = &mongodb.MongodbSim{
  34. //MongodbAddr: "127.0.0.1:27080",
  35. MongodbAddr: "172.17.4.85:27080",
  36. DbName: "qfw",
  37. Size: 10,
  38. //Direct: true,
  39. }
  40. MgoP.InitPool()
  41. //
  42. Es = &elastic.Elastic{
  43. //S_esurl: "http://127.0.0.1:19908",
  44. S_esurl: "http://172.17.4.184:19908",
  45. I_size: 5,
  46. Username: "jybid",
  47. Password: "Top2023_JEB01i@31",
  48. }
  49. Es.InitElasticSize()
  50. // es 新集群
  51. EsNew = &elastic.Elastic{
  52. //S_esurl: "http://127.0.0.1:19905",
  53. S_esurl: "http://172.17.4.184:19905",
  54. I_size: 5,
  55. Username: "jybid",
  56. Password: "Top2023_JEB01i@31",
  57. }
  58. EsNew.InitElasticSize()
  59. }
  60. func main() {
  61. Init()
  62. spec := "0 00 01 * * *" // 每天01点执行;秒分时日月星期
  63. local, _ := time.LoadLocation("Asia/Shanghai")
  64. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  65. eid, err := c.AddFunc(spec, dealIncData)
  66. if err != nil {
  67. log.Println("AddFunc err", err, eid)
  68. }
  69. c.Start()
  70. defer c.Stop()
  71. //
  72. select {}
  73. }
  74. // dealIncData 处理增量数据
  75. func dealIncData() {
  76. log.Println("开始处理增量数据")
  77. sess := MgoP.GetMgoConn()
  78. defer MgoP.DestoryMongoConn(sess)
  79. now := time.Now()
  80. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  81. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  82. q := map[string]interface{}{
  83. "pici": map[string]interface{}{
  84. "$gte": yesterday.Unix(),
  85. "$lt": today.Unix(),
  86. },
  87. //"ids": "6653e39c66cf0db42a619be7",
  88. }
  89. log.Println("q", q)
  90. query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
  91. count := 0
  92. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  93. if count%1000 == 0 {
  94. log.Println("current:", count, tmp["projectname"])
  95. }
  96. budget := util.Float64All(tmp["budget"]) //预算
  97. bidamount := util.Float64All(tmp["bidamount"]) //中标金额
  98. bidstatus := util.ObjToString(tmp["bidstatus"])
  99. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
  100. projectId := mongodb.BsonIdToSId(tmp["_id"])
  101. oldTmp := tmp //备份原来的数据
  102. if list, ok := tmp["list"].([]interface{}); ok {
  103. if budget == 0 {
  104. //通过list 字段,获取其中预算金额
  105. budget = getBudget(list)
  106. }
  107. newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
  108. if len(newList) > 0 && minBid > 0 {
  109. //1.备份数据
  110. oldTmp["logs"] = logs
  111. MgoP.SaveByOriID(logProject, oldTmp)
  112. //2.更新项目MongoDB数据
  113. update := make(map[string]interface{})
  114. if _, ok := tmp["bidamount"]; ok {
  115. update["bidamount"] = minBid
  116. }
  117. if _, ok := tmp["sortprice"]; ok {
  118. update["sortprice"] = minBid
  119. }
  120. update["list"] = newList
  121. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  122. //3.更新项目es
  123. Es.UpdateDocument("projectset", projectId, update)
  124. //4.更新对应的bidding数据
  125. if len(logs) > 0 {
  126. for _, log := range logs {
  127. if log.InfoID != "" { //对应bidding ID
  128. updateB := map[string]interface{}{
  129. "bidamount": log.UpdatedBid,
  130. }
  131. MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
  132. Es.UpdateDocument("bidding", log.InfoID, updateB)
  133. EsNew.UpdateDocument("bidding", log.InfoID, updateB)
  134. }
  135. }
  136. }
  137. }
  138. }
  139. }
  140. }
  141. log.Println("理增量数据处理完毕")
  142. }
  143. // dealAllData 处理存量数据
  144. func dealAllData() {
  145. log.Println("开始处理存量数据")
  146. sess := MgoP.GetMgoConn()
  147. defer MgoP.DestoryMongoConn(sess)
  148. now := time.Now()
  149. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  150. q := map[string]interface{}{
  151. "pici": map[string]interface{}{
  152. "$lt": yesterday.Unix(),
  153. },
  154. }
  155. log.Println("q", q)
  156. query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
  157. count := 0
  158. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  159. if count%1000 == 0 {
  160. log.Println("current:", count, tmp["projectname"])
  161. }
  162. budget := util.Float64All(tmp["budget"]) //预算
  163. bidamount := util.Float64All(tmp["bidamount"]) //中标金额
  164. bidstatus := util.ObjToString(tmp["bidstatus"])
  165. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
  166. projectId := mongodb.BsonIdToSId(tmp["_id"])
  167. oldTmp := tmp //备份原来的数据
  168. if list, ok := tmp["list"].([]interface{}); ok {
  169. if budget == 0 {
  170. //通过list 字段,获取其中预算金额
  171. budget = getBudget(list)
  172. }
  173. newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
  174. if len(newList) > 0 && minBid > 0 {
  175. //1.备份数据
  176. oldTmp["logs"] = logs
  177. MgoP.SaveByOriID(logProject, oldTmp)
  178. //2.更新项目MongoDB数据
  179. update := make(map[string]interface{})
  180. if _, ok := tmp["bidamount"]; ok {
  181. update["bidamount"] = minBid
  182. }
  183. if _, ok := tmp["sortprice"]; ok {
  184. update["sortprice"] = minBid
  185. }
  186. update["list"] = newList
  187. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  188. //3.更新项目es
  189. Es.UpdateDocument("projectset", projectId, update)
  190. //4.更新对应的bidding数据
  191. if len(logs) > 0 {
  192. for _, log := range logs {
  193. if log.InfoID != "" { //对应bidding ID
  194. updateB := map[string]interface{}{
  195. "bidamount": log.UpdatedBid,
  196. }
  197. MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
  198. Es.UpdateDocument("bidding", log.InfoID, updateB)
  199. EsNew.UpdateDocument("bidding", log.InfoID, updateB)
  200. }
  201. }
  202. }
  203. }
  204. }
  205. }
  206. }
  207. log.Println("理增量数据处理完毕")
  208. }