package udptask import ( "encoding/json" "go.mongodb.org/mongo-driver/bson" "log" mu "mfw/util" "mongodb" "net" qu "qfw/util" "util" ) var ( Udpclient mu.UdpClient //udp对象 IsTaskOK bool Timeout = 3 ) func InitUdp() { updport := util.Sysconfig["udpport"].(string) Udpclient = mu.UdpClient{Local: updport, BufSize: 1024} log.Println("Udp服务监听", updport) Udpclient.Listen(processUdpMsg) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //测试接收 var rep map[string]interface{} err := json.Unmarshal(data, &rep) log.Println("测试接收:", rep) if err != nil { //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点 } else { //by, _ := json.Marshal(map[string]interface{}{ // "taskid": rep["taskid"], // "stype": rep["stype"], //}) //go Udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点 } case mu.OP_NOOP: //下个节点回应 qu.Debug("接收回应:", string(data)) var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { //空数据 // } else { //正确 } } } //bidding索引udp func BiddingIndexUdp(id, coll string) { indexNode := util.Sysconfig["indexNode"].(map[string]interface{}) addr := &net.UDPAddr{ IP: net.ParseIP(indexNode["addr"].(string)), Port: qu.IntAll(indexNode["port"]), } by, _ := json.Marshal(map[string]interface{}{ "query": map[string]interface{}{ "_id": bson.M{ "$gte": mongodb.StringTOBsonId(id), "$lte": mongodb.StringTOBsonId(id), }}, "stype": qu.ObjToString(indexNode["stype"]), "coll": coll, }) Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } //项目合并udp func ProjectSetUdp(id, coll string) { nextNode := util.Sysconfig["jy_pro_node"].(map[string]interface{}) addr := &net.UDPAddr{ IP: net.ParseIP(nextNode["addr"].(string)), Port: qu.IntAll(nextNode["port"]), } by, _ := json.Marshal(map[string]interface{}{ "infoid": id, "stype": "updateInfo", "coll": coll, }) Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) }