浏览代码

代理机构

apple 5 年之前
父节点
当前提交
6c757f70e5
共有 5 个文件被更改,包括 265 次插入39 次删除
  1. 21 16
      standardata/src/config.json
  2. 10 1
      standardata/src/main.go
  3. 207 0
      standardata/src/standaragency.go
  4. 26 21
      standardata/src/standarwinner.go
  5. 1 1
      util/src/dbutil/mongo/mgo.go

+ 21 - 16
standardata/src/config.json

@@ -1,26 +1,31 @@
 {
-  "port": "1234",
-  "mgofrom": "127.0.0.1:27017",
+  "port": "1235",
+  "mgofrom": "172.17.4.187:27083",
   "mgofromsize":5,
-  "mgofromdb":"demo",
-  "mgoto": "127.0.0.1:27017",
+  "mgofromdb":"qfw",
+  "mgoto": "172.17.145.163:27082",
   "mgotosize":5,
-  "mgotodb":"demo",
-  "mgoent": "127.0.0.1:27017",
+  "mgotodb":"extract_v3",
+  "mgoent": "172.17.145.163:27082",
   "mgoentsize":5,
-  "mgoentdb":"demo",
-  "extractcoll":"result_20200117",
+  "mgoentdb":"enterprise",
+  "extractcoll":"result_20200116",
   "standardata":{
 	"winner":{
-		"standarent":"winner_zjkent",
-		"standarerr":"winner_zjkerr",
+		"standarent":"winner_ent",
+		"standarerr":"winner_err",
 		"redisdb":1
 	},
-	"buyer":{
-		"standarent":"buyer_zjkent",
-		"standarerr":"buyer_zjkerr",
-		"redisdb":2
-	}
+    "buyer":{
+      "standarent":"buyer_ent",
+      "standarerr":"buyer_err",
+      "redisdb":2
+    },
+    "agency":{
+      "standarent":"agency_data_ent",
+      "standarerr":"agency_data_err",
+      "redisdb":3
+    }
   },
-  "redis": "winner=127.0.0.1:6379,buyer=127.0.0.1:6379"
+  "redis": "winner=172.17.148.44:2679,buyer=172.17.148.44:2679,agency=172.17.148.44:2679"
 }

+ 10 - 1
standardata/src/main.go

