123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package main
- import (
- "github.com/olivere/elastic/v7"
- "go.uber.org/zap"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "sync"
- )
- // dealProposedIncrement 处理增量数据
- func dealProposedIncrement() {
- // 1. 初始化 ES 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(GF.Es.URL),
- elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
- }
- // 2. 初始化 MongoDB 连接
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- coll := sess.DB("qfw").C("projectset_proposed")
- query := map[string]interface{}{
- //"firsttime": map[string]interface{}{
- // "$gte": 1735660800,
- // "$lte": 1748102400,
- //},
- }
- iter := coll.Find(query).Select(nil).Iter()
- // 3. 并发控制
- const maxWorkers = 1
- taskCh := make(chan map[string]interface{}, 2000)
- var wg sync.WaitGroup
- // 4. 启动 worker 处理任务
- for i := 0; i < maxWorkers; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for doc := range taskCh {
- if len(doc) == 0 {
- log.Info("aaa", zap.Any("client", client))
- }
- processOneProposed(doc, client)
- }
- }()
- }
- // 5. 逐条读取数据并派发任务
- log.Info("111111", zap.String("222222", "开始处理数据"))
- count := 0
- for doc := make(map[string]interface{}); iter.Next(doc); {
- count++
- if count%1000 == 0 {
- log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"]))
- }
- //if util.ObjToString(doc["area"]) == "甘肃" {
- // continue
- //}
- taskCh <- cloneMap(doc) // 防止 map 重用
- }
- close(taskCh)
- wg.Wait()
- }
|