|
@@ -4,10 +4,9 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/garyburd/redigo/redis"
|
|
|
+ "gopkg.in/mgo.v2"
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
"log"
|
|
|
- mu "mfw/util"
|
|
|
- "net"
|
|
|
"qfw/util"
|
|
|
"sort"
|
|
|
"strings"
|
|
@@ -18,8 +17,6 @@ import (
|
|
|
//之前main方法,只更新
|
|
|
func TaskWinner(mapinfo *map[string]interface{}) {
|
|
|
defer util.Catch()
|
|
|
- //释放
|
|
|
- defer func() { <-CPool }()
|
|
|
gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
|
|
|
if gtid == "" || lteid == "" {
|
|
|
log.Println(gtid, lteid, "参数错误")
|
|
@@ -43,7 +40,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
|
|
|
"$gte": GId,
|
|
|
"$lte": LtId,
|
|
|
},
|
|
|
- }).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1, "winneraddr": 1}).Iter()
|
|
|
+ }).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1, "package": 1}).Iter()
|
|
|
|
|
|
if cursor.Err() != nil {
|
|
|
SourceClient.DestoryMongoConn(SourceClientcc)
|
|
@@ -125,7 +122,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
|
|
|
//拿到合并后的qyk
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, nil)
|
|
|
- if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
|
|
|
+ if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
|
|
|
log.Println(redisCName, "存量 redis id 不存在", reply)
|
|
|
continue
|
|
|
}
|
|
@@ -215,133 +212,141 @@ func TaskWinner(mapinfo *map[string]interface{}) {
|
|
|
log.Println("存量历史合并执行完成 ok", gtid, lteid)
|
|
|
|
|
|
} else {
|
|
|
- //增量处理
|
|
|
- overid := gtid
|
|
|
- tmp := map[string]interface{}{}
|
|
|
- for cursor.Next(&tmp) {
|
|
|
- overid = tmp["_id"].(bson.ObjectId).Hex()
|
|
|
- //log.Println(tmp["_id"])
|
|
|
- winner, ok := tmp["winner"].(string)
|
|
|
- if !ok || utf8.RuneCountInString(winner) < 4 {
|
|
|
- continue
|
|
|
+ //增量
|
|
|
+ overid := addfunc(gtid, cursor)
|
|
|
+ SourceClient.DestoryMongoConn(SourceClientcc)
|
|
|
+ log.Println("增量合并执行完成 ok", gtid, lteid, overid)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+//增量
|
|
|
+func addfunc(gtid string, cursor *mgo.Iter) string {
|
|
|
+ //增量处理
|
|
|
+ overid := gtid
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ for cursor.Next(&tmp) {
|
|
|
+ overid = tmp["_id"].(bson.ObjectId).Hex()
|
|
|
+ //log.Println(tmp["_id"])
|
|
|
+ winner, ok := tmp["winner"].(string)
|
|
|
+ if !ok || utf8.RuneCountInString(winner) < 4 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //redis查询是否存在
|
|
|
+ rdb := RedisPool.Get()
|
|
|
+ rdb.Do("SELECT", redis_winner_db)
|
|
|
+ if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
|
|
|
+ //redis不存在存到临时表,定时任务处理
|
|
|
+ FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
+ if err := FClient.SaveForOld("winner_new", tmp); err != nil {
|
|
|
+ log.Println("FClient.Save err", err, tmp)
|
|
|
}
|
|
|
- //redis查询是否存在
|
|
|
- rdb := RedisPool.Get()
|
|
|
- rdb.Do("SELECT", redis_winner_db)
|
|
|
- if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
|
|
|
- //redis不存在存到临时表,定时任务处理
|
|
|
- FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
- if err := FClient.SaveForOld("winner_new", tmp); err != nil {
|
|
|
- log.Println("FClient.Save err", err, tmp)
|
|
|
- }
|
|
|
- //log.Println("get redis id err:定时任务处理", err, tmp)
|
|
|
- if err := rdb.Close(); err != nil {
|
|
|
- log.Println(err)
|
|
|
- }
|
|
|
+ //log.Println("get redis id err:定时任务处理", err, tmp)
|
|
|
+ if err := rdb.Close(); err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ if err := rdb.Close(); err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+ //拿到合并后的qyk
|
|
|
+ FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
+ oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
|
|
|
+ if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
|
|
|
+ log.Println("redis id 不存在")
|
|
|
continue
|
|
|
- } else {
|
|
|
- if err := rdb.Close(); err != nil {
|
|
|
- log.Println(err)
|
|
|
- }
|
|
|
- //拿到合并后的qyk
|
|
|
- FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
- oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
|
|
|
- if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
|
|
|
- log.Println("redis id 不存在")
|
|
|
- continue
|
|
|
- }
|
|
|
- //比较合并
|
|
|
- //行业类型
|
|
|
- tmpTopscopeclass := []string{}
|
|
|
- tmpConTopscopeclass := []string{}
|
|
|
- tmpTopscopeclassMap := make(map[string]bool)
|
|
|
+ }
|
|
|
+ //比较合并 行业类型
|
|
|
+ tmpTopscopeclass := []string{}
|
|
|
+ tmpConTopscopeclass := []string{}
|
|
|
+ tmpTopscopeclassMap := make(map[string]bool)
|
|
|
|
|
|
- if (*oldTmp)["industry"] != nil {
|
|
|
- if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
|
|
|
- for _, vv := range v {
|
|
|
- if vvv, ok := vv.(string); ok {
|
|
|
- tmpTopscopeclassMap[vvv] = true
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if v, ok := tmp["topscopeclass"].([]interface{}); ok {
|
|
|
+ if (*oldTmp)["industry"] != nil {
|
|
|
+ if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
|
|
|
for _, vv := range v {
|
|
|
- if vvv, ok := vv.(string); ok && len(vvv) > 1 {
|
|
|
- tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
|
|
|
- tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
|
|
|
+ if vvv, ok := vv.(string); ok {
|
|
|
+ tmpTopscopeclassMap[vvv] = true
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- for k := range tmpTopscopeclassMap {
|
|
|
- tmpTopscopeclass = append(tmpTopscopeclass, k)
|
|
|
- }
|
|
|
- sort.Strings(tmpTopscopeclass)
|
|
|
- (*oldTmp)["industry"] = tmpTopscopeclass
|
|
|
-
|
|
|
- esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
|
|
|
- //更新行业类型
|
|
|
- if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
|
|
|
- (*oldTmp)["updatatime"] = time.Now().Unix()
|
|
|
- //mongo更新
|
|
|
- FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
- if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
|
|
|
- log.Println("mongo更新err", esId)
|
|
|
- }
|
|
|
-
|
|
|
- //es更新
|
|
|
- delete((*oldTmp), "_id")
|
|
|
- if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
|
|
|
- log.Println("update es err:", err)
|
|
|
- }
|
|
|
- continue
|
|
|
- }
|
|
|
- //联系方式合并
|
|
|
- var tmpperson, winnertel string
|
|
|
- if tmppersona, ok := tmp["winnerperson"].(string); ok {
|
|
|
- tmpperson = tmppersona
|
|
|
- }
|
|
|
- if winnerteltmp, ok := tmp["winnertel"].(string); ok {
|
|
|
- winnertel = winnerteltmp
|
|
|
- }
|
|
|
- if Reg_xing.MatchString(winnertel) || !Reg_tel.MatchString(winnertel) {
|
|
|
- winnertel = ""
|
|
|
- } else {
|
|
|
- winnertel = winnertel
|
|
|
- }
|
|
|
- contactMaps := make([]interface{}, 0)
|
|
|
- if (*oldTmp)["contact"] != nil {
|
|
|
- //直接添加联系人,不再判断
|
|
|
- if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
|
|
|
- contactMaps = append(contactMaps, v...)
|
|
|
+ }
|
|
|
+ if v, ok := tmp["topscopeclass"].([]interface{}); ok {
|
|
|
+ for _, vv := range v {
|
|
|
+ if vvv, ok := vv.(string); ok && len(vvv) > 1 {
|
|
|
+ tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
|
|
|
+ tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
|
|
|
}
|
|
|
}
|
|
|
- vvv := make(map[string]interface{})
|
|
|
- vvv["infoid"] = overid
|
|
|
- vvv["contact_person"] = tmpperson
|
|
|
- vvv["contact_type"] = "项目联系人"
|
|
|
- vvv["phone"] = winnertel
|
|
|
- vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
|
|
|
- vvv["updatetime"] = time.Now().Unix()
|
|
|
- contactMaps = append(contactMaps, vvv)
|
|
|
- (*oldTmp)["contact"] = contactMaps
|
|
|
- //mongo更新
|
|
|
+ }
|
|
|
+ for k := range tmpTopscopeclassMap {
|
|
|
+ tmpTopscopeclass = append(tmpTopscopeclass, k)
|
|
|
+ }
|
|
|
+ sort.Strings(tmpTopscopeclass)
|
|
|
+ (*oldTmp)["industry"] = tmpTopscopeclass
|
|
|
+
|
|
|
+ esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
|
|
|
+ //更新行业类型
|
|
|
+ if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
|
|
|
(*oldTmp)["updatatime"] = time.Now().Unix()
|
|
|
+ //mongo更新
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
|
|
|
- log.Println("mongo更新 err", esId, oldTmp)
|
|
|
+ log.Println("mongo更新err", esId)
|
|
|
}
|
|
|
+
|
|
|
//es更新
|
|
|
delete((*oldTmp), "_id")
|
|
|
if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
|
|
|
- log.Println("EsConn err :", err)
|
|
|
+ log.Println("update es err:", err)
|
|
|
}
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //联系方式合并
|
|
|
+ var tmpperson, winnertel string
|
|
|
+ if tmppersona, ok := tmp["winnerperson"].(string); ok {
|
|
|
+ tmpperson = tmppersona
|
|
|
+ }
|
|
|
+ if winnerteltmp, ok := tmp["winnertel"].(string); ok {
|
|
|
+ winnertel = winnerteltmp
|
|
|
+ }
|
|
|
+ if Reg_xing.MatchString(winnertel) || !Reg_tel.MatchString(winnertel) {
|
|
|
+ winnertel = ""
|
|
|
+ } else {
|
|
|
+ winnertel = winnertel
|
|
|
+ }
|
|
|
+ contactMaps := make([]interface{}, 0)
|
|
|
+ if (*oldTmp)["contact"] != nil {
|
|
|
+ //直接添加联系人,不再判断
|
|
|
+ if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
|
|
|
+ contactMaps = append(contactMaps, v...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ vvv := make(map[string]interface{})
|
|
|
+ vvv["infoid"] = overid
|
|
|
+ vvv["contact_person"] = tmpperson
|
|
|
+ vvv["contact_type"] = "项目联系人"
|
|
|
+ vvv["phone"] = winnertel
|
|
|
+ vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
|
|
|
+ vvv["updatetime"] = time.Now().Unix()
|
|
|
+ contactMaps = append(contactMaps, vvv)
|
|
|
+ //分包处理
|
|
|
+ PackageDealWith(&contactMaps, tmp)
|
|
|
+ (*oldTmp)["contact"] = contactMaps
|
|
|
+ //mongo更新
|
|
|
+ (*oldTmp)["updatatime"] = time.Now().Unix()
|
|
|
+ FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
+ if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
|
|
|
+ log.Println("mongo更新 err", esId, oldTmp)
|
|
|
+ }
|
|
|
+ //es更新
|
|
|
+ delete((*oldTmp), "_id")
|
|
|
+ if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
|
|
|
+ log.Println("EsConn err :", err)
|
|
|
}
|
|
|
}
|
|
|
- SourceClient.DestoryMongoConn(SourceClientcc)
|
|
|
- log.Println("增量合并执行完成 ok", gtid, lteid, overid)
|
|
|
}
|
|
|
-
|
|
|
+ return overid
|
|
|
}
|
|
|
|
|
|
//定时任务 新增
|
|
@@ -387,19 +392,10 @@ func TimedTaskWinner() {
|
|
|
rdb.Do("SELECT", redis_winner_db)
|
|
|
if _, err := redis.String(rdb.Do("GET", errwinner)); err == nil {
|
|
|
//redis存在发送udp进行处理
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": tmpId,
|
|
|
- "lteid": tmpId,
|
|
|
- "stype": "",
|
|
|
- "data_type": "winner",
|
|
|
- "data_info": "add",
|
|
|
+ TaskWinner(&map[string]interface{}{
|
|
|
+ "gtid": tmpId,
|
|
|
+ "lteid": tmpId,
|
|
|
})
|
|
|
- if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP("127.0.0.1"),
|
|
|
- Port: Updport,
|
|
|
- }); e != nil {
|
|
|
- log.Println(e)
|
|
|
- }
|
|
|
//存在的话删除tmp mongo表
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
if DeletedCount := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
|
|
@@ -422,17 +418,30 @@ func TimedTaskWinner() {
|
|
|
//log.Println(r)
|
|
|
//人工审核正则
|
|
|
var isok bool
|
|
|
+ //先遍历ok
|
|
|
for _, v := range WinnerRegOk {
|
|
|
isok = v.MatchString(errwinner)
|
|
|
if isok {
|
|
|
- tmp["winner_ok"] = 1
|
|
|
- break
|
|
|
+ //匹配ok完,匹配err
|
|
|
+ for _, vRegErr := range WinnerRegErr {
|
|
|
+ isok = vRegErr.MatchString(errwinner)
|
|
|
+ //匹配到ok 也匹配到err 按err算
|
|
|
+ if isok {
|
|
|
+ tmp["winner_err"] = 1
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //匹配ok,没匹配err 按ok算
|
|
|
+ if tmp["winner_err"] == nil {
|
|
|
+ tmp["winner_ok"] = 1
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- if tmp["winner_ok"] == nil {
|
|
|
+ //都没匹配
|
|
|
+ if tmp["winner_ok"] == nil && tmp["winner_err"] == nil{
|
|
|
tmp["winner_err"] = 1
|
|
|
}
|
|
|
-
|
|
|
//匹配不到原始库,存入异常表删除临时表
|
|
|
FClient.DbName = Config["mgodb_extract_kf"]
|
|
|
if err := FClient.SaveForOld("winner_err", tmp); err != nil {
|
|
@@ -658,3 +667,8 @@ func TimedTaskWinner() {
|
|
|
t2.Reset(time.Minute)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+//分包处理
|
|
|
+func PackageDealWith(contactMaps *[]interface{}, tmp map[string]interface{}) {
|
|
|
+
|
|
|
+}
|