package main import ( "encoding/json" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "time" ) var ( MgoB *mongodb.MongodbSim UdpClient udp.UdpClient nextAddr *net.UDPAddr Repeat = false //抽取是否回复 ) //processUdpMsg 处理udp func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: // //var mapInfo map[string]interface{} //err := json.Unmarshal(data, &mapInfo) //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo)) //if err != nil { // fmt.Println(err) //} //if mapInfo != nil { // key, _ := mapInfo["key"].(string) // if key == "" { // key = "udpok" // } // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) //} case udp.OP_NOOP: da := string(data) log.Info("收到回复数据", zap.String("data", da)) if da == "ok" { Repeat = true } default: fmt.Println("current_listen : processUdpMsg =====", act) } } func main() { f_sid := "" n_sid := "" f_lid := "" //file 最后一个分类结束ID n_lid := "" where := map[string]interface{}{ "extracttype": 9, "biddingid": map[string]interface{}{ "$exists": 1, }, } fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1) fileDaa := *fileDataa f_sid = BsonIdToSId(fileDaa[0]["_id"]) nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1) nomalDaa := *nomalDataa n_sid = BsonIdToSId(nomalDaa[0]["_id"]) for { fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1) fileDa := *fileData fid := BsonIdToSId(fileDa[0]["_id"]) if fid != "" { f_lid = fid } nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1) nomalDa := *nomalData nid := BsonIdToSId(nomalDa[0]["_id"]) if nid != "" { n_lid = nid } //起始ID 等于 结束ID if f_lid == f_sid && n_lid == n_sid { time.Sleep(time.Second * 5) } else if f_lid == "" && n_lid == "" { time.Sleep(time.Second * 5) } else { //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid)) data := map[string]interface{}{ "file": fmt.Sprintf("%s-%s", f_sid, f_lid), "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid), } SendUdpMsg(data, nextAddr) time.Sleep(time.Second * 3) if !Repeat { time.Sleep(time.Second * 10) SendUdpMsg(data, nextAddr) time.Sleep(time.Second * 10) SendUdpMsg(data, nextAddr) log.Error("没有收到回复", zap.Any("data", data)) } f_sid = f_lid n_sid = n_lid } } } func deletePreData() { } //BsonIdToSId 根据bsonID转string func BsonIdToSId(uid interface{}) string { if uid == nil { return "" } else if u, ok := uid.(string); ok { return u } else if u, ok := uid.(primitive.ObjectID); ok { return u.Hex() } else { return "" } } //SendUdpMsg 通知处理企业新增数据 func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) { bytes, _ := json.Marshal(data) UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) log.Info("SendUdpMsg", zap.Any("data", data)) log.Info("SendUdpMsg", zap.Any("target", target)) }