package extract import ( "data_ai/ul" log "github.com/donnie4w/go-logger/logger" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "sync" ) // 获取已存在数据··· func getExistsInfo() map[string]interface{} { log.Debug("开始构建已存在数据···") sess := ul.SourceMgo.GetMgoConn() defer ul.SourceMgo.DestoryMongoConn(sess) dict := map[string]interface{}{} q, total := map[string]interface{}{}, 0 it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%100000 == 0 { log.Debug("cur ai index ", total, tmp["_id"]) } tmpid := ul.BsonTOStringId(tmp["_id"]) dict[tmpid] = "" tmp = make(map[string]interface{}) } log.Debug("is exists ...", total, "~", len(dict)) return dict } // 识别结构化字段 func ExtractFullInfo(eid string) { q := map[string]interface{}{ "_id": map[string]interface{}{ "$lt": ul.StringTOBsonId(eid), }, } //6776b8000000000000000000 , 1月3日 log.Debug("刷历史存量数据语句:", q) ul.FlashModel = "glm-4-flash" pool_mgo := make(chan bool, ul.Reading) wg_mgo := &sync.WaitGroup{} sess := ul.BidMgo.GetMgoConn() defer ul.BidMgo.DestoryMongoConn(sess) total := 0 it := sess.DB(ul.BidMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("-_id").Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%5000 == 0 { log.Debug("cur ai index ", total, tmp["_id"]) } tmpid := ul.BsonTOStringId(tmp["_id"]) if tmpid == "" { tmp = make(map[string]interface{}) continue } pool_mgo <- true wg_mgo.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool_mgo wg_mgo.Done() }() infoformat := qu.IntAll(tmp["infoformat"]) if infoformat == 1 || infoformat == 0 { //正常数据处理··· u_id := ul.BsonTOStringId(tmp["_id"]) data := ResolveInfo(tmp, u_id) if len(data) > 0 && u_id != "" { tmp["ai_zhipu"] = data update_info := make(map[string]interface{}, 0) is_unset := ul.ChooseCheckDataAI(tmp, &update_info) if update_info["com_package"] == nil { //构建单包信息··· com_package := ul.CreatSingleFieldInfo(tmp, update_info) update_info["com_package"] = com_package } update_info["ai_zhipu"] = data //清洗与记录 if len(update_info) > 0 { //$set ul.SourceMgo.UpdateById(ul.Ext_Name, u_id, map[string]interface{}{ "$set": update_info, }) } if is_unset { //"$unset" ul.SourceMgo.UpdateById(ul.Ext_Name, u_id, map[string]interface{}{ "$unset": ul.Unset_Check, }) } } } }(tmp) tmp = make(map[string]interface{}) } wg_mgo.Wait() log.Debug("ai is over ...", total) } func MovingFullInfoCopy(sid string, eid string) { q := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": ul.StringTOBsonId(sid), }, } log.Debug("迁移语句:", q) pool_mgo := make(chan bool, ul.Reading) wg_mgo := &sync.WaitGroup{} sess := ul.SourceMgo.GetMgoConn() defer ul.SourceMgo.DestoryMongoConn(sess) total := 0 it := sess.DB(ul.SourceMgo.DbName).C(ul.Bid_Name).Find(&q).Sort("_id").Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Debug("cur move index ", total, tmp["_id"]) } pool_mgo <- true wg_mgo.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool_mgo wg_mgo.Done() }() delete(tmp, "detail") delete(tmp, "contenthtml") ul.SourceMgo.Save("bidding_copy", tmp) }(tmp) tmp = make(map[string]interface{}) } wg_mgo.Wait() log.Debug("move is over ...", total) }