@@ -19,10 +19,12 @@ var (
 	extractcoll          string
 	winnerent, winnererr string
 	buyerent, buyererr   string
-	winnerbd, buyerbd    int
+	agencyent, agencyerr   string
+	winnerbd, buyerbd, agencybd    int
 	Addrs                = make(map[string]interface{}, 0) //省市县
 	winchanbool          = make(chan bool, 3)
 	buyerchanbool        = make(chan bool, 3)
+	agencychanbool        = make(chan bool, 3)
 	gochan               = make(chan bool, 3)
 	udpclient            mu.UdpClient
 	//异常表正则匹配处理
@@ -41,6 +43,10 @@ func init() {
 		buyerent = qu.ObjToString(buyer["standarent"])
 		buyererr = qu.ObjToString(buyer["standarerr"])
 		buyerbd = qu.IntAll(buyer["redisdb"])
+		agency, _ := standardata["agency"].(map[string]interface{})
+		agencyent = qu.ObjToString(agency["standarent"])
+		agencyerr = qu.ObjToString(agency["standarerr"])
+		agencybd = qu.IntAll(agency["redisdb"])
 	} else {
 		os.Exit(0)
 	}
@@ -123,8 +129,11 @@ func initReg() {
 func main() {
 	//go historywinner(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
 	//go historybuyer(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
+	go historyagency(qu.ObjToString(sysconfig["mgofromdb"]), extractcoll)
 	//go winStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
 	//go buyerStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
+
+
 	//go task_standarData()
 	c := make(chan int, 1)
 	<-c

+ 207 - 0
standardata/src/standaragency.go

@@ -0,0 +1,207 @@
+// standaragency
+package main
+
+import (
+	"dbutil/mongo"
+	"dbutil/redis"
+	"encoding/json"
+	"log"
+	qu "qfw/util"
+	"time"
+	"unicode/utf8"
+
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//增量处理
+func agencyStandarData(db string, query map[string]interface{}) {
+	defer qu.Catch()
+	sess := MongoFrom.GetMgoConn()
+	defer MongoFrom.Close()
+	it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1,"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
+		"agencyaddr": 1}).Sort("_id").Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if qu.IntAll(tmp["repeat"]) > 0  { //重复数据跳过
+			continue
+		}
+		agency := qu.ObjToString(tmp["agency"])
+		if utf8.RuneCountInString(agency) < 5 {
+			continue
+		}
+		infoid := mongo.BsonTOStringId(tmp["_id"])
+		topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
+		entid, _ := redis.GetRedisStr("agency", agencybd, agency)
+		ps := []map[string]interface{}{}
+		agencyperson := qu.ObjToString(tmp["agencyperson"])
+		agencytel := qu.ObjToString(tmp["agencytel"])
+		if entid == "" {//redis 未存
+			savetoerr := true
+			if agencyperson != "" || agencytel != "" {
+				v := map[string]interface{}{
+					"contact_person": agencyperson,
+					"phone":          agencytel,
+					"topscopeclass":  comRepTopscopeclass(topscopeclass),
+					"infoid":         infoid,
+				}
+				ps = append(ps, v)
+				data := comHisMegerNewData(agency, "agency", ps)
+				if data != nil {
+					_id := MongoTo.Save(agencyent, data)
+					redis.PutRedis("agency", agencybd, agency, _id.(primitive.ObjectID).Hex(), -1)
+					savetoerr = false
+				}
+			}
+			if savetoerr {
+				t := MongoTo.FindOne(agencyerr, map[string]interface{}{"name": agency})
+				if len(t) < 1 {
+					MongoTo.Save(agencyerr, map[string]interface{}{
+						"name":       agency,
+						"check":      comMarkdata(agency, "agency"),
+						"updatetime": time.Now().Unix(),
+					})
+				}
+			}
+		} else {
+			if agencyperson != "" || agencytel != "" {
+				v := map[string]interface{}{
+					"contact_person": agencyperson,
+					"phone":          agencytel,
+					"topscopeclass":  comRepTopscopeclass(topscopeclass),
+					"infoid":         infoid,
+				}
+				MongoTo.UpdateById(agencyent, entid,
+					map[string]interface{}{
+						"$set":  v,
+						"$push": map[string]interface{}{"contact": v},
+					},
+				)
+			}
+		}
+		tmp = map[string]interface{}{}
+		if index%100 == 0 {
+			log.Println("agency index", index)
+		}
+	}
+	log.Println("agency ok index", index)
+}
+
+//历史数据处理
+func historyagency(db, fromcoll string) {
+	defer qu.Catch()
+	log.Println("history  start")
+	sess := MongoFrom.GetMgoConn()
+	defer MongoFrom.Close()
+	it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1,"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
+		"agencyaddr": 1}).Sort("_id").Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
+			continue
+		}
+		_id := mongo.BsonTOStringId(tmp["_id"])
+		agencychanbool <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-agencychanbool
+			}()
+			agency := qu.ObjToString(tmp["agency"])
+			topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
+			if agency != "" && utf8.RuneCountInString(agency) > 4 {
+				agencyperson := qu.ObjToString(tmp["agencyperson"])
+				agencytel := qu.ObjToString(tmp["agencytel"])
+				b, _ := redis.ExistRedis("agency", agencybd, agency)
+				if b {//redis 存在
+					if agencyperson != "" || agencytel != "" {
+						strs, _ := redis.GetRedisStr("agency", agencybd, agency)
+						ps := []interface{}{}
+						err := json.Unmarshal([]byte(strs), &ps)
+						if err == nil {
+							v := map[string]interface{}{
+								"contact_person": agencyperson,
+								"phone":          agencytel,
+								"topscopeclass":  comRepTopscopeclass(topscopeclass),
+								"infoid":         _id,
+							}
+							ps = append(ps, v)
+							bs, _ := json.Marshal(ps)
+							redis.PutRedis("agency", agencybd, agency, bs, -1)
+						} else {
+							log.Println("jsonErr", err)
+						}
+					}
+				} else {
+					val := []map[string]interface{}{}
+					if agencyperson != "" || agencytel != "" {
+						tmp := map[string]interface{}{
+							"contact_person": agencyperson,
+							"phone":          agencytel,
+							"topscopeclass":  comRepTopscopeclass(topscopeclass),
+							"infoid":         _id,
+						}
+						val = append(val, tmp)
+					}
+					bs, _ := json.Marshal(val)
+					redis.PutRedis("agency", agencybd, agency, bs, -1)
+					MongoTo.Save(agencyerr, map[string]interface{}{
+						"name":       agency,
+						"updatetime": time.Now().Unix(),
+					})
+				}
+			}
+		}(tmp)
+		tmp = map[string]interface{}{}
+		if index%10000 == 0 {
+			log.Println("index", index, _id)
+		}
+	}
+	log.Println("history  ok  index", index)
+	agencyStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
+}
+
+//查询agencyerr标准化历史数据
+func agencyStandarHistory(db string) {
+	defer qu.Catch()
+	log.Println("开始标准化数据--agency", db)
+	sessto := MongoTo.GetMgoConn()
+	defer MongoTo.Close()
+	it := sessto.DB(db).C(agencyerr).Find(map[string]interface{}{}).Iter()
+	index := 0
+	entnum := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		err_id := mongo.BsonTOStringId(tmp["_id"])
+		name := qu.ObjToString(tmp["name"])
+		agencychanbool <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-agencychanbool
+			}()
+			strs, err := redis.GetRedisStr("agency", agencybd, name)
+			if err != nil {
+				return
+			}
+			ps := []map[string]interface{}{}
+			err = json.Unmarshal([]byte(strs), &ps)
+			if err == nil {
+				data := comHisMegerNewData(name, "agency", ps)
+				if data != nil {
+					MongoTo.Save(agencyent, data)
+					MongoTo.DeleteById(agencyerr, err_id)
+					entnum++
+				} else { //未查询到企业,打标记并存表
+					num := comMarkdata(name, "agency")
+					tmp["check"] = num
+					MongoTo.UpdateById(agencyerr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}})
+				}
+			} else {
+				log.Println("jsonErr", name, err)
+			}
+		}(tmp)
+		if index%1000 == 0 {
+			log.Println("标准化历史数据--agency", index, err_id, entnum)
+		}
+		tmp = map[string]interface{}{}
+	}
+	log.Println("标准化数据完成--agency", index, entnum)
+}

