123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- package timetask
- import (
- "encoding/json"
- "github.com/donnie4w/go-logger/logger"
- qu "qfw/util"
- "sort"
- "sync"
- "time"
- "util"
- )
- type Spider struct {
- Code string `json:"code"`
- Site string `json:"site"`
- Channel string `json:"channel"`
- Href string `json:"href"`
- MaxPage int `json:"maxpage"`
- CycleTime int `json:"cycletime"`
- FromEvent int `json:"fromevent"` //现节点
- ToEvent int `json:"toevent"` //目标节点
- DataNum int `json:"datanum"` //采集量
- PtimeDataNum int `json:"ptimedatanum"` //按发布时间统计的采集量
- //Average int `json:"average"` //平均值
- IsMove bool `json:"ismove"` //是否转移节点
- State int `json:"state"`
- Comeintime int64 `json:"comeintime"`
- }
- var CodeMap map[string]*Spider
- type EventNum struct {
- Event int //节点
- Num int //节点爬虫数量
- }
- //节点上的爬虫个数
- var EventMapType1 = map[int]int{} //7100、7110、7400、7410
- var EventMapType2 = map[int]int{} //7200、7210、7300、7310
- var EventMapType3 = map[int]int{} //7500、7510、7700
- var EventArrType1 []*EventNum //7100、7110、7400、7410
- var EventArrType2 []*EventNum //7200、7210、7300、7310
- var EventArrType3 []*EventNum //7500、7510、7700
- func LuaMoveEvent() {
- defer qu.Catch()
- CodeMap = map[string]*Spider{}
- GetLuaInfo() //获取爬虫信息
- GetDataNum() //统计爬虫采集量
- GetMoveLua() //计算哪些爬虫需要转节点
- }
- func GetLuaInfo() {
- defer qu.Catch()
- sess := util.MgoEB.GetMgoConn()
- defer util.MgoEB.DestoryMongoConn(sess)
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- query := map[string]interface{}{
- "platform": "golua平台",
- "state": map[string]interface{}{
- "$in": []int{0, 1, 2, 5}, //待完成、待审核、未通过、已上架
- },
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- }
- fields := map[string]interface{}{
- "event": 1,
- "code": 1,
- "site": 1,
- "channel": 1,
- "param_common": 1,
- }
- count := util.MgoEB.Count("luaconfig", query)
- logger.Debug("共加载线上爬虫个数:", count)
- it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["code"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- event := qu.IntAll(tmp["event"])
- href := ""
- maxPage, cycletime := 0, 0
- if param_common := tmp["param_common"].([]interface{}); len(param_common) >= 12 {
- href = qu.ObjToString(param_common[11])
- maxPage = qu.IntAll(param_common[5])
- cycletime = qu.IntAll(param_common[6])
- }
- lock.Lock()
- if event != 7410 && event != 7700 {
- if util.CodeEventType[event] == 1 {
- EventMapType1[event]++
- } else if util.CodeEventType[event] == 2 {
- EventMapType2[event]++
- } else if util.CodeEventType[event] == 3 {
- EventMapType3[event]++
- }
- }
- CodeMap[code] = &Spider{
- Code: code,
- Site: site,
- Channel: channel,
- Href: href,
- MaxPage: maxPage,
- CycleTime: cycletime,
- FromEvent: event,
- Comeintime: time.Now().Unix(),
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- //排序,选出数量最少的节点
- for event, num := range EventMapType1 {
- EventArrType1 = append(EventArrType1, &EventNum{
- Event: event,
- Num: num,
- })
- }
- sort.Slice(EventArrType1, func(i, j int) bool {
- return EventArrType1[i].Num < EventArrType1[j].Num // 升序
- })
- for event, num := range EventMapType2 {
- EventArrType2 = append(EventArrType2, &EventNum{
- Event: event,
- Num: num,
- })
- }
- sort.Slice(EventArrType2, func(i, j int) bool {
- return EventArrType2[i].Num < EventArrType2[j].Num // 升序
- })
- for event, num := range EventMapType3 {
- EventArrType3 = append(EventArrType3, &EventNum{
- Event: event,
- Num: num,
- })
- }
- sort.Slice(EventArrType3, func(i, j int) bool {
- return EventArrType3[i].Num < EventArrType3[j].Num // 升序
- })
- logger.Debug("爬虫基本信息准备完成...", EventArrType1[0].Event, EventArrType2[0].Event, EventArrType3[0].Event)
- }
- func GetDataNum() {
- defer qu.Catch()
- sess := util.MgoEB.GetMgoConn()
- defer util.MgoEB.DestoryMongoConn(sess)
- lock := &sync.Mutex{}
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 5)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": util.GetTime(-30),
- "$lt": util.GetTime(0),
- },
- }
- fieles := map[string]interface{}{
- "spidercode": 1,
- "ptimeallnum": 1, //按发布时间统计的每天的采集量
- "downloadallnum": 1, //每天的采集量
- }
- it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fieles).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- spidercode := qu.ObjToString(tmp["spidercode"])
- ptimeallnum := qu.IntAll(tmp["ptimeallnum"])
- downloadallnum := qu.IntAll(tmp["downloadallnum"])
- lock.Lock()
- if sp := CodeMap[spidercode]; sp != nil {
- sp.DataNum += downloadallnum
- sp.PtimeDataNum += ptimeallnum
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- logger.Debug(n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("爬虫一个月数据量统计完毕...")
- }
- func GetMoveLua() {
- defer qu.Catch()
- /*
- 1、按入库时间采集的数据量统计
- 2、转移节点规则
- 转移至高频率模式(7100、7110、7400、7410):30天采集总量超过500条
- 转移至低频率队列模式(7200、7210、7300、7310):30天采集总量50-500条
- 转移至极低频率模式(7500、7510、7700):30天采集总量0-50条
- 特殊节点(7520):0条
- */
- save := []map[string]interface{}{}
- for _, sp := range CodeMap {
- if sp.DataNum >= 500 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 1 { //本身不是高性能节点超过500条的
- sp.IsMove = true
- if sp.FromEvent == 7700 { //7700、7410节点特殊性
- sp.ToEvent = 7410
- } else {
- sp.ToEvent = EventArrType1[0].Event
- }
- } else if sp.DataNum >= 10 && sp.DataNum < 500 && sp.FromEvent != 7700 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 2 {
- sp.IsMove = true
- sp.ToEvent = EventArrType2[0].Event
- } else if sp.DataNum > 0 && sp.DataNum < 10 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 3 {
- sp.IsMove = true
- if sp.FromEvent == 7410 { //7700、7410节点特殊性
- sp.ToEvent = 7700
- } else {
- sp.ToEvent = EventArrType3[0].Event
- }
- } else if sp.DataNum == 0 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 4 {
- sp.IsMove = true
- if sp.FromEvent == 7410 { //7700、7410节点特殊性
- sp.ToEvent = 7700
- } else {
- sp.ToEvent = 7520
- }
- }
- //存储爬虫统计信息
- byteText, err := json.Marshal(sp)
- if err != nil {
- logger.Debug("Json Marshal Error", sp.Code)
- continue
- }
- tmp := map[string]interface{}{}
- if json.Unmarshal(byteText, &tmp) == nil {
- save = append(save, tmp)
- if len(save) >= 1000 {
- util.MgoEB.SaveBulk("luamovevent", save...)
- save = []map[string]interface{}{}
- }
- }
- }
- if len(save) > 0 {
- util.MgoEB.SaveBulk("luamovevent", save...)
- save = []map[string]interface{}{}
- }
- }
|