main.go 6.5 KB


  1. package main
  2. import (
  3. "github.com/robfig/cron/v3"
  4. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "time"
  9. )
  10. var (
  11. MgoP *mongodb.MongodbSim
  12. MgoB *mongodb.MongodbSim
  13. Es *elastic.Elastic
  14. EsNew *elastic.Elastic
  15. logProject = "projectset_amount_logs" //记录项目表更新记录
  16. logBidding = "bidding_amount_logs" // 记录bidding表更新记录
  17. )
  18. func Init() {
  19. //mongodb
  20. MgoB = &mongodb.MongodbSim{
  21. MongodbAddr: "172.17.189.140:27080",
  22. //MongodbAddr: "127.0.0.1:27083",
  23. DbName: "qfw",
  24. Size: 10,
  25. UserName: "SJZY_RWbid_ES",
  26. Password: "SJZY@B4i4D5e6S",
  27. //Direct: true,
  28. }
  29. MgoB.InitPool()
  30. //
  31. //85
  32. MgoP = &mongodb.MongodbSim{
  33. //MongodbAddr: "127.0.0.1:27080",
  34. MongodbAddr: "172.17.4.85:27080",
  35. DbName: "qfw",
  36. Size: 10,
  37. //Direct: true,
  38. }
  39. MgoP.InitPool()
  40. //
  41. Es = &elastic.Elastic{
  42. //S_esurl: "http://127.0.0.1:19908",
  43. S_esurl: "http://172.17.4.184:19908",
  44. I_size: 5,
  45. Username: "jybid",
  46. Password: "Top2023_JEB01i@31",
  47. }
  48. Es.InitElasticSize()
  49. // es 新集群
  50. EsNew = &elastic.Elastic{
  51. //S_esurl: "http://127.0.0.1:19905",
  52. S_esurl: "http://172.17.4.184:19905",
  53. I_size: 5,
  54. Username: "jybid",
  55. Password: "Top2023_JEB01i@31",
  56. }
  57. EsNew.InitElasticSize()
  58. }
  59. func main() {
  60. Init()
  61. spec := "0 00 01 * * *" // 每天01点执行;秒分时日月星期
  62. local, _ := time.LoadLocation("Asia/Shanghai")
  63. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  64. eid, err := c.AddFunc(spec, dealIncData)
  65. if err != nil {
  66. log.Println("AddFunc err", err, eid)
  67. }
  68. c.Start()
  69. defer c.Stop()
  70. //
  71. select {}
  72. }
  73. // dealIncData 处理增量数据
  74. func dealIncData() {
  75. log.Println("开始处理增量数据")
  76. sess := MgoP.GetMgoConn()
  77. defer MgoP.DestoryMongoConn(sess)
  78. now := time.Now()
  79. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  80. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  81. q := map[string]interface{}{
  82. "pici": map[string]interface{}{
  83. "$gte": yesterday.Unix(),
  84. "$lt": today.Unix(),
  85. },
  86. //"ids": "6653e39c66cf0db42a619be7",
  87. }
  88. log.Println("q", q)
  89. query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
  90. count := 0
  91. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  92. if count%1000 == 0 {
  93. log.Println("current:", count, tmp["projectname"])
  94. }
  95. budget := util.Float64All(tmp["budget"]) //预算
  96. bidamount := util.Float64All(tmp["bidamount"]) //中标金额
  97. bidstatus := util.ObjToString(tmp["bidstatus"])
  98. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
  99. projectId := mongodb.BsonIdToSId(tmp["_id"])
  100. oldTmp := tmp //备份原来的数据
  101. if list, ok := tmp["list"].([]interface{}); ok {
  102. if budget == 0 {
  103. //通过list 字段,获取其中预算金额
  104. budget = getBudget(list)
  105. }
  106. newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
  107. if len(newList) > 0 && minBid > 0 {
  108. //1.备份数据
  109. oldTmp["logs"] = logs
  110. MgoP.SaveByOriID(logProject, oldTmp)
  111. //2.更新项目MongoDB数据
  112. update := make(map[string]interface{})
  113. if _, ok := tmp["bidamount"]; ok {
  114. update["bidamount"] = minBid
  115. }
  116. if _, ok := tmp["sortprice"]; ok {
  117. update["sortprice"] = minBid
  118. }
  119. update["list"] = newList
  120. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  121. //3.更新项目es
  122. Es.UpdateDocument("projectset", projectId, update)
  123. //4.更新对应的bidding数据
  124. if len(logs) > 0 {
  125. for _, log := range logs {
  126. if log.InfoID != "" { //对应bidding ID
  127. updateB := map[string]interface{}{
  128. "bidamount": log.UpdatedBid,
  129. }
  130. MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
  131. Es.UpdateDocument("bidding", log.InfoID, updateB)
  132. EsNew.UpdateDocument("bidding", log.InfoID, updateB)
  133. }
  134. }
  135. }
  136. }
  137. }
  138. }
  139. }
  140. log.Println("理增量数据处理完毕")
  141. }
  142. // dealAllData 处理存量数据
  143. func dealAllData() {
  144. log.Println("开始处理存量数据")
  145. sess := MgoP.GetMgoConn()
  146. defer MgoP.DestoryMongoConn(sess)
  147. now := time.Now()
  148. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  149. q := map[string]interface{}{
  150. "pici": map[string]interface{}{
  151. "$lt": yesterday.Unix(),
  152. },
  153. }
  154. log.Println("q", q)
  155. query := sess.DB("qfw").C("projectset_20230904").Find(q).Select(nil).Iter()
  156. count := 0
  157. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  158. if count%1000 == 0 {
  159. log.Println("current:", count, tmp["projectname"])
  160. }
  161. budget := util.Float64All(tmp["budget"]) //预算
  162. bidamount := util.Float64All(tmp["bidamount"]) //中标金额
  163. bidstatus := util.ObjToString(tmp["bidstatus"])
  164. if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "单一" {
  165. projectId := mongodb.BsonIdToSId(tmp["_id"])
  166. oldTmp := tmp //备份原来的数据
  167. if list, ok := tmp["list"].([]interface{}); ok {
  168. if budget == 0 {
  169. //通过list 字段,获取其中预算金额
  170. budget = getBudget(list)
  171. }
  172. newList, _, _, minBid, _, logs := ProcessBids(list, projectId, budget, bidamount)
  173. if len(newList) > 0 && minBid > 0 {
  174. //1.备份数据
  175. oldTmp["logs"] = logs
  176. MgoP.SaveByOriID(logProject, oldTmp)
  177. //2.更新项目MongoDB数据
  178. update := make(map[string]interface{})
  179. if _, ok := tmp["bidamount"]; ok {
  180. update["bidamount"] = minBid
  181. }
  182. if _, ok := tmp["sortprice"]; ok {
  183. update["sortprice"] = minBid
  184. }
  185. update["list"] = newList
  186. MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update})
  187. //3.更新项目es
  188. Es.UpdateDocument("projectset", projectId, update)
  189. //4.更新对应的bidding数据
  190. if len(logs) > 0 {
  191. for _, log := range logs {
  192. if log.InfoID != "" { //对应bidding ID
  193. updateB := map[string]interface{}{
  194. "bidamount": log.UpdatedBid,
  195. }
  196. MgoB.UpdateById("bidding", log.InfoID, map[string]interface{}{"$set": updateB})
  197. Es.UpdateDocument("bidding", log.InfoID, updateB)
  198. EsNew.UpdateDocument("bidding", log.InfoID, updateB)
  199. }
  200. }
  201. }
  202. }
  203. }
  204. }
  205. }
  206. log.Println("理增量数据处理完毕")
  207. }