package main import ( "encoding/json" "fmt" "log" mu "mfw/util" "net" qu "qfw/util" "strconv" "sync" "time" ) var ( Config map[string]interface{} //配置文件 nextNode []map[string]interface{} //下节点数组 extractNode []map[string]interface{} //抽取节点数组 udpclient mu.UdpClient //udp对象 extractLevel map[string]interface{} //抽取节点状态 udplock sync.Mutex //锁 ) func init() { qu.ReadConfig(&Config) nextNode = qu.ObjArrToMapArr(Config["nextNode"].([]interface{})) extractNode = qu.ObjArrToMapArr(Config["extractNode"].([]interface{})) resetExtractLevel() } //重置抽取状态 func resetExtractLevel() { extractLevel = make(map[string]interface{},0) for _,v:=range extractNode{ key := fmt.Sprintf("%s",qu.ObjToString(v["stype"])) extractLevel[key] = 0 } } func main() { go checkMailJob() updport := Config["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) if sid == "" || eid == "" { log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { udpinfo, _ := mapInfo["key"].(string) if udpinfo == "" { udpinfo = "udpok" } go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra) log.Println("") log.Println("接收当前段落,udp通知抽取-需拆分",len(extractNode),"组", sid, "~~", eid) udplock.Lock() resetExtractLevel() //重置状态 extractLevel["sid"]=sid extractLevel["eid"]=eid udplock.Unlock() //拆分段落方法 splitArr:=splitIdMethod(sid,eid) if len(splitArr)!=len(extractNode){//直接发送整段 log.Println("段落划分异常...请检查程序...") } key:=fmt.Sprintf("%s~%s",sid,eid) node := &udpNode{time.Now().Unix()} udptaskmap.Store(key, node) sendExtractNode(splitArr) //通知抽取 } } case mu.OP_NOOP: //下个节点回应 //抽取多节点 udplock.Lock() str := string(data) if extractLevel[str] != nil { extractLevel[str] = 1 log.Println("抽取节点回应:",str) f := validExtractFinish() //验证段落是否均抽取完毕 if f {//发送下节点整体udp,补城市,敏感词等 sid := qu.ObjToString(extractLevel["sid"]) eid := qu.ObjToString(extractLevel["eid"]) if sid != ""&&eid != "" { key:=fmt.Sprintf("%s~%s",sid,eid) udptaskmap.Delete(key) sendNextNode(sid,eid) } } }else { log.Println("其他节点回应:",str) } udplock.Unlock() } } //验证抽取是否完毕 不验证-sid eid key func validExtractFinish() bool { for k,v :=range extractLevel{ if k=="sid" || k=="eid" { continue } if qu.Int64All(v)==0 { return false } } return true } //拆分ID段方法 func splitIdMethod(sid string,eid string)([]map[string]interface{}) { dataArr := make([]map[string]interface{},0) if len(extractNode)==1 { dataArr = append(dataArr, map[string]interface{}{ "sid":sid, "eid":eid, }) }else { interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8])) num := interval/int64(len(extractNode)) tmp_time := hex2Dec(string(sid[:8]))+num for i:=0;i