main.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. hisRedis "github.com/go-redis/redis"
  7. "go.mongodb.org/mongo-driver/bson"
  8. es "gopkg.in/olivere/elastic.v1"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/common/src/qfw/util/elastic"
  13. "qfw/util"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var (
  20. Config = make(map[string]string)
  21. Fields, BuyerFields, AgencyFields []string
  22. SourceClient, FClient *MongodbSim
  23. RedisPool redis.Pool
  24. HisRedisPool *hisRedis.Client
  25. Addrs = make(map[string]interface{}, 0) //省市县
  26. udpclient mu.UdpClient //udp对象
  27. ElasticClientIndex, ElasticClientType string
  28. Reg_xing = regexp.MustCompile(`\*{1,}`)
  29. Reg_person = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
  30. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  31. EsConn *es.Client
  32. Updport int
  33. CPool chan bool
  34. )
  35. /**
  36. 新增
  37. 初始化
  38. */
  39. func init() {
  40. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  41. util.ReadConfig(&Config)
  42. log.Println(Config)
  43. var err error
  44. cpnum, err := strconv.Atoi(Config["chan_pool_num"])
  45. if err != nil{
  46. log.Fatalln(err)
  47. }
  48. CPool = make(chan bool,cpnum)
  49. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  50. "capital", "establish_date", "legal_person", "company_type",
  51. "district", "city", "province", "area_code", "credit_no",
  52. "company_name", "history_name", "topscopeclass", "wechat_accounts",
  53. "alias", "website", "report_websites"}
  54. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  55. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  56. "history_name", "wechat_accounts", "website", "report_websites"}
  57. AgencyFields = []string{"_id", "contact", "type", "ranks",
  58. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  59. "history_name", "wechat_accounts", "website", "report_websites"}
  60. pool_size, _ := strconv.Atoi(Config["pool_size"])
  61. //mongo init
  62. SourceClient = new(MongodbSim)
  63. SourceClient.MongodbAddr = Config["mgoinit"]
  64. SourceClient.Size = pool_size
  65. //mongodbSim.DbName = "qfw"
  66. SourceClient.InitPool()
  67. SourceClientmgoConn := SourceClient.GetMgoConn()
  68. defer SourceClient.DestoryMongoConn(SourceClientmgoConn)
  69. FClient = new(MongodbSim)
  70. FClient.MongodbAddr = Config["mgourl"]
  71. FClient.Size = pool_size
  72. FClient.DbName = Config["mgodb_extract_kf"]
  73. //mongodbSim.DbName = "qfw"
  74. FClient.InitPool()
  75. FClientmgoConn := FClient.GetMgoConn()
  76. defer FClient.DestoryMongoConn(FClientmgoConn)
  77. //加载省市县代码
  78. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  79. //defer FClient.Connect(cc)
  80. if cursor2 == nil {
  81. log.Fatalln(cursor2)
  82. }
  83. tmp := make(map[string]interface{})
  84. for cursor2.Next(&tmp) {
  85. code := tmp["code"]
  86. if code != nil && strings.TrimSpace(code.(string)) != "" {
  87. Addrs[fmt.Sprint(code)] = tmp
  88. }
  89. }
  90. log.Println(len(Addrs))
  91. //es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  92. //es init
  93. elastic.InitElasticSize(Config["elasticsearch"], 50)
  94. EsConn = elastic.GetEsConn()
  95. defer elastic.DestoryEsConn(EsConn)
  96. //redis
  97. RedisPool = redis.Pool{
  98. MaxIdle: 50,
  99. IdleTimeout: 10 * time.Second,
  100. Dial: func() (redis.Conn, error) {
  101. conn, e := redis.Dial("tcp", Config["redis"])
  102. if e != nil {
  103. return conn, e
  104. }
  105. _, err = conn.Do("SELECT", "1")
  106. if err != nil {
  107. return nil, err
  108. }
  109. return conn, nil
  110. }}
  111. c := RedisPool.Get()
  112. if _, err := c.Do("PING"); err != nil {
  113. log.Fatalln("redis err:", err)
  114. }
  115. c.Close()
  116. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  117. Addr: "127.0.0.1:6380",
  118. DB: 1,
  119. DialTimeout: 10 * time.Second,
  120. ReadTimeout: 30 * time.Second,
  121. WriteTimeout: 30 * time.Second,
  122. PoolSize: 30,
  123. MinIdleConns: 20,
  124. PoolTimeout: 30 * time.Second,
  125. })
  126. }
  127. func main() {
  128. //udp
  129. updport := Config["udpport"]
  130. Updport, _ = strconv.Atoi(Config["port"])
  131. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  132. udpclient.Listen(processUdpMsg)
  133. log.Println("Udp服务监听", updport)
  134. log.Println("发送端口port:", Updport)
  135. go TimedTaskWinner() //定时任务
  136. go TimedTaskBuyer() //定时任务
  137. go TimedTaskAgency() //定时任务
  138. c := make(chan int, 1)
  139. <-c
  140. }
  141. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  142. log.Println(act, string(data), ra)
  143. switch act {
  144. case mu.OP_TYPE_DATA: //上个节点的数据
  145. //从表中开始处理生成企业数据
  146. tmp := new(map[string]interface{})
  147. err := json.Unmarshal(data, &tmp)
  148. if err != nil {
  149. log.Println("err:", err)
  150. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  151. return
  152. } else if tmp != nil {
  153. if key, ok := (*tmp)["key"].(string); ok {
  154. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  155. } else {
  156. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  157. }
  158. //data_type:winner data_type:buyer data_type:agency
  159. //data_info:save//存量 data_info:add //增量
  160. if key, ok := (*tmp)["data_type"].(string); ok {
  161. //阻塞
  162. CPool <- true
  163. if key == "winner" {
  164. go TaskWinner(tmp)
  165. } else if key == "buyer" {
  166. go TaskBuyer(tmp)
  167. } else if key == "agency" {
  168. go TaskAgency(tmp)
  169. }
  170. }
  171. }
  172. case mu.OP_NOOP: //下个节点回应
  173. log.Println("发送成功", string(data))
  174. }
  175. }