main.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. qu "qfw/util"
  9. "strconv"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. Config map[string]interface{} //配置文件
  15. nextNode []map[string]interface{} //下节点数组
  16. extractNode []map[string]interface{} //抽取节点数组
  17. udpclient mu.UdpClient //udp对象
  18. extractLevel map[string]interface{} //抽取节点状态
  19. udplock sync.Mutex //锁
  20. )
  21. func init() {
  22. qu.ReadConfig(&Config)
  23. nextNode = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
  24. extractNode = qu.ObjArrToMapArr(Config["extractNode"].([]interface{}))
  25. resetExtractLevel()
  26. }
  27. //重置抽取状态
  28. func resetExtractLevel() {
  29. extractLevel = make(map[string]interface{},0)
  30. for _,v:=range extractNode{
  31. key := fmt.Sprintf("%s",qu.ObjToString(v["stype"]))
  32. extractLevel[key] = 0
  33. }
  34. }
  35. func main() {
  36. go checkMailJob()
  37. updport := Config["udpport"].(string)
  38. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  39. udpclient.Listen(processUdpMsg)
  40. log.Println("Udp服务监听", updport)
  41. time.Sleep(99999 * time.Hour)
  42. }
  43. //udp接收
  44. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  45. switch act {
  46. case mu.OP_TYPE_DATA:
  47. var mapInfo map[string]interface{}
  48. err := json.Unmarshal(data, &mapInfo)
  49. if err != nil {
  50. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  51. } else if mapInfo != nil {
  52. sid, _ := mapInfo["gtid"].(string)
  53. eid, _ := mapInfo["lteid"].(string)
  54. if sid == "" || eid == "" {
  55. log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  56. } else {
  57. udpinfo, _ := mapInfo["key"].(string)
  58. if udpinfo == "" {
  59. udpinfo = "udpok"
  60. }
  61. go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  62. log.Println("")
  63. log.Println("接收当前段落,udp通知抽取-需拆分",len(extractNode),"组", sid, "~~", eid)
  64. udplock.Lock()
  65. resetExtractLevel() //重置状态
  66. extractLevel["sid"]=sid
  67. extractLevel["eid"]=eid
  68. udplock.Unlock()
  69. //拆分段落方法
  70. splitArr:=splitIdMethod(sid,eid)
  71. if len(splitArr)!=len(extractNode){//直接发送整段
  72. log.Println("段落划分异常...请检查程序...")
  73. }
  74. key:=fmt.Sprintf("%s~%s",sid,eid)
  75. node := &udpNode{time.Now().Unix()}
  76. udptaskmap.Store(key, node)
  77. sendExtractNode(splitArr) //通知抽取
  78. }
  79. }
  80. case mu.OP_NOOP: //下个节点回应
  81. //抽取多节点
  82. udplock.Lock()
  83. str := string(data)
  84. if extractLevel[str] != nil {
  85. extractLevel[str] = 1
  86. log.Println("抽取节点回应:",str)
  87. f := validExtractFinish() //验证段落是否均抽取完毕
  88. if f {//发送下节点整体udp,补城市,敏感词等
  89. sid := qu.ObjToString(extractLevel["sid"])
  90. eid := qu.ObjToString(extractLevel["eid"])
  91. if sid != ""&&eid != "" {
  92. key:=fmt.Sprintf("%s~%s",sid,eid)
  93. udptaskmap.Delete(key)
  94. sendNextNode(sid,eid)
  95. }
  96. }
  97. }else {
  98. log.Println("其他节点回应:",str)
  99. }
  100. udplock.Unlock()
  101. }
  102. }
  103. //验证抽取是否完毕 不验证-sid eid key
  104. func validExtractFinish() bool {
  105. for k,v :=range extractLevel{
  106. if k=="sid" || k=="eid" {
  107. continue
  108. }
  109. if qu.Int64All(v)==0 {
  110. return false
  111. }
  112. }
  113. return true
  114. }
  115. //拆分ID段方法
  116. func splitIdMethod(sid string,eid string)([]map[string]interface{}) {
  117. dataArr := make([]map[string]interface{},0)
  118. if len(extractNode)==1 {
  119. dataArr = append(dataArr, map[string]interface{}{
  120. "sid":sid,
  121. "eid":eid,
  122. })
  123. }else {
  124. interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
  125. num := interval/int64(len(extractNode))
  126. tmp_time := hex2Dec(string(sid[:8]))+num
  127. for i:=0;i<len(extractNode);i++ {
  128. if i==0 {
  129. tmp_eid := fmt.Sprintf("%x",tmp_time)
  130. dataArr = append(dataArr, map[string]interface{}{
  131. "sid":sid,
  132. "eid":tmp_eid+"0000000000000000",
  133. })
  134. }else if i==len(extractNode)-1 {
  135. tmp_sid := fmt.Sprintf("%x",tmp_time)
  136. dataArr = append(dataArr, map[string]interface{}{
  137. "sid":tmp_sid+"0000000000000000",
  138. "eid":eid,
  139. })
  140. }else {
  141. tmp_sid := fmt.Sprintf("%x",tmp_time)
  142. tmp_time = tmp_time+num
  143. tmp_eid := fmt.Sprintf("%x",tmp_time)
  144. dataArr = append(dataArr, map[string]interface{}{
  145. "sid":tmp_sid+"0000000000000000",
  146. "eid":tmp_eid+"0000000000000000",
  147. })
  148. }
  149. }
  150. }
  151. return dataArr
  152. }
  153. func hex2Dec(val string)int64{
  154. n,_ := strconv.ParseInt(val,16,32)
  155. return n
  156. }
  157. //发送抽取
  158. func sendExtractNode(splitArr []map[string]interface{}) {
  159. for index, node := range extractNode {
  160. tmp:=splitArr[index]
  161. by, _ := json.Marshal(map[string]interface{}{
  162. "gtid": qu.ObjToString(tmp["sid"]),
  163. "lteid": qu.ObjToString(tmp["eid"]),
  164. "stype": qu.ObjToString(node["stype"]),
  165. })
  166. err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  167. IP: net.ParseIP(node["addr"].(string)),
  168. Port: qu.IntAll(node["port"]),
  169. })
  170. if err != nil {
  171. log.Println("发送段落异常:",node,tmp,err)
  172. }
  173. }
  174. log.Println("通知抽取udp...等待抽取...回应...")
  175. }
  176. //发送其他
  177. func sendNextNode(sid string,eid string) {
  178. for _, node := range nextNode {
  179. by, _ := json.Marshal(map[string]interface{}{
  180. "gtid": sid,
  181. "lteid": eid,
  182. "stype": qu.ObjToString(node["stype"]),
  183. })
  184. err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  185. IP: net.ParseIP(node["addr"].(string)),
  186. Port: qu.IntAll(node["port"]),
  187. })
  188. if err != nil {
  189. log.Println(err)
  190. }
  191. }
  192. log.Println("udp通知抽取完成...通知下阶段udp-敏感词,补城市",sid,"~",eid)
  193. }