|
@@ -1,219 +0,0 @@
|
|
|
-package main
|
|
|
-
|
|
|
-import (
|
|
|
- "encoding/json"
|
|
|
- "fmt"
|
|
|
- "gopkg.in/mgo.v2/bson"
|
|
|
- "log"
|
|
|
- mu "mfw/util"
|
|
|
- mgo "mongodbutil"
|
|
|
- "net"
|
|
|
- qu "qfw/util"
|
|
|
- "strings"
|
|
|
-)
|
|
|
-
|
|
|
-var Udpclient mu.UdpClient //udp对象
|
|
|
-var nextNodes []map[string]interface{}
|
|
|
-var Config map[string]interface{}
|
|
|
-var PageSize = 5000 //查询分页
|
|
|
-var biddingFields = `{"buyer":1,"modifyinfo":1,"area":1,"province":1,"city":1,"district":1}`
|
|
|
-var qyxyFields = `{"company_code":1,"province":1,"city":1,"district":1}`
|
|
|
-var findDb string
|
|
|
-var cc chan bool = make(chan bool, 5)
|
|
|
-
|
|
|
-func init() {
|
|
|
- qu.ReadConfig(&Config)
|
|
|
- if len(Config) == 0 {
|
|
|
- log.Fatal("读取配置文件失败", Config)
|
|
|
- }
|
|
|
- findDb = qu.ObjToString(Config["findDb"])
|
|
|
- initCap := qu.IntAll(Config["dbsize"])
|
|
|
- addr := qu.ObjToString(Config["mgodb"])
|
|
|
- dbname := qu.ObjToString(Config["dbname"])
|
|
|
- cc = make(chan bool, 3)
|
|
|
- mgo.Mgo = mgo.MgoFactory(initCap, initCap*3, 120, addr, dbname)
|
|
|
- mgo.Mgo_Bidding = mgo.MgoFactory(initCap, initCap*3, 120, qu.ObjToString(Config["mgodb_bidding"]), qu.ObjToString(Config["dbname_bidding"]))
|
|
|
- nextNodes = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
|
|
|
- Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(Config["udpport"]), BufSize: 1024}
|
|
|
- log.Println("udp run ", Config["udpport"])
|
|
|
- Udpclient.Listen(processUdpMsg)
|
|
|
-}
|
|
|
-
|
|
|
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
- switch act {
|
|
|
- case mu.OP_TYPE_DATA:
|
|
|
- var rep map[string]interface{}
|
|
|
- err := json.Unmarshal(data, &rep)
|
|
|
- if err != nil {
|
|
|
- log.Println(err)
|
|
|
- } else {
|
|
|
- sid, _ := rep["gtid"].(string)
|
|
|
- eid, _ := rep["lteid"].(string)
|
|
|
- if sid == "" || eid == "" {
|
|
|
- log.Println("err", "sid=", sid, ",eid=", eid)
|
|
|
- return
|
|
|
- } else {
|
|
|
- go Udpclient.WriteUdp(data, mu.OP_NOOP, ra)
|
|
|
- log.Println("udp通知抽取id段", sid, " ", eid)
|
|
|
-
|
|
|
- getCity(sid, eid, qu.ObjToString(rep["stype"]))
|
|
|
- log.Println("udp通知抽取完成,eid", eid)
|
|
|
- for _, m := range nextNodes {
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
- "stype": qu.ObjToString(m["stype"]),
|
|
|
- })
|
|
|
- err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(m["addr"].(string)),
|
|
|
- Port: qu.IntAll(m["port"]),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- log.Println(err)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- case mu.OP_NOOP: //下个节点回应
|
|
|
- log.Println(string(data))
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func getCity(sid, eid, rep string) {
|
|
|
- index := 0
|
|
|
- var unum int64
|
|
|
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
- count := mgo.Mgo_Bidding.Count(findDb, query)
|
|
|
- log.Println("查询条件为:", query, "查询条数:", count)
|
|
|
- pageNum := (count + PageSize - 1) / PageSize
|
|
|
- limit := PageSize
|
|
|
- if count < PageSize {
|
|
|
- limit = count
|
|
|
- }
|
|
|
- table := findDb
|
|
|
- for i := 0; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
- log.Printf("page=%d,query=%v,db=%v\n", i+1, query, table)
|
|
|
- list, _ := mgo.Mgo_Bidding.Find(table, query, map[string]interface{}{
|
|
|
- "_id": 1,
|
|
|
- }, biddingFields, false, 0, limit)
|
|
|
- for _, v := range *list {
|
|
|
- if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" && qu.ObjToString(v["area"]) != "全国" {
|
|
|
- index++
|
|
|
- continue
|
|
|
- }
|
|
|
- if qu.ObjToString(v["buyer"]) == "" {
|
|
|
- index++
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- _id := qu.BsonIdToSId(v["_id"])
|
|
|
- cc <- true
|
|
|
- go func(v map[string]interface{}) {
|
|
|
- rdata := cityMarshal(v)
|
|
|
- if len(rdata) > 0 {
|
|
|
- umap := make(map[string]interface{})
|
|
|
- if v["modifyinfo"] == nil {
|
|
|
- umap["modifyinfo"] = make(map[string]interface{})
|
|
|
- } else {
|
|
|
- umap["modifyinfo"] = v["modifyinfo"]
|
|
|
- }
|
|
|
- for rk, rv := range rdata {
|
|
|
- umap[rk] = rv
|
|
|
- umap["modifyinfo"].(map[string]interface{})[rk] = "企业信息"
|
|
|
- }
|
|
|
- unum++
|
|
|
- log.Println(unum, ",id:", _id, umap)
|
|
|
- mgo.Mgo_Bidding.UpdateById(table, v["_id"], map[string]interface{}{
|
|
|
- "$set": umap,
|
|
|
- })
|
|
|
- }
|
|
|
- <-cc
|
|
|
- }(v)
|
|
|
- index++
|
|
|
- if index%1000 == 0 {
|
|
|
- log.Println("index:", index, ",页码:", i+1, ",_id:", _id)
|
|
|
- }
|
|
|
- sid = _id
|
|
|
- if sid >= eid {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func cityMarshal(data map[string]interface{}) map[string]string {
|
|
|
- buyer := qu.ObjToString(data["buyer"])
|
|
|
- bidarea := qu.ObjToString(data["area"])
|
|
|
- bidcity := qu.ObjToString(data["city"])
|
|
|
- biddistrict := qu.ObjToString(data["district"])
|
|
|
- rdata := make(map[string]string)
|
|
|
- tmp, _ := mgo.Mgo.FindOneByField("qyxy", `{"company_name":"`+buyer+`"}`, qyxyFields)
|
|
|
- if tmp == nil || (*tmp) == nil {
|
|
|
- return rdata
|
|
|
- }
|
|
|
- company_code := fmt.Sprint((*tmp)["company_code"])
|
|
|
- if len(company_code) > 5 {
|
|
|
- province_city_district, _ := mgo.Mgo.FindOne("address", `{"code":"`+company_code[:6]+`"}`)
|
|
|
- remarks := fmt.Sprint((*province_city_district)["Remarks"])
|
|
|
- if remarks == "" || remarks == "废除" || remarks == "已作废" {
|
|
|
- } else if province_city_district != nil && (*province_city_district) != nil {
|
|
|
- codeprovince := qu.ObjToString((*province_city_district)["province"])
|
|
|
- codecity := qu.ObjToString((*province_city_district)["city"])
|
|
|
- codedistrict := qu.ObjToString((*province_city_district)["district"])
|
|
|
- if bidarea == "" || bidarea == "全国" {
|
|
|
- if codeprovince != "" {
|
|
|
- rdata["area"] = codeprovince
|
|
|
- if codecity != "" && codecity != codeprovince {
|
|
|
- rdata["city"] = codecity
|
|
|
- if codedistrict != "" && codedistrict != codecity {
|
|
|
- rdata["district"] = codedistrict
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else if bidcity == "" && codecity != "" && bidarea == codeprovince {
|
|
|
- if codecity != bidarea {
|
|
|
- rdata["city"] = codecity
|
|
|
- if codedistrict != "" && codecity != codedistrict {
|
|
|
- rdata["district"] = codedistrict
|
|
|
- }
|
|
|
- }
|
|
|
- } else if biddistrict == "" && codedistrict != "" && bidarea == codeprovince && codecity == bidcity {
|
|
|
- rdata["district"] = codedistrict
|
|
|
- }
|
|
|
- return rdata
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- entprovince := qu.ObjToString((*tmp)["province"])
|
|
|
- entprovince = strings.TrimRight(entprovince, "省")
|
|
|
- entprovince = strings.TrimRight(entprovince, "市")
|
|
|
- entcity := qu.ObjToString((*tmp)["city"])
|
|
|
- entdistrict := qu.ObjToString((*tmp)["district"])
|
|
|
-
|
|
|
- //新增特殊处理-港澳台数据
|
|
|
- if bidarea == "" || bidarea=="香港" || bidarea=="澳门" || bidarea=="台湾" || bidarea == "全国" {
|
|
|
- if entprovince != "" {
|
|
|
- rdata["area"] = entprovince
|
|
|
- if entcity != "" && entcity != entprovince {
|
|
|
- rdata["city"] = entcity
|
|
|
- if entdistrict != "" && entdistrict != entcity {
|
|
|
- rdata["district"] = entdistrict
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else if bidcity == "" && entcity != "" && entprovince == bidarea {
|
|
|
- rdata["city"] = entcity
|
|
|
- if entdistrict != "" && entcity != entdistrict {
|
|
|
- rdata["district"] = entdistrict
|
|
|
- }
|
|
|
- } else if biddistrict == "" && entdistrict != "" && entprovince == bidarea && bidcity == entcity {
|
|
|
- rdata["district"] = entdistrict
|
|
|
- }
|
|
|
-
|
|
|
- return rdata
|
|
|
-}
|
|
|
-func main() {
|
|
|
- c := make(chan bool)
|
|
|
- <-c
|
|
|
-}
|