luamove.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package timetask
  2. import (
  3. "encoding/json"
  4. "github.com/donnie4w/go-logger/logger"
  5. qu "qfw/util"
  6. "sort"
  7. "sync"
  8. "time"
  9. "util"
  10. )
  11. type Spider struct {
  12. Code string `json:"code"`
  13. Site string `json:"site"`
  14. Channel string `json:"channel"`
  15. FromEvent int `json:"fromevent"` //现节点
  16. ToEvent int `json:"toevent"` //目标节点
  17. DataNum int `json:"datanum"` //采集量
  18. PtimeDataNum int `json:"ptimedatanum"` //按发布时间统计的采集量
  19. //Average int `json:"average"` //平均值
  20. IsMove bool `json:"ismove"` //是否转移节点
  21. State int `json:"state"`
  22. Comeintime int64 `json:"comeintime"`
  23. }
  24. var CodeMap map[string]*Spider
  25. type EventNum struct {
  26. Event int //节点
  27. Num int //节点爬虫数量
  28. }
  29. //节点上的爬虫个数
  30. var EventMapType1 = map[int]int{} //7100、7110、7400、7410
  31. var EventMapType2 = map[int]int{} //7200、7210、7300、7310
  32. var EventMapType3 = map[int]int{} //7500、7510、7700
  33. var EventArrType1 []*EventNum //7100、7110、7400、7410
  34. var EventArrType2 []*EventNum //7200、7210、7300、7310
  35. var EventArrType3 []*EventNum //7500、7510、7700
  36. func LuaMoveEvent() {
  37. defer qu.Catch()
  38. CodeMap = map[string]*Spider{}
  39. GetLuaInfo() //获取爬虫信息
  40. GetDataNum() //统计爬虫采集量
  41. GetMoveLua() //计算哪些爬虫需要转节点
  42. }
  43. func GetLuaInfo() {
  44. defer qu.Catch()
  45. sess := util.MgoEB.GetMgoConn()
  46. defer util.MgoEB.DestoryMongoConn(sess)
  47. lock := &sync.Mutex{}
  48. wg := &sync.WaitGroup{}
  49. ch := make(chan bool, 5)
  50. query := map[string]interface{}{
  51. "platform": "golua平台",
  52. "state": map[string]interface{}{
  53. "$in": []int{0, 1, 2, 5}, //待完成、待审核、未通过、已上架
  54. },
  55. "event": map[string]interface{}{
  56. "$ne": 7000,
  57. },
  58. }
  59. fields := map[string]interface{}{
  60. "event": 1,
  61. "code": 1,
  62. "site": 1,
  63. "channel": 1,
  64. }
  65. count := util.MgoEB.Count("luaconfig", query)
  66. logger.Debug("共加载线上爬虫个数:", count)
  67. it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
  68. n := 0
  69. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  70. ch <- true
  71. wg.Add(1)
  72. go func(tmp map[string]interface{}) {
  73. defer func() {
  74. <-ch
  75. wg.Done()
  76. }()
  77. code := qu.ObjToString(tmp["code"])
  78. site := qu.ObjToString(tmp["site"])
  79. channel := qu.ObjToString(tmp["channel"])
  80. event := qu.IntAll(tmp["event"])
  81. lock.Lock()
  82. if event != 7410 && event != 7700 {
  83. if util.CodeEventType[event] == 1 {
  84. EventMapType1[event]++
  85. } else if util.CodeEventType[event] == 2 {
  86. EventMapType2[event]++
  87. } else if util.CodeEventType[event] == 3 {
  88. EventMapType3[event]++
  89. }
  90. }
  91. CodeMap[code] = &Spider{
  92. Code: code,
  93. Site: site,
  94. Channel: channel,
  95. FromEvent: event,
  96. Comeintime: time.Now().Unix(),
  97. }
  98. lock.Unlock()
  99. }(tmp)
  100. if n%1000 == 0 {
  101. logger.Debug(n)
  102. }
  103. tmp = map[string]interface{}{}
  104. }
  105. wg.Wait()
  106. //排序,选出数量最少的节点
  107. for event, num := range EventMapType1 {
  108. EventArrType1 = append(EventArrType1, &EventNum{
  109. Event: event,
  110. Num: num,
  111. })
  112. }
  113. sort.Slice(EventArrType1, func(i, j int) bool {
  114. return EventArrType1[i].Num < EventArrType1[j].Num // 升序
  115. })
  116. for event, num := range EventMapType2 {
  117. EventArrType2 = append(EventArrType2, &EventNum{
  118. Event: event,
  119. Num: num,
  120. })
  121. }
  122. sort.Slice(EventArrType2, func(i, j int) bool {
  123. return EventArrType2[i].Num < EventArrType2[j].Num // 升序
  124. })
  125. for event, num := range EventMapType3 {
  126. EventArrType3 = append(EventArrType3, &EventNum{
  127. Event: event,
  128. Num: num,
  129. })
  130. }
  131. sort.Slice(EventArrType3, func(i, j int) bool {
  132. return EventArrType3[i].Num < EventArrType3[j].Num // 升序
  133. })
  134. logger.Debug("爬虫基本信息准备完成...", EventArrType1[0].Event, EventArrType2[0].Event, EventArrType3[0].Event)
  135. }
  136. func GetDataNum() {
  137. defer qu.Catch()
  138. sess := util.MgoEB.GetMgoConn()
  139. defer util.MgoEB.DestoryMongoConn(sess)
  140. lock := &sync.Mutex{}
  141. wg := &sync.WaitGroup{}
  142. ch := make(chan bool, 5)
  143. query := map[string]interface{}{
  144. "comeintime": map[string]interface{}{
  145. "$gte": util.GetTime(-30),
  146. "$lt": util.GetTime(0),
  147. },
  148. }
  149. fieles := map[string]interface{}{
  150. "spidercode": 1,
  151. "ptimeallnum": 1, //按发布时间统计的每天的采集量
  152. "downloadallnum": 1, //每天的采集量
  153. }
  154. it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fieles).Iter()
  155. n := 0
  156. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  157. wg.Add(1)
  158. ch <- true
  159. go func(tmp map[string]interface{}) {
  160. defer func() {
  161. <-ch
  162. wg.Done()
  163. }()
  164. spidercode := qu.ObjToString(tmp["spidercode"])
  165. ptimeallnum := qu.IntAll(tmp["ptimeallnum"])
  166. downloadallnum := qu.IntAll(tmp["downloadallnum"])
  167. lock.Lock()
  168. if sp := CodeMap[spidercode]; sp != nil {
  169. sp.DataNum += downloadallnum
  170. sp.PtimeDataNum += ptimeallnum
  171. }
  172. lock.Unlock()
  173. }(tmp)
  174. if n%1000 == 0 {
  175. logger.Debug(n)
  176. }
  177. tmp = map[string]interface{}{}
  178. }
  179. wg.Wait()
  180. logger.Debug("爬虫一个月数据量统计完毕...")
  181. }
  182. func GetMoveLua() {
  183. defer qu.Catch()
  184. /*
  185. 1、按入库时间采集的数据量统计
  186. 2、转移节点规则
  187. 转移至高频率模式(7100、7110、7400、7410):30天采集总量超过200条
  188. 转移至队低频率列模式(7200、7210、7300、7310):30天采集总量50-200条
  189. 转移至极低频率模式(7500、7510、7700):30天采集总量0-50条
  190. 特殊节点(7520):0条
  191. */
  192. save := []map[string]interface{}{}
  193. for _, sp := range CodeMap {
  194. if sp.DataNum >= 200 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 1 { //本身不是高性能节点超过200条的
  195. sp.IsMove = true
  196. if sp.FromEvent == 7700 { //7700、7410节点特殊性
  197. sp.ToEvent = 7410
  198. } else {
  199. sp.ToEvent = EventArrType1[0].Event
  200. }
  201. } else if sp.DataNum >= 10 && sp.DataNum < 200 && sp.FromEvent != 7700 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 2 {
  202. sp.IsMove = true
  203. sp.ToEvent = EventArrType2[0].Event
  204. } else if sp.DataNum > 0 && sp.DataNum < 10 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 3 {
  205. sp.IsMove = true
  206. if sp.FromEvent == 7410 { //7700、7410节点特殊性
  207. sp.ToEvent = 7700
  208. } else {
  209. sp.ToEvent = EventArrType3[0].Event
  210. }
  211. } else if sp.DataNum == 0 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 4 {
  212. sp.IsMove = true
  213. if sp.FromEvent == 7410 { //7700、7410节点特殊性
  214. sp.ToEvent = 7700
  215. } else {
  216. sp.ToEvent = 7520
  217. }
  218. }
  219. //存储爬虫统计信息
  220. byteText, err := json.Marshal(sp)
  221. if err != nil {
  222. logger.Debug("Json Marshal Error", sp.Code)
  223. continue
  224. }
  225. tmp := map[string]interface{}{}
  226. if json.Unmarshal(byteText, &tmp) == nil {
  227. save = append(save, tmp)
  228. if len(save) >= 1000 {
  229. util.MgoEB.SaveBulk("luamovevent", save...)
  230. save = []map[string]interface{}{}
  231. }
  232. }
  233. }
  234. if len(save) > 0 {
  235. util.MgoEB.SaveBulk("luamovevent", save...)
  236. save = []map[string]interface{}{}
  237. }
  238. }