main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. hisRedis "github.com/go-redis/redis"
  7. "go.mongodb.org/mongo-driver/bson"
  8. es "gopkg.in/olivere/elastic.v1"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/common/src/qfw/util/elastic"
  13. "qfw/util"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var (
  20. Config = make(map[string]string)
  21. Fields,BuyerFields,AgencyFields []string
  22. SourceClient, FClient *MongodbSim
  23. RedisPool redis.Pool
  24. HisRedisPool *hisRedis.Client
  25. Addrs = make(map[string]interface{}, 0) //省市县
  26. udpclient mu.UdpClient //udp对象
  27. ElasticClientIndex, ElasticClientType string
  28. Reg_xing = regexp.MustCompile(`\*{1,}`)
  29. Reg_person = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
  30. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  31. EsConn *es.Client
  32. Updport int
  33. )
  34. /**
  35. 新增
  36. 初始化
  37. */
  38. func init() {
  39. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  40. util.ReadConfig(&Config)
  41. log.Println(Config)
  42. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  43. "capital", "establish_date", "legal_person", "company_type",
  44. "district", "city", "province", "area_code", "credit_no",
  45. "company_name", "history_name", "topscopeclass", "wechat_accounts",
  46. "alias", "website", "report_websites"}
  47. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  48. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  49. "history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
  50. AgencyFields = []string{"_id", "contact", "type", "ranks",
  51. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  52. "history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
  53. var err error
  54. pool_size, _ := strconv.Atoi(Config["pool_size"])
  55. //mongo init
  56. SourceClient = new(MongodbSim)
  57. SourceClient.MongodbAddr = Config["mgoinit"]
  58. SourceClient.Size = pool_size
  59. //mongodbSim.DbName = "qfw"
  60. SourceClient.InitPool()
  61. SourceClientmgoConn := SourceClient.GetMgoConn()
  62. defer SourceClient.DestoryMongoConn(SourceClientmgoConn)
  63. FClient = new(MongodbSim)
  64. FClient.MongodbAddr = Config["mgourl"]
  65. FClient.Size = pool_size
  66. FClient.DbName = Config["mgodb_extract_kf"]
  67. //mongodbSim.DbName = "qfw"
  68. FClient.InitPool()
  69. FClientmgoConn := FClient.GetMgoConn()
  70. defer FClient.DestoryMongoConn(FClientmgoConn)
  71. //加载省市县代码
  72. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  73. //defer FClient.Connect(cc)
  74. if cursor2 == nil {
  75. log.Fatalln(cursor2)
  76. }
  77. tmp := make(map[string]interface{})
  78. for cursor2.Next(&tmp) {
  79. code := tmp["code"]
  80. if code != nil && strings.TrimSpace(code.(string)) != "" {
  81. Addrs[fmt.Sprint(code)] = tmp
  82. }
  83. }
  84. log.Println(len(Addrs))
  85. //es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  86. //es init
  87. elastic.InitElasticSize(Config["elasticsearch"], 50)
  88. EsConn = elastic.GetEsConn()
  89. defer elastic.DestoryEsConn(EsConn)
  90. //redis
  91. RedisPool = redis.Pool{
  92. MaxIdle: 50,
  93. IdleTimeout: 10 * time.Second,
  94. Dial: func() (redis.Conn, error) {
  95. conn, e := redis.Dial("tcp", Config["redis"])
  96. if e != nil {
  97. return conn, e
  98. }
  99. _, err = conn.Do("SELECT", "1")
  100. if err != nil {
  101. return nil, err
  102. }
  103. return conn, nil
  104. }}
  105. c := RedisPool.Get()
  106. if _, err := c.Do("PING"); err != nil {
  107. log.Fatalln("redis err:", err)
  108. }
  109. c.Close()
  110. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  111. Addr: "127.0.0.1:6380",
  112. DB: 1,
  113. DialTimeout: 10 * time.Second,
  114. ReadTimeout: 30 * time.Second,
  115. WriteTimeout: 30 * time.Second,
  116. PoolSize: 30,
  117. MinIdleConns: 20,
  118. PoolTimeout: 30 * time.Second,
  119. })
  120. }
  121. func main() {
  122. //udp
  123. updport := Config["udpport"]
  124. Updport, _ = strconv.Atoi(Config["port"])
  125. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  126. udpclient.Listen(processUdpMsg)
  127. log.Println("Udp服务监听", updport)
  128. log.Println("发送端口port:", Updport)
  129. go TimedTaskWinner() //定时任务
  130. c := make(chan int, 1)
  131. <-c
  132. }
  133. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  134. log.Println(act, string(data), ra)
  135. switch act {
  136. case mu.OP_TYPE_DATA: //上个节点的数据
  137. //从表中开始处理生成企业数据
  138. tmp := new(map[string]interface{})
  139. err := json.Unmarshal(data, &tmp)
  140. if err != nil {
  141. log.Println("err:", err)
  142. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  143. return
  144. } else if tmp != nil {
  145. if key, ok := (*tmp)["key"].(string); ok {
  146. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  147. } else {
  148. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  149. }
  150. //data_type:winner data_type:buyer data_type:agency
  151. //data_info:save//存量 data_info:add //增量
  152. if key, ok := (*tmp)["data_type"].(string); ok {
  153. if key == "winner" {
  154. go TaskWinner(tmp)
  155. } else if key == "buyer" {
  156. } else if key == "agency" {
  157. }
  158. }
  159. }
  160. case mu.OP_NOOP: //下个节点回应
  161. log.Println("发送成功", string(data))
  162. }
  163. }