main.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "time"
  8. )
  9. var (
  10. udpclient mu.UdpClient //udp对象
  11. SingleThread = make(chan bool, 1)
  12. )
  13. func main() {
  14. //udp跑增量 id段 zl
  15. //udp跑全量 ql
  16. //udp跑历史数据 信息id1,id2/或id段 ls
  17. //udp强制合并 信息id1,id2,id3 [项目id] 不存在时新建 qzhb
  18. //udp强制拆分 项目id,信息id1,id2 qzcf
  19. //udp重新合并 信息id1,id2,id3 cxhb
  20. //loadData("project_0809", 1, true)
  21. time.Sleep(99999 * time.Hour)
  22. }
  23. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  24. switch act {
  25. case mu.OP_TYPE_DATA: //上个节点的数据
  26. var mapInfo map[string]interface{}
  27. err := json.Unmarshal(data, &mapInfo)
  28. log.Println("err:", err, "mapInfo:", mapInfo)
  29. if err != nil {
  30. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  31. } else if mapInfo != nil {
  32. key, _ := mapInfo["key"].(string)
  33. if key == "" {
  34. key = "udpok"
  35. }
  36. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  37. SingleThread <- true
  38. tasktype, _ := mapInfo["stype"].(string)
  39. log.Println("tasktype:", tasktype)
  40. switch tasktype {
  41. case "ql":
  42. go func() {
  43. defer func() {
  44. <-SingleThread
  45. }()
  46. taskQl(mapInfo)
  47. }()
  48. case "ids":
  49. go func() {
  50. defer func() {
  51. <-SingleThread
  52. }()
  53. Ids(mapInfo)
  54. }()
  55. }
  56. }
  57. case mu.OP_NOOP: //下个节点回应
  58. ok := string(data)
  59. if ok != "" {
  60. log.Println("ok:", ok)
  61. }
  62. }
  63. }