package luatask import ( "bytes" "encoding/json" "fmt" "github.com/donnie4w/go-logger/logger" "github.com/imroc/req" "github.com/tealeg/xlsx" "net/http" qu "qfw/util" "strconv" "sync" "time" "util" ) type SiteInfo struct { Site string `json:"site"` //站点 Num int `json:"averagenum"` //每日网站发布平均量 Modifyuser string `json:"modifyuser"` //维护人 State string `json:"state"` //网站状态 Domain string `json:"domain"` //域名 Stype string `json:"stype"` //网站类型 Platform string `json:"platform"` //所属平台 Coverage string `json:"coverage"` //覆盖率 ListAllNum int `json:"listallnum"` //href去重,当天采集数据量 ListSuccessNum int `json:"listsuccessnum"` //href去重,当天采集成功数据量 PTimeSuccessNum int `json:"ptimesuccessnum"` //href去重,当天发布采集成功数据量 PTimeSuccessDbNum int `json:"ptimesuccessdbnum"` //href去重,data_bak当天发布采集成功数据量 ThreeDaysAgoNum int `json:"threedaysagonum"` //三天前当天的数据量再次统计(有些站点发布延迟导致当天数据量不准确,再次统计) BeforeThreeDaysAgoNum int `json:"beforethreedaysagonum"` //三天前当天的数据量历史统计 Comeintime int64 `json:"comeintime"` //href去重,当天发布采集成功数据量 } var SiteInfoModel = `{ "msgtype": "file", "file": { "media_id": "%s" } }` var LuaListDownloadAllNum int64 var LuaListDownloadSuccessAllNum int64 var LuaBiddingDownloadAllNum int64 var PythonListDownloadAllNum int64 var PythonListDownloadSuccessAllNum int64 var PythonBiddingDownloadAllNum int64 var LuaPythonNumModel = `{ "msgtype": "text", "text": { "content": "%s" } }` var MarkdownModel = `{ "msgtype": "markdown", "markdown": { "content": "%s" } }` var NumContentModel = ` >平台:%s >列表页采集量:%d >列表页采集成功量:%d\n >Bidding成功量:%d\n ` //var AllHref map[string]string //重点网站每日采集量统计 func SendInfoToWxWork_SiteDataCount() { defer qu.Catch() //AllHref = map[string]string{} //初始化 //一、统计 //1、查站点的基础信息 //siteInfoMap := map[string]*SiteInfo{} //siteInfoMap_Back := map[string]*SiteInfo{} allSpiderMap := map[string]*SiteInfo{} list, _ := util.MgoE.Find("site_baseinfo", nil, nil, nil, false, -1, -1) for _, l := range *list { site := qu.ObjToString(l["site"]) vByte, _ := json.Marshal(l) //siteInfo1 := &SiteInfo{} //siteInfo2 := &SiteInfo{} siteInfo3 := &SiteInfo{} //json.Unmarshal(vByte, siteInfo1) //json.Unmarshal(vByte, siteInfo2) json.Unmarshal(vByte, siteInfo3) //siteInfoMap[site] = siteInfo1 //siteInfoMap_Back[site] = siteInfo2 siteInfo3.Comeintime = time.Now().Unix() allSpiderMap[site] = siteInfo3 } //stime := util.GetTime(-1) //etime := util.GetTime(0) //ptime := qu.FormatDateByInt64(&stime, qu.Date_Short_Layout) //qu.Debug(stime, etime, ptime) //2、统计在spider_highlistdata中站点数据量 //GetHighListDataNum(stime, etime, ptime, siteInfoMap) //3、统计spider_listdata站点数据 //GetListDataNum(stime, etime, ptime, siteInfoMap) //4、统计data_bak //GetDataBakNum(stime, etime, siteInfoMap) //5、单独统计data_bak //GetDataBakNum_Back(stime, etime, siteInfoMap_Back) //6、聚合spidercode统计结果 GetAllSpidercodeNum(allSpiderMap) //6、汇总excel //GetSiteInfoExcel(siteInfoMap, siteInfoMap_Back, allSpiderMap) day := GetThreeDaysAgoNum(allSpiderMap) GetSiteInfoExcel(allSpiderMap, day) } func GetAllSpidercodeNum(siteInfoMap map[string]*SiteInfo) { defer qu.Catch() logger.Info("统计采集量luacodeinfo开始...") sess := util.MgoE.GetMgoConn() defer util.MgoE.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(0), }, } fields := map[string]interface{}{ "repeatptimesuccessnum": 1, "repeatptimesuccessdbnum": 1, "repeatdownloadallnum": 1, "repeatdownloadsuccessnum": 1, "site": 1, "platform": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoE.DbName).C("luacodeinfo").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() site := qu.ObjToString(tmp["site"]) platform := qu.ObjToString(tmp["platform"]) repeatdownloadallnum := qu.IntAll(tmp["repeatdownloadallnum"]) repeatdownloadsuccessnum := qu.IntAll(tmp["repeatdownloadsuccessnum"]) repeatptimesuccessnum := qu.IntAll(tmp["repeatptimesuccessnum"]) repeatptimesuccessdbnum := qu.IntAll(tmp["repeatptimesuccessdbnum"]) if platform == "python" { site = site + "(python)" } lock.Lock() if info := siteInfoMap[site]; info != nil { //匹配要统计的重点网站 info.ListAllNum += repeatdownloadallnum info.ListSuccessNum += repeatdownloadsuccessnum info.PTimeSuccessNum += repeatptimesuccessnum info.PTimeSuccessDbNum += repeatptimesuccessdbnum } lock.Unlock() }(tmp) if n%100 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("统计采集量luacodeinfo完成...") } func GetThreeDaysAgoNum(siteInfoMap map[string]*SiteInfo) (strStime string) { defer qu.Catch() //1、获取三个工作日之前的日期 baseDay := 3 for i := 1; i <= baseDay; i++ { //除去三天内周六周日 beforDay := time.Now().AddDate(0, 0, -i) if weekDay := beforDay.Weekday().String(); weekDay == "Saturday" || weekDay == "Sunday" { baseDay++ } } logger.Info("baseday:", baseDay) stime := util.GetTime(-baseDay) //起始时间戳(三个工作日前) strStime = qu.FormatDateByInt64(&stime, qu.Date_Short_Layout) //起始日期 logger.Info("查询天:", stime, strStime) //3、统计数据量 GetSpiderHighListDataNum(stime, strStime, siteInfoMap) //spider_highlistdata GetSpiderListDataNum(stime, strStime, siteInfoMap) //spider_listdata GetPythonDataNum(stime, strStime, siteInfoMap) GetNumByLastTime(stime, baseDay, siteInfoMap) return } func GetSpiderHighListDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) HrefRepeatMap := map[string]string{} lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, }, "publishtime": map[string]interface{}{ "$regex": strStime, }, } fieles := map[string]interface{}{ "href": 1, "site": 1, } it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() site := qu.ObjToString(tmp["site"]) lock.Lock() if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点 href := qu.ObjToString(tmp["href"]) if tmpSite := HrefRepeatMap[href]; tmpSite != site { //同站点去重 sInfo.ThreeDaysAgoNum++ HrefRepeatMap[href] = site } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() HrefRepeatMap = map[string]string{} logger.Debug("三天前发布spider_highlistdata统计完毕...") } func GetSpiderListDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, }, "publishtime": map[string]interface{}{ "$regex": strStime, }, } fieles := map[string]interface{}{ "site": 1, "event": 1, } it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if qu.IntAll(tmp["event"]) == 7000 { //排除7000节点 return } site := qu.ObjToString(tmp["site"]) lock.Lock() if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点 sInfo.ThreeDaysAgoNum++ } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("三天前发布spider_listdata统计完毕...") } func GetPythonDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) { defer qu.Catch() sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, }, "publishtime": map[string]interface{}{ "$regex": strStime, }, } fieles := map[string]interface{}{ "site": 1, } qu.Debug(query) it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() site := qu.ObjToString(tmp["site"]) + "(python)" lock.Lock() if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点 sInfo.ThreeDaysAgoNum++ } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("三天前发布python统计完毕...") } func GetNumByLastTime(stime int64, baseDay int, siteInfoMap map[string]*SiteInfo) { defer qu.Catch() stimeWeekDay := time.Now().AddDate(0, 0, -baseDay).Weekday().String() start := stime + 86400 end := stime + 86400*2 if stimeWeekDay == "Friday" { //每周五的数据是每周一统计 start = stime + 86400*3 end = stime + 86400*4 } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": start, "$lt": end, }, } logger.Info("历史站点统计", query) list, _ := util.MgoEB.Find("site_datacount", query, nil, map[string]interface{}{"site": 1, "ptimesuccessnum": 1}, false, -1, -1) for _, l := range *list { site := qu.ObjToString(l["site"]) pNum := qu.IntAll(l["ptimesuccessnum"]) if sInfo := siteInfoMap[site]; sInfo != nil { sInfo.BeforeThreeDaysAgoNum = pNum } } } func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo, day string) { defer qu.Catch() file, err := xlsx.OpenFile("res/sitecount.xlsx") if err != nil { qu.Debug("open file err:", err) return } sheet := file.Sheets[0] arr := []map[string]interface{}{} for site, info := range allSpiderInfo { style := xlsx.NewStyle() style.ApplyFill = true style.ApplyFont = true font := *xlsx.NewFont(10, "Verdana") style.Font = font title1 := day + "新统计(publishtime)" title2 := day + "历史统计(publishtime)" sheet.Rows[0].Cells[6].SetValue(title1) sheet.Rows[0].Cells[7].SetValue(title2) row := sheet.AddRow() row.AddCell().SetValue(site) row.AddCell().SetValue(info.Num) row.AddCell().SetValue(info.ListAllNum) row.AddCell().SetValue(info.ListSuccessNum) row.AddCell().SetValue(info.PTimeSuccessNum) row.AddCell().SetValue(info.PTimeSuccessDbNum) row.AddCell().SetValue(info.ThreeDaysAgoNum) row.AddCell().SetValue(info.BeforeThreeDaysAgoNum) coverage := float64(info.PTimeSuccessNum) / float64(info.Num) fill := &xlsx.Fill{ PatternType: "solid", } if coverage < 0.6 { // fill.FgColor = "00FF0000" fill.BgColor = "FF000000" } else if coverage >= 0.6 && coverage < 0.8 { fill.FgColor = "00FFCC00" fill.BgColor = "FF000000" } else if coverage >= 0.8 && coverage < 1.0 { fill.FgColor = "00FFFF00" fill.BgColor = "FF000000" } else { fill.FgColor = "0066FF33" fill.BgColor = "FF000000" } style.Fill = *fill value, _ := strconv.ParseFloat(fmt.Sprintf("%.2f", coverage*100), 64) result := fmt.Sprint(value) + "%" info.Coverage = result cell := row.AddCell() cell.SetValue(result) cell.SetStyle(style) row.AddCell().SetValue(info.Modifyuser) row.AddCell().SetValue(info.State) row.AddCell().SetValue(info.Domain) row.AddCell().SetValue(info.Stype) row.AddCell().SetValue(info.Platform) tmp := map[string]interface{}{} infoByte, err := json.Marshal(*info) if err == nil { if json.Unmarshal(infoByte, &tmp) == nil { arr = append(arr, tmp) } else { logger.Info("json unmarshal:", err) } } else { logger.Info("json marshal:", err) } } util.MgoE.SaveBulk("site_datacount", arr...) arr = []map[string]interface{}{} //file.Save("res/tmp.xlsx") SendSiteInfoToWxWork(file) } func SendSiteInfoToWxWork(file *xlsx.File) { defer qu.Catch() //file, err := xlsx.OpenFile("res/sitecount.xlsx") //if err != nil { // qu.Debug("open file err:", err) // return //} mw := &util.MyWrite{ Byte: &bytes.Buffer{}, } file.Write(mw) bt := mw.Byte.Bytes() client := req.C() filename := Publishtime + "重点网站数据量统计.xlsx" resp, _ := client.R(). SetHeader("Content-Type", "multipart/form-data"). //SetFiles(map[string]string{"file": string(resp4.Bytes())}). SetFileReader("file", filename, bytes.NewReader(bt)). Post("https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=97850772-88d0-4544-a2c3-6201aeddff9e&type=file") result := map[string]interface{}{} mediaId := "" if json.Unmarshal(resp.Bytes(), &result) == nil { mediaId = qu.ObjToString(result["media_id"]) } else { qu.Debug("unmarshal result err") } msg := fmt.Sprintf(SiteInfoModel, mediaId) resp1, err := http.Post( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97850772-88d0-4544-a2c3-6201aeddff9e", "application/json", bytes.NewBuffer([]byte(msg)), ) if err != nil { fmt.Println("request error:", err) return } defer resp1.Body.Close() } func SendLuaPythonAllNum() { defer qu.Catch() luaContent := fmt.Sprintf(NumContentModel, "Lua", LuaListDownloadAllNum, LuaListDownloadSuccessAllNum, LuaBiddingDownloadAllNum) pythonContent := fmt.Sprintf(NumContentModel, "python", PythonListDownloadAllNum, PythonListDownloadSuccessAllNum, PythonBiddingDownloadAllNum) resultContent := fmt.Sprintf(MarkdownModel, Publishtime+",Lua、Python各维度采集量统计结果如下:\n"+luaContent+pythonContent) qu.Debug(resultContent) //保存记录 util.MgoS.Save("spider_luapythoncount", map[string]interface{}{ "lualistnum": LuaListDownloadAllNum, "lualistsuccessnum": LuaListDownloadSuccessAllNum, "luabiddingnum": LuaBiddingDownloadAllNum, "pythonlistnum": PythonListDownloadAllNum, "pythonlistsuccessnum": PythonListDownloadSuccessAllNum, "pythonbiddingnum": PythonBiddingDownloadAllNum, "comeintime": time.Now().Unix(), "date": Publishtime, }) //重置 LuaListDownloadAllNum = 0 LuaListDownloadSuccessAllNum = 0 LuaBiddingDownloadAllNum = 0 PythonListDownloadAllNum = 0 PythonListDownloadSuccessAllNum = 0 PythonBiddingDownloadAllNum = 0 //发送统计 resp, err := http.Post( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97850772-88d0-4544-a2c3-6201aeddff9e", "application/json", bytes.NewBuffer([]byte(resultContent)), ) if err != nil { fmt.Println("request error:", err) return } defer resp.Body.Close() } //func GetHighListDataNum(ctime, etime int64, ptime string, siteInfoMap map[string]*SiteInfo) { // defer qu.Catch() // sess := util.MgoS.GetMgoConn() // defer util.MgoS.DestoryMongoConn(sess) // query := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$gte": ctime, // "$lt": etime, // }, // //"publishtime": map[string]interface{}{ // // "$gte": ctime, // // "$lt": 1654617600, // //}, // "publishtime": map[string]interface{}{ // "$regex": ptime, // }, // } // fields := map[string]interface{}{ // "state": 1, // "href": 1, // "site": 1, // } // lock := &sync.Mutex{} // wg := &sync.WaitGroup{} // ch := make(chan bool, 3) // qu.Debug("query:", query) // it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter() // n := 0 // for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { // wg.Add(1) // ch <- true // go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // href := qu.ObjToString(tmp["href"]) // state := qu.IntAll(tmp["state"]) // site := qu.ObjToString(tmp["site"]) // lock.Lock() // if info := siteInfoMap[site]; info != nil && AllHref[href] != site { //匹配要统计的重点网站 // info.ListAllNum++ // if state == 1 { // info.ListSuccessNum++ // } // AllHref[href] = site // } // lock.Unlock() // }(tmp) // if n%1000 == 0 { // logger.Debug(n) // } // tmp = map[string]interface{}{} // } // wg.Wait() //} // //func GetListDataNum(ctime, etime int64, ptime string, siteInfoMap map[string]*SiteInfo) { // defer qu.Catch() // sess := util.MgoS.GetMgoConn() // defer util.MgoS.DestoryMongoConn(sess) // query := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$gte": ctime, // "$lt": etime, // }, // "publishtime": map[string]interface{}{ // "$regex": ptime, // }, // } // fields := map[string]interface{}{ // "state": 1, // "href": 1, // "site": 1, // } // lock := &sync.Mutex{} // wg := &sync.WaitGroup{} // ch := make(chan bool, 3) // it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Iter() // n := 0 // for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { // wg.Add(1) // ch <- true // go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // href := qu.ObjToString(tmp["href"]) // state := qu.IntAll(tmp["state"]) // site := qu.ObjToString(tmp["site"]) // lock.Lock() // if info := siteInfoMap[site]; info != nil && AllHref[href] != site { //匹配要统计的重点网站 // info.ListAllNum++ // if state == 1 { // info.ListSuccessNum++ // } // AllHref[href] = site // } // lock.Unlock() // }(tmp) // if n%1000 == 0 { // logger.Debug(n) // } // tmp = map[string]interface{}{} // } // wg.Wait() //} // //func GetDataBakNum(stime, etime int64, siteInfoMap map[string]*SiteInfo) { // defer qu.Catch() // //spider_highlistdata和spider_listdata列表页统计不出的站点,借由data_bak统计 // //存在问题 // //1、列表页无发布时间的,未计入统计 // //2、若果按列表统计后,所有站点再由data_bak统计一遍,列表页、详情页变链接者会多统计 // for site, info := range siteInfoMap { // if info.ListAllNum == 0 { // query := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$gte": stime, // "$lt": etime, // }, // "publishtime": map[string]interface{}{ // "$gte": stime, // "$lt": etime, // }, // "site": site, // } // count := util.MgoS.Count("data_bak", query) // info.ListAllNum = count // info.ListSuccessNum = count // } // } // AllHref = map[string]string{} //} // //func GetDataBakNum_Back(stime, etime int64, siteInfoMap map[string]*SiteInfo) { // defer qu.Catch() // logger.Info("单独统计data_bak开始...") // sess := util.MgoS.GetMgoConn() // defer util.MgoS.DestoryMongoConn(sess) // query := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$gte": stime, // "$lt": etime, // }, // "publishtime": map[string]interface{}{ // "$gte": stime, // "$lt": etime, // }, // } // fields := map[string]interface{}{ // "href": 1, // "site": 1, // } // lock := &sync.Mutex{} // wg := &sync.WaitGroup{} // ch := make(chan bool, 3) // it := sess.DB(util.MgoS.DbName).C("data_bak").Find(&query).Select(&fields).Iter() // n := 0 // for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { // wg.Add(1) // ch <- true // go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // href := qu.ObjToString(tmp["href"]) // site := qu.ObjToString(tmp["site"]) // lock.Lock() // if info := siteInfoMap[site]; info != nil && AllHref[href] != site { //匹配要统计的重点网站 // info.ListAllNum++ // info.ListSuccessNum++ // AllHref[href] = site // } // lock.Unlock() // }(tmp) // if n%1000 == 0 { // logger.Debug(n) // } // tmp = map[string]interface{}{} // } // wg.Wait() // logger.Info("单独统计data_bak完成...") //} //func GetSiteInfoExcel(listInfo, backInfo, allSpiderInfo map[string]*SiteInfo) { // defer qu.Catch() // file, err := xlsx.OpenFile("res/sitecount.xlsx") // if err != nil { // qu.Debug("open file err:", err) // return // } // infoArr := []map[string]*SiteInfo{listInfo, backInfo, allSpiderInfo} // for i, sheet := range []*xlsx.Sheet{file.Sheets[0], file.Sheets[1], file.Sheets[2]} { // for site, info := range infoArr[i] { // row := sheet.AddRow() // row.AddCell().SetValue(site) // row.AddCell().SetValue(info.Num) // row.AddCell().SetValue(info.ListAllNum) // row.AddCell().SetValue(info.ListSuccessNum) // if i == 2 { // row.AddCell().SetValue(info.PTimeSuccessNum) // } // row.AddCell().SetValue(0) // row.AddCell().SetValue(info.Modifyuser) // row.AddCell().SetValue(info.State) // row.AddCell().SetValue(info.Domain) // row.AddCell().SetValue(info.Stype) // row.AddCell().SetValue(info.Platform) // } // } // //SendSiteInfoToWxWork(file) // file.Save("res/tmp.xlsx") //}