distributed.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /**
  2. 分布式抽取
  3. **/
  4. package cluster
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "jy/extract"
  9. db "jy/mongodbutil"
  10. ju "jy/util"
  11. "log"
  12. mu "mfw/util"
  13. "net"
  14. qu "qfw/util"
  15. "time"
  16. "gopkg.in/mgo.v2/bson"
  17. )
  18. var EscIds map[string][]string //id区间
  19. //根据esc数量实例数量id划段
  20. func IdsRange(table, endate string) int {
  21. start := time.Date(2015, 11, 3, 0, 0, 0, 0, time.Local)
  22. end, _ := time.ParseInLocation(qu.Date_Short_Layout, endate, time.Local)
  23. EscIds = map[string][]string{}
  24. list, _ := db.Mgo.Find("ecs", `{"Status":"Running"}`, nil, nil, false, -1, -1)
  25. ids := RangeIdsByDate(len(*list), start, end)
  26. for k, v := range *list {
  27. db.Mgo.UpdateById("ecs", qu.BsonIdToSId(v["_id"]), map[string]interface{}{
  28. "$set": map[string]interface{}{
  29. "extask": []string{
  30. ids[fmt.Sprint(k)][0],
  31. ids[fmt.Sprint(k)][1],
  32. ids[fmt.Sprint(k)][2],
  33. qu.ObjToString(v["InstanceId"]),
  34. },
  35. },
  36. })
  37. }
  38. return len(*list)
  39. }
  40. //启动任务
  41. func RunEcsTask() int {
  42. list, _ := db.Mgo.Find("ecs", `{"extstatus":"deploy"}`, nil, nil, false, -1, -1)
  43. num := 0
  44. for _, v := range *list {
  45. if extask, ok := v["extask"].([]interface{}); ok {
  46. ip := qu.ObjToString(v["ip_nw"])
  47. by, _ := json.Marshal(map[string]interface{}{
  48. "ip": ip,
  49. "gtid": extask[0],
  50. "lteid": extask[1],
  51. "InstanceId": extask[3],
  52. "stype": "distributed",
  53. })
  54. err := extract.Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  55. IP: net.ParseIP(ip),
  56. Port: qu.IntAll(ju.Config["udpport"]),
  57. })
  58. if err != nil {
  59. log.Println(err)
  60. } else {
  61. num++
  62. time.Sleep(2 * time.Second)
  63. log.Println("分发任务", string(by))
  64. }
  65. }
  66. }
  67. return num
  68. }
  69. //id分段
  70. func RangeIdsByDate(escnum int, start, edate time.Time) map[string][]string {
  71. ids := map[string][]string{}
  72. task, _ := db.Mgo.FindById("task", qu.ObjToString(ju.Config["udptaskid"]), nil)
  73. log.Println(qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
  74. DB := db.MgoFactory(2, 3, 120, qu.ObjToString((*task)["s_mgoaddr"]), qu.ObjToString((*task)["s_mgodb"]))
  75. total := DB.Count("bidding", `{"comeintime":{"$lt":`+fmt.Sprint(edate.Unix())+`}}`)
  76. total_back := DB.Count("bidding_back", `{}`)
  77. total += total_back
  78. pagesize := (total + escnum - 1) / escnum
  79. log.Printf("total:%d total_back:%d pagesize:%d escnum:%d", total, total_back, pagesize, escnum)
  80. nums := 0
  81. for i := 0; i < escnum; i++ {
  82. log.Println("escnum", i)
  83. sid := bson.NewObjectIdWithTime(start)
  84. var eid bson.ObjectId
  85. var idsnum = 0
  86. table := "bidding_back"
  87. for {
  88. tmpsid := bson.NewObjectIdWithTime(start)
  89. end := start.Add(4 * time.Hour)
  90. if end.Unix() > edate.Unix() {
  91. eid = bson.NewObjectIdWithTime(edate)
  92. } else {
  93. eid = bson.NewObjectIdWithTime(end)
  94. }
  95. start = end
  96. query := bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": eid}}
  97. count := DB.Count(table, query)
  98. log.Println(count, table, query)
  99. if count < 1 { //校验是否切换table
  100. tmpnum := DB.Count(table, bson.M{"_id": bson.M{"$gte": tmpsid, "$lt": bson.NewObjectIdWithTime(end.Add(24 * 10 * time.Hour) /*连续10天无数据*/)}})
  101. if tmpnum < 1 && table != "bidding" {
  102. table = "bidding"
  103. start = start.Add(-4 * time.Hour)
  104. continue
  105. }
  106. } else {
  107. idsnum += count
  108. }
  109. log.Printf("i:%d count:%d,date:%s", i, idsnum, end.Format(qu.Date_Full_Layout))
  110. if idsnum >= pagesize || start.Unix() > time.Now().Unix() || count > 5000000 { //测试数据count > 5000000
  111. break
  112. }
  113. }
  114. nums += idsnum
  115. ids[fmt.Sprint(i)] = []string{qu.BsonIdToSId(sid), qu.BsonIdToSId(eid), fmt.Sprint(idsnum)}
  116. log.Println("nums", nums)
  117. }
  118. return ids
  119. }