|
@@ -43,45 +43,45 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
//extract库
|
|
|
- extractConn := MgoE.GetMgoConn()
|
|
|
- defer MgoE.DestoryMongoConn(extractConn)
|
|
|
- extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
|
|
|
- "field_source": 0,
|
|
|
- "kvtext": 0,
|
|
|
- }).Sort("_id").Iter()
|
|
|
- eMap := map[string]map[string]interface{}{}
|
|
|
- extCount, repeatCount := 0, 0
|
|
|
- for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
|
|
|
- if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
- repeatCount++
|
|
|
- }
|
|
|
- tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
- eMap[tid] = tmp
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
- log.Println("抽取表 数据量", extCount, "重复数据量", repeatCount)
|
|
|
+ // extractConn := MgoE.GetMgoConn()
|
|
|
+ // defer MgoE.DestoryMongoConn(extractConn)
|
|
|
+ // extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
|
|
|
+ // "field_source": 0,
|
|
|
+ // "kvtext": 0,
|
|
|
+ // }).Sort("_id").Iter()
|
|
|
+ // eMap := map[string]map[string]interface{}{}
|
|
|
+ // extCount, repeatCount := 0, 0
|
|
|
+ // for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
|
|
|
+ // if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
+ // repeatCount++
|
|
|
+ // }
|
|
|
+ // tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ // eMap[tid] = tmp
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // }
|
|
|
+ // log.Println("抽取表 数据量", extCount, "重复数据量", repeatCount)
|
|
|
|
|
|
//bidding库
|
|
|
biddingConn := MgoB.GetMgoConn()
|
|
|
count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
|
|
|
log.Println("bidding表 同步总数:", count)
|
|
|
c := 0
|
|
|
- if count < 500000 {
|
|
|
- var res []map[string]interface{}
|
|
|
- result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
|
|
|
- "contenthtml": 0,
|
|
|
- }).Iter()
|
|
|
- for tmp := make(map[string]interface{}); result.Next(tmp); {
|
|
|
- res = append(res, tmp)
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
- MgoB.DestoryMongoConn(biddingConn)
|
|
|
- log.Println("查询结果 bidding", count, "抽取:", extCount)
|
|
|
- c = doIndex(res, eMap, bkey, stype)
|
|
|
- } else {
|
|
|
- log.Println("查询结果 数据量太大,放弃", count)
|
|
|
- MgoB.DestoryMongoConn(biddingConn)
|
|
|
+ // if count < 500000 {
|
|
|
+ var res []map[string]interface{}
|
|
|
+ result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
|
|
|
+ "contenthtml": 0,
|
|
|
+ }).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); result.Next(tmp); {
|
|
|
+ res = append(res, tmp)
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
}
|
|
|
+ MgoB.DestoryMongoConn(biddingConn)
|
|
|
+ // log.Println("查询结果 bidding", count, "抽取:", extCount)
|
|
|
+ c = doIndex(res, bkey, stype)
|
|
|
+ // } else {
|
|
|
+ // log.Println("查询结果 数据量太大,放弃", count)
|
|
|
+ // MgoB.DestoryMongoConn(biddingConn)
|
|
|
+ // }
|
|
|
log.Println("bidding sync...over all", count, "extract sync ", c)
|
|
|
NextNode(mapInfo, stype)
|
|
|
// NextNodePro(mapInfo, stype)
|
|
@@ -255,7 +255,7 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("biddingAll sync...over all", count)
|
|
|
}
|
|
|
|
|
|
-func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
|
|
|
+func doIndex(infos []map[string]interface{}, bkey, stype string) int {
|
|
|
syncNo := 0 //抽取表数据同步数量
|
|
|
//对比两张表数据,减少查询次数
|
|
|
var compare map[string]interface{}
|
|
@@ -267,9 +267,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
update := map[string]interface{}{} //要更新的mongo数据
|
|
|
del := map[string]interface{}{}
|
|
|
+ edata, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, tid, nil)
|
|
|
//对比方法----------------
|
|
|
- if eMap[tid] != nil {
|
|
|
- compare = eMap[tid]
|
|
|
+ if edata != nil && len(*edata) > 0 {
|
|
|
+ compare = *edata
|
|
|
if stype == "bidding" {
|
|
|
// 增量id段 正常数据
|
|
|
if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
|
|
@@ -277,7 +278,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
compare = nil
|
|
|
continue
|
|
|
}
|
|
|
- delete(eMap, tid)
|
|
|
+ // delete(eMap, tid)
|
|
|
}
|
|
|
if stype == "bidding_history" {
|
|
|
//增量id段 历史数据
|
|
@@ -286,7 +287,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
compare = nil
|
|
|
continue
|
|
|
}
|
|
|
- delete(eMap, tid)
|
|
|
+ // delete(eMap, tid)
|
|
|
}
|
|
|
syncNo++
|
|
|
log.Println("抽取区域 省", compare["area"], " 市 ", compare["city"], " 区 ", compare["district"], " id ", tid)
|
|
@@ -397,7 +398,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
// 2024-02-21 徐志恒 情报标签字段
|
|
|
toptype := util.ObjToString(tmp["toptype"])
|
|
|
subtype := util.ObjToString(tmp["subtype"])
|
|
|
- buyerclass := util.ObjToString(tmp["buyerclass"])
|
|
|
+ buyerclass := util.ObjToString(compare["buyerclass"])
|
|
|
if buyerclass != "" {
|
|
|
update["buyer_type"] = getStr(buyerclass)
|
|
|
}
|