main.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package main
  2. import (
  3. "context"
  4. "github.com/robfig/cron/v3"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.mongodb.org/mongo-driver/mongo/options"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. MgoB *mongodb.MongodbSim
  16. MgoP *mongodb.MongodbSim
  17. columns = make([]map[string]interface{}, 0) //存储配置 栏目
  18. // 更新mongo
  19. updatePool = make(chan []map[string]interface{}, 5000)
  20. )
  21. func main() {
  22. go updateMethod()
  23. local, _ := time.LoadLocation("Asia/Shanghai")
  24. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  25. eid, err := c.AddFunc(GF.Cron.Spec, dealData)
  26. if err != nil {
  27. log.Info("main", zap.Any("AddFunc err", err))
  28. }
  29. log.Info("main", zap.Any("eid", eid))
  30. c.Start()
  31. defer c.Stop()
  32. //
  33. select {}
  34. }
  35. func dealData() {
  36. go dealBidding()
  37. go dealProject()
  38. }
  39. //dealBidding 处理标讯数据
  40. func dealBidding() {
  41. sess := MgoB.GetMgoConn()
  42. defer MgoB.DestoryMongoConn(sess)
  43. var q interface{}
  44. var startTime = GF.Cron.Start
  45. now := time.Now()
  46. ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
  47. coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
  48. find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1})
  49. if startTime != 0 && GF.Cron.End != 0 {
  50. q = map[string]interface{}{
  51. "comeintime": map[string]interface{}{
  52. "$gt": GF.Cron.Start,
  53. "$lte": GF.Cron.End,
  54. },
  55. }
  56. } else if startTime != 0 {
  57. q = map[string]interface{}{
  58. "comeintime": map[string]interface{}{
  59. "$gt": startTime,
  60. },
  61. }
  62. } else if startTime == 0 && GF.Cron.End == 0 {
  63. //默认 取大于 昨天的数据
  64. q = map[string]interface{}{
  65. "comeintime": map[string]interface{}{
  66. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  67. },
  68. }
  69. }
  70. cur, err := coll.Find(ctx, q, find)
  71. if err != nil {
  72. log.Error("dealBidding,coll.Find ", zap.Error(err))
  73. }
  74. log.Info("dealBidding", zap.Any("q", q))
  75. //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  76. // "contenthtml": 0}).Iter()
  77. count := 0
  78. ch := make(chan bool, 15)
  79. wg := &sync.WaitGroup{}
  80. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  81. for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
  82. if cur != nil {
  83. cur.Decode(&tmp)
  84. }
  85. startTime = util.IntAll(tmp["comeintime"])
  86. if count%1000 == 0 {
  87. log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
  88. }
  89. ch <- true
  90. wg.Add(1)
  91. go func(tmp map[string]interface{}) {
  92. defer func() {
  93. <-ch
  94. wg.Done()
  95. }()
  96. rea := TagBidding(tmp)
  97. if len(rea) > 0 {
  98. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  99. //update := map[string]interface{}{
  100. // "nav_column": reb,
  101. //}
  102. //where := map[string]interface{}{
  103. // "_id": tmp["_id"],
  104. //}
  105. updatePool <- []map[string]interface{}{
  106. {"_id": tmp["_id"]},
  107. {"$set": bson.M{
  108. "nav_column": reb,
  109. }},
  110. }
  111. //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
  112. }
  113. }(tmp)
  114. tmp = map[string]interface{}{}
  115. }
  116. wg.Wait()
  117. log.Info("dealBidding", zap.Int("over ", count))
  118. //没有数据时,发送邮件
  119. if count == 0 {
  120. SendMail("每日数据监控", "查询数据为空,请处理")
  121. }
  122. }
  123. //dealProject 处理拟建项目数据标签
  124. func dealProject() {
  125. sess := MgoP.GetMgoConn()
  126. defer MgoP.DestoryMongoConn(sess)
  127. // 指定对应的时间格式
  128. //layout := "2006-01-02 15:04:05"
  129. // 获取当前时间
  130. now := time.Now()
  131. var q interface{}
  132. var startTime = GF.Cron.Start
  133. if startTime != 0 && GF.Cron.End != 0 {
  134. q = map[string]interface{}{
  135. "pici": map[string]interface{}{
  136. "$gt": GF.Cron.Start,
  137. "$lte": GF.Cron.End,
  138. },
  139. }
  140. } else if startTime != 0 {
  141. q = map[string]interface{}{
  142. "pici": map[string]interface{}{
  143. "$gt": startTime,
  144. },
  145. }
  146. } else if startTime == 0 && GF.Cron.End == 0 {
  147. //默认 取大于 昨天的数据
  148. q = map[string]interface{}{
  149. "pici": map[string]interface{}{
  150. "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
  151. },
  152. }
  153. }
  154. log.Info("dealProject", zap.Any("q", q))
  155. query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{
  156. "ids": 0, "list": 0}).Iter()
  157. count := 0
  158. ch := make(chan bool, 15)
  159. wg := &sync.WaitGroup{}
  160. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  161. if count%1000 == 0 {
  162. log.Info("dealProject", zap.Int("current", count))
  163. }
  164. ch <- true
  165. wg.Add(1)
  166. go func(tmp map[string]interface{}) {
  167. defer func() {
  168. <-ch
  169. wg.Done()
  170. }()
  171. rea := TagProject(tmp)
  172. if len(rea) > 0 {
  173. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  174. update := map[string]interface{}{
  175. "nav_column": reb,
  176. }
  177. where := map[string]interface{}{
  178. "_id": tmp["_id"],
  179. }
  180. MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
  181. }
  182. }(tmp)
  183. tmp = map[string]interface{}{}
  184. }
  185. wg.Wait()
  186. log.Info("dealProject", zap.Int("over ", count))
  187. //没有数据时,发送邮件
  188. if count == 0 {
  189. SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
  190. }
  191. }
  192. // updateMethod 更新MongoDB
  193. func updateMethod() {
  194. updateSp := make(chan bool, 8)
  195. arru := make([][]map[string]interface{}, 200)
  196. indexu := 0
  197. for {
  198. select {
  199. case v := <-updatePool:
  200. arru[indexu] = v
  201. indexu++
  202. if indexu == 200 {
  203. updateSp <- true
  204. go func(arru [][]map[string]interface{}) {
  205. defer func() {
  206. <-updateSp
  207. }()
  208. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  209. }(arru)
  210. arru = make([][]map[string]interface{}, 200)
  211. indexu = 0
  212. }
  213. case <-time.After(1000 * time.Millisecond):
  214. if indexu > 0 {
  215. updateSp <- true
  216. go func(arru [][]map[string]interface{}) {
  217. defer func() {
  218. <-updateSp
  219. }()
  220. MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
  221. }(arru[:indexu])
  222. arru = make([][]map[string]interface{}, 200)
  223. indexu = 0
  224. }
  225. }
  226. }
  227. }