main.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. qu "qfw/util"
  9. "time"
  10. "gopkg.in/mgo.v2/bson"
  11. )
  12. var udpclient mu.UdpClient //udp对象
  13. var nextNodes []map[string]interface{}
  14. var startDate, endDate, ip, port, stype, sid, eid string
  15. func main() {
  16. //2015-11-03,2017-04-01
  17. //2017-04-01,2017-06-01
  18. //2017-06-01,2018-06-01
  19. //2018-06-01,2019-02-20
  20. /*
  21. 5da3f2c5a5cb26b9b79847fc
  22. 5db2735ba5cb26b9b7c99c6f 76万
  23. */
  24. /*
  25. 9W
  26. 5d767728a5cb26b9b7748868
  27. ObjectId("5d77c881a5cb26b9b7de209d")
  28. ObjectId("5da3f2c5a5cb26b9b79847fc")
  29. ObjectId("5db2735ba5cb26b9b7c99c6f")
  30. //历史中间一段数据
  31. ObjectId("5d771e90a5cb26b9b7be7976")
  32. ObjectId("5d775be4a5cb26b9b759b5eb")
  33. ObjectId("5dfc98f5e9d1f601e46f047c")
  34. ObjectId("5a4ad8f240d2d9bbe8adfbda")
  35. ObjectId("5e0bf92b0cf41612e063cc28")
  36. */
  37. flag.StringVar(&sid, "sid", "", "开始id")
  38. flag.StringVar(&eid, "eid", "", "结束id")
  39. flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
  40. flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
  41. flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
  42. flag.StringVar(&port, "port", "1485", "dup端口")
  43. flag.StringVar(&stype, "stype", "", "stype")
  44. flag.Parse()
  45. var startid, endid bson.ObjectId
  46. if sid != "" && eid != "" {
  47. startid = qu.StringTOBsonId(sid)
  48. endid = qu.StringTOBsonId(eid)
  49. } else {
  50. start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
  51. end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
  52. startid = bson.NewObjectIdWithTime(start)
  53. endid = bson.NewObjectIdWithTime(end)
  54. }
  55. log.Println(startid, endid, ip, port, stype)
  56. udpclient = mu.UdpClient{Local: ":1470", BufSize: 1024}
  57. udpclient.Listen(processUdpMsg)
  58. by, _ := json.Marshal(map[string]interface{}{
  59. "gtid": startid,
  60. "lteid": endid,
  61. "stype": stype,
  62. })
  63. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  64. IP: net.ParseIP(ip),
  65. Port: qu.IntAll(port),
  66. })
  67. b := make(chan bool, 1)
  68. <-b
  69. }
  70. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  71. switch act {
  72. case mu.OP_TYPE_DATA:
  73. var mapInfo map[string]interface{}
  74. err := json.Unmarshal(data, &mapInfo)
  75. if err != nil {
  76. log.Println(err)
  77. } else {
  78. log.Println(mapInfo)
  79. }
  80. case mu.OP_NOOP: //下个节点回应
  81. log.Println("发送成功", string(data))
  82. }
  83. }