package main import ( "encoding/json" "esindex/config" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "sync" ) //attachmentBiddingTask 附件补采入库es func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersData) { defer util.Catch() var MgoOther *mongodb.MongodbSim //初始化MongoDB MgoOther = &mongodb.MongodbSim{ MongodbAddr: other.MgoAddr, DbName: other.MgoDB, Size: 10, UserName: other.MgoUsername, Password: other.MgoPassword, } MgoOther.InitPool() log.Info("attachmentBiddingTask", zap.Any("MgoOther", MgoOther)) stype := util.ObjToString(mapInfo["stype"]) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } else { //针对gte/lte,单独转换 q = convertToMongoID(q) } ch := make(chan bool, 10) wg := &sync.WaitGroup{} //bidding库 biddingConn := MgoOther.GetMgoConn() count, _ := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Count() log.Info(other.MgoColl, zap.Int64("同步总数:", count)) it := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() c1, index := 0, 0 var indexLock sync.Mutex for tmp := make(map[string]interface{}); it.Next(tmp); c1++ { if c1%1000 == 0 { log.Info("attachmentBiddingTask", zap.Int("current:", c1)) log.Info("attachmentBiddingTask", zap.Any("current:_id =>", tmp["_id"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } //只针对增量数据处理;全量数据 需要用extracttype字段判断 //7: 重复数据 //8: 不重复 if util.IntAll(tmp["dataprocess"]) != 8 { return } //// 增量数据使用上面判断;全量数据使用下面配置 //-1:重复 ,1:不重复 ,0:入库 9:分类 //if util.IntAll(tmp["extracttype"]) != 1 { // return //} //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { return } /** 数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime 字段,所以下面判断才会处理 */ if stype == "bidding_history" && tmp["history_updatetime"] == nil { return } indexLock.Lock() index++ indexLock.Unlock() newTmp, update := GetEsField(tmp, stype) newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 //针对中国政府采购网,单独处理 if util.ObjToString(tmp["site"]) == "中国政府采购网" { objectType := MatchService(tmp) if objectType != "" { newTmp["object_type"] = objectType } } if len(update) > 0 { updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } saveEsPool <- newTmp }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("attachmentBiddingTask over", zap.Int("count", c1), zap.Int("index", index)) if other.NextAddr != "" { //发送udp,附件补采 才需要 data := map[string]interface{}{ //"stype": "update", "gtid": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), } //udp 传递的信息 for k, v := range other.Data { data[k] = v } //下个udp 地址信息 target := &net.UDPAddr{ Port: other.NextPort, IP: net.ParseIP(other.NextAddr), } bytes, _ := json.Marshal(data) err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) if err != nil { log.Info("attachmentBiddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target)) } log.Info("attachmentBiddingTask ", zap.Any("target", target), zap.Any("data", data)) } }