123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- package main
- import (
- "encoding/json"
- "fmt"
- log "github.com/donnie4w/go-logger/logger"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "strings"
- "sync"
- "time"
- )
- var (
- nextNode []map[string]interface{}
- udpclient mu.UdpClient
- udplock sync.Mutex
- nextlock sync.Mutex
- extractAction map[string]map[string]interface{}
- heartAction map[string]interface{}
- isAction bool
- isGetask bool
- using_ext_node, standby_ext_node, invalid_ext_node []map[string]interface{}
- )
- //udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- stype := qu.ObjToString(mapInfo["stype"])
- if stype == "monitor" {
- log.Debug("收到监测......")
- key := qu.ObjToString(mapInfo["key"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- return
- }
- if sid == "" || eid == "" {
- log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
- } else {
- lastNodeResponse = time.Now().Unix()
- key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //插入任务
- udplock.Lock()
- taskList = append(taskList, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- })
- log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
- udplock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- //抽取多节点
- nextlock.Lock()
- str := string(data)
- if isAction {
- if strings.Contains(str, "heart_extract") {
- dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
- } else {
- dealWithCallBackUdpData(str)
- }
- } else {
- log.Debug("其他节点回应:", str)
- udptaskmap.Delete(str)
- }
- nextlock.Unlock()
- }
- }
- //处理~新接收抽取段~
- func dealWithExtUdpData(sid, eid string) {
- //获取最新-抽取节点状态
- initExtractNode()
- log.Debug("处理当前段落~~~需拆分", len(using_ext_node), "组", sid, "~~", eid)
- if len(using_ext_node) > 0 {
- //拆分段落方法~并附加抽取状态标记~有效期等
- splitArr, lifeArr := splitIdMethod(sid, eid)
- log.Debug("最终分", len(splitArr), "段")
- extractAction = map[string]map[string]interface{}{}
- heartAction = map[string]interface{}{}
- for k, v := range using_ext_node {
- skey := fmt.Sprintf("%s:%d:%s", v["addr"], qu.IntAll(v["port"]), v["stype"])
- extractAction[skey] = map[string]interface{}{
- "life": lifeArr[k],
- "action": 0,
- "uid": BsonTOStringId(v["_id"]),
- }
- heartAction[skey] = 0
- }
- extractAction["extract_ids"] = map[string]interface{}{
- "sid": sid,
- "eid": eid,
- }
- sendRunExtractNode(splitArr) //通知抽取
- } else {
- log.Debug("无有效机器抽取...程序停止于此...")
- }
- }
- //处理回调udp~相关数据
- func dealWithCallBackUdpData(str string) {
- if extractAction[str] != nil {
- extractAction[str]["action"] = 1
- log.Debug("抽取节点回应:", str)
- f := validExtractFinish()
- if f {
- sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
- eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
- isAction = false
- lastNodeResponse = time.Now().Unix()
- sendNextNode(sid, eid)
- }
- } else {
- log.Debug("其他节点回应:", str)
- udptaskmap.Delete(str)
- }
- }
- //处理-心跳回调
- func dealWithHeartBackUdpData(str string) {
- if heartAction[str] != nil {
- heartAction[str] = 0
- }
- }
- //通知所有节点~进行抽取~
- func sendRunExtractNode(splitArr []map[string]interface{}) {
- for index, node := range using_ext_node {
- tmp := splitArr[index]
- skey := fmt.Sprintf("%s:%d:%s", node["addr"], qu.IntAll(node["port"]), node["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": qu.ObjToString(tmp["sid"]),
- "lteid": qu.ObjToString(tmp["eid"]),
- "stype": skey,
- })
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(node["addr"].(string)),
- Port: qu.IntAll(node["port"]),
- })
- }
- isAction = true
- log.Debug("通知抽取udp...等待抽取...回应...")
- }
- //通知所有抽取节点~结束抽取
- func sendStopExtractNode(splitArr []map[string]interface{}) {
- for _, node := range using_ext_node {
- by, _ := json.Marshal(map[string]interface{}{
- "stype": "stop_extract",
- })
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(node["addr"].(string)),
- Port: qu.IntAll(node["port"]),
- })
- }
- }
- //发送下阶段节点~
- func sendNextNode(sid string, eid string) {
- //更新记录状态
- updateProcessUdpIdsInfo(sid, eid)
- for _, node := range nextNode {
- key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(node["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(node["addr"].(string)),
- Port: qu.IntAll(node["port"]),
- }
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
- //只监控清洗流程
- if qu.IntAll(node["port"]) == 1799 {
- new_node := &udpNode{by, addr, time.Now().Unix()}
- udptaskmap.Store(key, new_node)
- }
- }
- log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
- //此段落彻底完毕~继续获取任务
- isGetask = false
- }
- //发送单节点~
- func sendSingleOtherNode(by []byte, addr string, port string) {
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(addr),
- Port: qu.IntAll(port),
- })
- }
- //更新流程记录id段落
- func updateProcessUdpIdsInfo(sid string, eid string) {
- query := map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- }
- log.Debug("开始更新流程段落记录~~", query)
- data := source_mgo.FindOne("bidding_processing_ids", query)
- if len(data) > 0 {
- up_id := BsonTOStringId(data["_id"])
- if up_id != "" {
- update := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 3,
- "updatetime": time.Now().Unix(),
- },
- }
- source_mgo.UpdateById("bidding_processing_ids", up_id, update)
- log.Debug("流程段落记录~~更新完毕~", update)
- }
- } else {
- log.Debug("未查询到记录id段落~", query)
- }
- }
|