package main import ( "encoding/json" "field_sync/config" "field_sync/oss" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "reflect" "regexp" "sort" "strings" "time" ) var ( regLetter = regexp.MustCompile("[a-z]*") cityEndReg = regexp.MustCompile("(区|县|市)$") ) func biddingTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) if stype == "bidding" { uq := bson.M{"gtid": util.ObjToString(mapInfo["gtid"]), "lteid": util.ObjToString(mapInfo["lteid"])} MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true) } //领域标签处理的数据 id段 if stype == "bidding_history" { MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0}) } q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //extract库 extractConn := MgoE.GetMgoConn() defer MgoE.DestoryMongoConn(extractConn) extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{ "field_source": 0, "kvtext": 0, }).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} extCount, repeatCount := 0, 0 for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ { if util.IntAll(tmp["repeat"]) == 1 { repeatCount++ } tid := mongodb.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount)) //bidding库 biddingConn := MgoB.GetMgoConn() count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count() log.Info("bidding表", zap.Int64("同步总数:", count)) c := 0 if count < 500000 { var res []map[string]interface{} result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() for tmp := make(map[string]interface{}); result.Next(tmp); { res = append(res, tmp) tmp = make(map[string]interface{}) } MgoB.DestoryMongoConn(biddingConn) log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount)) c = doIndex(res, eMap, bkey, stype) } else { log.Info("查询结果", zap.Int64("数据量太大,放弃", count)) MgoB.DestoryMongoConn(biddingConn) } log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c)) NextNode(mapInfo, stype) NextNodePro(mapInfo, stype) NextNodeTidb(mapInfo, stype) if stype == "bidding_history" { NextNodeBidData(mapInfo) // bidding-data数据 NextNodeTidbQyxy(mapInfo) // tidb-企业数据 NextNodeHn(mapInfo) } } func biddingAllTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //extract库 extractConn := MgoE.GetMgoConn() defer MgoE.DestoryMongoConn(extractConn) extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{ "field_source": 0, "kvtext": 0, }).Sort("-_id").Iter() //bidding库 biddingConn := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(biddingConn) count := 0 var compare map[string]interface{} result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{ "contenthtml": 0, "field_source": 0, }).Sort("-_id").Iter() for tmp := make(map[string]interface{}); result.Next(tmp); count++ { update := map[string]interface{}{} del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段 //对比方法---------------- for { if compare == nil { compare = make(map[string]interface{}) if !extractResult.Next(compare) { break } } if compare != nil { cid := mongodb.BsonIdToSId(compare["_id"]) tid := mongodb.BsonIdToSId(tmp["_id"]) if cid == tid { //更新bidding表;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k := range tmpmodifyinfo { modifyinfo[k] = true } } for _, k := range config.Conf.Serve.FieldS { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 == nil && !modifyinfo[k] { if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } else if k == "s_topscopeclass" && del["topscopeclass"] == nil { continue } del[k] = 1 //util.Debug("抽取结果没有值,bidding有值:field--", k, del) } } if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 update["dataprocess"] = 7 update["repeat_id"] = compare["repeat_id"] } else { update["extracttype"] = 1 update["dataprocess"] = 8 } break } else { if cid < tid { compare = nil continue } else { break } } } else { break } } //------------------对比结束 //处理分类 if compare != nil { //extract fieldFun(compare, update) compare = nil } // entidlist extractMap := make(map[string]interface{}) if update["s_winner"] != "" { cid := companyFun(update) if len(cid) > 0 { update["entidlist"] = cid extractMap["entidlist"] = cid } } if len(extractMap) > 0 { updateExtPool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap}, } } // 附件有效字段 if i := validFile(tmp); i != 0 { if i == -1 { update["isValidFile"] = false } else { update["isValidFile"] = true } } if len(update) > 0 { if len(del) > 0 { //删除的字段 updateBidPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update, "$unset": del}, } } else { updateBidPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } } if count%50000 == 0 { log.Info("biddingTask", zap.Int("current", count)) } tmp = make(map[string]interface{}) } log.Info("biddingAll sync...over", zap.Int("all", count)) } func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int { syncNo := 0 //抽取表数据同步数量 //对比两张表数据,减少查询次数 var compare map[string]interface{} var bidUpdate [][]map[string]interface{} var extUpdate [][]map[string]interface{} //SaveEsLock := &sync.Mutex{} log.Info("start ...") for n, tmp := range infos { tid := mongodb.BsonIdToSId(tmp["_id"]) update := map[string]interface{}{} //要更新的mongo数据 del := map[string]interface{}{} //对比方法---------------- if eMap[tid] != nil { compare = eMap[tid] if stype == "bidding" { // 增量id段 正常数据 if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } if stype == "bidding_history" { //增量id段 历史数据 if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } syncNo++ for _, k := range config.Conf.Serve.FieldS { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } else if k == "s_topscopeclass" && del["topscopeclass"] == nil { continue } else if k == "city" || k == "district" { update[k] = "" } else { del[k] = 1 } } } // 附件重采,数据同步时不更新判重标识 if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 update["dataprocess"] = 7 update["repeat_id"] = compare["repeat_id"] } else { update["extracttype"] = 1 update["dataprocess"] = 8 } } else { compare = nil if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } update["dataprocess"] = 8 } //下面可以多线程跑的---> //处理分类 if compare != nil { //extract fieldFun(compare, update) // publishtime 20230523 if util.IntAll(tmp["publishtime"]) == -1 { if pb := methodPb(compare); pb > 0 { update["publishtime"] = pb } } compare = nil } //------------------对比结束 //处理key descript //if bkey == "" { // DealInfo(&tmp, &update) //} // entidlist extractMap := make(map[string]interface{}) if update["s_winner"] != "" { cid := companyFun(update) if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid extractMap["entidlist"] = cid } } // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表 typeFunc(tmp, update, extractMap) if len(extractMap) > 0 { if extractMap["toptype"] != nil && extractMap["subtype"] == nil { extUpdate = append(extUpdate, []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}}, }) } else { extUpdate = append(extUpdate, []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap}, }) } if len(extUpdate) >= MgoBulkSize { tmps := extUpdate MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...) extUpdate = [][]map[string]interface{}{} } } // 附件有效字段 if i := validFile(tmp); i != 0 { if i == -1 { tmp["isValidFile"] = false update["isValidFile"] = false } else { tmp["isValidFile"] = true update["isValidFile"] = true } } if len(update) > 0 { if len(del) > 0 { bidUpdate = append(bidUpdate, []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update, "$unset": del}, }) } else { bidUpdate = append(bidUpdate, []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, }) } if len(bidUpdate) >= MgoBulkSize { tmps := bidUpdate MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...) bidUpdate = [][]map[string]interface{}{} } } if n%500 == 0 { log.Info("biddingTask", zap.Int("current", n)) } tmp = make(map[string]interface{}) } //SaveEsLock.Lock() if len(bidUpdate) > 0 { tmps := bidUpdate MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...) bidUpdate = [][]map[string]interface{}{} } if len(extUpdate) > 0 { tmps := extUpdate MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...) extUpdate = [][]map[string]interface{}{} } //SaveEsLock.Unlock() return syncNo } // @Description subscopeclass、topscopeclass、package // 20230523 多包处理 subpackage = 1 // @Author J 2022/6/7 5:54 PM func fieldFun(compare, update map[string]interface{}) { subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = regLetter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } update["topscopeclass"] = topscopeclass update["s_topscopeclass"] = strings.Join(newclass, ",") } if package1 := compare["package"]; package1 != nil { packageM, _ := package1.(map[string]interface{}) update["package"] = packageM for _, p := range packageM { pm, _ := p.(map[string]interface{}) if util.ObjToString(pm["winner"]) != "" || util.Float64All(pm["budget"]) > 0 || util.Float64All(pm["bidamount"]) > 0 { update["multipackage"] = 1 break } } } else { update["multipackage"] = 0 } // subpackage if compare["package"] != nil && compare["s_winner"] != nil && compare["bidamount"] != nil { pg := compare["package"].(map[string]interface{}) if len(pg) > 1 { var bmt []float64 var swn []string for _, p := range pg { p1 := p.(map[string]interface{}) if p1["bidamount"] != nil { bmt = append(bmt, util.Float64All(p1["bidamount"])) } if w := util.ObjToString(p1["winner"]); w != "" { swn = append(swn) } } if len(bmt) > 1 && len(swn) > 1 { sn := strings.Split(util.ObjToString(compare["s_winner"]), ",") sort.Strings(sn) sort.Strings(swn) swn1 := util.ObjArrToStringArr(Duplicate(swn)) // 去重 if strings.Join(swn1, ",") == strings.Join(sn, ",") { bidamount := 0.0 for _, f := range bmt { bidamount += f } if bidamount == util.Float64All(compare["bidamount"]) { update["subpackage"] = 1 } } } } } } // @Description entidlist // @Author J 2022/6/7 2:36 PM func companyFun(tmp map[string]interface{}) (cid []string) { sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") for _, w := range sWinnerarr { if w != "" { id := redis.GetStr("qyxy_id", w) if id == "" { ents, _ := MgoQ.Find(config.Conf.DB.MongoQ.Coll, map[string]interface{}{"company_name": w}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{"company_name": 1}, false, -1, -1) if len(*ents) > 0 { id = util.ObjToString((*ents)[0]["_id"]) redis.PutCKV("qyxy_id", w, id) } else { ent, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, map[string]interface{}{"history_name": w}) if len(*ent) > 0 { id = util.ObjToString((*ent)["company_id"]) redis.PutCKV("qyxy_id", w, id) } } } if id == "" { id = "-" } cid = append(cid, id) } } return cid } // @Description update 修改bidding表,extractM修改抽取表 // @Author J 2022/6/10 10:29 AM func typeFunc(tmp, update, extractM map[string]interface{}) { if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok { if t := util.ObjToString(jyData["type"]); t != "" { switch t { //case "采购信息": case "招标公告": if util.ObjToString(tmp["toptype"]) != "招标" { update["toptype"] = "招标" extractM["toptype"] = "招标" delete(update, "subtype") } case "采购意向": if util.ObjToString(tmp["toptype"]) != "采购意向" { update["toptype"] = "采购意向" update["subtype"] = "采购意向" extractM["toptype"] = "采购意向" extractM["subtype"] = "采购意向" } case "招标预告": if util.ObjToString(tmp["toptype"]) != "预告" { update["toptype"] = "预告" extractM["toptype"] = "预告" delete(update, "subtype") } case "招标结果": if util.ObjToString(tmp["toptype"]) != "结果" { update["toptype"] = "结果" extractM["toptype"] = "结果" delete(update, "subtype") } } } } } // @Description 附件有效字段(isValidFile) // @Author J 2022/7/8 14:41 func validFile(tmp map[string]interface{}) int { isContinue := false if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o { if atts, o1 := pinfo["attachments"].(map[string]interface{}); o1 { for _, att := range atts { if att == nil { continue } if reflect.TypeOf(att).String() == "string" { continue } att1 := att.(map[string]interface{}) if fid := util.ObjToString(att1["fid"]); fid != "" { isContinue = true break } } if isContinue { if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o { if len(attachTxt) > 0 { for _, at := range attachTxt { at1 := at.(map[string]interface{}) if len(at1) > 0 { for k, _ := range at1 { if reflect.TypeOf(at1[k]).String() == "string" { continue } at2 := at1[k].(map[string]interface{}) s := strings.ToLower(util.ObjToString(at2["file_name"])) if !strings.Contains(s, "jpg") || !strings.Contains(s, "jpeg") != strings.Contains(s, "png") || strings.Contains(s, "pdf") { if strings.Contains(s, "swf") || strings.Contains(s, "html") { return -1 } else if AnalysisFile(oss.OssGetObject(util.ObjToString(at2["attach_url"]))) { return 1 } } } break } else { break } } } } flag := false for _, att := range atts { if att == nil { continue } if reflect.TypeOf(att).String() == "string" { continue } att1 := att.(map[string]interface{}) if fid := util.ObjToString(att1["fid"]); fid != "" { ftype := strings.ToLower(util.ObjToString(tmp["ftype"])) if ftype != "swf" && ftype != "html" && oss.OssObjExists("jy-datafile", fid) { return 1 } else { flag = true } } } if flag { return -1 } } } } return 0 } // @Description id不变,内容变化 重新索引数据 // @Author J 2022/8/10 13:29 func taskinfo(id string) { tmp, _ := MgoB.FindById("bidding", id, nil) if tmp == nil || len(*tmp) == 0 { log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id)) return } extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil) if extractM == nil || len(*extractM) == 0 { extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil) if extractM == nil || len(*extractM) == 0 { log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id)) return } } update := map[string]interface{}{} //要更新的mongo数据 //更新bidding表字段 for _, k := range config.Conf.Serve.FieldS { v1 := (*extractM)[k] //extract v2 := (*tmp)[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { if k == "city" || k == "district" { update[k] = "" } } } if util.IntAll((*extractM)["repeat"]) == 1 { update["extracttype"] = -1 update["dataprocess"] = 7 update["repeat_id"] = (*extractM)["repeat_id"] } else { update["extracttype"] = 1 update["dataprocess"] = 8 } //处理分类 fieldFun(*extractM, update) extractMap := make(map[string]interface{}) if util.ObjToString((*tmp)["s_winner"]) != "" { cid := companyFun(*tmp) if len(cid) > 0 { update["entidlist"] = cid extractMap["entidlist"] = cid } MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap}) } // 附件有效字段 if i := validFile(*tmp); i != 0 { if i == -1 { update["isValidFile"] = false } else { update["isValidFile"] = true } } if len(update) > 0 { MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update}) } mapinfo := map[string]interface{}{ "infoid": id, "stype": "index-by-id", } datas, _ := json.Marshal(mapinfo) var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas))) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } var DateTimeSelect = []string{"bidopentime", "bidendtime", "signaturedate", "comeintime"} // @Description 发布时间处理 // @Author J 2023/5/23 14:32 func methodPb(tmp map[string]interface{}) int64 { if tmp["ext_publishtime"] != nil { if newPb := util.Int64All(tmp["ext_publishtime"]); newPb < time.Now().Unix() && newPb > 1420041600 { return newPb } } for _, d := range DateTimeSelect { if tmp[d] != nil && util.Int64All(tmp[d]) < time.Now().Unix() { return util.Int64All(tmp[d]) } } return 0 } // Duplicate // @Description 去重 // @Author J 2023/5/24 09:53 func Duplicate(a interface{}) (ret []interface{}) { va := reflect.ValueOf(a) for i := 0; i < va.Len(); i++ { if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) { continue } ret = append(ret, va.Index(i).Interface()) } return ret }