123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package main
- import (
- "encoding/json"
- "esindex/config"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "sync"
- )
- //attachmentBiddingTask 附件补采入库es
- func attachmentBiddingTask(mapInfo map[string]interface{}, other config.OthersData) {
- defer util.Catch()
- var MgoOther *mongodb.MongodbSim
- //初始化MongoDB
- MgoOther = &mongodb.MongodbSim{
- MongodbAddr: other.MgoAddr,
- DbName: other.MgoDB,
- Size: 10,
- UserName: other.MgoUsername,
- Password: other.MgoPassword,
- }
- MgoOther.InitPool()
- log.Info("attachmentBiddingTask", zap.Any("MgoOther", MgoOther))
- stype := util.ObjToString(mapInfo["stype"])
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- //针对gte/lte,单独转换
- q = convertToMongoID(q)
- }
- ch := make(chan bool, 10)
- wg := &sync.WaitGroup{}
- //bidding库
- biddingConn := MgoOther.GetMgoConn()
- count, _ := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Count()
- log.Info(other.MgoColl, zap.Int64("同步总数:", count))
- it := biddingConn.DB(MgoOther.DbName).C(other.MgoColl).Find(&q).Select(map[string]interface{}{
- "contenthtml": 0,
- }).Iter()
- c1, index := 0, 0
- var indexLock sync.Mutex
- for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
- if c1%1000 == 0 {
- log.Info("attachmentBiddingTask", zap.Int("current:", c1))
- log.Info("attachmentBiddingTask", zap.Any("current:_id =>", tmp["_id"]))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- //只针对增量数据处理;全量数据 需要用extracttype字段判断
- //7: 重复数据
- //8: 不重复
- if util.IntAll(tmp["dataprocess"]) != 8 {
- return
- }
- //// 增量数据使用上面判断;全量数据使用下面配置
- //-1:重复 ,1:不重复 ,0:入库 9:分类
- //if util.IntAll(tmp["extracttype"]) != 1 {
- // return
- //}
- //针对产权数据,暂时不入es 索引库
- if util.IntAll(tmp["infoformat"]) == 3 {
- return
- }
- /**
- 数据抽取时,有的数据的发布时间是之前的,属于增量历史数据,在判重和同步到bidding表是,会添加history_updatetime
- 字段,所以下面判断才会处理
- */
- if stype == "bidding_history" && tmp["history_updatetime"] == nil {
- return
- }
- indexLock.Lock()
- index++
- indexLock.Unlock()
- newTmp, update := GetEsField(tmp, stype)
- newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
- //针对中国政府采购网,单独处理
- if util.ObjToString(tmp["site"]) == "中国政府采购网" {
- objectType := MatchService(tmp)
- if objectType != "" {
- newTmp["object_type"] = objectType
- }
- }
- if len(update) > 0 {
- updateBiddingPool <- []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": update},
- }
- }
- // 这里直接放入存 es库 bidding的通道
- saveEsPool <- newTmp
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("attachmentBiddingTask over", zap.Int("count", c1), zap.Int("index", index))
- if other.NextAddr != "" {
- //发送udp,附件补采 才需要
- data := map[string]interface{}{
- //"stype": "update",
- "gtid": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "lteid": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- }
- //udp 传递的信息
- for k, v := range other.Data {
- data[k] = v
- }
- //下个udp 地址信息
- target := &net.UDPAddr{
- Port: other.NextPort,
- IP: net.ParseIP(other.NextAddr),
- }
- bytes, _ := json.Marshal(data)
- err := UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- if err != nil {
- log.Info("attachmentBiddingTask ", zap.Any("WriteUdp err", err), zap.Any("target", target))
- }
- log.Info("attachmentBiddingTask ", zap.Any("target", target), zap.Any("data", data))
- }
- }
|