package main import ( "github.com/olivere/elastic/v7" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "sync" ) // dealProposedIncrement 处理增量数据 func dealProposedIncrement() { // 1. 初始化 ES 客户端 client, err := elastic.NewClient( elastic.SetURL(GF.Es.URL), elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password), elastic.SetSniff(false), ) if err != nil { log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err)) } // 2. 初始化 MongoDB 连接 sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) coll := sess.DB("qfw").C("projectset_proposed") query := map[string]interface{}{ //"firsttime": map[string]interface{}{ // "$gte": 1735660800, // "$lte": 1748102400, //}, } iter := coll.Find(query).Select(nil).Iter() // 3. 并发控制 const maxWorkers = 1 taskCh := make(chan map[string]interface{}, 2000) var wg sync.WaitGroup // 4. 启动 worker 处理任务 for i := 0; i < maxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for doc := range taskCh { if len(doc) == 0 { log.Info("aaa", zap.Any("client", client)) } processOneProposed(doc, client) } }() } // 5. 逐条读取数据并派发任务 log.Info("111111", zap.String("222222", "开始处理数据")) count := 0 for doc := make(map[string]interface{}); iter.Next(doc); { count++ if count%1000 == 0 { log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"])) } //if util.ObjToString(doc["area"]) == "甘肃" { // continue //} taskCh <- cloneMap(doc) // 防止 map 重用 } close(taskCh) wg.Wait() }