123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- package timetask
- import (
- "encoding/json"
- qu "qfw/util"
- "strings"
- "sync"
- "time"
- "util"
- )
- type WarnInfo struct {
- Fields map[string]bool
- MaxLevel int
- Data interface{}
- Site interface{}
- Channel interface{}
- Title interface{}
- Infos map[string]bool
- Code interface{}
- Href interface{}
- Repeat bool
- }
- var StypeArr = []string{
- "Field Value Is Null",
- "Field Value Contains Random Code",
- "Field Value Not Contains Chinese",
- "Detail File Err",
- }
- func PushSpiderWarnErrData() {
- GetSpiderWarnData()
- GetHighlistDetailFilErrData()
- }
- func GetHighlistDetailFilErrData() {
- defer qu.Catch()
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- stime := util.GetTime(-7)
- etime := util.GetTime(-6)
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": stime,
- "$lt": etime,
- },
- "detailfilerr": true,
- "state": -1,
- }
- fields := map[string]interface{}{
- "site": 1,
- "channel": 1,
- "spidercode": 1,
- "area": 1,
- "city": 1,
- "district": 1,
- "jsondata": 1,
- "publishtime": 1,
- "comeintime": 1,
- "href": 1,
- "title": 1,
- "dataging": 1,
- "_id": 0,
- }
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- arr := []map[string]interface{}{}
- it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- result := map[string]interface{}{}
- result["from"] = "list"
- result["level"] = 2
- result["info"] = "Detail File Err"
- result["ok"] = false
- result["field"] = "detail"
- result["site"] = tmp["site"]
- result["channel"] = tmp["channel"]
- result["title"] = tmp["title"]
- result["href"] = tmp["href"]
- result["spidercode"] = tmp["spidercode"]
- result["comeintime"] = time.Now().Unix()
- //publishtime
- publishtime_str := qu.ObjToString(tmp["publishtime"])
- publishtime_int := int64(0)
- if publishtime_str != "0" {
- if t, err := time.ParseInLocation(qu.Date_Full_Layout, publishtime_str, time.Local); err == nil {
- publishtime_int = t.Unix()
- }
- }
- result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime_int)
- //jsondata
- if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" {
- jsondataMap := map[string]interface{}{}
- if json.Unmarshal([]byte(jsondata), &jsondataMap) == nil {
- tmp["jsondata"] = jsondataMap
- } else {
- delete(tmp, "jsondata")
- }
- }
- iscompete := false
- coll := "bidding"
- lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": tmp["spidercode"]})
- if len(*lua) > 0 {
- iscompete, _ = (*lua)["spidercompete"].(bool)
- param_common := (*lua)["param_common"].([]interface{})
- if len(param_common) >= 8 {
- coll = qu.ObjToString(param_common[7])
- }
- }
- tmp["iscompete"] = iscompete
- tmp["publishtime"] = publishtime_int
- tmp["_d"] = "comeintime"
- tmp["T"] = coll
- result["data"] = tmp
- lock.Lock()
- arr = append(arr, result)
- if len(arr) > 500 {
- util.MgoS.SaveBulk("spider_warn_err", arr...)
- arr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- util.MgoS.SaveBulk("spider_warn_err", arr...)
- arr = []map[string]interface{}{}
- }
- }
- func GetSpiderWarnData() {
- defer qu.Catch()
- qu.Debug("准备spider_warn_err数据")
- sess := util.MgoS.GetMgoConn()
- defer util.MgoS.DestoryMongoConn(sess)
- stime := util.GetTime(-1)
- etime := util.GetTime(0)
- if time.Now().Weekday().String() == "Monday" {
- stime = util.GetTime(-3)
- }
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": stime,
- "$lt": etime,
- },
- "info": map[string]interface{}{ //保存服务更新后这个条件可去掉2022-11-28
- "$in": StypeArr,
- },
- "level": 2,
- }
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- result := map[string]*WarnInfo{}
- it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- href := qu.ObjToString(tmp["href"])
- level := qu.IntAll(tmp["level"])
- field := qu.ObjToString(tmp["field"])
- info := qu.ObjToString(tmp["info"])
- title := qu.ObjToString(tmp["title"])
- publishtime := int64(0)
- data, ok := tmp["data"].(map[string]interface{})
- if ok {
- if ptime := data["publishtime"]; ptime != nil {
- publishtime = qu.Int64All(ptime)
- }
- }
- //数据验证,是否有title一致,相似publishtime的数据,视为一样的数据,不需要再修复
- repeat := RepeatData(title, publishtime)
- lock.Lock()
- if warnInfo := result[href]; warnInfo == nil {
- warnInfo = &WarnInfo{
- Fields: map[string]bool{field: true},
- MaxLevel: level,
- Data: data,
- Site: tmp["site"],
- Channel: tmp["channel"],
- Title: title,
- Infos: map[string]bool{info: true},
- Code: tmp["code"],
- Href: href,
- Repeat: repeat,
- }
- result[href] = warnInfo
- } else {
- warnInfo.Fields[field] = true
- warnInfo.Infos[info] = true
- if warnInfo.MaxLevel < level {
- warnInfo.MaxLevel = level
- }
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- saveArr := []map[string]interface{}{}
- for _, wi := range result {
- ch <- true
- wg.Add(1)
- go func(w *WarnInfo) {
- defer func() {
- <-ch
- wg.Done()
- }()
- fields := []string{}
- for f, _ := range w.Fields {
- fields = append(fields, f)
- }
- infos := []string{}
- for t, _ := range w.Infos {
- infos = append(infos, t)
- }
- lock.Lock()
- saveArr = append(saveArr, map[string]interface{}{
- "field": strings.Join(fields, ","),
- "level": w.MaxLevel,
- "site": w.Site,
- "channel": w.Channel,
- "title": w.Title,
- "repeat": w.Repeat,
- "comeintime": time.Now().Unix(),
- "info": strings.Join(infos, ","),
- "spidercode": w.Code,
- "href": w.Href,
- "data": w.Data,
- "ok": false,
- "from": "warn",
- })
- if len(saveArr) > 500 {
- util.MgoS.SaveBulk("spider_warn_err", saveArr...)
- saveArr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(wi)
- }
- wg.Wait()
- if len(saveArr) > 0 {
- util.MgoS.SaveBulk("spider_warn_err", saveArr...)
- saveArr = []map[string]interface{}{}
- }
- }
- func RepeatData(title string, publishtime int64) bool {
- return util.MgoB.Count("bidding",
- map[string]interface{}{
- "title": title,
- "publishtime": map[string]interface{}{
- "$gte": publishtime + 86400*3,
- "$lte": publishtime - 86400*3,
- },
- }) > 0
- }
- /*
- 每天定时推送含乱码数据
- */
- // var (
- // RandomDataPushCron string
- // Gmail *gm.GmailAuth
- // To string
- // )
- // type FileWrite struct {
- // Byte *bytes.Buffer
- // }
- // func (fw *FileWrite) Write(p []byte) (n int, err error) {
- // n, err = fw.Byte.Write(p)
- // return
- // }
- //PushRandomData 推送乱码数据
- // func PushRandomData() {
- // defer qu.Catch()
- // query := map[string]interface{}{
- // //"comeintime": map[string]interface{}{
- // // "$gte": GetTime(-1),
- // // "$lt": GetTime(0),
- // //},
- // "info": map[string]interface{}{
- // "$in": []string{"Field Value Not Contains Chinese"},
- // },
- // }
- // list, _ := MgoS.Find("spider_warn", query, nil, nil, false, -1, -1)
- // if len(*list) > 0 {
- // file := xlsx.NewFile()
- // sheet, _ := file.AddSheet("乱码数据")
- // row := sheet.AddRow()
- // row.AddCell().SetValue("站点")
- // row.AddCell().SetValue("栏目")
- // row.AddCell().SetValue("爬虫")
- // row.AddCell().SetValue("字段")
- // row.AddCell().SetValue("异常等级")
- // row.AddCell().SetValue("标题")
- // row.AddCell().SetValue("链接")
- // for _, l := range *list {
- // textRow := sheet.AddRow()
- // textRow.AddCell().SetValue(qu.ObjToString(l["site"]))
- // textRow.AddCell().SetValue(qu.ObjToString(l["channel"]))
- // textRow.AddCell().SetValue(qu.ObjToString(l["code"]))
- // textRow.AddCell().SetValue(qu.ObjToString(l["field"]))
- // level := qu.IntAll(l["level"])
- // if level == 1 {
- // textRow.AddCell().SetValue("警告")
- // } else if level == 2 {
- // textRow.AddCell().SetValue("错误")
- // }
- // textRow.AddCell().SetValue(qu.ObjToString(l["title"]))
- // textRow.AddCell().SetValue(qu.ObjToString(l["href"]))
- // }
- // fw := &FileWrite{
- // Byte: &bytes.Buffer{},
- // }
- // file.Write(fw)
- // bt := fw.Byte.Bytes()
- // gm.GSendMail_Bq("jy@jianyu360.cn", To, "", "", "乱码数据统计", "", "统计报表.xlsx", bt, Gmail)
- // }
- // }
|