luamove.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package timetask
  2. import (
  3. "encoding/json"
  4. "github.com/donnie4w/go-logger/logger"
  5. qu "qfw/util"
  6. "sort"
  7. "sync"
  8. "time"
  9. "util"
  10. )
  11. type Spider struct {
  12. Code string `json:"code"`
  13. Site string `json:"site"`
  14. Channel string `json:"channel"`
  15. Href string `json:"href"`
  16. MaxPage int `json:"maxpage"`
  17. CycleTime int `json:"cycletime"`
  18. FromEvent int `json:"fromevent"` //现节点
  19. ToEvent int `json:"toevent"` //目标节点
  20. DataNum int `json:"datanum"` //采集量
  21. PtimeDataNum int `json:"ptimedatanum"` //按发布时间统计的采集量
  22. //Average int `json:"average"` //平均值
  23. IsMove bool `json:"ismove"` //是否转移节点
  24. State int `json:"state"`
  25. Comeintime int64 `json:"comeintime"`
  26. }
  27. var CodeMap map[string]*Spider
  28. type EventNum struct {
  29. Event int //节点
  30. Num int //节点爬虫数量
  31. }
  32. //节点上的爬虫个数
  33. var EventMapType1 = map[int]int{} //7100、7110、7400、7410
  34. var EventMapType2 = map[int]int{} //7200、7210、7300、7310
  35. var EventMapType3 = map[int]int{} //7500、7510、7700
  36. var EventArrType1 []*EventNum //7100、7110、7400、7410
  37. var EventArrType2 []*EventNum //7200、7210、7300、7310
  38. var EventArrType3 []*EventNum //7500、7510、7700
  39. func LuaMoveEvent() {
  40. defer qu.Catch()
  41. CodeMap = map[string]*Spider{}
  42. GetLuaInfo() //获取爬虫信息
  43. GetDataNum() //统计爬虫采集量
  44. GetMoveLua() //计算哪些爬虫需要转节点
  45. }
  46. func GetLuaInfo() {
  47. defer qu.Catch()
  48. sess := util.MgoEB.GetMgoConn()
  49. defer util.MgoEB.DestoryMongoConn(sess)
  50. lock := &sync.Mutex{}
  51. wg := &sync.WaitGroup{}
  52. ch := make(chan bool, 5)
  53. query := map[string]interface{}{
  54. "platform": "golua平台",
  55. "state": map[string]interface{}{
  56. "$in": []int{0, 1, 2, 5}, //待完成、待审核、未通过、已上架
  57. },
  58. "event": map[string]interface{}{
  59. "$ne": 7000,
  60. },
  61. }
  62. fields := map[string]interface{}{
  63. "event": 1,
  64. "code": 1,
  65. "site": 1,
  66. "channel": 1,
  67. "param_common": 1,
  68. }
  69. count := util.MgoEB.Count("luaconfig", query)
  70. logger.Debug("共加载线上爬虫个数:", count)
  71. it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
  72. n := 0
  73. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  74. ch <- true
  75. wg.Add(1)
  76. go func(tmp map[string]interface{}) {
  77. defer func() {
  78. <-ch
  79. wg.Done()
  80. }()
  81. code := qu.ObjToString(tmp["code"])
  82. site := qu.ObjToString(tmp["site"])
  83. channel := qu.ObjToString(tmp["channel"])
  84. event := qu.IntAll(tmp["event"])
  85. href := ""
  86. maxPage, cycletime := 0, 0
  87. if param_common := tmp["param_common"].([]interface{}); len(param_common) >= 12 {
  88. href = qu.ObjToString(param_common[11])
  89. maxPage = qu.IntAll(param_common[5])
  90. cycletime = qu.IntAll(param_common[6])
  91. }
  92. lock.Lock()
  93. if event != 7410 && event != 7700 {
  94. if util.CodeEventType[event] == 1 {
  95. EventMapType1[event]++
  96. } else if util.CodeEventType[event] == 2 {
  97. EventMapType2[event]++
  98. } else if util.CodeEventType[event] == 3 {
  99. EventMapType3[event]++
  100. }
  101. }
  102. CodeMap[code] = &Spider{
  103. Code: code,
  104. Site: site,
  105. Channel: channel,
  106. Href: href,
  107. MaxPage: maxPage,
  108. CycleTime: cycletime,
  109. FromEvent: event,
  110. Comeintime: time.Now().Unix(),
  111. }
  112. lock.Unlock()
  113. }(tmp)
  114. if n%1000 == 0 {
  115. logger.Debug(n)
  116. }
  117. tmp = map[string]interface{}{}
  118. }
  119. wg.Wait()
  120. //排序,选出数量最少的节点
  121. for event, num := range EventMapType1 {
  122. EventArrType1 = append(EventArrType1, &EventNum{
  123. Event: event,
  124. Num: num,
  125. })
  126. }
  127. sort.Slice(EventArrType1, func(i, j int) bool {
  128. return EventArrType1[i].Num < EventArrType1[j].Num // 升序
  129. })
  130. for event, num := range EventMapType2 {
  131. EventArrType2 = append(EventArrType2, &EventNum{
  132. Event: event,
  133. Num: num,
  134. })
  135. }
  136. sort.Slice(EventArrType2, func(i, j int) bool {
  137. return EventArrType2[i].Num < EventArrType2[j].Num // 升序
  138. })
  139. for event, num := range EventMapType3 {
  140. EventArrType3 = append(EventArrType3, &EventNum{
  141. Event: event,
  142. Num: num,
  143. })
  144. }
  145. sort.Slice(EventArrType3, func(i, j int) bool {
  146. return EventArrType3[i].Num < EventArrType3[j].Num // 升序
  147. })
  148. logger.Debug("爬虫基本信息准备完成...", EventArrType1[0].Event, EventArrType2[0].Event, EventArrType3[0].Event)
  149. }
  150. func GetDataNum() {
  151. defer qu.Catch()
  152. sess := util.MgoEB.GetMgoConn()
  153. defer util.MgoEB.DestoryMongoConn(sess)
  154. lock := &sync.Mutex{}
  155. wg := &sync.WaitGroup{}
  156. ch := make(chan bool, 5)
  157. query := map[string]interface{}{
  158. "comeintime": map[string]interface{}{
  159. "$gte": util.GetTime(-30),
  160. "$lt": util.GetTime(0),
  161. },
  162. }
  163. fieles := map[string]interface{}{
  164. "spidercode": 1,
  165. "ptimeallnum": 1, //按发布时间统计的每天的采集量
  166. "downloadallnum": 1, //每天的采集量
  167. }
  168. it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fieles).Iter()
  169. n := 0
  170. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  171. wg.Add(1)
  172. ch <- true
  173. go func(tmp map[string]interface{}) {
  174. defer func() {
  175. <-ch
  176. wg.Done()
  177. }()
  178. spidercode := qu.ObjToString(tmp["spidercode"])
  179. ptimeallnum := qu.IntAll(tmp["ptimeallnum"])
  180. downloadallnum := qu.IntAll(tmp["downloadallnum"])
  181. lock.Lock()
  182. if sp := CodeMap[spidercode]; sp != nil {
  183. sp.DataNum += downloadallnum
  184. sp.PtimeDataNum += ptimeallnum
  185. }
  186. lock.Unlock()
  187. }(tmp)
  188. if n%1000 == 0 {
  189. logger.Debug(n)
  190. }
  191. tmp = map[string]interface{}{}
  192. }
  193. wg.Wait()
  194. logger.Debug("爬虫一个月数据量统计完毕...")
  195. }
  196. func GetMoveLua() {
  197. defer qu.Catch()
  198. /*
  199. 1、按入库时间采集的数据量统计
  200. 2、转移节点规则
  201. 转移至高频率模式(7100、7110、7400、7410):30天采集总量超过500条
  202. 转移至低频率队列模式(7200、7210、7300、7310):30天采集总量50-500条
  203. 转移至极低频率模式(7500、7510、7700):30天采集总量0-50条
  204. 特殊节点(7520):0条
  205. */
  206. save := []map[string]interface{}{}
  207. for _, sp := range CodeMap {
  208. if sp.DataNum >= 500 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 1 { //本身不是高性能节点超过500条的
  209. sp.IsMove = true
  210. if sp.FromEvent == 7700 { //7700、7410节点特殊性
  211. sp.ToEvent = 7410
  212. } else {
  213. sp.ToEvent = EventArrType1[0].Event
  214. }
  215. } else if sp.DataNum >= 10 && sp.DataNum < 500 && sp.FromEvent != 7700 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 2 {
  216. sp.IsMove = true
  217. sp.ToEvent = EventArrType2[0].Event
  218. } else if sp.DataNum > 0 && sp.DataNum < 10 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 3 {
  219. sp.IsMove = true
  220. if sp.FromEvent == 7410 { //7700、7410节点特殊性
  221. sp.ToEvent = 7700
  222. } else {
  223. sp.ToEvent = EventArrType3[0].Event
  224. }
  225. } else if sp.DataNum == 0 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 4 {
  226. sp.IsMove = true
  227. if sp.FromEvent == 7410 { //7700、7410节点特殊性
  228. sp.ToEvent = 7700
  229. } else {
  230. sp.ToEvent = 7520
  231. }
  232. }
  233. //存储爬虫统计信息
  234. byteText, err := json.Marshal(sp)
  235. if err != nil {
  236. logger.Debug("Json Marshal Error", sp.Code)
  237. continue
  238. }
  239. tmp := map[string]interface{}{}
  240. if json.Unmarshal(byteText, &tmp) == nil {
  241. save = append(save, tmp)
  242. if len(save) >= 1000 {
  243. util.MgoEB.SaveBulk("luamovevent", save...)
  244. save = []map[string]interface{}{}
  245. }
  246. }
  247. }
  248. if len(save) > 0 {
  249. util.MgoEB.SaveBulk("luamovevent", save...)
  250. save = []map[string]interface{}{}
  251. }
  252. }