main.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/robfig/cron/v3"
  5. "go.uber.org/zap"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. MgoB *mongodb.MongodbSim
  13. MgoP *mongodb.MongodbSim
  14. )
  15. func main() {
  16. local, _ := time.LoadLocation("Asia/Shanghai")
  17. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  18. eid, err := c.AddFunc(GF.Cron.Spec, test)
  19. if err != nil {
  20. log.Info("main", zap.Any("AddFunc err", err))
  21. }
  22. log.Info("main", zap.Any("eid", eid))
  23. c.Start()
  24. defer c.Stop()
  25. select {}
  26. }
  27. func test() {
  28. fmt.Println("aaa")
  29. }
  30. //dealBidding 处理标讯数据
  31. func dealBidding() {
  32. sess := MgoB.GetMgoConn()
  33. defer MgoB.DestoryMongoConn(sess)
  34. // 指定对应的时间格式
  35. //layout := "2006-01-02 15:04:05"
  36. // 获取当前时间
  37. now := time.Now()
  38. //targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 04, 20, 0, 0, now.Location())
  39. targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
  40. todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
  41. q := map[string]interface{}{
  42. "comeintime": map[string]interface{}{
  43. "$gt": targetTime.Unix(),
  44. "$lte": todayTime.Unix(),
  45. },
  46. }
  47. log.Info("dealBidding", zap.Any("q", q))
  48. query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
  49. "contenthtml": 0}).Iter()
  50. count := 0
  51. ch := make(chan bool, 10)
  52. wg := &sync.WaitGroup{}
  53. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  54. if count%1000 == 0 {
  55. log.Info("dealBidding", zap.Int("current", count))
  56. }
  57. ch <- true
  58. wg.Add(1)
  59. go func(tmp map[string]interface{}) {
  60. defer func() {
  61. <-ch
  62. wg.Done()
  63. }()
  64. //saveBidding(tmp)
  65. }(tmp)
  66. tmp = map[string]interface{}{}
  67. }
  68. wg.Wait()
  69. log.Info("dealBidding", zap.Int("over ", count))
  70. //没有数据时,发送邮件
  71. if count == 0 {
  72. SendMail("每日数据监控", "查询数据为空,请处理")
  73. }
  74. }