main.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. hisRedis "github.com/go-redis/redis"
  7. "gopkg.in/mgo.v2/bson"
  8. es "gopkg.in/olivere/elastic.v1"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/common/src/qfw/util/elastic"
  13. "qfw/util"
  14. "qfw/util/mongodb"
  15. "regexp"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. var (
  21. Config = make(map[string]string)
  22. Fields, BuyerFields, AgencyFields []string
  23. SourceClient, FClient *mongodb.MongodbSim
  24. RedisPool redis.Pool
  25. HisRedisPool *hisRedis.Client
  26. Addrs = make(map[string]interface{}, 0) //省市县
  27. udpclient mu.UdpClient //udp对象
  28. ElasticClientIndex, ElasticClientType string
  29. Reg_xing = regexp.MustCompile(`\*{1,}`)
  30. Reg_person = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
  31. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  32. EsConn *es.Client
  33. Updport int
  34. CPool chan bool
  35. )
  36. /**
  37. 新增
  38. 初始化
  39. */
  40. func init() {
  41. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  42. util.ReadConfig(&Config)
  43. log.Println(Config)
  44. var err error
  45. cpnum, err := strconv.Atoi(Config["chan_pool_num"])
  46. if err != nil {
  47. log.Fatalln(err)
  48. }
  49. CPool = make(chan bool, cpnum)
  50. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  51. "capital", "establish_date", "legal_person", "company_type",
  52. "district", "city", "province", "area_code", "credit_no",
  53. "company_name", "history_name", "wechat_accounts",
  54. "alias", "website", "report_websites","industry"}
  55. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  56. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  57. "history_name", "wechat_accounts", "website", "report_websites"}
  58. AgencyFields = []string{"_id", "contact", "type", "ranks",
  59. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  60. "history_name", "wechat_accounts", "website", "report_websites"}
  61. pool_size, _ := strconv.Atoi(Config["pool_size"])
  62. //mongo init
  63. SourceClient = new(mongodb.MongodbSim)
  64. SourceClient.MongodbAddr = Config["mgoinit"]
  65. SourceClient.Size = pool_size
  66. SourceClient.DbName = Config["mgodb_bidding"]
  67. //mongodbSim.DbName = "qfw"
  68. SourceClient.InitPool()
  69. FClient = new(mongodb.MongodbSim)
  70. FClient.MongodbAddr = Config["mgourl"]
  71. FClient.Size = pool_size
  72. FClient.DbName = Config["mgodb_extract_kf"]
  73. //mongodbSim.DbName = "qfw"
  74. FClient.InitPool()
  75. FClientmgoConn := FClient.GetMgoConn(86400)
  76. defer FClient.DestoryMongoConn(FClientmgoConn)
  77. //加载省市县代码
  78. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  79. //defer FClient.Connect(cc)
  80. if cursor2 == nil {
  81. log.Fatalln(cursor2)
  82. }
  83. tmp := make(map[string]interface{})
  84. for cursor2.Next(&tmp) {
  85. code := tmp["code"]
  86. if code != nil && strings.TrimSpace(code.(string)) != "" {
  87. Addrs[fmt.Sprint(code)] = tmp
  88. }
  89. }
  90. log.Println(len(Addrs))
  91. //es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  92. //es init
  93. elastic.InitElasticSize(Config["elasticsearch"], 50)
  94. EsConn = elastic.GetEsConn()
  95. defer elastic.DestoryEsConn(EsConn)
  96. //redis
  97. RedisPool = redis.Pool{
  98. MaxIdle: 50,
  99. IdleTimeout: 10 * time.Second,
  100. Dial: func() (redis.Conn, error) {
  101. conn, e := redis.Dial("tcp", Config["redis"])
  102. if e != nil {
  103. return conn, e
  104. }
  105. _, err = conn.Do("SELECT", "1")
  106. if err != nil {
  107. return nil, err
  108. }
  109. return conn, nil
  110. }}
  111. c := RedisPool.Get()
  112. if _, err := c.Do("PING"); err != nil {
  113. log.Fatalln("redis err:", err)
  114. }
  115. c.Close()
  116. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  117. Addr: "127.0.0.1:6380",
  118. DB: 1,
  119. DialTimeout: 10 * time.Second,
  120. ReadTimeout: 30 * time.Second,
  121. WriteTimeout: 30 * time.Second,
  122. PoolSize: 30,
  123. MinIdleConns: 20,
  124. PoolTimeout: 30 * time.Second,
  125. })
  126. }
  127. func main() {
  128. //udp
  129. updport := Config["udpport"]
  130. Updport, _ = strconv.Atoi(Config["port"])
  131. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  132. udpclient.Listen(processUdpMsg)
  133. log.Println("Udp服务监听", updport)
  134. log.Println("发送端口port:", Updport)
  135. go TimedTaskWinner() //定时任务
  136. //go TimedTaskBuyer() //定时任务
  137. //go TimedTaskAgency() //定时任务
  138. c := make(chan int, 1)
  139. <-c
  140. }
  141. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  142. log.Println(act, string(data), ra)
  143. switch act {
  144. case mu.OP_TYPE_DATA: //上个节点的数据
  145. //从表中开始处理生成企业数据
  146. tmp := new(map[string]interface{})
  147. err := json.Unmarshal(data, &tmp)
  148. if err != nil {
  149. log.Println("err:", err)
  150. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  151. return
  152. } else if tmp != nil {
  153. if key, ok := (*tmp)["key"].(string); ok {
  154. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  155. } else {
  156. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  157. }
  158. //data_type:winner data_type:buyer data_type:agency
  159. //data_info:save//存量 data_info:add //增量
  160. if key, ok := (*tmp)["data_type"].(string); ok {
  161. //阻塞
  162. CPool <- true
  163. if key == "winner" {
  164. go TaskWinner(tmp)
  165. } else if key == "buyer" {
  166. //go TaskBuyer(tmp)
  167. } else if key == "agency" {
  168. //go TaskAgency(tmp)
  169. }
  170. }
  171. }
  172. case mu.OP_NOOP: //下个节点回应
  173. log.Println("发送成功", string(data))
  174. }
  175. }