123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/garyburd/redigo/redis"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo/options"
- "gopkg.in/mgo.v2/bson"
- "log"
- "net"
- "sort"
- "strings"
- "time"
- mu "mfw/util"
- )
- //定时任务
- //1.存异常表
- //2.合并原始库新增
- func TimedTask() {
- t2 := time.NewTimer(time.Second * 5)
- for range t2.C {
- tmpLast := map[string]interface{}{}
- if err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").FindOne(context.TODO(), bson.M{}, options.FindOne().SetSort(bson.M{"_id": -1})).Decode(&tmpLast); err != nil {
- //临时表无数据
- log.Println("临时表无数据:", err)
- t2.Reset(time.Minute * 5)
- continue
- } else {
- //临时表有数据
- log.Println("临时表有数据:", tmpLast)
- cursor, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").Find(context.TODO(), bson.M{
- "_id": bson.M{
- "$lte": tmpLast["_id"],
- },
- }, options.Find().SetSort(bson.M{"_id": 1}))
- if err != nil {
- log.Println(err)
- t2.Reset(time.Second * 5)
- continue
- }
- //遍历临时表数据,匹配不到原始库存入异常表
- for cursor.Next(context.TODO()) {
- if err := cursor.Err(); err != nil {
- log.Println("cursor.Err();", err)
- }
- tmp := make(map[string]interface{})
- if err := cursor.Decode(&tmp); err == nil {
- //再重新查找redis,存在发udp处理,不存在走新增合并
- rdb := RedisPool.Get()
- if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
- //{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
- //redis存在发送udp进行处理
- go func(reply string) {
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": reply,
- "lteid": reply,
- "stype": "",
- })
- if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP("127.0.0.1"),
- Port: Updport,
- });e != nil{
- log.Println(e)
- }
- }(reply)
- //存在的话删除tmp mongo表
- if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp);err != nil{
- log.Println("删除临时表err:",err)
- }
- if err := rdb.Close(); err != nil {
- log.Println(err)
- }
- continue
- } else {
- if err = rdb.Close(); err != nil {
- log.Println(err)
- }
- }
- //查询redis不存在新增
- resulttmp := make(map[string]interface{})
- r := FClient.Database(Config["mgodb_enterprise"]).Collection(Config["mgodb_enterprise_c"]).FindOne(context.TODO(), bson.M{"company_name": tmp["winner"]}).Decode(&resulttmp)
- if r != nil {
- //log.Println(r)
- //匹配不到原始库,存入异常表删除临时表
- if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_err").InsertOne(context.TODO(), tmp); err != nil {
- log.Println(err)
- }
- if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
- log.Println(err)
- }
- continue
- } else {
- //log.Println(123)
- //匹配到原始库,新增 resulttmp
- if resulttmp["credit_no"] != nil {
- if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
- len(strings.TrimSpace(credit_no)) > 8 {
- dataNo := strings.TrimSpace(credit_no)[2:8]
- if Addrs[dataNo] != nil {
- if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
- if resulttmp["province"] == nil || resulttmp["province"] == "" {
- resulttmp["province"] = v["province"]
- }
- resulttmp["city"] = v["city"]
- resulttmp["district"] = v["district"]
- }
- }
- }
- }
- contacts := make([]map[string]interface{}, 0)
- contact := make(map[string]interface{}, 0)
- if resulttmp["legal_person"] != nil {
- contact["contact_person"] = resulttmp["legal_person"] //联系人
- } else {
- contact["contact_person"] = "" //联系人
- }
- contact["contact_type"] = "法定代表人" //法定代表人
- //log.Println(1)
- if resulttmp["annual_reports"] != nil {
- bytes, err := json.Marshal(resulttmp["annual_reports"])
- if err != nil {
- log.Println("annual_reports err:", err)
- }
- phonetmp := make([]map[string]interface{}, 0)
- err = json.Unmarshal(bytes, &phonetmp)
- if err != nil {
- log.Println("Unmarshal err:", err)
- }
- for _, vv := range phonetmp {
- if vv["company_phone"] != nil {
- if vv["company_phone"] == "" {
- continue
- } else {
- contact["phone"] = vv["company_phone"] //联系电话
- break
- }
- } else {
- contact["phone"] = "" //联系电话
- }
- }
- }
- //log.Println(k, contact["phone"], resulttmp["_id"])
- //time.Sleep(10 * time.Second)
- if contact["phone"] == nil {
- contact["phone"] = "" //联系电话
- }
- contact["topscopeclass"] = "企业公示" //项目类型
- contact["updatetime"] = time.Now().Unix() //更新时间
- contacts = append(contacts, contact)
- resulttmp["contact"] = contacts
- savetmp := make(map[string]interface{}, 0)
- for _, sk := range Fields {
- if sk == "establish_date" {
- if resulttmp[sk] != nil {
- savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
- continue
- }
- } else if sk == "capital" {
- //log.Println(sk, resulttmp[sk])
- savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
- continue
- } else if sk == "partners" {
- //log.Println(sk, resulttmp[sk], )
- //fmt.Println(reflect.TypeOf(resulttmp[sk]))
- if resulttmp[sk] != nil {
- if ppms, ok := resulttmp[sk].(primitive.A); ok {
- for i, _ := range ppms {
- if ppms[i].(map[string]interface{})["stock_type"] != nil {
- ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
- }
- delete(ppms[i].(map[string]interface{}), "identify_type")
- }
- savetmp[sk] = ppms
- }
- } else {
- savetmp[sk] = []interface{}{}
- }
- continue
- } else if sk == "_id" {
- savetmp["tmp"+sk] = resulttmp[sk]
- continue
- } else if sk == "area_code" {
- //行政区划代码
- savetmp[sk] = fmt.Sprint(resulttmp[sk])
- continue
- } else if sk == "report_websites" {
- //网址
- if resulttmp["report_websites"] == nil {
- savetmp["website"] = ""
- } else {
- report_websitesArr := []string{}
- if ppms, ok := resulttmp[sk].(primitive.A); ok {
- for _, v := range ppms {
- if vvv, ok := v.(map[string]interface{}); ok {
- if rv, ok := vvv["website_url"].(string); ok {
- report_websitesArr = append(report_websitesArr, rv)
- }
- }
- }
- }
- sort.Strings(report_websitesArr)
- savetmp["website"] = strings.Join(report_websitesArr, ";")
- }
- continue
- } else if sk == "wechat_accounts" {
- savetmp[sk] = []interface{}{}
- continue
- }
- if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
- savetmp[sk] = ""
- } else {
- savetmp[sk] = resulttmp[sk]
- }
- }
- //tmps = append(tmps, savetmp)
- savetmp["updatatime"] = time.Now().Unix()
- //保存mongo
- result, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
- InsertOne(context.TODO(), savetmp)
- if err == nil {
- //保存redis
- rc := RedisPool.Get()
- var _id string
- if v, ok := result.InsertedID.(primitive.ObjectID); ok {
- _id = v.Hex()
- }
- if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
- log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
- if err := rc.Close(); err != nil {
- log.Println(err)
- }
- } else {
- //保存es
- delete(savetmp, "_id")
- if err := rc.Close(); err != nil {
- log.Println(err)
- }
- //esConn := elastic.GetEsConn()
- //defer elastic.DestoryEsConn(esConn)
- if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
- log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
- } else {
- //删除临时表
- if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
- log.Println(err)
- }
- }
- }
- } else {
- log.Println("save mongo err:", err, tmp["_id"])
- }
- }
- }
- }
- defer cursor.Close(context.TODO())
- }
- t2.Reset(time.Minute)
- }
- }
|