main.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. "qfw/util"
  9. "qfw/common/src/qfw/util/mongodb"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/garyburd/redigo/redis"
  15. hisRedis "github.com/go-redis/redis"
  16. "gopkg.in/mgo.v2/bson"
  17. )
  18. var (
  19. Config = make(map[string]string)
  20. Sysconfig map[string]interface{}
  21. Fields, BuyerFields, AgencyFields []string
  22. SourceClient, FClient *mongodb.MongodbSim
  23. RedisPool redis.Pool
  24. HisRedisPool *hisRedis.Client
  25. Addrs = make(map[string]interface{}, 0) //省市县
  26. udpclient mu.UdpClient //udp对象
  27. Reg_person = regexp.MustCompile("[\u4e00-\u9fa5]+")
  28. Reg_xing = regexp.MustCompile(`\*{1,}`)
  29. Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
  30. Updport int
  31. CPoolWinner, CPoolBuery, CPoolAgency chan bool
  32. //his_redis db
  33. redis_winner_db, redis_buyer_db, redis_agency_db int
  34. //异常表正则匹配处理
  35. WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuyerRegOk, BuyerRegErr []regexp.Regexp
  36. )
  37. /**
  38. 新增
  39. 初始化
  40. */
  41. func init() {
  42. log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
  43. util.ReadConfig(&Config)
  44. util.ReadConfig(&Sysconfig)
  45. log.Println(Sysconfig)
  46. var err error
  47. cpnum, err := strconv.Atoi(Config["chan_pool_num"])
  48. if err != nil {
  49. log.Fatalln(err)
  50. }
  51. CPoolWinner = make(chan bool, cpnum)
  52. CPoolBuery = make(chan bool, cpnum)
  53. CPoolAgency = make(chan bool, cpnum)
  54. Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
  55. "capital", "establish_date", "legal_person", "company_type",
  56. "district", "city", "province", "area_code", "credit_no",
  57. "company_name", "history_name", "wechat_accounts",
  58. "alias", "website", "report_websites", "industry"}
  59. BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
  60. "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
  61. "history_name", "wechat_accounts", "website", "report_websites"}
  62. AgencyFields = []string{"_id", "contact", "type", "ranks",
  63. "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
  64. "history_name", "wechat_accounts", "website", "report_websites"}
  65. initRdis()
  66. initMongo()
  67. initReg()
  68. Updport, err = strconv.Atoi(Config["port"])
  69. if err != nil{
  70. log.Fatalln(err)
  71. }
  72. }
  73. func main() {
  74. //udp
  75. updport := Config["udpport"]
  76. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  77. udpclient.Listen(processUdpMsg)
  78. log.Println("Udp服务监听", updport)
  79. go TimedTaskWinner() //定时任务
  80. go TimedTaskBuyer() //定时任务
  81. go TimedTaskAgency() //定时任务
  82. go checkMapJob()
  83. c := make(chan int, 1)
  84. <-c
  85. }
  86. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  87. log.Println(act, string(data), ra)
  88. switch act {
  89. case mu.OP_TYPE_DATA: //上个节点的数据
  90. //从表中开始处理生成企业数据
  91. tmp := new(map[string]interface{})
  92. err := json.Unmarshal(data, &tmp)
  93. if err != nil {
  94. log.Println("err:", err)
  95. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  96. return
  97. } else if tmp != nil {
  98. if key, ok := (*tmp)["key"].(string); ok {
  99. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  100. } else {
  101. udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  102. }
  103. //data_info:save//存量 data_info:add //增量
  104. //阻塞
  105. tmpstype,ok := (*tmp)["stype"].(string)
  106. if ok&& tmpstype ==""{
  107. CPoolWinner <- true
  108. go func(mapinfo *map[string]interface{}) {
  109. defer func() { <-CPoolWinner }()
  110. TaskWinner(mapinfo)
  111. }(tmp)
  112. CPoolBuery <- true
  113. go func(mapinfo *map[string]interface{}) {
  114. defer func() { <-CPoolBuery }()
  115. TaskBuyer(mapinfo)
  116. }(tmp)
  117. CPoolAgency <- true
  118. go func(mapinfo *map[string]interface{}) {
  119. defer func() { <-CPoolAgency }()
  120. TaskAgency(mapinfo)
  121. }(tmp)
  122. }else if tmpstype =="winner" {
  123. CPoolWinner <- true
  124. go func(mapinfo *map[string]interface{}) {
  125. defer func() { <-CPoolWinner }()
  126. TaskWinner(mapinfo)
  127. }(tmp)
  128. }else if tmpstype=="buyer"{
  129. CPoolBuery <- true
  130. go func(mapinfo *map[string]interface{}) {
  131. defer func() { <-CPoolBuery }()
  132. TaskBuyer(mapinfo)
  133. }(tmp)
  134. }else if tmpstype=="agency"{
  135. CPoolAgency <- true
  136. go func(mapinfo *map[string]interface{}) {
  137. defer func() { <-CPoolAgency }()
  138. TaskAgency(mapinfo)
  139. }(tmp)
  140. }
  141. }
  142. case mu.OP_NOOP: //下个节点回应
  143. log.Println("发送成功", string(data))
  144. }
  145. }
  146. func initRdis() {
  147. var err error
  148. //redis
  149. RedisPool = redis.Pool{
  150. MaxIdle: 50,
  151. IdleTimeout: 10 * time.Second,
  152. Dial: func() (redis.Conn, error) {
  153. conn, e := redis.Dial("tcp", Config["redis"])
  154. if e != nil {
  155. return conn, e
  156. }
  157. _, err = conn.Do("SELECT", "1")
  158. if err != nil {
  159. return nil, err
  160. }
  161. return conn, nil
  162. }}
  163. c := RedisPool.Get()
  164. if _, err := c.Do("PING"); err != nil {
  165. log.Fatalln("redis err:", err)
  166. }
  167. c.Close()
  168. HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
  169. Addr: Config["his_redis"],
  170. DB: 1,
  171. DialTimeout: 10 * time.Second,
  172. ReadTimeout: 30 * time.Second,
  173. WriteTimeout: 30 * time.Second,
  174. PoolSize: 30,
  175. MinIdleConns: 20,
  176. PoolTimeout: 30 * time.Second,
  177. })
  178. redis_winner_db, _ = strconv.Atoi(Config["redis_winner_db"])
  179. redis_buyer_db, _ = strconv.Atoi(Config["redis_buyer_db"])
  180. redis_agency_db, _ = strconv.Atoi(Config["redis_agency_db"])
  181. }
  182. func initMongo() {
  183. //mongo init
  184. pool_size, _ := strconv.Atoi(Config["pool_size"])
  185. SourceClient = new(mongodb.MongodbSim)
  186. SourceClient.MongodbAddr = Config["mgoinit"]
  187. SourceClient.Size = pool_size
  188. SourceClient.DbName = Config["mgodb_bidding"]
  189. SourceClient.InitPool()
  190. FClient = new(mongodb.MongodbSim)
  191. FClient.MongodbAddr = Config["mgourl"]
  192. FClient.Size = pool_size
  193. FClient.DbName = Config["mgodb_extract_kf"]
  194. FClient.InitPool()
  195. FClientmgoConn := FClient.GetMgoConn()
  196. defer FClient.DestoryMongoConn(FClientmgoConn)
  197. //加载省市县代码
  198. cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
  199. //defer FClient.Connect(cc)
  200. if cursor2 == nil {
  201. log.Fatalln(cursor2)
  202. }
  203. tmp := make(map[string]interface{})
  204. for cursor2.Next(&tmp) {
  205. code := tmp["code"]
  206. if code != nil && strings.TrimSpace(code.(string)) != "" {
  207. Addrs[fmt.Sprint(code)] = tmp
  208. }
  209. }
  210. log.Println(len(Addrs))
  211. }
  212. func initReg() {
  213. FClientmgoConnReg := FClient.GetMgoConn()
  214. defer FClient.DestoryMongoConn(FClientmgoConnReg)
  215. findReg, b := FClient.Find(Config["mgo_qyk_reg"], bson.M{"delete": false, "isuse": true}, bson.M{"_id": 1}, nil, false, -1, 0)
  216. if !b {
  217. log.Fatalln("查询正则失败")
  218. }
  219. for _, v := range *findReg {
  220. s_field, ok := v["s_field"].(string) //字段
  221. s_rule, ok2 := v["s_rule"].(string) //正则
  222. s_type, ok3 := v["s_type"].(string) //ok or err
  223. if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
  224. continue
  225. }
  226. var pattern string
  227. if strings.Contains(s_rule, "\\u") {
  228. s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
  229. s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
  230. pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
  231. } else {
  232. pattern = s_rule
  233. }
  234. regtmp := regexp.MustCompile(pattern)
  235. if s_field == "winner" {
  236. if s_type == "ok" {
  237. WinnerRegOk = append(WinnerRegOk, *regtmp)
  238. } else if s_type == "err" {
  239. WinnerRegErr = append(WinnerRegErr, *regtmp)
  240. }
  241. } else if s_field == "buyer" {
  242. if s_type == "ok" {
  243. BuyerRegOk = append(BuyerRegOk, *regtmp)
  244. } else if s_type == "err" {
  245. BuyerRegErr = append(BuyerRegErr, *regtmp)
  246. }
  247. } else if s_field == "agency" {
  248. if s_type == "ok" {
  249. AgencyRegOk = append(AgencyRegOk, *regtmp)
  250. } else if s_type == "err" {
  251. AgencyRegErr = append(AgencyRegErr, *regtmp)
  252. }
  253. }
  254. }
  255. log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuyerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
  256. }
  257. //通知下个节点nextNode
  258. func nextNode(stype string, updatatime int64) {
  259. to, _ := Sysconfig["nextNode"].(map[string]interface{})
  260. log.Println(stype, to)
  261. key := stype + "-" + fmt.Sprint(updatatime)
  262. by, _ := json.Marshal(map[string]interface{}{
  263. "query": map[string]interface{}{
  264. "updatatime": map[string]interface{}{
  265. "$gte": updatatime,
  266. },
  267. },
  268. "stype": stype,
  269. "key": key,
  270. })
  271. addr := &net.UDPAddr{
  272. IP: net.ParseIP(to["addr"].(string)),
  273. Port: util.IntAll(to["port"]),
  274. }
  275. node := &udpNode{by, addr, time.Now().Unix(), 0}
  276. udptaskmap.Store(key, node)
  277. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  278. }