package timetask import ( "encoding/json" "github.com/donnie4w/go-logger/logger" qu "qfw/util" "sort" "sync" "time" "util" ) type Spider struct { Code string `json:"code"` Site string `json:"site"` Channel string `json:"channel"` FromEvent int `json:"fromevent"` //现节点 ToEvent int `json:"toevent"` //目标节点 DataNum int `json:"datanum"` //采集量 PtimeDataNum int `json:"ptimedatanum"` //按发布时间统计的采集量 //Average int `json:"average"` //平均值 IsMove bool `json:"ismove"` //是否转移节点 State int `json:"state"` Comeintime int64 `json:"comeintime"` } var CodeMap map[string]*Spider type EventNum struct { Event int //节点 Num int //节点爬虫数量 } //节点上的爬虫个数 var EventMapType1 = map[int]int{} //7100、7110、7400、7410 var EventMapType2 = map[int]int{} //7200、7210、7300、7310 var EventMapType3 = map[int]int{} //7500、7510、7700 var EventArrType1 []*EventNum //7100、7110、7400、7410 var EventArrType2 []*EventNum //7200、7210、7300、7310 var EventArrType3 []*EventNum //7500、7510、7700 func LuaMoveEvent() { defer qu.Catch() CodeMap = map[string]*Spider{} GetLuaInfo() //获取爬虫信息 GetDataNum() //统计爬虫采集量 GetMoveLua() //计算哪些爬虫需要转节点 } func GetLuaInfo() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "platform": "golua平台", "state": map[string]interface{}{ "$in": []int{0, 1, 2, 5}, //待完成、待审核、未通过、已上架 }, "event": map[string]interface{}{ "$ne": 7000, }, } fields := map[string]interface{}{ "event": 1, "code": 1, "site": 1, "channel": 1, } count := util.MgoEB.Count("luaconfig", query) logger.Debug("共加载线上爬虫个数:", count) it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) event := qu.IntAll(tmp["event"]) lock.Lock() if event != 7410 && event != 7700 { if util.CodeEventType[event] == 1 { EventMapType1[event]++ } else if util.CodeEventType[event] == 2 { EventMapType2[event]++ } else if util.CodeEventType[event] == 3 { EventMapType3[event]++ } } CodeMap[code] = &Spider{ Code: code, Site: site, Channel: channel, FromEvent: event, Comeintime: time.Now().Unix(), } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() //排序,选出数量最少的节点 for event, num := range EventMapType1 { EventArrType1 = append(EventArrType1, &EventNum{ Event: event, Num: num, }) } sort.Slice(EventArrType1, func(i, j int) bool { return EventArrType1[i].Num < EventArrType1[j].Num // 升序 }) for event, num := range EventMapType2 { EventArrType2 = append(EventArrType2, &EventNum{ Event: event, Num: num, }) } sort.Slice(EventArrType2, func(i, j int) bool { return EventArrType2[i].Num < EventArrType2[j].Num // 升序 }) for event, num := range EventMapType3 { EventArrType3 = append(EventArrType3, &EventNum{ Event: event, Num: num, }) } sort.Slice(EventArrType3, func(i, j int) bool { return EventArrType3[i].Num < EventArrType3[j].Num // 升序 }) logger.Debug("爬虫基本信息准备完成...", EventArrType1[0].Event, EventArrType2[0].Event, EventArrType3[0].Event) } func GetDataNum() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-30), "$lt": util.GetTime(0), }, } fieles := map[string]interface{}{ "spidercode": 1, "ptimeallnum": 1, //按发布时间统计的每天的采集量 "downloadallnum": 1, //每天的采集量 } it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() spidercode := qu.ObjToString(tmp["spidercode"]) ptimeallnum := qu.IntAll(tmp["ptimeallnum"]) downloadallnum := qu.IntAll(tmp["downloadallnum"]) lock.Lock() if sp := CodeMap[spidercode]; sp != nil { sp.DataNum += downloadallnum sp.PtimeDataNum += ptimeallnum } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("爬虫一个月数据量统计完毕...") } func GetMoveLua() { defer qu.Catch() /* 1、按入库时间采集的数据量统计 2、转移节点规则 转移至高频率模式(7100、7110、7400、7410):30天采集总量超过200条 转移至队低频率列模式(7200、7210、7300、7310):30天采集总量50-200条 转移至极低频率模式(7500、7510、7700):30天采集总量0-50条 特殊节点(7520):0条 */ save := []map[string]interface{}{} for _, sp := range CodeMap { if sp.DataNum >= 200 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 1 { //本身不是高性能节点超过200条的 sp.IsMove = true if sp.FromEvent == 7700 { //7700、7410节点特殊性 sp.ToEvent = 7410 } else { sp.ToEvent = EventArrType1[0].Event } } else if sp.DataNum >= 10 && sp.DataNum < 200 && sp.FromEvent != 7700 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 2 { sp.IsMove = true sp.ToEvent = EventArrType2[0].Event } else if sp.DataNum > 0 && sp.DataNum < 10 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 3 { sp.IsMove = true if sp.FromEvent == 7410 { //7700、7410节点特殊性 sp.ToEvent = 7700 } else { sp.ToEvent = EventArrType3[0].Event } } else if sp.DataNum == 0 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 4 { sp.IsMove = true if sp.FromEvent == 7410 { //7700、7410节点特殊性 sp.ToEvent = 7700 } else { sp.ToEvent = 7520 } } //存储爬虫统计信息 byteText, err := json.Marshal(sp) if err != nil { logger.Debug("Json Marshal Error", sp.Code) continue } tmp := map[string]interface{}{} if json.Unmarshal(byteText, &tmp) == nil { save = append(save, tmp) if len(save) >= 1000 { util.MgoEB.SaveBulk("luamovevent", save...) save = []map[string]interface{}{} } } } if len(save) > 0 { util.MgoEB.SaveBulk("luamovevent", save...) save = []map[string]interface{}{} } }