package timetask import ( "encoding/json" qu "qfw/util" "strings" "sync" "time" "util" ) type WarnInfo struct { Fields map[string]bool MaxLevel int Data interface{} Site interface{} Channel interface{} Title interface{} Infos map[string]bool Code interface{} Href interface{} Repeat bool } var StypeArr = []string{ "Field Value Is Null", "Field Value Contains Random Code", "Field Value Not Contains Chinese", "Detail File Err", } func PushSpiderWarnErrData() { GetSpiderWarnData() GetHighlistDetailFilErrData() } func GetHighlistDetailFilErrData() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) stime := util.GetTime(-7) etime := util.GetTime(-6) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, "$lt": etime, }, "detailfilerr": true, "state": -1, } fields := map[string]interface{}{ "site": 1, "channel": 1, "spidercode": 1, "area": 1, "city": 1, "district": 1, "jsondata": 1, "publishtime": 1, "comeintime": 1, "href": 1, "title": 1, "dataging": 1, "_id": 0, } ch := make(chan bool, 2) wg := &sync.WaitGroup{} lock := &sync.Mutex{} arr := []map[string]interface{}{} it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() result := map[string]interface{}{} result["from"] = "list" result["level"] = 2 result["info"] = "Detail File Err" result["ok"] = false result["field"] = "detail" result["site"] = tmp["site"] result["channel"] = tmp["channel"] result["title"] = tmp["title"] result["href"] = tmp["href"] result["spidercode"] = tmp["spidercode"] result["comeintime"] = time.Now().Unix() //publishtime publishtime_str := qu.ObjToString(tmp["publishtime"]) publishtime_int := int64(0) if publishtime_str != "0" { if t, err := time.ParseInLocation(qu.Date_Full_Layout, publishtime_str, time.Local); err == nil { publishtime_int = t.Unix() } } result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime_int) //jsondata if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" { jsondataMap := map[string]interface{}{} if json.Unmarshal([]byte(jsondata), &jsondataMap) == nil { tmp["jsondata"] = jsondataMap } else { delete(tmp, "jsondata") } } iscompete := false coll := "bidding" lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": tmp["spidercode"]}) if len(*lua) > 0 { iscompete, _ = (*lua)["spidercompete"].(bool) param_common := (*lua)["param_common"].([]interface{}) if len(param_common) >= 8 { coll = qu.ObjToString(param_common[7]) } } tmp["iscompete"] = iscompete tmp["publishtime"] = publishtime_int tmp["_d"] = "comeintime" tmp["T"] = coll result["data"] = tmp lock.Lock() arr = append(arr, result) if len(arr) > 500 { util.MgoS.SaveBulk("spider_warn_err", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { util.MgoS.SaveBulk("spider_warn_err", arr...) arr = []map[string]interface{}{} } } func GetSpiderWarnData() { defer qu.Catch() qu.Debug("准备spider_warn_err数据") sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) stime := util.GetTime(-1) etime := util.GetTime(0) if time.Now().Weekday().String() == "Monday" { stime = util.GetTime(-3) } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": stime, "$lt": etime, }, "info": map[string]interface{}{ //保存服务更新后这个条件可去掉2022-11-28 "$in": StypeArr, }, "level": 2, } ch := make(chan bool, 2) wg := &sync.WaitGroup{} lock := &sync.Mutex{} result := map[string]*WarnInfo{} it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() href := qu.ObjToString(tmp["href"]) level := qu.IntAll(tmp["level"]) field := qu.ObjToString(tmp["field"]) info := qu.ObjToString(tmp["info"]) title := qu.ObjToString(tmp["title"]) publishtime := int64(0) data, ok := tmp["data"].(map[string]interface{}) if ok { if ptime := data["publishtime"]; ptime != nil { publishtime = qu.Int64All(ptime) } } //数据验证,是否有title一致,相似publishtime的数据,视为一样的数据,不需要再修复 repeat := RepeatData(title, publishtime) lock.Lock() if warnInfo := result[href]; warnInfo == nil { warnInfo = &WarnInfo{ Fields: map[string]bool{field: true}, MaxLevel: level, Data: data, Site: tmp["site"], Channel: tmp["channel"], Title: title, Infos: map[string]bool{info: true}, Code: tmp["code"], Href: href, Repeat: repeat, } result[href] = warnInfo } else { warnInfo.Fields[field] = true warnInfo.Infos[info] = true if warnInfo.MaxLevel < level { warnInfo.MaxLevel = level } } lock.Unlock() }(tmp) if n%1000 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() saveArr := []map[string]interface{}{} for _, wi := range result { ch <- true wg.Add(1) go func(w *WarnInfo) { defer func() { <-ch wg.Done() }() fields := []string{} for f, _ := range w.Fields { fields = append(fields, f) } infos := []string{} for t, _ := range w.Infos { infos = append(infos, t) } lock.Lock() saveArr = append(saveArr, map[string]interface{}{ "field": strings.Join(fields, ","), "level": w.MaxLevel, "site": w.Site, "channel": w.Channel, "title": w.Title, "repeat": w.Repeat, "comeintime": time.Now().Unix(), "info": strings.Join(infos, ","), "spidercode": w.Code, "href": w.Href, "data": w.Data, "ok": false, "from": "warn", }) if len(saveArr) > 500 { util.MgoS.SaveBulk("spider_warn_err", saveArr...) saveArr = []map[string]interface{}{} } lock.Unlock() }(wi) } wg.Wait() if len(saveArr) > 0 { util.MgoS.SaveBulk("spider_warn_err", saveArr...) saveArr = []map[string]interface{}{} } } func RepeatData(title string, publishtime int64) bool { return util.MgoB.Count("bidding", map[string]interface{}{ "title": title, "publishtime": map[string]interface{}{ "$gte": publishtime + 86400*3, "$lte": publishtime - 86400*3, }, }) > 0 } /* 每天定时推送含乱码数据 */ // var ( // RandomDataPushCron string // Gmail *gm.GmailAuth // To string // ) // type FileWrite struct { // Byte *bytes.Buffer // } // func (fw *FileWrite) Write(p []byte) (n int, err error) { // n, err = fw.Byte.Write(p) // return // } //PushRandomData 推送乱码数据 // func PushRandomData() { // defer qu.Catch() // query := map[string]interface{}{ // //"comeintime": map[string]interface{}{ // // "$gte": GetTime(-1), // // "$lt": GetTime(0), // //}, // "info": map[string]interface{}{ // "$in": []string{"Field Value Not Contains Chinese"}, // }, // } // list, _ := MgoS.Find("spider_warn", query, nil, nil, false, -1, -1) // if len(*list) > 0 { // file := xlsx.NewFile() // sheet, _ := file.AddSheet("乱码数据") // row := sheet.AddRow() // row.AddCell().SetValue("站点") // row.AddCell().SetValue("栏目") // row.AddCell().SetValue("爬虫") // row.AddCell().SetValue("字段") // row.AddCell().SetValue("异常等级") // row.AddCell().SetValue("标题") // row.AddCell().SetValue("链接") // for _, l := range *list { // textRow := sheet.AddRow() // textRow.AddCell().SetValue(qu.ObjToString(l["site"])) // textRow.AddCell().SetValue(qu.ObjToString(l["channel"])) // textRow.AddCell().SetValue(qu.ObjToString(l["code"])) // textRow.AddCell().SetValue(qu.ObjToString(l["field"])) // level := qu.IntAll(l["level"]) // if level == 1 { // textRow.AddCell().SetValue("警告") // } else if level == 2 { // textRow.AddCell().SetValue("错误") // } // textRow.AddCell().SetValue(qu.ObjToString(l["title"])) // textRow.AddCell().SetValue(qu.ObjToString(l["href"])) // } // fw := &FileWrite{ // Byte: &bytes.Buffer{}, // } // file.Write(fw) // bt := fw.Byte.Bytes() // gm.GSendMail_Bq("jy@jianyu360.cn", To, "", "", "乱码数据统计", "", "统计报表.xlsx", bt, Gmail) // } // }