main.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. hisRedis "github.com/go-redis/redis"
  7. "gopkg.in/mgo.v2/bson"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "qfw/util"
  12. "qfw/util/mongodb"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. var (
  19. Config = make(map[string]string)
  20. Fields, BuyerFields, AgencyFields []string
  21. SourceClient, FClient *mongodb.MongodbSim
  22. RedisPool redis.Pool
  23. HisRedisPool *hisRedis.Client
  24. Addrs = make(map[string]interface{}, 0) //省市县
  25. udpclient mu.UdpClient //udp对象
  26. Reg_person = regexp.MustCompile("[\u4e00-\u9fa5]+")
  27. Reg_xing = regexp.MustCompile(`\*{1,}`)
  28. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  29. Updport int
  30. CPoolWinner, CPoolBuery, CPoolAgency chan bool
  31. //his_redis db
  32. redis_winner_db, redis_buyer_db, redis_agency_db int
  33. //异常表正则匹配处理
  34. WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuerRegOk, BuyerRegErr []regexp.Regexp
  35. )
  36. /**
  37. 新增
  38. 初始化
  39. */
  40. func init() {
  41. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  42. util.ReadConfig(&Config)
  43. log.Println(Config)
  44. var err error
  45. cpnum, err := strconv.Atoi(Config["chan_pool_num"])
  46. if err != nil {
  47. log.Fatalln(err)
  48. }
  49. CPoolWinner = make(chan bool, cpnum)
  50. CPoolBuery = make(chan bool, cpnum)
  51. CPoolAgency = make(chan bool, cpnum)
  52. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  53. "capital", "establish_date", "legal_person", "company_type",
  54. "district", "city", "province", "area_code", "credit_no",
  55. "company_name", "history_name", "wechat_accounts",
  56. "alias", "website", "report_websites", "industry"}
  57. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  58. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  59. "history_name", "wechat_accounts", "website", "report_websites"}
  60. AgencyFields = []string{"_id", "contact", "type", "ranks",
  61. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  62. "history_name", "wechat_accounts", "website", "report_websites"}
  63. initRdis()
  64. initMongo()
  65. initReg()
  66. }
  67. func main() {
  68. //udp
  69. updport := Config["udpport"]
  70. Updport, _ = strconv.Atoi(Config["port"])
  71. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  72. udpclient.Listen(processUdpMsg)
  73. log.Println("Udp服务监听", updport)
  74. log.Println("发送端口port:", Updport)
  75. go TimedTaskWinner() //定时任务
  76. //go TimedTaskBuyer() //定时任务
  77. //go TimedTaskAgency() //定时任务
  78. c := make(chan int, 1)
  79. <-c
  80. }
  81. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  82. log.Println(act, string(data), ra)
  83. switch act {
  84. case mu.OP_TYPE_DATA: //上个节点的数据
  85. //从表中开始处理生成企业数据
  86. tmp := new(map[string]interface{})
  87. err := json.Unmarshal(data, &tmp)
  88. if err != nil {
  89. log.Println("err:", err)
  90. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  91. return
  92. } else if tmp != nil {
  93. if key, ok := (*tmp)["key"].(string); ok {
  94. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  95. } else {
  96. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  97. }
  98. //data_info:save//存量 data_info:add //增量
  99. //阻塞
  100. CPoolWinner <- true
  101. go func(mapinfo *map[string]interface{}) {
  102. defer func() { <-CPoolWinner }()
  103. TaskWinner(mapinfo)
  104. }(tmp)
  105. CPoolBuery <- true
  106. go func(mapinfo *map[string]interface{}) {
  107. defer func() { <-CPoolBuery }()
  108. TaskBuyer(mapinfo)
  109. }(tmp)
  110. CPoolAgency <- true
  111. go func(mapinfo *map[string]interface{}) {
  112. defer func() { <-CPoolAgency }()
  113. TaskAgency(mapinfo)
  114. }(tmp)
  115. }
  116. case mu.OP_NOOP: //下个节点回应
  117. log.Println("发送成功", string(data))
  118. }
  119. }
  120. func initRdis() {
  121. var err error
  122. //redis
  123. RedisPool = redis.Pool{
  124. MaxIdle: 50,
  125. IdleTimeout: 10 * time.Second,
  126. Dial: func() (redis.Conn, error) {
  127. conn, e := redis.Dial("tcp", Config["redis"])
  128. if e != nil {
  129. return conn, e
  130. }
  131. _, err = conn.Do("SELECT", "1")
  132. if err != nil {
  133. return nil, err
  134. }
  135. return conn, nil
  136. }}
  137. c := RedisPool.Get()
  138. if _, err := c.Do("PING"); err != nil {
  139. log.Fatalln("redis err:", err)
  140. }
  141. c.Close()
  142. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  143. Addr: Config["his_redis"],
  144. DB: 1,
  145. DialTimeout: 10 * time.Second,
  146. ReadTimeout: 30 * time.Second,
  147. WriteTimeout: 30 * time.Second,
  148. PoolSize: 30,
  149. MinIdleConns: 20,
  150. PoolTimeout: 30 * time.Second,
  151. })
  152. redis_winner_db, _ = strconv.Atoi(Config["redis_winner_db"])
  153. redis_buyer_db, _ = strconv.Atoi(Config["redis_buyer_db"])
  154. redis_agency_db, _ = strconv.Atoi(Config["redis_agency_db"])
  155. }
  156. func initMongo() {
  157. //mongo init
  158. pool_size, _ := strconv.Atoi(Config["pool_size"])
  159. SourceClient = new(mongodb.MongodbSim)
  160. SourceClient.MongodbAddr = Config["mgoinit"]
  161. SourceClient.Size = pool_size
  162. SourceClient.DbName = Config["mgodb_bidding"]
  163. SourceClient.InitPool()
  164. FClient = new(mongodb.MongodbSim)
  165. FClient.MongodbAddr = Config["mgourl"]
  166. FClient.Size = pool_size
  167. FClient.DbName = Config["mgodb_extract_kf"]
  168. FClient.InitPool()
  169. FClientmgoConn := FClient.GetMgoConn()
  170. defer FClient.DestoryMongoConn(FClientmgoConn)
  171. //加载省市县代码
  172. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  173. //defer FClient.Connect(cc)
  174. if cursor2 == nil {
  175. log.Fatalln(cursor2)
  176. }
  177. tmp := make(map[string]interface{})
  178. for cursor2.Next(&tmp) {
  179. code := tmp["code"]
  180. if code != nil && strings.TrimSpace(code.(string)) != "" {
  181. Addrs[fmt.Sprint(code)] = tmp
  182. }
  183. }
  184. log.Println(len(Addrs))
  185. }
  186. func initReg() {
  187. FClientmgoConnReg := FClient.GetMgoConn()
  188. defer FClient.DestoryMongoConn(FClientmgoConnReg)
  189. findReg, b := FClient.Find(Config["mgo_qyk_reg"], bson.M{"delete": false, "isuse": true}, bson.M{"_id": 1}, nil, false, -1, 0)
  190. if !b {
  191. log.Fatalln("查询正则失败")
  192. }
  193. for _, v := range (*findReg) {
  194. s_field, ok := v["s_field"].(string) //字段
  195. s_rule, ok2 := v["s_rule"].(string) //正则
  196. s_type, ok3 := v["s_type"].(string) //ok or err
  197. if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
  198. continue
  199. }
  200. var pattern string
  201. if strings.Contains(s_rule, "\\u") {
  202. s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
  203. s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
  204. pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
  205. } else {
  206. pattern = s_rule
  207. }
  208. regtmp := regexp.MustCompile(pattern)
  209. if s_field == "winner" {
  210. if s_type == "ok" {
  211. WinnerRegOk = append(WinnerRegOk, *regtmp)
  212. } else if s_type == "err" {
  213. WinnerRegErr = append(WinnerRegErr, *regtmp)
  214. }
  215. } else if s_field == "buyer" {
  216. if s_type == "ok" {
  217. BuerRegOk = append(BuerRegOk, *regtmp)
  218. } else if s_type == "err" {
  219. BuyerRegErr = append(BuyerRegErr, *regtmp)
  220. }
  221. } else if s_field == "agency" {
  222. if s_type == "ok" {
  223. AgencyRegOk = append(AgencyRegOk, *regtmp)
  224. } else if s_type == "err" {
  225. AgencyRegErr = append(AgencyRegErr, *regtmp)
  226. }
  227. }
  228. }
  229. log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
  230. }