|
@@ -13,10 +13,10 @@ import (
|
|
|
"strings"
|
|
|
"time"
|
|
|
|
|
|
+ "log"
|
|
|
+
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
- "go.uber.org/zap"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
|
|
@@ -31,14 +31,6 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
defer util.Catch()
|
|
|
|
|
|
stype := util.ObjToString(mapInfo["stype"])
|
|
|
- if stype == "bidding" {
|
|
|
- uq := bson.M{"gtid": util.ObjToString(mapInfo["gtid"]), "lteid": util.ObjToString(mapInfo["lteid"])}
|
|
|
- MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
|
|
|
- }
|
|
|
- //领域标签处理的数据 id段
|
|
|
- if stype == "bidding_history" {
|
|
|
- MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
|
|
|
- }
|
|
|
|
|
|
q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
bkey, _ := mapInfo["bkey"].(string)
|
|
@@ -67,12 +59,12 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
eMap[tid] = tmp
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount))
|
|
|
+ log.Println("抽取表 数据量", extCount, "重复数据量", repeatCount)
|
|
|
|
|
|
//bidding库
|
|
|
biddingConn := MgoB.GetMgoConn()
|
|
|
count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
|
|
|
- log.Info("bidding表", zap.Int64("同步总数:", count))
|
|
|
+ log.Println("bidding表 同步总数:", count)
|
|
|
c := 0
|
|
|
if count < 500000 {
|
|
|
var res []map[string]interface{}
|
|
@@ -84,13 +76,13 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
MgoB.DestoryMongoConn(biddingConn)
|
|
|
- log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount))
|
|
|
+ log.Println("查询结果 bidding", count, "抽取:", extCount)
|
|
|
c = doIndex(res, eMap, bkey, stype)
|
|
|
} else {
|
|
|
- log.Info("查询结果", zap.Int64("数据量太大,放弃", count))
|
|
|
+ log.Println("查询结果 数据量太大,放弃", count)
|
|
|
MgoB.DestoryMongoConn(biddingConn)
|
|
|
}
|
|
|
- log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
|
|
|
+ log.Println("bidding sync...over all", count, "extract sync ", c)
|
|
|
NextNode(mapInfo, stype)
|
|
|
NextNodePro(mapInfo, stype)
|
|
|
NextNodeTidb(mapInfo, stype)
|
|
@@ -99,6 +91,14 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
NextNodeTidbQyxy(mapInfo) // tidb-企业数据
|
|
|
NextNodeHn(mapInfo)
|
|
|
}
|
|
|
+ if stype == "bidding" {
|
|
|
+ uq := bson.M{"gtid": util.ObjToString(mapInfo["gtid"]), "lteid": util.ObjToString(mapInfo["lteid"])}
|
|
|
+ MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
|
|
|
+ }
|
|
|
+ //领域标签处理的数据 id段
|
|
|
+ if stype == "bidding_history" {
|
|
|
+ MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
|
|
@@ -240,12 +240,12 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
if count%50000 == 0 {
|
|
|
- log.Info("biddingTask", zap.Int("current", count))
|
|
|
+ log.Println("biddingTask current", count)
|
|
|
}
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
|
|
|
- log.Info("biddingAll sync...over", zap.Int("all", count))
|
|
|
+ log.Println("biddingAll sync...over all", count)
|
|
|
}
|
|
|
|
|
|
func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
|
|
@@ -255,7 +255,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
var bidUpdate [][]map[string]interface{}
|
|
|
var extUpdate [][]map[string]interface{}
|
|
|
//SaveEsLock := &sync.Mutex{}
|
|
|
- log.Info("start ...")
|
|
|
+ log.Println("start ...")
|
|
|
for n, tmp := range infos {
|
|
|
tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
update := map[string]interface{}{} //要更新的mongo数据
|
|
@@ -282,7 +282,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
delete(eMap, tid)
|
|
|
}
|
|
|
syncNo++
|
|
|
- log.Info("抽取区域", zap.Any("省", compare["area"]), zap.Any("市", compare["city"]), zap.Any("区", compare["district"]), zap.Any("id", tid))
|
|
|
+ log.Println("抽取区域 省", compare["area"], " 市 ", compare["city"], " 区 ", compare["district"], " id ", tid)
|
|
|
for _, k := range config.Conf.Serve.FieldS {
|
|
|
v1 := compare[k] //extract
|
|
|
v2 := tmp[k] //bidding
|
|
@@ -382,12 +382,16 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
// 2024-02-21 徐志恒 情报标签字段
|
|
|
toptype := util.ObjToString(tmp["toptype"])
|
|
|
subtype := util.ObjToString(tmp["subtype"])
|
|
|
+ buyerclass := util.ObjToString(tmp["buyerclass"])
|
|
|
+ if buyerclass != "" {
|
|
|
+ update["buyer_type"] = getStr(buyerclass)
|
|
|
+ }
|
|
|
s_topscopeclass := util.ObjToString(update["s_topscopeclass"])
|
|
|
- if (tmp["tag_topinformation"] != nil && (subtype == "合同" || subtype == "中标" || subtype == "成交")) || (tmp["tag_topinformation"] == nil && toptype == "拟建" && strings.Contains(s_topscopeclass, "建筑工程")) {
|
|
|
+ if (tmp["tag_topinformation"] != nil && (subtype == "合同" || subtype == "中标" || subtype == "成交" || subtype == "采购意向" || toptype == "招标")) || (tmp["tag_topinformation"] == nil && toptype == "拟建" && strings.Contains(s_topscopeclass, "建筑工程")) {
|
|
|
update["tag_set"] = getTagSet(tmp, compare)
|
|
|
}
|
|
|
if len(update) > 0 {
|
|
|
- log.Info("保存bidding区域", zap.Any("省", update["area"]), zap.Any("市", update["city"]), zap.Any("区", update["district"]), zap.Any("id", tid))
|
|
|
+ log.Println("保存bidding区域 省", update["area"], " 市 ", update["city"], " 区 ", update["district"], " id ", tid)
|
|
|
if len(del) > 0 {
|
|
|
bidUpdate = append(bidUpdate, []map[string]interface{}{{
|
|
|
"_id": tmp["_id"],
|
|
@@ -408,7 +412,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
}
|
|
|
}
|
|
|
if n%500 == 0 {
|
|
|
- log.Info("biddingTask", zap.Int("current", n))
|
|
|
+ log.Println("biddingTask current ", n)
|
|
|
}
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
@@ -656,14 +660,14 @@ func validFile(tmp map[string]interface{}) int {
|
|
|
func taskinfo(id string) {
|
|
|
tmp, _ := MgoB.FindById("bidding", id, nil)
|
|
|
if tmp == nil || len(*tmp) == 0 {
|
|
|
- log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
|
|
|
+ log.Println(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
|
|
|
return
|
|
|
}
|
|
|
extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil)
|
|
|
if extractM == nil || len(*extractM) == 0 {
|
|
|
extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil)
|
|
|
if extractM == nil || len(*extractM) == 0 {
|
|
|
- log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
|
|
|
+ log.Println(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -726,7 +730,7 @@ func taskinfo(id string) {
|
|
|
IP: net.ParseIP(config.Conf.Udp.Next.Addr),
|
|
|
Port: util.IntAll(config.Conf.Udp.Next.Port),
|
|
|
}
|
|
|
- log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
|
|
|
+ log.Println("nsq data over es ", next, " mapinfo ", string(datas))
|
|
|
_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
|
|
|
}
|
|
|
|
|
@@ -770,6 +774,7 @@ func getTagSet(tmp, compare map[string]interface{}) map[string]map[string]interf
|
|
|
buyer := util.ObjToString(compare["buyer"])
|
|
|
publishtime := util.Int64All(tmp["publishtime"])
|
|
|
bidamount := util.Float64All(compare["bidamount"])
|
|
|
+ toptype := util.ObjToString(tmp["toptype"])
|
|
|
subtype := util.ObjToString(tmp["subtype"])
|
|
|
if subtype == "合同" {
|
|
|
wuye["isfirsthand"] = 62
|
|
@@ -834,6 +839,8 @@ func getTagSet(tmp, compare map[string]interface{}) map[string]map[string]interf
|
|
|
}
|
|
|
}
|
|
|
wuye["period"] = getperiod(compare)
|
|
|
+ } else if toptype == "招标" || toptype == "采购意向" {
|
|
|
+ bidamount = util.Float64All(compare["budget"])
|
|
|
}
|
|
|
if tmp["projectinfo"] != nil {
|
|
|
projectInfo := util.ObjToMap(tmp["projectinfo"])
|
|
@@ -941,3 +948,28 @@ func calculateYearDifference(startTime int64, endTime int64) float64 {
|
|
|
years := duration.Hours() / 24 / 365
|
|
|
return years
|
|
|
}
|
|
|
+
|
|
|
+func getStr(b string) string {
|
|
|
+ if b == "" {
|
|
|
+ return "其它"
|
|
|
+ }
|
|
|
+ a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
|
|
|
+ a2 := "(卫健委|医疗)"
|
|
|
+ a3 := "(教育|学校)"
|
|
|
+ a4 := "(人行|金融业)"
|
|
|
+ a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
|
|
|
+ if strings.Contains(a1, b) {
|
|
|
+ return "政府机构"
|
|
|
+ } else if strings.Contains(a2, b) {
|
|
|
+ return "医疗单位"
|
|
|
+ } else if strings.Contains(a3, b) {
|
|
|
+ return "教育单位"
|
|
|
+ } else if strings.Contains(a4, b) {
|
|
|
+ return "金融企业"
|
|
|
+ } else if strings.Contains(a5, b) {
|
|
|
+ return "商业公司"
|
|
|
+ } else {
|
|
|
+ return "其它"
|
|
|
+ }
|
|
|
+ return "其它"
|
|
|
+}
|