incre.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package main
  2. import (
  3. "github.com/olivere/elastic/v7"
  4. "go.uber.org/zap"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  6. "sync"
  7. )
  8. // dealProposedIncrement 处理增量数据
  9. func dealProposedIncrement() {
  10. // 1. 初始化 ES 客户端
  11. client, err := elastic.NewClient(
  12. elastic.SetURL(GF.Es.URL),
  13. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  14. elastic.SetSniff(false),
  15. )
  16. if err != nil {
  17. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  18. }
  19. // 2. 初始化 MongoDB 连接
  20. sess := MgoP.GetMgoConn()
  21. defer MgoP.DestoryMongoConn(sess)
  22. coll := sess.DB("qfw").C("projectset_proposed")
  23. query := map[string]interface{}{
  24. //"firsttime": map[string]interface{}{
  25. // "$gte": 1735660800,
  26. // "$lte": 1748102400,
  27. //},
  28. }
  29. iter := coll.Find(query).Select(nil).Iter()
  30. // 3. 并发控制
  31. const maxWorkers = 1
  32. taskCh := make(chan map[string]interface{}, 2000)
  33. var wg sync.WaitGroup
  34. // 4. 启动 worker 处理任务
  35. for i := 0; i < maxWorkers; i++ {
  36. wg.Add(1)
  37. go func() {
  38. defer wg.Done()
  39. for doc := range taskCh {
  40. if len(doc) == 0 {
  41. log.Info("aaa", zap.Any("client", client))
  42. }
  43. processOneProposed(doc, client)
  44. }
  45. }()
  46. }
  47. // 5. 逐条读取数据并派发任务
  48. log.Info("111111", zap.String("222222", "开始处理数据"))
  49. count := 0
  50. for doc := make(map[string]interface{}); iter.Next(doc); {
  51. count++
  52. if count%1000 == 0 {
  53. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"]))
  54. }
  55. //if util.ObjToString(doc["area"]) == "甘肃" {
  56. // continue
  57. //}
  58. taskCh <- cloneMap(doc) // 防止 map 重用
  59. }
  60. close(taskCh)
  61. wg.Wait()
  62. }