package timetask import ( "bytes" "fmt" "github.com/donnie4w/go-logger/logger" "net/http" qu "qfw/util" "sync" "sync/atomic" "time" "util" ) var ( CodePlatformMap map[string]string LuaListDownloadAllNum int64 LuaListDownloadSuccessAllNum int64 LuaBiddingDownloadAllNum int64 PythonListDownloadAllNum int64 PythonListDownloadSuccessAllNum int64 PythonBiddingDownloadAllNum int64 Publishtime string ) var LuaPythonNumModel = `{ "msgtype": "text", "text": { "content": "%s" } }` var MarkdownModel = `{ "msgtype": "markdown", "markdown": { "content": "%s" } }` var NumContentModel = ` >平台:%s >列表页采集量:%d >列表页采集成功量:%d\n >Bidding成功量:%d\n ` //每日采集量统计 func CountLuaPythonNumEveryDay() { //lua python每日采集量统计 CodePlatformMap = map[string]string{} startTime := util.GetTime(-1) Publishtime = qu.FormatDateByInt64(&startTime, qu.Date_Short_Layout) //重置 LuaListDownloadAllNum = 0 LuaListDownloadSuccessAllNum = 0 LuaBiddingDownloadAllNum = 0 PythonListDownloadAllNum = 0 PythonListDownloadSuccessAllNum = 0 PythonBiddingDownloadAllNum = 0 GetCodePlatform() //爬虫所有平台 GetBiddingCount() //统计bidding表爬虫采集量 GetPythonListDownloadNum() GetLuaListDownloadNum() SendLuaPythonAllNum() } func GetCodePlatform() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{} fields := map[string]interface{}{ "platform": 1, "code": 1, } it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() platform := qu.ObjToString(tmp["platform"]) code := qu.ObjToString(tmp["code"]) lock.Lock() CodePlatformMap[code] = platform lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("爬虫所属平台信息准备完成...", len(CodePlatformMap)) } func GetBiddingCount() { defer qu.Catch() sess := util.MgoB.GetMgoConn() defer util.MgoB.DestoryMongoConn(sess) //lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-1), "$lt": util.GetTime(0), }, } fieles := map[string]interface{}{ "spidercode": 1, } count := util.MgoB.Count("bidding", query) logger.Debug("bidding采集数据量:", count) it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) platform := CodePlatformMap[code] if platform == "golua平台" || platform == "chrome" { atomic.AddInt64(&LuaBiddingDownloadAllNum, 1) } else if platform == "python" { atomic.AddInt64(&PythonBiddingDownloadAllNum, 1) } else { atomic.AddInt64(&PythonBiddingDownloadAllNum, 1) qu.Debug(code) } }(tmp) if n%10000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum) } //python统计列表页采集量 func GetPythonListDownloadNum() { defer qu.Catch() logger.Debug("python列表页数据下载量统计开始...") sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) query := map[string]interface{}{ "runtime": Publishtime, "rel_count": map[string]interface{}{ "$gt": 0, }, } fields := map[string]interface{}{ "rel_count": 1, } wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() count := qu.IntAll(tmp["rel_count"]) atomic.AddInt64(&PythonListDownloadAllNum, int64(count)) }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() queryAll := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-1), "$lt": util.GetTime(0), }, } count := util.MgoPy.Count("data_bak", queryAll) PythonListDownloadSuccessAllNum = int64(count) qu.Debug("python列表页采集量:", PythonListDownloadAllNum, "采集成功量:", PythonListDownloadSuccessAllNum) } //lua统计列表页采集量 func GetLuaListDownloadNum() { queryAll := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-1), "$lt": util.GetTime(0), }, } querySuccess := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-1), "$lt": util.GetTime(0), }, "state": 1, } //spider_highlistdata allNum1 := util.MgoS.Count("spider_highlistdata", queryAll) successNum1 := util.MgoS.Count("spider_highlistdata", querySuccess) qu.Debug("spider_highlistdata", allNum1, successNum1) //spider_listdata allNum2 := util.MgoS.Count("spider_listdata", queryAll) successNum2 := util.MgoS.Count("spider_listdata", querySuccess) qu.Debug("spider_listdata", allNum2, successNum2) //spider_historydata allNum3 := util.MgoS.Count("spider_historydata", queryAll) successNum3 := util.MgoS.Count("spider_historydata", querySuccess) qu.Debug("spider_historydata", allNum3, successNum3) //spider_historydata_back allNum4 := util.MgoS.Count("spider_historydata_back", queryAll) successNum4 := util.MgoS.Count("spider_historydata_back", querySuccess) qu.Debug("spider_historydata_back", allNum4, successNum4) LuaListDownloadAllNum = int64(allNum1) + int64(allNum2) + int64(allNum3) + int64(allNum4) LuaListDownloadSuccessAllNum = int64(successNum1) + int64(successNum2) + int64(successNum3) + int64(successNum4) qu.Debug("lua列表页采集量:", LuaListDownloadAllNum, "采集成功量:", LuaListDownloadSuccessAllNum) } func SendLuaPythonAllNum() { defer qu.Catch() luaContent := fmt.Sprintf(NumContentModel, "Lua", LuaListDownloadAllNum, LuaListDownloadSuccessAllNum, LuaBiddingDownloadAllNum) pythonContent := fmt.Sprintf(NumContentModel, "python", PythonListDownloadAllNum, PythonListDownloadSuccessAllNum, PythonBiddingDownloadAllNum) resultContent := fmt.Sprintf(MarkdownModel, Publishtime+",Lua、Python各维度采集量统计结果如下:\n"+luaContent+pythonContent) qu.Debug(resultContent) //保存记录 util.MgoS.Save("spider_luapythoncount", map[string]interface{}{ "lualistnum": LuaListDownloadAllNum, "lualistsuccessnum": LuaListDownloadSuccessAllNum, "luabiddingnum": LuaBiddingDownloadAllNum, "pythonlistnum": PythonListDownloadAllNum, "pythonlistsuccessnum": PythonListDownloadSuccessAllNum, "pythonbiddingnum": PythonBiddingDownloadAllNum, "comeintime": time.Now().Unix(), "date": Publishtime, }) //发送统计 resp, err := http.Post( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97850772-88d0-4544-a2c3-6201aeddff9e", "application/json", bytes.NewBuffer([]byte(resultContent)), ) if err != nil { fmt.Println("request error:", err) return } defer resp.Body.Close() } func SummaryCode() { defer qu.Catch() qu.Debug("上架爬虫信息汇总开始...") qu.Debug("开始统计spider_highlisthdata信息...") //统计spider_highlisthdata信息 codeHlistDnumMap := map[string]int{} //记录爬虫昨天下载量 codeErrDnumMap := map[string]int{} //记录爬虫昨天下载失败量 sm_ch1 := make(chan bool, 5) sm_wg1 := &sync.WaitGroup{} sm_lock1 := &sync.Mutex{} sm_stime, sm_etime := util.GetTime(-1), util.GetTime(0) sess_s := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess_s) timestr := qu.FormatDateByInt64(&sm_stime, qu.Date_Short_Layout) query := map[string]interface{}{ "publishtime": map[string]interface{}{ "$regex": timestr, }, } fs := map[string]interface{}{ "spidercode": 1, "state": 1, } count, _ := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Count() qu.Debug(timestr, "spider_highlisthdata共采集数据:", count) it_sh := sess_s.DB("spider").C("spider_highlistdata").Find(&query).Select(&fs).Iter() for tmp := make(map[string]interface{}); it_sh.Next(&tmp); { sm_wg1.Add(1) sm_ch1 <- true go func(tmp map[string]interface{}) { defer func() { <-sm_ch1 sm_wg1.Done() }() state := qu.IntAll(tmp["state"]) code := qu.ObjToString(tmp["spidercode"]) sm_lock1.Lock() if state == -1 { codeErrDnumMap[code]++ } codeHlistDnumMap[code]++ sm_lock1.Unlock() }(tmp) tmp = map[string]interface{}{} } qu.Debug("spider_highlistdata采集信息的爬虫总量:", len(codeHlistDnumMap), " 下载失败爬虫的总量:", len(codeErrDnumMap)) qu.Debug("开始统计data_bak信息...") codeDbakDnumMap := map[string]int{} //记录爬虫昨天下载量 query = map[string]interface{}{ "l_np_publishtime": map[string]interface{}{ "$gte": sm_stime, "$lte": sm_etime, }, } fs = map[string]interface{}{ "spidercode": 1, } count, _ = sess_s.DB("spider").C("data_bak").Find(&query).Count() qu.Debug(timestr, "data_bak共采集数据:", count) it_sd := sess_s.DB("spider").C("data_bak").Find(&query).Select(&fs).Iter() for tmp := make(map[string]interface{}); it_sd.Next(&tmp); { sm_wg1.Add(1) sm_ch1 <- true go func(tmp map[string]interface{}) { defer func() { <-sm_ch1 sm_wg1.Done() }() code := qu.ObjToString(tmp["spidercode"]) sm_lock1.Lock() codeDbakDnumMap[code]++ sm_lock1.Unlock() }(tmp) tmp = map[string]interface{}{} } sm_wg1.Wait() qu.Debug("data_bak采集信息的爬虫总量:", len(codeDbakDnumMap)) //统计爬虫 query = map[string]interface{}{ "$or": []interface{}{ map[string]interface{}{"state": 5}, map[string]interface{}{ "state": map[string]interface{}{ "$in": []int{0, 1, 2}, }, "event": map[string]interface{}{ "$ne": 7000, }, }, }, } sm_ch2 := make(chan bool, 5) sm_wg2 := &sync.WaitGroup{} sm_lock2 := &sync.Mutex{} arr := []map[string]interface{}{} sess_e := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess_e) fe := map[string]interface{}{ "code": 1, "event": 1, "param_common": 1, "model": 1, "platform": 1, "createuser": 1, "createuserid": 1, } it_e := sess_e.DB("editor").C("luaconfig").Find(&query).Select(&fe).Iter() n := 0 for tmp := make(map[string]interface{}); it_e.Next(&tmp); n++ { sm_wg2.Add(1) sm_ch2 <- true go func(tmp map[string]interface{}) { defer func() { <-sm_ch2 sm_wg2.Done() }() result := map[string]interface{}{} code := qu.ObjToString(tmp["code"]) result["code"] = code result["modify"] = tmp["createuser"] result["modifyid"] = tmp["createuserid"] result["event"] = tmp["event"] result["platform"] = tmp["platform"] result["comeintime"] = time.Now().Unix() //1、统计data_bak下载量 result["download"] = codeDbakDnumMap[code] //2、统计spider_highlistdata下载量和下载失败量 result["hl_download"] = codeHlistDnumMap[code] result["hl_downloaderr"] = codeErrDnumMap[code] //3、查询spider_sitecheck中url状态码 q := map[string]interface{}{ "code": code, "comeintime": map[string]interface{}{ "$gte": sm_stime, "$lte": sm_etime, }, } data, _ := util.MgoS.FindOne("spider_sitecheck", q) //spider_sitecheck只记录了错误状态码爬虫 if data != nil && len(*data) > 0 { result["statuscode"] = qu.Int64All((*data)["statuscode"]) } else { result["statuscode"] = 200 } //4、查询spider_warn爬虫的下载错误信息 errinfo := map[string]interface{}{} fnMap_lev1 := map[string]int{} fnMap_lev2 := map[string]int{} warnDatas, _ := util.MgoS.Find("spider_warn", q, nil, `{"field":1,"level":1}`, false, -1, -1) for _, d := range *warnDatas { field := qu.ObjToString(d["field"]) level := qu.IntAll(d["level"]) if level == 1 { fnMap_lev1[field] += 1 } else { fnMap_lev2[field] += 1 } } if len(fnMap_lev1) > 0 { errinfo["1"] = fnMap_lev1 } if len(fnMap_lev2) > 0 { errinfo["2"] = fnMap_lev2 } result["errinfo"] = errinfo // pc := tmp["param_common"].([]interface{}) if len(pc) > 2 { result["site"] = pc[1] result["channel"] = pc[2] } if len(pc) > 12 { result["url"] = pc[11] } if model, ok := tmp["model"].(map[string]interface{}); ok && model != nil { result["area"] = qu.ObjToString(model["area"]) result["city"] = qu.ObjToString(model["city"]) result["district"] = qu.ObjToString(model["district"]) } sm_lock2.Lock() arr = append(arr, result) if len(arr) > 500 { tmps := arr util.MgoS.SaveBulk("spider_summaryinfo", tmps...) arr = []map[string]interface{}{} } sm_lock2.Unlock() }(tmp) if n%500 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } sm_wg2.Wait() if len(arr) > 0 { util.MgoS.SaveBulk("spider_summaryinfo", arr...) arr = []map[string]interface{}{} } qu.Debug("上架爬虫信息汇总结束...") }