main.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. "go.mongodb.org/mongo-driver/bson"
  7. es "gopkg.in/olivere/elastic.v1"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "qfw/common/src/qfw/util/elastic"
  12. "qfw/util"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. var (
  19. Config = make(map[string]string)
  20. Fields,BuyerFields,AgencyFields []string
  21. SourceClient, FClient *MongodbSim
  22. RedisPool redis.Pool
  23. Addrs = make(map[string]interface{}, 0) //省市县
  24. udpclient mu.UdpClient //udp对象
  25. ElasticClientIndex, ElasticClientType string
  26. Reg_xing = regexp.MustCompile(`\*{1,}`)
  27. Reg_person = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
  28. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  29. EsConn *es.Client
  30. Updport int
  31. )
  32. /**
  33. 新增
  34. 初始化
  35. */
  36. func init() {
  37. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  38. util.ReadConfig(&Config)
  39. log.Println(Config)
  40. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  41. "capital", "establish_date", "legal_person", "company_type",
  42. "district", "city", "province", "area_code", "credit_no",
  43. "company_name", "history_name", "topscopeclass", "wechat_accounts",
  44. "alias", "website", "report_websites"}
  45. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  46. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  47. "history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
  48. AgencyFields = []string{"_id", "contact", "type", "ranks",
  49. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  50. "history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
  51. var err error
  52. pool_size, _ := strconv.Atoi(Config["pool_size"])
  53. //mongo init
  54. SourceClient = new(MongodbSim)
  55. SourceClient.MongodbAddr = Config["mgoinit"]
  56. SourceClient.Size = pool_size
  57. //mongodbSim.DbName = "qfw"
  58. SourceClient.InitPool()
  59. SourceClientmgoConn := SourceClient.GetMgoConn()
  60. defer SourceClient.DestoryMongoConn(SourceClientmgoConn)
  61. FClient = new(MongodbSim)
  62. FClient.MongodbAddr = Config["mgourl"]
  63. FClient.Size = pool_size
  64. FClient.DbName = Config["mgodb_extract_kf"]
  65. //mongodbSim.DbName = "qfw"
  66. FClient.InitPool()
  67. FClientmgoConn := FClient.GetMgoConn()
  68. defer FClient.DestoryMongoConn(FClientmgoConn)
  69. //加载省市县代码
  70. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  71. //defer FClient.Connect(cc)
  72. if cursor2 == nil {
  73. log.Fatalln(cursor2)
  74. }
  75. tmp := make(map[string]interface{})
  76. for cursor2.Next(&tmp) {
  77. code := tmp["code"]
  78. if code != nil && strings.TrimSpace(code.(string)) != "" {
  79. Addrs[fmt.Sprint(code)] = tmp
  80. }
  81. }
  82. log.Println(len(Addrs))
  83. //es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  84. //es init
  85. elastic.InitElasticSize(Config["elasticsearch"], 50)
  86. EsConn = elastic.GetEsConn()
  87. defer elastic.DestoryEsConn(EsConn)
  88. //redis
  89. RedisPool = redis.Pool{
  90. MaxIdle: 50,
  91. IdleTimeout: 10 * time.Second,
  92. Dial: func() (redis.Conn, error) {
  93. conn, e := redis.Dial("tcp", Config["redis"])
  94. if e != nil {
  95. return conn, e
  96. }
  97. _, err = conn.Do("SELECT", "1")
  98. if err != nil {
  99. return nil, err
  100. }
  101. return conn, nil
  102. }}
  103. c := RedisPool.Get()
  104. if _, err := c.Do("PING"); err != nil {
  105. log.Fatalln("redis err:", err)
  106. }
  107. c.Close()
  108. }
  109. func main() {
  110. //udp
  111. updport := Config["udpport"]
  112. Updport, _ = strconv.Atoi(Config["port"])
  113. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  114. udpclient.Listen(processUdpMsg)
  115. log.Println("Udp服务监听", updport)
  116. log.Println("发送端口port:", Updport)
  117. go TimedTaskWinner() //定时任务
  118. c := make(chan int, 1)
  119. <-c
  120. }
  121. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  122. log.Println(act, string(data), ra)
  123. switch act {
  124. case mu.OP_TYPE_DATA: //上个节点的数据
  125. //从表中开始处理生成企业数据
  126. tmp := new(map[string]interface{})
  127. err := json.Unmarshal(data, &tmp)
  128. if err != nil {
  129. log.Println("err:", err)
  130. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  131. return
  132. } else if tmp != nil {
  133. if key, ok := (*tmp)["key"].(string); ok {
  134. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  135. } else {
  136. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  137. }
  138. //data_type:winner data_type:buyer data_type:agency
  139. //data_info:save//存量 data_info:add //增量
  140. if key, ok := (*tmp)["data_type"].(string); ok {
  141. if key == "winner" {
  142. go TaskWinner(tmp)
  143. } else if key == "buyer" {
  144. } else if key == "agency" {
  145. }
  146. }
  147. }
  148. case mu.OP_NOOP: //下个节点回应
  149. log.Println("发送成功", string(data))
  150. }
  151. }