package main import ( "encoding/json" "field_sync/config" "field_sync/oss" "fmt" "io/ioutil" "net" "net/http" "strings" "sync" "time" "log" elastic "app.yhyue.com/moapp/jybase/es" "go.uber.org/zap" "gopkg.in/mgo.v2/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) var ( MgoB *mongodb.MongodbSim MgoE *mongodb.MongodbSim MgoQ *mongodb.MongodbSim // 企业 MgoP *mongodb.MongodbSim // 凭安企业 Es elastic.Es UdpClient udp.UdpClient UdpTaskMap = &sync.Map{} Mcmer *gonsq.Consumer MgoBulkSize = 200 // mgo批量保存大小 updateBidPool = make(chan []map[string]interface{}, 5000) updateBidSp = make(chan bool, 5) updateExtPool = make(chan []map[string]interface{}, 5000) updateExtSp = make(chan bool, 5) ) func init() { config.Init("./common.toml") oss.InitOss() InitFileInfo() // InitLog() InitMgo() InitEs() inits() redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex) log.Println("init success") } func main() { go checkMapJob() go nsqMethod() go UpdateBidding() go UpdateExtract() UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Println("Udp服务监听 port:", config.Conf.Udp.LocPort) info, _ := MgoB.Find("bidding_processing_ids", `{"dataprocess": 6}`, bson.M{"_id": 1}, nil, false, -1, -1) log.Println(len(*info)) log.Println("size", len(*info)) if len(*info) > 0 { for _, m := range *info { mapInfo := make(map[string]interface{}) mapInfo["gtid"] = util.ObjToString(m["gtid"]) mapInfo["lteid"] = util.ObjToString(m["lteid"]) mapInfo["stype"] = "bidding" mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"])) log.Println("--", mapInfo) biddingTask(nil, mapInfo) } } ch := make(chan bool, 1) <-ch } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("processUdpMsg mapInfo:", mapInfo) if err != nil { UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) switch tasktype { case "bidding": pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "bidding_history": //增量id段历史数据 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "bidding_all": //id段存量数据 pool <- true go func() { defer func() { <-pool }() biddingAllTask(data, mapInfo) }() case "monitor": // default: pool <- true go func() { defer func() { <-pool }() log.Println("err mapinfo ", mapInfo) }() } } case udp.OP_NOOP: ok := string(data) if ok != "" { log.Println("udp re data:", ok) UdpTaskMap.Delete(ok) } } } type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func NextNode(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = stype key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype) mapInfo["key"] = key log.Println("udp es node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodePro(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Project.Addr), Port: util.IntAll(config.Conf.Udp.Project.Port), } if stype == "bidding_history" { mapInfo["stype"] = "project_history" } else { mapInfo["stype"] = "project" } key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) mapInfo["key"] = key log.Println("udp project node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeBidData(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = "biddingdata" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp es node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidbQyxy(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb.Addr), Port: util.IntAll(config.Conf.Udp.Tidb.Port), } mapInfo["stype"] = config.Conf.Udp.Tidb.Stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp tidb-qyxy node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidb(mapInfo map[string]interface{}, stype string) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb1.Addr), Port: util.IntAll(config.Conf.Udp.Tidb1.Port), } mapInfo["stype"] = stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp tidb-bidding node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } // NextNodeHn @Description 郑坤 海南数据处理 // @Author J 2022/10/28 09:26 func NextNodeHn(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb2.Addr), Port: util.IntAll(config.Conf.Udp.Tidb2.Port), } mapInfo["stype"] = "hainan" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("NextNodeTidb mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func UpdateBidding() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateBidPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateBidSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBidSp }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateBidSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBidSp }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func UpdateExtract() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateExtPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func checkMapJob() { if config.Conf.Mail.Send { log.Println("checkMapJob to:", config.Conf.Mail.To) for { UdpTaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.timestamp > 120 { node.retry++ if node.retry > 5 { UdpTaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Println("send mail ... r:", string(read), "err:", err) } } else { log.Println("udp重发", zap.Any("k:", k)) //UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr) } } else if now-node.timestamp > 10 { log.Println("udp任务超时中.. k:", k) } return true }) time.Sleep(60 * time.Second) } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func nsqMethod() { var err error Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: config.Conf.Nsq.Addr, ConnectType: 0, //默认连接nsqd Topic: config.Conf.Nsq.Topic, Channel: config.Conf.Nsq.Channel, Concurrent: config.Conf.Nsq.Concurrent, //并发数 }) if err != nil { log.Println("nsqMethod err", err) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 id := strings.Split(util.ObjToString(obj), "=") if bson.IsObjectIdHex(id[1]) { taskinfo(id[1]) } else { log.Println("jy nsq id err id", id[1]) } } } }