package main import ( "encoding/json" "field_sync/config" "field_sync/oss" "fmt" "io/ioutil" "net" "net/http" "strings" "sync" "time" elastic "app.yhyue.com/moapp/jybase/es" "go.uber.org/zap" "gopkg.in/mgo.v2/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) var ( MgoB *mongodb.MongodbSim MgoE *mongodb.MongodbSim MgoQ *mongodb.MongodbSim // 企业 MgoP *mongodb.MongodbSim // 凭安企业 Es elastic.Es UdpClient udp.UdpClient UdpTaskMap = &sync.Map{} Mcmer *gonsq.Consumer MgoBulkSize = 200 // mgo批量保存大小 updateBidPool = make(chan []map[string]interface{}, 5000) updateBidSp = make(chan bool, 5) updateExtPool = make(chan []map[string]interface{}, 5000) updateExtSp = make(chan bool, 5) ) func init() { config.Init("./common.toml") oss.InitOss() InitFileInfo() InitLog() InitMgo() InitEs() inits() redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex) log.Info("init success") } func main() { go checkMapJob() go nsqMethod() go UpdateBidding() go UpdateExtract() UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) info, _ := MgoB.Find("bidding_processing_ids", `{"dataprocess": 6}`, bson.M{"_id": 1}, nil, false, -1, -1) util.Debug(len(*info)) log.Info("", zap.Int("size", len(*info))) if len(*info) > 0 { for i, m := range *info { mapInfo := make(map[string]interface{}) mapInfo["gtid"] = util.ObjToString(m["gtid"]) mapInfo["lteid"] = util.ObjToString(m["lteid"]) mapInfo["stype"] = "bidding" mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"])) log.Info(fmt.Sprint(i), zap.Any("--", mapInfo)) biddingTask(nil, mapInfo) } } ch := make(chan bool, 1) <-ch } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) if err != nil { UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) switch tasktype { case "bidding": pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "bidding_history": //增量id段历史数据 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "bidding_all": //id段存量数据 pool <- true go func() { defer func() { <-pool }() biddingAllTask(data, mapInfo) }() case "monitor": // default: pool <- true go func() { defer func() { <-pool }() log.Error("err", zap.Any("mapinfo", mapInfo)) }() } } case udp.OP_NOOP: ok := string(data) if ok != "" { log.Info("udp re", zap.String("data:", ok)) UdpTaskMap.Delete(ok) } } } type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func NextNode(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = stype key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype) mapInfo["key"] = key log.Info("udp es node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodePro(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Project.Addr), Port: util.IntAll(config.Conf.Udp.Project.Port), } if stype == "bidding_history" { mapInfo["stype"] = "project_history" } else { mapInfo["stype"] = "project" } key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) mapInfo["key"] = key log.Info("udp project node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeBidData(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = "biddingdata" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Info("udp es node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidbQyxy(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb.Addr), Port: util.IntAll(config.Conf.Udp.Tidb.Port), } mapInfo["stype"] = config.Conf.Udp.Tidb.Stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Info("udp tidb-qyxy node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidb(mapInfo map[string]interface{}, stype string) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb1.Addr), Port: util.IntAll(config.Conf.Udp.Tidb1.Port), } mapInfo["stype"] = stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Info("udp tidb-bidding node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } // NextNodeHn @Description 郑坤 海南数据处理 // @Author J 2022/10/28 09:26 func NextNodeHn(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb2.Addr), Port: util.IntAll(config.Conf.Udp.Tidb2.Port), } mapInfo["stype"] = "hainan" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Info("NextNodeTidb", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func UpdateBidding() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateBidPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateBidSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBidSp }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateBidSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateBidSp }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func UpdateExtract() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateExtPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func checkMapJob() { if config.Conf.Mail.Send { log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To)) for { UdpTaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.timestamp > 120 { node.retry++ if node.retry > 5 { UdpTaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err)) } } else { log.Info("udp重发", zap.Any("k:", k)) //UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr) } } else if now-node.timestamp > 10 { log.Info("udp任务超时中..", zap.Any("k:", k)) } return true }) time.Sleep(60 * time.Second) } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func nsqMethod() { var err error Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: config.Conf.Nsq.Addr, ConnectType: 0, //默认连接nsqd Topic: config.Conf.Nsq.Topic, Channel: config.Conf.Nsq.Channel, Concurrent: config.Conf.Nsq.Concurrent, //并发数 }) if err != nil { log.Error("nsqMethod err", zap.Error(err)) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 id := strings.Split(util.ObjToString(obj), "=") if bson.IsObjectIdHex(id[1]) { taskinfo(id[1]) } else { log.Info("jy nsq id err", zap.String("id", id[1])) } } } }