package main import ( "encoding/json" "field_sync/config" "field_sync/oss" "fmt" "io/ioutil" "net" "net/http" "strings" "sync" "time" "log" elastic "app.yhyue.com/moapp/jybase/es" "go.uber.org/zap" "gopkg.in/mgo.v2/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" ) var ( MgoB, MgoBP *mongodb.MongodbSim MgoE *mongodb.MongodbSim MgoQ *mongodb.MongodbSim // 企业 MgoP *mongodb.MongodbSim // 凭安企业 Es elastic.Es UdpClient udp.UdpClient UdpTaskMap = &sync.Map{} Mcmer *gonsq.Consumer MgoBulkSize = 200 // mgo批量保存大小 updateBidPool = make(chan []map[string]interface{}, 5000) updateBidSp = make(chan bool, 5) updateExtPool = make(chan []map[string]interface{}, 5000) updateExtSp = make(chan bool, 5) ) func init() { config.Init("./common.toml") oss.InitOss() InitFileInfo() // InitLog() InitMgo() // InitEs() inits() redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex) log.Println("init success") } func main() { // go checkMapJob() // go nsqMethod() // go UpdateBidding() // go UpdateExtract() // UdpClient = udp.UdpClient{Local: ":11876", BufSize: 1024} // UdpClient.Listen(processUdpMsg) // log.Println("Udp服务监听 port:", ":11876") // info, _ := MgoBP.Find("bidding_processing_ids", `{"dataprocess_ai": 5}`, bson.M{"_id": 1}, nil, false, -1, -1) // log.Println(len(*info)) // log.Println("size", len(*info)) // if len(*info) > 0 { // for _, m := range *info { // mapInfo := make(map[string]interface{}) // mapInfo["gtid"] = util.ObjToString(m["gtid"]) // mapInfo["lteid"] = util.ObjToString(m["lteid"]) // mapInfo["stype"] = "bidding" // mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"])) // log.Println("--", mapInfo) biddingAllTask() // } // } // ch := make(chan bool, 1) // <-ch } var pool = make(chan bool, 20) type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func NextNode(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = stype key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype) mapInfo["key"] = key log.Println("udp es node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodePro(mapInfo map[string]interface{}, stype string) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Project.Addr), Port: util.IntAll(config.Conf.Udp.Project.Port), } if stype == "bidding_history" { mapInfo["stype"] = "project_history" } else { mapInfo["stype"] = "project" } key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) mapInfo["key"] = key log.Println("udp project node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeBidData(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = "biddingdata" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp es node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidbQyxy(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb.Addr), Port: util.IntAll(config.Conf.Udp.Tidb.Port), } mapInfo["stype"] = config.Conf.Udp.Tidb.Stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp tidb-qyxy node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func NextNodeTidb(mapInfo map[string]interface{}, stype string) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb1.Addr), Port: util.IntAll(config.Conf.Udp.Tidb1.Port), } mapInfo["stype"] = stype mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("udp tidb-bidding node mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } // NextNodeHn @Description 郑坤 海南数据处理 // @Author J 2022/10/28 09:26 func NextNodeHn(mapInfo map[string]interface{}) { next := &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Tidb2.Addr), Port: util.IntAll(config.Conf.Udp.Tidb2.Port), } mapInfo["stype"] = "hainan" mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"])) log.Println("NextNodeTidb mapinfo:", mapInfo) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(mapInfo["key"], node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func UpdateBidding() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateBidPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { // updateBidSp <- true // go func(arru [][]map[string]interface{}) { // defer func() { // }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...) // <-updateBidSp // }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { // updateBidSp <- true // go func(arru [][]map[string]interface{}) { // defer func() { // <-updateBidSp // }() MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru[:indexu]...) // }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func UpdateExtract() { arru := make([][]map[string]interface{}, MgoBulkSize) indexu := 0 for { select { case v := <-updateExtPool: arru[indexu] = v indexu++ if indexu == MgoBulkSize { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateExtSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateExtSp }() MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, MgoBulkSize) indexu = 0 } } } } func checkMapJob() { if config.Conf.Mail.Send { log.Println("checkMapJob to:", config.Conf.Mail.To) for { UdpTaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.timestamp > 120 { node.retry++ if node.retry > 5 { UdpTaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Println("send mail ... r:", string(read), "err:", err) } } else { log.Println("udp重发", zap.Any("k:", k)) //UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr) } } else if now-node.timestamp > 10 { log.Println("udp任务超时中.. k:", k) } return true }) time.Sleep(60 * time.Second) } } } // @Description nsq处理id不变,内容替换的竞品数据 // @Author J 2022/8/10 11:40 func nsqMethod() { var err error Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: config.Conf.Nsq.Addr, ConnectType: 0, //默认连接nsqd Topic: config.Conf.Nsq.Topic, Channel: config.Conf.Nsq.Channel, Concurrent: config.Conf.Nsq.Concurrent, //并发数 }) if err != nil { log.Println("nsqMethod err", err) } for { select { case obj := <-Mcmer.Ch: //从通道读取即可 objstr := util.ObjToString(obj) log.Println("obj ", obj, objstr) id := strings.Split(objstr, "=") if len(id) > 1 { if bson.IsObjectIdHex(id[1]) { taskinfo(id[1]) } else { log.Println("jy nsq id err id", objstr) } } else { log.Println("jy nsq id err id", objstr) } } } }