package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "app.yhyue.com/data_processing/common_utils/udp" "encoding/json" "field_sync/config" "field_sync/oss" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "net" "reflect" "regexp" "strings" "time" ) var ( regLetter = regexp.MustCompile("[a-z]*") ) func biddingTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() stype := util.ObjToString(mapInfo["stype"]) if stype == "bidding" { uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])}, "lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}} MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true) } // 领域标签处理的数据 id段 if stype == "bidding_history" { MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0}) } q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //extract库 extractConn := MgoE.GetMgoConn() defer MgoE.DestoryMongoConn(extractConn) extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{ "field_source": 0, "kvtext": 0, }).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} extCount, repeatCount := 0, 0 for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ { if util.IntAll(tmp["repeat"]) == 1 { repeatCount++ } tid := mongodb.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount)) //bidding库 biddingConn := MgoB.GetMgoConn() count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count() log.Info("bidding表", zap.Int64("同步总数:", count)) c := 0 if count < 500000 { var res []map[string]interface{} result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() for tmp := make(map[string]interface{}); result.Next(tmp); { res = append(res, tmp) tmp = make(map[string]interface{}) } MgoB.DestoryMongoConn(biddingConn) log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount)) c = doIndex(res, eMap, bkey, stype) } else { log.Info("查询结果", zap.Int64("数据量太大,放弃", count)) MgoB.DestoryMongoConn(biddingConn) } log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c)) NextNode(mapInfo, stype) NextNodePro(mapInfo, stype) NextNodeTidb(mapInfo, stype) if stype == "bidding_history" { NextNodeBidData(mapInfo) // bidding-data数据 NextNodeTidbQyxy(mapInfo) // tidb-企业数据 NextNodeHn(mapInfo) } } func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int { syncNo := 0 //抽取表数据同步数量 //对比两张表数据,减少查询次数 var compare map[string]interface{} var bidUpdate [][]map[string]interface{} var extUpdate [][]map[string]interface{} //SaveEsLock := &sync.Mutex{} log.Info("start ...") for n, tmp := range infos { tid := mongodb.BsonIdToSId(tmp["_id"]) update := map[string]interface{}{} //要更新的mongo数据 //对比方法---------------- if eMap[tid] != nil { compare = eMap[tid] if stype == "bidding" { // 增量id段 正常数据 if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } if stype == "bidding_history" { //增量id段 历史数据 if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } syncNo++ for _, k := range config.Conf.Serve.FieldS { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { if k == "city" || k == "district" { update[k] = "" } } } if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 update["dataprocess"] = 7 } else { update["extracttype"] = 1 update["dataprocess"] = 8 } } else { compare = nil if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } update["dataprocess"] = 8 } //下面可以多线程跑的---> //处理分类 if compare != nil { //extract fieldFun(compare, update) compare = nil } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } // entidlist extractMap := make(map[string]interface{}) if update["s_winner"] != "" { cid := companyFun(update) if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid extractMap["entidlist"] = cid } } // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表 typeFunc(tmp, update, extractMap) if len(extractMap) > 0 { if extractMap["toptype"] != nil && extractMap["subtype"] == nil { //updateExtPool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}}, //} extUpdate = append(extUpdate, []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}}, }) } else { //updateExtPool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": extractMap}, //} extUpdate = append(extUpdate, []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap}, }) } if len(extUpdate) >= MgoBulkSize { tmps := extUpdate MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...) extUpdate = [][]map[string]interface{}{} } } // 附件有效字段 if i := validFile(tmp); i != 0 { if i == -1 { tmp["isValidFile"] = false update["isValidFile"] = false } else { tmp["isValidFile"] = true update["isValidFile"] = true } } if len(update) > 0 { //SaveEsLock.Lock() bidUpdate = append(bidUpdate, []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, }) if len(bidUpdate) >= MgoBulkSize { tmps := bidUpdate MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...) bidUpdate = [][]map[string]interface{}{} } //SaveEsLock.Unlock() //updateBidPool <- []map[string]interface{}{{ // "_id": tmp["_id"], //}, // {"$set": update}, //} } if n%500 == 0 { log.Info("biddingTask", zap.Int("current", n)) } tmp = make(map[string]interface{}) } //SaveEsLock.Lock() if len(bidUpdate) > 0 { tmps := bidUpdate MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...) bidUpdate = [][]map[string]interface{}{} } if len(extUpdate) > 0 { tmps := extUpdate MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...) extUpdate = [][]map[string]interface{}{} } //SaveEsLock.Unlock() return syncNo } // @Description subscopeclass、topscopeclass、package // @Author J 2022/6/7 5:54 PM func fieldFun(compare, update map[string]interface{}) { subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = regLetter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } update["topscopeclass"] = topscopeclass update["s_topscopeclass"] = strings.Join(newclass, ",") } if package1 := compare["package"]; package1 != nil { packageM, _ := package1.(map[string]interface{}) update["package"] = packageM for _, p := range packageM { pm, _ := p.(map[string]interface{}) if util.ObjToString(pm["winner"]) != "" || util.Float64All(pm["budget"]) > 0 || util.Float64All(pm["bidamount"]) > 0 { update["multipackage"] = 1 break } } } else { update["multipackage"] = 0 } } // @Description entidlist // @Author J 2022/6/7 2:36 PM func companyFun(tmp map[string]interface{}) (cid []string) { sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") for _, w := range sWinnerarr { if w != "" { id := redis.GetStr("qyxy_id", w) if id == "" { ents, _ := MgoQ.Find(config.Conf.DB.MongoQ.Coll, map[string]interface{}{"company_name": w}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{"company_name": 1}, false, -1, -1) if len(*ents) > 0 { id = util.ObjToString((*ents)[0]["_id"]) redis.PutCKV("qyxy_id", w, id) } else { ent, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, map[string]interface{}{"history_name": w}) if len(*ent) > 0 { id = util.ObjToString((*ent)["company_id"]) redis.PutCKV("qyxy_id", w, id) } } } if id == "" { id = "-" } cid = append(cid, id) } } return cid } // @Description update 修改bidding表,extractM修改抽取表 // @Author J 2022/6/10 10:29 AM func typeFunc(tmp, update, extractM map[string]interface{}) { if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok { if t := util.ObjToString(jyData["type"]); t != "" { switch t { //case "采购信息": case "招标公告": if util.ObjToString(tmp["toptype"]) != "招标" { update["toptype"] = "招标" extractM["toptype"] = "招标" delete(update, "subtype") } case "采购意向": if util.ObjToString(tmp["toptype"]) != "采购意向" { update["toptype"] = "采购意向" update["subtype"] = "采购意向" extractM["toptype"] = "采购意向" extractM["subtype"] = "采购意向" } case "招标预告": if util.ObjToString(tmp["toptype"]) != "预告" { update["toptype"] = "预告" extractM["toptype"] = "预告" delete(update, "subtype") } case "招标结果": if util.ObjToString(tmp["toptype"]) != "结果" { update["toptype"] = "结果" extractM["toptype"] = "结果" delete(update, "subtype") } } } } } // @Description 附件有效字段(isValidFile) // @Author J 2022/7/8 14:41 func validFile(tmp map[string]interface{}) int { isContinue := false if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o { if atts, o1 := pinfo["attachments"].(map[string]interface{}); o1 { for _, att := range atts { if att == nil { continue } if reflect.TypeOf(att).String() == "string" { continue } att1 := att.(map[string]interface{}) if fid := util.ObjToString(att1["fid"]); fid != "" { isContinue = true break } } if isContinue { if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o { if len(attachTxt) > 0 { for _, at := range attachTxt { at1 := at.(map[string]interface{}) if len(at1) > 0 { for k, _ := range at1 { if reflect.TypeOf(at1[k]).String() == "string" { continue } at2 := at1[k].(map[string]interface{}) s := strings.ToLower(util.ObjToString(at2["file_name"])) if !strings.Contains(s, "jpg") || !strings.Contains(s, "jpeg") != strings.Contains(s, "png") || strings.Contains(s, "pdf") { if strings.Contains(s, "swf") || strings.Contains(s, "html") { return -1 } else if AnalysisFile(oss.OssGetObject(util.ObjToString(at2["attach_url"]))) { return 1 } } } break } else { break } } } } flag := false for _, att := range atts { if att == nil { continue } if reflect.TypeOf(att).String() == "string" { continue } att1 := att.(map[string]interface{}) if fid := util.ObjToString(att1["fid"]); fid != "" { ftype := strings.ToLower(util.ObjToString(tmp["ftype"])) if ftype != "swf" && ftype != "html" && oss.OssObjExists("jy-datafile", fid) { return 1 } else { flag = true } } } if flag { return -1 } } } } return 0 } // @Description id不变,内容变化 重新索引数据 // @Author J 2022/8/10 13:29 func taskinfo(id string) { tmp, _ := MgoB.FindById("bidding", id, nil) if tmp == nil || len(*tmp) == 0 { log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id)) return } extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil) if extractM == nil || len(*extractM) == 0 { extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil) if extractM == nil || len(*extractM) == 0 { log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id)) return } } update := map[string]interface{}{} //要更新的mongo数据 //更新bidding表字段 for _, k := range config.Conf.Serve.FieldS { v1 := (*extractM)[k] //extract v2 := (*tmp)[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { if k == "city" || k == "district" { update[k] = "" } } } if util.IntAll((*extractM)["repeat"]) == 1 { update["extracttype"] = -1 update["dataprocess"] = 7 } else { update["extracttype"] = 1 update["dataprocess"] = 8 } //处理分类 fieldFun(*extractM, update) extractMap := make(map[string]interface{}) if util.ObjToString((*tmp)["s_winner"]) != "" { cid := companyFun(*tmp) if len(cid) > 0 { update["entidlist"] = cid extractMap["entidlist"] = cid } MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap}) } // 附件有效字段 if i := validFile(*tmp); i != 0 { if i == -1 { update["isValidFile"] = false } else { update["isValidFile"] = true } } if len(update) > 0 { MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update}) } mapinfo := map[string]interface{}{ "infoid": id, "stype": "index-by-id", } datas, _ := json.Marshal(mapinfo) var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas))) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) }