123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- package main
- import (
- "encoding/json"
- "fmt"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- "qfw/common/src/qfw/util/mongodb"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/garyburd/redigo/redis"
- hisRedis "github.com/go-redis/redis"
- "gopkg.in/mgo.v2/bson"
- )
- var (
- Config = make(map[string]string)
- Sysconfig map[string]interface{}
- Fields, BuyerFields, AgencyFields []string
- SourceClient, FClient *mongodb.MongodbSim
- RedisPool redis.Pool
- HisRedisPool *hisRedis.Client
- Addrs = make(map[string]interface{}, 0) //省市县
- udpclient mu.UdpClient //udp对象
- Reg_person = regexp.MustCompile("[\u4e00-\u9fa5]+")
- Reg_xing = regexp.MustCompile(`\*{1,}`)
- Reg_tel = regexp.MustCompile(`^[0-9\-\s]*$`)
- Updport int
- CPoolWinner, CPoolBuery, CPoolAgency chan bool
- //his_redis db
- redis_winner_db, redis_buyer_db, redis_agency_db int
- //异常表正则匹配处理
- WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuyerRegOk, BuyerRegErr []regexp.Regexp
- )
- /**
- 新增
- 初始化
- */
- func init() {
- log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
- util.ReadConfig(&Config)
- util.ReadConfig(&Sysconfig)
- log.Println(Sysconfig)
- var err error
- cpnum, err := strconv.Atoi(Config["chan_pool_num"])
- if err != nil {
- log.Fatalln(err)
- }
- CPoolWinner = make(chan bool, cpnum)
- CPoolBuery = make(chan bool, cpnum)
- CPoolAgency = make(chan bool, cpnum)
- Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
- "capital", "establish_date", "legal_person", "company_type",
- "district", "city", "province", "area_code", "credit_no",
- "company_name", "history_name", "wechat_accounts",
- "alias", "website", "report_websites", "industry"}
- BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
- "address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
- "history_name", "wechat_accounts", "website", "report_websites"}
- AgencyFields = []string{"_id", "contact", "type", "ranks",
- "address", "district", "city", "province", "area_code", "credit_no", "agency_name",
- "history_name", "wechat_accounts", "website", "report_websites"}
- initRdis()
- initMongo()
- initReg()
- Updport, err = strconv.Atoi(Config["port"])
- if err != nil{
- log.Fatalln(err)
- }
- }
- func main() {
- //udp
- updport := Config["udpport"]
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- go TimedTaskWinner() //定时任务
- go TimedTaskBuyer() //定时任务
- go TimedTaskAgency() //定时任务
- go checkMapJob()
- c := make(chan int, 1)
- <-c
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- log.Println(act, string(data), ra)
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- tmp := new(map[string]interface{})
- err := json.Unmarshal(data, &tmp)
- if err != nil {
- log.Println("err:", err)
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- return
- } else if tmp != nil {
- if key, ok := (*tmp)["key"].(string); ok {
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- } else {
- udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
- }
- //data_info:save//存量 data_info:add //增量
- //阻塞
- tmpstype,ok := (*tmp)["stype"].(string)
- if ok&& tmpstype ==""{
- CPoolWinner <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolWinner }()
- TaskWinner(mapinfo)
- }(tmp)
- CPoolBuery <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolBuery }()
- TaskBuyer(mapinfo)
- }(tmp)
- CPoolAgency <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolAgency }()
- TaskAgency(mapinfo)
- }(tmp)
- }else if tmpstype =="winner" {
- CPoolWinner <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolWinner }()
- TaskWinner(mapinfo)
- }(tmp)
- }else if tmpstype=="buyer"{
- CPoolBuery <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolBuery }()
- TaskBuyer(mapinfo)
- }(tmp)
- }else if tmpstype=="agency"{
- CPoolAgency <- true
- go func(mapinfo *map[string]interface{}) {
- defer func() { <-CPoolAgency }()
- TaskAgency(mapinfo)
- }(tmp)
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("发送成功", string(data))
- }
- }
- func initRdis() {
- var err error
- //redis
- RedisPool = redis.Pool{
- MaxIdle: 50,
- IdleTimeout: 10 * time.Second,
- Dial: func() (redis.Conn, error) {
- conn, e := redis.Dial("tcp", Config["redis"])
- if e != nil {
- return conn, e
- }
- _, err = conn.Do("SELECT", "1")
- if err != nil {
- return nil, err
- }
- return conn, nil
- }}
- c := RedisPool.Get()
- if _, err := c.Do("PING"); err != nil {
- log.Fatalln("redis err:", err)
- }
- c.Close()
- HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
- Addr: Config["his_redis"],
- DB: 1,
- DialTimeout: 10 * time.Second,
- ReadTimeout: 30 * time.Second,
- WriteTimeout: 30 * time.Second,
- PoolSize: 30,
- MinIdleConns: 20,
- PoolTimeout: 30 * time.Second,
- })
- redis_winner_db, _ = strconv.Atoi(Config["redis_winner_db"])
- redis_buyer_db, _ = strconv.Atoi(Config["redis_buyer_db"])
- redis_agency_db, _ = strconv.Atoi(Config["redis_agency_db"])
- }
- func initMongo() {
- //mongo init
- pool_size, _ := strconv.Atoi(Config["pool_size"])
- SourceClient = new(mongodb.MongodbSim)
- SourceClient.MongodbAddr = Config["mgoinit"]
- SourceClient.Size = pool_size
- SourceClient.DbName = Config["mgodb_bidding"]
- SourceClient.InitPool()
- FClient = new(mongodb.MongodbSim)
- FClient.MongodbAddr = Config["mgourl"]
- FClient.Size = pool_size
- FClient.DbName = Config["mgodb_extract_kf"]
- FClient.InitPool()
- FClientmgoConn := FClient.GetMgoConn()
- defer FClient.DestoryMongoConn(FClientmgoConn)
- //加载省市县代码
- cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
- //defer FClient.Connect(cc)
- if cursor2 == nil {
- log.Fatalln(cursor2)
- }
- tmp := make(map[string]interface{})
- for cursor2.Next(&tmp) {
- code := tmp["code"]
- if code != nil && strings.TrimSpace(code.(string)) != "" {
- Addrs[fmt.Sprint(code)] = tmp
- }
- }
- log.Println(len(Addrs))
- }
- func initReg() {
- FClientmgoConnReg := FClient.GetMgoConn()
- defer FClient.DestoryMongoConn(FClientmgoConnReg)
- findReg, b := FClient.Find(Config["mgo_qyk_reg"], bson.M{"delete": false, "isuse": true}, bson.M{"_id": 1}, nil, false, -1, 0)
- if !b {
- log.Fatalln("查询正则失败")
- }
- for _, v := range *findReg {
- s_field, ok := v["s_field"].(string) //字段
- s_rule, ok2 := v["s_rule"].(string) //正则
- s_type, ok3 := v["s_type"].(string) //ok or err
- if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
- continue
- }
- var pattern string
- if strings.Contains(s_rule, "\\u") {
- s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
- s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
- pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
- } else {
- pattern = s_rule
- }
- regtmp := regexp.MustCompile(pattern)
- if s_field == "winner" {
- if s_type == "ok" {
- WinnerRegOk = append(WinnerRegOk, *regtmp)
- } else if s_type == "err" {
- WinnerRegErr = append(WinnerRegErr, *regtmp)
- }
- } else if s_field == "buyer" {
- if s_type == "ok" {
- BuyerRegOk = append(BuyerRegOk, *regtmp)
- } else if s_type == "err" {
- BuyerRegErr = append(BuyerRegErr, *regtmp)
- }
- } else if s_field == "agency" {
- if s_type == "ok" {
- AgencyRegOk = append(AgencyRegOk, *regtmp)
- } else if s_type == "err" {
- AgencyRegErr = append(AgencyRegErr, *regtmp)
- }
- }
- }
- log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuyerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
- }
- //通知下个节点nextNode
- func nextNode(stype string, updatatime int64) {
- to, _ := Sysconfig["nextNode"].(map[string]interface{})
- log.Println(stype, to)
- key := stype + "-" + fmt.Sprint(updatatime)
- by, _ := json.Marshal(map[string]interface{}{
- "query": map[string]interface{}{
- "updatatime": map[string]interface{}{
- "$gte": updatatime,
- },
- },
- "stype": stype,
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
|