+ 26 - 21
standardata/src/standarwinner.go

@@ -305,6 +305,7 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 		"comeintime":      time.Now().Unix(),
 		"updatetime":      time.Now().Unix(),
 	}
+	//统一信用代码
 	credit_no := strings.TrimSpace(qu.ObjToString(tmp["credit_no"]))
 	if credit_no != "" {
 		data["credit_no"] = credit_no
@@ -321,6 +322,30 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 			}
 		}
 	}
+
+	//网址
+	annual_reports := tmp["annual_reports"]
+	if annual_reports != nil {
+		report_websitesArr := []string{}
+		if anreports, ok := annual_reports.([]interface{}); ok {
+			for _, report_websites := range anreports {
+				if websites, ok := report_websites.([]interface{}); ok {
+					for _, website := range websites {
+						if rv, ok := website.(map[string]interface{}); ok {
+							web := qu.ObjToString(rv["website_url"])
+							if web != "" {
+								report_websitesArr = append(report_websitesArr, web)
+							}
+						}
+					}
+				}
+			}
+		}
+		if len(report_websitesArr) > 0 {
+			data["website"] = strings.Join(report_websitesArr, ";")
+		}
+	}
+
 	if datatype == "winner" {
 		data["company_name"] = name
 		data["partners"] = tmp["partners"]
@@ -332,27 +357,7 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 		if capital != nil {
 			data["capital"] = ObjToMoney([]interface{}{capital, ""})[0]
 		}
-		annual_reports := tmp["annual_reports"]
-		if annual_reports != nil {
-			report_websitesArr := []string{}
-			if anreports, ok := annual_reports.([]interface{}); ok {
-				for _, report_websites := range anreports {
-					if websites, ok := report_websites.([]interface{}); ok {
-						for _, website := range websites {
-							if rv, ok := website.(map[string]interface{}); ok {
-								web := qu.ObjToString(rv["website_url"])
-								if web != "" {
-									report_websitesArr = append(report_websitesArr, web)
-								}
-							}
-						}
-					}
-				}
-			}
-			if len(report_websitesArr) > 0 {
-				data["website"] = strings.Join(report_websitesArr, ";")
-			}
-		}
+
 		industry := make([]string, 0)
 		tmpindustry := map[string]bool{}
 		for _, p := range ps {

+ 1 - 1
util/src/dbutil/mongo/mgo.go

@@ -146,7 +146,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint64(m.Size))
+	opts.SetMaxPoolSize(uint16(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)