package timetask
import (
"bytes"
"fmt"
"github.com/donnie4w/go-logger/logger"
"net/http"
qu "qfw/util"
"sync"
"sync/atomic"
"time"
"util"
)
var (
CodePlatformMap map[string]string
LuaListDownloadAllNum int64
LuaListDownloadSuccessAllNum int64
LuaBiddingDownloadAllNum int64
PythonListDownloadAllNum int64
PythonListDownloadSuccessAllNum int64
PythonBiddingDownloadAllNum int64
Publishtime string
)
var LuaPythonNumModel = `{
"msgtype": "text",
"text": {
"content": "%s"
}
}`
var MarkdownModel = `{
"msgtype": "markdown",
"markdown": {
"content": "%s"
}
}`
var NumContentModel = `
>平台:%s
>列表页采集量:%d
>列表页采集成功量:%d\n
>Bidding成功量:%d\n
`
//每日采集量统计
func CountLuaPythonNumEveryDay() {
//lua python每日采集量统计
CodePlatformMap = map[string]string{}
startTime := util.GetTime(-1)
Publishtime = qu.FormatDateByInt64(&startTime, qu.Date_Short_Layout)
//重置
LuaListDownloadAllNum = 0
LuaListDownloadSuccessAllNum = 0
LuaBiddingDownloadAllNum = 0
PythonListDownloadAllNum = 0
PythonListDownloadSuccessAllNum = 0
PythonBiddingDownloadAllNum = 0
GetCodePlatform() //爬虫所有平台
GetBiddingCount() //统计bidding表爬虫采集量
GetPythonListDownloadNum()
GetLuaListDownloadNum()
SendLuaPythonAllNum()
}
func GetCodePlatform() {
defer qu.Catch()
sess := util.MgoEB.GetMgoConn()
defer util.MgoEB.DestoryMongoConn(sess)
lock := &sync.Mutex{}
wg := &sync.WaitGroup{}
ch := make(chan bool, 5)
query := map[string]interface{}{}
fields := map[string]interface{}{
"platform": 1,
"code": 1,
}
it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
n := 0
for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
wg.Add(1)
ch <- true
go func(tmp map[string]interface{}) {
defer func() {
<-ch
wg.Done()
}()
platform := qu.ObjToString(tmp["platform"])
code := qu.ObjToString(tmp["code"])
lock.Lock()
CodePlatformMap[code] = platform
lock.Unlock()
}(tmp)
if n%1000 == 0 {
logger.Debug(n)
}
tmp = map[string]interface{}{}
}
wg.Wait()
logger.Debug("爬虫所属平台信息准备完成...", len(CodePlatformMap))
}
func GetBiddingCount() {
defer qu.Catch()
sess := util.MgoB.GetMgoConn()
defer util.MgoB.DestoryMongoConn(sess)
//lock := &sync.Mutex{}
wg := &sync.WaitGroup{}
ch := make(chan bool, 5)
query := map[string]interface{}{
"comeintime": map[string]interface{}{
"$gte": util.GetTime(-1),
"$lt": util.GetTime(0),
},
}
fieles := map[string]interface{}{
"spidercode": 1,
}
count := util.MgoB.Count("bidding", query)
logger.Debug("bidding采集数据量:", count)
it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter()
n := 0
for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
wg.Add(1)
ch <- true
go func(tmp map[string]interface{}) {
defer func() {
<-ch
wg.Done()
}()
code := qu.ObjToString(tmp["spidercode"])
platform := CodePlatformMap[code]
if platform == "golua平台" || platform == "chrome" {
atomic.AddInt64(&LuaBiddingDownloadAllNum, 1)
} else if platform == "python" {
atomic.AddInt64(&PythonBiddingDownloadAllNum, 1)
} else {
atomic.AddInt64(&PythonBiddingDownloadAllNum, 1)
qu.Debug(code)
}
}(tmp)
if n%10000 == 0 {
logger.Debug(n)
}
tmp = map[string]interface{}{}
}
wg.Wait()
logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum)
}
//python统计列表页采集量
func GetPythonListDownloadNum() {
defer qu.Catch()
logger.Debug("python列表页数据下载量统计开始...")
sess := util.MgoPy.GetMgoConn()
defer util.MgoPy.DestoryMongoConn(sess)
query := map[string]interface{}{
"runtime": Publishtime,
"rel_count": map[string]interface{}{
"$gt": 0,
},
}
fields := map[string]interface{}{
"rel_count": 1,
}
wg := &sync.WaitGroup{}
ch := make(chan bool, 5)
it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter()
n := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
wg.Add(1)
ch <- true
go func(tmp map[string]interface{}) {
defer func() {
<-ch
wg.Done()
}()
count := qu.IntAll(tmp["rel_count"])
atomic.AddInt64(&PythonListDownloadAllNum, int64(count))
}(tmp)
if n%1000 == 0 {
logger.Debug(n)
}
tmp = map[string]interface{}{}
}
wg.Wait()
queryAll := map[string]interface{}{
"comeintime": map[string]interface{}{
"$gte": util.GetTime(-1),
"$lt": util.GetTime(0),
},
}
count := util.MgoPy.Count("data_bak", queryAll)
PythonListDownloadSuccessAllNum = int64(count)
qu.Debug("python列表页采集量:", PythonListDownloadAllNum, "采集成功量:", PythonListDownloadSuccessAllNum)
}
//lua统计列表页采集量
func GetLuaListDownloadNum() {
queryAll := map[string]interface{}{
"comeintime": map[string]interface{}{
"$gte": util.GetTime(-1),
"$lt": util.GetTime(0),
},
}
querySuccess := map[string]interface{}{
"comeintime": map[string]interface{}{
"$gte": util.GetTime(-1),
"$lt": util.GetTime(0),
},
"state": 1,
}
//spider_highlistdata
allNum1 := util.MgoS.Count("spider_highlistdata", queryAll)
successNum1 := util.MgoS.Count("spider_highlistdata", querySuccess)
qu.Debug("spider_highlistdata", allNum1, successNum1)
//spider_listdata
allNum2 := util.MgoS.Count("spider_listdata", queryAll)
successNum2 := util.MgoS.Count("spider_listdata", querySuccess)
qu.Debug("spider_listdata", allNum2, successNum2)
//spider_historydata
allNum3 := util.MgoS.Count("spider_historydata", queryAll)
successNum3 := util.MgoS.Count("spider_historydata", querySuccess)
qu.Debug("spider_historydata", allNum3, successNum3)
//spider_historydata_back
allNum4 := util.MgoS.Count("spider_historydata_back", queryAll)
successNum4 := util.MgoS.Count("spider_historydata_back", querySuccess)
qu.Debug("spider_historydata_back", allNum4, successNum4)
LuaListDownloadAllNum = int64(allNum1) + int64(allNum2) + int64(allNum3) + int64(allNum4)
LuaListDownloadSuccessAllNum = int64(successNum1) + int64(successNum2) + int64(successNum3) + int64(successNum4)
qu.Debug("lua列表页采集量:", LuaListDownloadAllNum, "采集成功量:", LuaListDownloadSuccessAllNum)
}
func SendLuaPythonAllNum() {
defer qu.Catch()
luaContent := fmt.Sprintf(NumContentModel, "Lua", LuaListDownloadAllNum, LuaListDownloadSuccessAllNum, LuaBiddingDownloadAllNum)
pythonContent := fmt.Sprintf(NumContentModel, "python", PythonListDownloadAllNum, PythonListDownloadSuccessAllNum, PythonBiddingDownloadAllNum)
resultContent := fmt.Sprintf(MarkdownModel, Publishtime+",Lua、Python各维度采集量统计结果如下:\n"+luaContent+pythonContent)
qu.Debug(resultContent)
//保存记录
util.MgoS.Save("spider_luapythoncount", map[string]interface{}{
"lualistnum": LuaListDownloadAllNum,
"lualistsuccessnum": LuaListDownloadSuccessAllNum,
"luabiddingnum": LuaBiddingDownloadAllNum,
"pythonlistnum": PythonListDownloadAllNum,
"pythonlistsuccessnum": PythonListDownloadSuccessAllNum,
"pythonbiddingnum": PythonBiddingDownloadAllNum,
"comeintime": time.Now().Unix(),
"date": Publishtime,
})
//发送统计
resp, err := http.Post(
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97850772-88d0-4544-a2c3-6201aeddff9e",
"application/json",
bytes.NewBuffer([]byte(resultContent)),
)
if err != nil {
fmt.Println("request error:", err)
return
}
defer resp.Body.Close()
}
func SummaryCode() {
defer qu.Catch()
qu.Debug("上架爬虫信息汇总开始...")
qu.Debug("开始统计spider_highlisthdata信息...")
//统计spider_highlisthdata信息
codeHlistDnumMap := map[string]int{} //记录爬虫昨天下载量
codeErrDnumMap := map[string]int{} //记录爬虫昨天下载失败量
sm_ch1 := make(chan bool, 5)
sm_wg1 := &sync.WaitGroup{}
sm_lock1 := &sync.Mutex{}
sm_stime, sm_etime := util.GetTime(-1), util.GetTime(0)
sess_s := util.MgoS.GetMgoConn()
defer util.MgoS.DestoryMongoConn(sess_s)
timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout)
query := map[string]interface{}{
"publishtime": map[string]interface{}{
"$regex": timestr,
},
}
fs := map[string]interface{}{
"spidercode": 1,
"state": 1,
}
count, _ := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Count()
qu.Debug(timestr, "spider_highlisthdata共采集数据:", count)
it_sh := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Select(&fs).Iter()
for tmp := make(map[string]interface{}); it_sh.Next(&tmp); {
sm_wg1.Add(1)
sm_ch1 <- true
go func(tmp map[string]interface{}) {
defer func() {
<-sm_ch1
sm_wg1.Done()
}()
state := qu.IntAll(tmp["state"])
code := qu.ObjToString(tmp["spidercode"])
sm_lock1.Lock()
if state == -1 {
codeErrDnumMap[code]++
}
codeHlistDnumMap[code]++
sm_lock1.Unlock()
}(tmp)
tmp = map[string]interface{}{}
}
qu.Debug("spider_highlistdata采集信息的爬虫总量:", len(codeHlistDnumMap), " 下载失败爬虫的总量:", len(codeErrDnumMap))
qu.Debug("开始统计data_bak信息...")
codeDbakDnumMap := map[string]int{} //记录爬虫昨天下载量
query = map[string]interface{}{
"l_np_publishtime": map[string]interface{}{
"$gte": sm_stime,
"$lte": sm_etime,
},
}
fs = map[string]interface{}{
"spidercode": 1,
}
count, _ = sess_s.DB("spider").C("data_bak").Find(&query).Count()
qu.Debug(timestr, "data_bak共采集数据:", count)
it_sd := sess_s.DB("spider").C("data_bak").Find(&query).Select(&fs).Iter()
for tmp := make(map[string]interface{}); it_sd.Next(&tmp); {
sm_wg1.Add(1)
sm_ch1 <- true
go func(tmp map[string]interface{}) {
defer func() {
<-sm_ch1
sm_wg1.Done()
}()
code := qu.ObjToString(tmp["spidercode"])
sm_lock1.Lock()
codeDbakDnumMap[code]++
sm_lock1.Unlock()
}(tmp)
tmp = map[string]interface{}{}
}
sm_wg1.Wait()
qu.Debug("data_bak采集信息的爬虫总量:", len(codeDbakDnumMap))
//统计爬虫
query = map[string]interface{}{
"$or": []interface{}{
map[string]interface{}{"state": 5},
map[string]interface{}{
"state": map[string]interface{}{
"$in": []int{0, 1, 2},
},
"event": map[string]interface{}{
"$ne": 7000,
},
},
},
}
sm_ch2 := make(chan bool, 5)
sm_wg2 := &sync.WaitGroup{}
sm_lock2 := &sync.Mutex{}
arr := []map[string]interface{}{}
sess_e := util.MgoEB.GetMgoConn()
defer util.MgoEB.DestoryMongoConn(sess_e)
fe := map[string]interface{}{
"code": 1,
"event": 1,
"param_common": 1,
"model": 1,
"platform": 1,
"createuser": 1,
"createuserid": 1,
}
it_e := sess_e.DB("editor").C("luaconfig").Find(&query).Select(&fe).Iter()
n := 0
for tmp := make(map[string]interface{}); it_e.Next(&tmp); n++ {
sm_wg2.Add(1)
sm_ch2 <- true
go func(tmp map[string]interface{}) {
defer func() {
<-sm_ch2
sm_wg2.Done()
}()
result := map[string]interface{}{}
code := qu.ObjToString(tmp["code"])
result["code"] = code
result["modify"] = tmp["createuser"]
result["modifyid"] = tmp["createuserid"]
result["event"] = tmp["event"]
result["platform"] = tmp["platform"]
result["comeintime"] = time.Now().Unix()
//1、统计data_bak下载量
result["download"] = codeDbakDnumMap[code]
//2、统计spider_highlistdata下载量和下载失败量
result["hl_download"] = codeHlistDnumMap[code]
result["hl_downloaderr"] = codeErrDnumMap[code]
//3、查询spider_sitecheck中url状态码
q := map[string]interface{}{
"code": code,
"comeintime": map[string]interface{}{
"$gte": sm_stime,
"$lte": sm_etime,
},
}
data, _ := util.MgoS.FindOne("spider_sitecheck", q) //spider_sitecheck只记录了错误状态码爬虫
if data != nil && len(*data) > 0 {
result["statuscode"] = qu.Int64All((*data)["statuscode"])
} else {
result["statuscode"] = 200
}
//4、查询spider_warn爬虫的下载错误信息
errinfo := map[string]interface{}{}
fnMap_lev1 := map[string]int{}
fnMap_lev2 := map[string]int{}
warnDatas, _ := util.MgoS.Find("spider_warn", q, nil, `{"field":1,"level":1}`, false, -1, -1)
for _, d := range *warnDatas {
field := qu.ObjToString(d["field"])
level := qu.IntAll(d["level"])
if level == 1 {
fnMap_lev1[field] += 1
} else {
fnMap_lev2[field] += 1
}
}
if len(fnMap_lev1) > 0 {
errinfo["1"] = fnMap_lev1
}
if len(fnMap_lev2) > 0 {
errinfo["2"] = fnMap_lev2
}
result["errinfo"] = errinfo
//
pc := tmp["param_common"].([]interface{})
if len(pc) > 2 {
result["site"] = pc[1]
result["channel"] = pc[2]
}
if len(pc) > 12 {
result["url"] = pc[11]
}
if model, ok := tmp["model"].(map[string]interface{}); ok && model != nil {
result["area"] = qu.ObjToString(model["area"])
result["city"] = qu.ObjToString(model["city"])
result["district"] = qu.ObjToString(model["district"])
}
sm_lock2.Lock()
arr = append(arr, result)
if len(arr) > 500 {
tmps := arr
util.MgoS.SaveBulk("spider_summaryinfo", tmps...)
arr = []map[string]interface{}{}
}
sm_lock2.Unlock()
}(tmp)
if n%500 == 0 {
qu.Debug("current:", n)
}
tmp = map[string]interface{}{}
}
sm_wg2.Wait()
if len(arr) > 0 {
util.MgoS.SaveBulk("spider_summaryinfo", arr...)
arr = []map[string]interface{}{}
}
qu.Debug("上架爬虫信息汇总结束...")
}