main.go 7.1 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. hisRedis "github.com/go-redis/redis"
  7. "gopkg.in/mgo.v2/bson"
  8. es "gopkg.in/olivere/elastic.v1"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/common/src/qfw/util/elastic"
  13. "qfw/util"
  14. "qfw/util/mongodb"
  15. "regexp"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. var (
  21. Config = make(map[string]string)
  22. Fields, BuyerFields, AgencyFields []string
  23. SourceClient, FClient *mongodb.MongodbSim
  24. RedisPool redis.Pool
  25. HisRedisPool *hisRedis.Client
  26. Addrs = make(map[string]interface{}, 0) //省市县
  27. udpclient mu.UdpClient //udp对象
  28. ElasticClientIndex, ElasticClientType string
  29. Reg_xing = regexp.MustCompile(`\*{1,}`)
  30. Reg_person = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
  31. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  32. EsConn *es.Client
  33. Updport int
  34. CPool chan bool
  35. //his_redis db
  36. redis_winner_db, redis_buyer_db, redis_agency_db int
  37. //异常表正则匹配处理
  38. WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuerRegOk, BuyerRegErr []regexp.Regexp
  39. )
  40. /**
  41. 新增
  42. 初始化
  43. */
  44. func init() {
  45. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  46. util.ReadConfig(&Config)
  47. log.Println(Config)
  48. var err error
  49. cpnum, err := strconv.Atoi(Config["chan_pool_num"])
  50. if err != nil {
  51. log.Fatalln(err)
  52. }
  53. CPool = make(chan bool, cpnum)
  54. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  55. "capital", "establish_date", "legal_person", "company_type",
  56. "district", "city", "province", "area_code", "credit_no",
  57. "company_name", "history_name", "wechat_accounts",
  58. "alias", "website", "report_websites", "industry"}
  59. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  60. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  61. "history_name", "wechat_accounts", "website", "report_websites"}
  62. AgencyFields = []string{"_id", "contact", "type", "ranks",
  63. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  64. "history_name", "wechat_accounts", "website", "report_websites"}
  65. pool_size, _ := strconv.Atoi(Config["pool_size"])
  66. //mongo init
  67. SourceClient = new(mongodb.MongodbSim)
  68. SourceClient.MongodbAddr = Config["mgoinit"]
  69. SourceClient.Size = pool_size
  70. SourceClient.DbName = Config["mgodb_bidding"]
  71. //mongodbSim.DbName = "qfw"
  72. SourceClient.InitPool()
  73. FClient = new(mongodb.MongodbSim)
  74. FClient.MongodbAddr = Config["mgourl"]
  75. FClient.Size = pool_size
  76. FClient.DbName = Config["mgodb_extract_kf"]
  77. //mongodbSim.DbName = "qfw"
  78. FClient.InitPool()
  79. FClientmgoConn := FClient.GetMgoConn()
  80. defer FClient.DestoryMongoConn(FClientmgoConn)
  81. //加载省市县代码
  82. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  83. //defer FClient.Connect(cc)
  84. if cursor2 == nil {
  85. log.Fatalln(cursor2)
  86. }
  87. tmp := make(map[string]interface{})
  88. for cursor2.Next(&tmp) {
  89. code := tmp["code"]
  90. if code != nil && strings.TrimSpace(code.(string)) != "" {
  91. Addrs[fmt.Sprint(code)] = tmp
  92. }
  93. }
  94. log.Println(len(Addrs))
  95. //es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  96. //es init
  97. elastic.InitElasticSize(Config["elasticsearch"], 50)
  98. EsConn = elastic.GetEsConn()
  99. defer elastic.DestoryEsConn(EsConn)
  100. //redis
  101. RedisPool = redis.Pool{
  102. MaxIdle: 50,
  103. IdleTimeout: 10 * time.Second,
  104. Dial: func() (redis.Conn, error) {
  105. conn, e := redis.Dial("tcp", Config["redis"])
  106. if e != nil {
  107. return conn, e
  108. }
  109. _, err = conn.Do("SELECT", "1")
  110. if err != nil {
  111. return nil, err
  112. }
  113. return conn, nil
  114. }}
  115. c := RedisPool.Get()
  116. if _, err := c.Do("PING"); err != nil {
  117. log.Fatalln("redis err:", err)
  118. }
  119. c.Close()
  120. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  121. Addr: Config["his_redis"],
  122. DB: 1,
  123. DialTimeout: 10 * time.Second,
  124. ReadTimeout: 30 * time.Second,
  125. WriteTimeout: 30 * time.Second,
  126. PoolSize: 30,
  127. MinIdleConns: 20,
  128. PoolTimeout: 30 * time.Second,
  129. })
  130. redis_winner_db, _ = strconv.Atoi(Config["redis_winner_db"])
  131. redis_buyer_db, _ = strconv.Atoi(Config["redis_buyer_db"])
  132. redis_agency_db, _ = strconv.Atoi(Config["redis_agency_db"])
  133. iniReg()
  134. }
  135. func iniReg() {
  136. FClientmgoConnReg := FClient.GetMgoConn()
  137. defer FClient.DestoryMongoConn(FClientmgoConnReg)
  138. findReg, b := FClient.Find(Config["mgo_qyk_reg"], bson.M{"delete": false, "isuse": true}, bson.M{"_id": 1}, nil, false, -1, 0)
  139. if !b {
  140. log.Fatalln("查询正则失败")
  141. }
  142. for _, v := range (*findReg) {
  143. s_field, ok := v["s_field"].(string) //字段
  144. s_rule, ok2 := v["s_rule"].(string) //正则
  145. s_type, ok3 := v["s_type"].(string) //ok or err
  146. if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
  147. continue
  148. }
  149. regtmp := regexp.MustCompile(s_rule)
  150. if s_field == "winner" {
  151. if s_type=="ok"{
  152. WinnerRegOk = append(WinnerRegOk, *regtmp)
  153. }else if s_type=="err"{
  154. WinnerRegErr = append(WinnerRegErr, *regtmp)
  155. }
  156. } else if s_field == "buyer" {
  157. if s_type=="ok"{
  158. BuerRegOk = append(BuerRegOk, *regtmp)
  159. }else if s_type=="err"{
  160. BuyerRegErr = append(BuyerRegErr, *regtmp)
  161. }
  162. } else if s_field == "agency" {
  163. if s_type=="ok"{
  164. AgencyRegOk = append(AgencyRegOk, *regtmp)
  165. }else if s_type=="err"{
  166. AgencyRegErr = append(AgencyRegErr, *regtmp)
  167. }
  168. }
  169. }
  170. log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
  171. }
  172. func main() {
  173. //udp
  174. updport := Config["udpport"]
  175. Updport, _ = strconv.Atoi(Config["port"])
  176. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  177. udpclient.Listen(processUdpMsg)
  178. log.Println("Udp服务监听", updport)
  179. log.Println("发送端口port:", Updport)
  180. go TimedTaskWinner() //定时任务
  181. go TimedTaskBuyer() //定时任务
  182. go TimedTaskAgency() //定时任务
  183. c := make(chan int, 1)
  184. <-c
  185. }
  186. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  187. log.Println(act, string(data), ra)
  188. switch act {
  189. case mu.OP_TYPE_DATA: //上个节点的数据
  190. //从表中开始处理生成企业数据
  191. tmp := new(map[string]interface{})
  192. err := json.Unmarshal(data, &tmp)
  193. if err != nil {
  194. log.Println("err:", err)
  195. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  196. return
  197. } else if tmp != nil {
  198. if key, ok := (*tmp)["key"].(string); ok {
  199. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  200. } else {
  201. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  202. }
  203. //data_type:winner data_type:buyer data_type:agency
  204. //data_info:save//存量 data_info:add //增量
  205. if key, ok := (*tmp)["data_type"].(string); ok {
  206. //阻塞
  207. CPool <- true
  208. if key == "winner" {
  209. go TaskWinner(tmp)
  210. } else if key == "buyer" {
  211. go TaskBuyer(tmp)
  212. } else if key == "agency" {
  213. go TaskAgency(tmp)
  214. }
  215. }
  216. }
  217. case mu.OP_NOOP: //下个节点回应
  218. log.Println("发送成功", string(data))
  219. }
  220. }