|
@@ -10,7 +10,6 @@ import (
|
|
|
"net"
|
|
|
"qfw/util"
|
|
|
"sort"
|
|
|
- "strconv"
|
|
|
"strings"
|
|
|
"time"
|
|
|
"unicode/utf8"
|
|
@@ -45,7 +44,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
"$lte": LtId,
|
|
|
},
|
|
|
}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1,
|
|
|
- "buyeraddr": 1,"buyerclass":1}).Iter()
|
|
|
+ "buyeraddr": 1, "buyerclass": 1}).Iter()
|
|
|
if cursor.Err() != nil {
|
|
|
SourceClient.DestoryMongoConn(SourceClientcc)
|
|
|
log.Println(cursor.Err())
|
|
@@ -57,14 +56,13 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
conn := HisRedisPool.Conn()
|
|
|
defer conn.Close()
|
|
|
//选择redis db
|
|
|
- redis_buyer_db, _ := strconv.Atoi(Config["redis_buyer_db"])
|
|
|
conn.Select(redis_buyer_db)
|
|
|
//遍历bidding表保存到redis
|
|
|
//key:企业名 value:json结构体{"buyer": 1, "buyertel": 1, "buyerperson": 1,"topscopeclass": 1, "buyeraddr": 1,"_id":1}
|
|
|
tmp := make(map[string]interface{})
|
|
|
for cursor.Next(&tmp) {
|
|
|
buyer, ok := tmp["buyer"].(string)
|
|
|
- if !ok || utf8.RuneCountInString(buyer)<4{
|
|
|
+ if !ok || utf8.RuneCountInString(buyer) < 4 {
|
|
|
continue
|
|
|
}
|
|
|
//判断redis key是否存在
|
|
@@ -103,14 +101,14 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
json.Unmarshal(redisvalueBytes, &rValuesMaps)
|
|
|
//redis查询是否存在
|
|
|
rdb := RedisPool.Get()
|
|
|
- rdb.Do("SELECT", Config["redis_buyer_db"])
|
|
|
+ rdb.Do("SELECT", redis_buyer_db)
|
|
|
if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
|
|
|
//redis不存在,存到临时表,定时任务处理
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
for _, vmap := range rValuesMaps {
|
|
|
vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
|
|
|
if err = FClient.SaveForOld("buyer_new", vmap); err != nil {
|
|
|
- log.Println("存量 FClient.Save err", err,vmap)
|
|
|
+ log.Println("存量 FClient.Save err", err, vmap)
|
|
|
}
|
|
|
}
|
|
|
//log.Println("get redis id err:定时任务处理", err, tmp)
|
|
@@ -126,8 +124,8 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
//拿到合并后的qyk
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, nil)
|
|
|
- if !b || oldTmp == nil {
|
|
|
- log.Println(redisCName, "存量 redis id 不存在", reply)
|
|
|
+ if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
|
|
|
+ log.Println(redisCName, "存量 redis id 不存在", reply,"数据:",oldTmp)
|
|
|
continue
|
|
|
}
|
|
|
tmpTopscopeclass := []string{}
|
|
@@ -148,20 +146,19 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
sort.Strings(tmpTopscopeclass)
|
|
|
esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
|
|
|
|
|
|
-
|
|
|
//更新buyerclass合并
|
|
|
if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
|
|
|
//无值,不更新
|
|
|
- }else {
|
|
|
- var buyerclass_new,buyerclass_old string
|
|
|
+ } else {
|
|
|
+ var buyerclass_new, buyerclass_old string
|
|
|
buyerclass_new = tmp["buyerclass"].(string)
|
|
|
buyerclass_old = (*oldTmp)["buyerclass"].(string)
|
|
|
- if buyerclass_old=="" {
|
|
|
+ if buyerclass_old == "" {
|
|
|
(*oldTmp)["buyerclass"] = buyerclass_new
|
|
|
- }else {
|
|
|
- if buyerclass_new!=buyerclass_old {
|
|
|
+ } else {
|
|
|
+ if buyerclass_new != buyerclass_old {
|
|
|
if !strings.Contains(buyerclass_old, buyerclass_new) {
|
|
|
- (*oldTmp)["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
|
|
|
+ (*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -233,17 +230,17 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
overid = tmp["_id"].(bson.ObjectId).Hex()
|
|
|
//log.Println(tmp["_id"])
|
|
|
buyer, ok := tmp["buyer"].(string)
|
|
|
- if !ok || utf8.RuneCountInString(buyer)<4{
|
|
|
+ if !ok || utf8.RuneCountInString(buyer) < 4 {
|
|
|
continue
|
|
|
}
|
|
|
//redis查询是否存在
|
|
|
rdb := RedisPool.Get()
|
|
|
- rdb.Do("SELECT", Config["redis_buyer_db"])
|
|
|
+ rdb.Do("SELECT", redis_buyer_db)
|
|
|
if reply, err := redis.String(rdb.Do("GET", buyer)); err != nil {
|
|
|
//redis不存在存到临时表,定时任务处理
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
- if err := FClient.SaveForOld("buyer_new", tmp); err!=nil {
|
|
|
- log.Println("FClient.Save err", err,tmp)
|
|
|
+ if err := FClient.SaveForOld("buyer_new", tmp); err != nil {
|
|
|
+ log.Println("FClient.Save err", err, tmp)
|
|
|
}
|
|
|
//log.Println("get redis id err:定时任务处理", err, tmp)
|
|
|
if err := rdb.Close(); err != nil {
|
|
@@ -257,7 +254,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
//拿到合并后的qyk
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, bson.M{})
|
|
|
- if !b || oldTmp == nil {
|
|
|
+ if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
|
|
|
log.Println("redis id 不存在")
|
|
|
continue
|
|
|
}
|
|
@@ -281,20 +278,19 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
|
|
|
sort.Strings(tmpTopscopeclass)
|
|
|
esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
|
|
|
|
|
|
-
|
|
|
//更新buyerclass合并
|
|
|
if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
|
|
|
//无值,不更新
|
|
|
- }else {
|
|
|
- var buyerclass_new,buyerclass_old string
|
|
|
+ } else {
|
|
|
+ var buyerclass_new, buyerclass_old string
|
|
|
buyerclass_new = tmp["buyerclass"].(string)
|
|
|
buyerclass_old = (*oldTmp)["buyerclass"].(string)
|
|
|
- if buyerclass_old=="" {
|
|
|
+ if buyerclass_old == "" {
|
|
|
(*oldTmp)["buyerclass"] = buyerclass_new
|
|
|
- }else {
|
|
|
- if buyerclass_new!=buyerclass_old {
|
|
|
+ } else {
|
|
|
+ if buyerclass_new != buyerclass_old {
|
|
|
if !strings.Contains(buyerclass_old, buyerclass_new) {
|
|
|
- (*oldTmp)["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
|
|
|
+ (*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -379,7 +375,7 @@ func TimedTaskBuyer() {
|
|
|
if !iter.Next(&tmpLast) {
|
|
|
//临时表无数据
|
|
|
log.Println("临时表无数据:")
|
|
|
- t2.Reset(time.Second * 15)
|
|
|
+ t2.Reset(time.Minute * 5)
|
|
|
FClient.DestoryMongoConn(Fcconn)
|
|
|
continue
|
|
|
} else {
|
|
@@ -392,7 +388,7 @@ func TimedTaskBuyer() {
|
|
|
}).Sort("_id").Iter()
|
|
|
if cursor == nil {
|
|
|
log.Println("查询失败")
|
|
|
- t2.Reset(time.Second * 15)
|
|
|
+ t2.Reset(time.Second * 5)
|
|
|
FClient.DestoryMongoConn(fconn)
|
|
|
continue
|
|
|
}
|
|
@@ -400,10 +396,14 @@ func TimedTaskBuyer() {
|
|
|
tmp := make(map[string]interface{})
|
|
|
for cursor.Next(&tmp) {
|
|
|
tmpId := tmp["_id"].(bson.ObjectId).Hex()
|
|
|
+ errbuyer, ok := tmp["buyer"].(string)
|
|
|
+ if !ok || errbuyer == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
//再重新查找redis,存在发udp处理,不存在走新增合并
|
|
|
rdb := RedisPool.Get()
|
|
|
- rdb.Do("SELECT", Config["redis_buyer_db"])
|
|
|
- if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
|
|
|
+ rdb.Do("SELECT", redis_buyer_db)
|
|
|
+ if _, err := redis.String(rdb.Do("GET", errbuyer)); err == nil {
|
|
|
//redis存在发送udp进行处理
|
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
|
"gtid": tmpId,
|
|
@@ -435,9 +435,21 @@ func TimedTaskBuyer() {
|
|
|
//查询redis不存在新增
|
|
|
FClient.DbName = Config["mgodb_enterprise"]
|
|
|
|
|
|
- resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
|
|
|
+ resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": errbuyer})
|
|
|
if !b || (*resulttmp)["_id"] == nil {
|
|
|
//log.Println(r)
|
|
|
+ //人工审核
|
|
|
+ var isok bool
|
|
|
+ for _, v := range BuerRegOk {
|
|
|
+ isok = v.MatchString(errbuyer)
|
|
|
+ if isok {
|
|
|
+ tmp["buyer_ok"] = 1
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if tmp["buyer_ok"] == nil {
|
|
|
+ tmp["buyer_err"] = 1
|
|
|
+ }
|
|
|
//匹配不到原始库,存入异常表删除临时表
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
if err := FClient.SaveForOld("buyer_err", tmp); err != nil {
|
|
@@ -449,7 +461,7 @@ func TimedTaskBuyer() {
|
|
|
continue
|
|
|
} else {
|
|
|
//log.Println(123)
|
|
|
- //匹配到原始库,新增 resulttmp
|
|
|
+ //匹配到原始库,新增 resulttmp buyer
|
|
|
if (*resulttmp)["credit_no"] != nil {
|
|
|
if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
|
|
|
len(strings.TrimSpace(credit_no)) > 8 {
|
|
@@ -581,10 +593,18 @@ func TimedTaskBuyer() {
|
|
|
savetmp[sk] = (*resulttmp)["company_address"]
|
|
|
}
|
|
|
continue
|
|
|
+ }else if sk == "buyerclass" {
|
|
|
+ if tmp["buyerclass"]==nil {
|
|
|
+ savetmp[sk] = ""
|
|
|
+ }else {
|
|
|
+ savetmp[sk] = tmp["buyerclass"]
|
|
|
+ }
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
+
|
|
|
if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
|
|
|
- sk != "buyer_name" && sk != "address" &&
|
|
|
+ sk != "buyer_name" && sk != "address" && sk != "buyerclass"&&
|
|
|
sk != "contact" && sk != "report_websites" {
|
|
|
savetmp[sk] = ""
|
|
|
} else {
|
|
@@ -600,7 +620,7 @@ func TimedTaskBuyer() {
|
|
|
if saveid != "" {
|
|
|
//保存redis
|
|
|
rc := RedisPool.Get()
|
|
|
- rc.Do("SELECT", Config["redis_buyer_db"])
|
|
|
+ rc.Do("SELECT", redis_buyer_db)
|
|
|
//var _id string
|
|
|
//if v, ok := saveid.(primitive.ObjectID); ok {
|
|
|
// _id = v.Hex()
|
|
@@ -636,9 +656,10 @@ func TimedTaskBuyer() {
|
|
|
|
|
|
}
|
|
|
FClient.DestoryMongoConn(fconn)
|
|
|
+ log.Println("buyer_new,遍历完成")
|
|
|
}
|
|
|
}
|
|
|
FClient.DestoryMongoConn(Fcconn)
|
|
|
t2.Reset(time.Minute)
|
|
|
}
|
|
|
-}
|
|
|
+}
|