123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726 |
- package main
- import (
- "fmt"
- "math"
- qu "qfw/util"
- "sort"
- "sync"
- "time"
- )
- var (
- YearMinCodeMap map[string]bool //luayearmincode中,爬虫代码:循环周期
- SendFirstMap map[string]*Lua //
- YearMinDownloadNum int //一年下载最低值
- IntervalMaxNum int //区间最大值
- PublishtimeInterval = []float64{1.0, 3.0, 10.0, 20.0, 31.0, 93.0} //[0,1),[1,3),[3,10),[10,20),[20,31),[31,31*3),[31*3,···)
- IntervalMap = map[int]string{
- 1: "[0,1)",
- 2: "[1,3)",
- 3: "[3,10)",
- 4: "[10,20)",
- 5: "[20,31)",
- 6: "[31,93)",
- 7: "[93,···)",
- }
- IntervalRotateTime = map[string]int{ //区间爬虫一轮次时间(月)
- "[0,1)": 3,
- "[1,3)": 3,
- "[3,10)": 6,
- "[10,20)": 6,
- "[20,31)": 6,
- "[31,93)": 12,
- "[93,···)": 12,
- }
- )
- type Lua struct {
- Site string
- Channel string
- Modify string
- Modifyid string
- Code string
- Event int
- Count int
- }
- func LuaYearMinCodeCreateTask() {
- defer qu.Catch()
- GetAllLuaYearMinCode() //获取luayearmincode所有爬虫
- CreateTask() //
- }
- func GetAllLuaYearMinCode() {
- defer qu.Catch()
- YearMinCodeMap = map[string]bool{}
- SendFirstMap = map[string]*Lua{}
- list, _ := MgoE.Find("luayearmincode", nil, nil, `{"publishtime":0}`, false, -1, -1)
- for _, l := range *list {
- code := qu.ObjToString(l["code"])
- YearMinCodeMap[code] = true
- sf, _ := l["sendfirst"].(bool)
- sd, _ := l["send"].(bool)
- if sf && !sd {
- lua := &Lua{
- Site: qu.ObjToString(l["site"]),
- Channel: qu.ObjToString(l["channel"]),
- Modify: qu.ObjToString(l["modify"]),
- Modifyid: qu.ObjToString(l["modifyid"]),
- Code: code,
- Count: qu.IntAll(l["count"]),
- Event: qu.IntAll(l["event"]),
- }
- SendFirstMap[code] = lua
- }
- }
- }
- func CreateTask() {
- defer qu.Catch()
- //1.sendfirst建任务(只建一次该任务)
- CreateFirstCodeTask()
- //2.根据区间轮循建任务
- list, _ := MgoE.Find("luayearmincodeinterval", nil, nil, nil, false, -1, -1)
- for _, l := range *list {
- CreateTaskByInterval(l)
- }
- }
- //根据区间建任务
- func CreateTaskByInterval(l map[string]interface{}) {
- defer qu.Catch()
- interval := qu.ObjToString(l["interval"])
- qu.Debug(interval, "区间开始创建任务...")
- timesnum := qu.IntAll(l["timesnum"])
- cycletime := qu.IntAll(l["cycletime"])
- ct_wg := &sync.WaitGroup{}
- ct_lock := &sync.Mutex{}
- ct_ch := make(chan bool, 3)
- savetaskArr := []map[string]interface{}{}
- updateArr := [][]map[string]interface{}{}
- list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`","send":false}`, ``, `{"publishtime":0}`, false, 0, timesnum)
- for _, l := range *list {
- ct_wg.Add(1)
- ct_ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ct_ch
- ct_wg.Done()
- }()
- update := []map[string]interface{}{ //更新
- map[string]interface{}{"_id": tmp["_id"]},
- map[string]interface{}{
- "$set": map[string]interface{}{
- "send": true,
- },
- },
- }
- code := qu.ObjToString(tmp["code"])
- description := ""
- state := 0 //任务状态
- /*
- 统计是否有已下几种情况,时间定为一周内数据:
- 1、统计spider_highlistdata是否有下载异常数据
- 2、统计spider_warn异常数据(发布时间异常、乱码)
- 3、统计spider_sitecheck 站点异常爬虫(404)
- */
- stime, etime := GetTime(-cycletime), GetTime(0)
- //统计周期内下载量
- query := map[string]interface{}{
- "spidercode": code,
- "l_np_publishtime": map[string]interface{}{
- "$gte": stime,
- "$lte": etime,
- },
- }
- downloadnum := MgoS.Count("data_bak", query)
- //1、下载异常
- query = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": stime,
- "$lte": etime,
- },
- "state": -1,
- "spidercode": code,
- }
- data_downloaderr, _ := MgoS.Find("spider_highlistdata", query, `{"_id":-1}`, `{"href":1}`, false, 0, 10)
- if data_downloaderr != nil && len(*data_downloaderr) > 0 {
- if len(*data_downloaderr) == 10 {
- state = 1
- }
- description += "下载异常:\n"
- for _, derr := range *data_downloaderr {
- description += qu.ObjToString(derr["href"]) + "\n"
- }
- }
- //2、发布时间异常、乱码
- query = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": stime,
- "$lte": etime,
- },
- "level": 2, //2:error数据 1:warn数据
- "code": code,
- }
- data_warn, _ := MgoS.Find("spider_warn", query, `{"_id":-1}`, `{"href":1,"field":1}`, false, 0, 10)
- if data_warn != nil && len(*data_warn) > 0 {
- destmp_publishtime := "发布时间异常:\n"
- destmp_code := "正文标题异常:\n"
- for _, dw := range *data_warn {
- field := qu.ObjToString(dw["field"])
- if field == "publishtime" {
- state = 1
- destmp_publishtime += qu.ObjToString(dw["href"]) + "\n"
- } else {
- destmp_code += qu.ObjToString(dw["href"]) + "\n"
- }
- }
- description += destmp_code
- description += destmp_publishtime
- }
- //3、404
- query = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": stime,
- "$lte": etime,
- },
- "statuscode": 404,
- "code": code,
- }
- data_404, _ := MgoS.FindOne("spider_sitecheck", query)
- if data_404 != nil && len(*data_404) > 0 {
- if downloadnum == 0 { //有采集数据,不认为是404
- state = 1
- description += "网站监测:404\n" + qu.ObjToString((*data_404)["url"]) + "\n"
- }
- }
- result := map[string]interface{}{}
- result["s_code"] = code
- result["s_site"] = tmp["site"]
- result["s_channel"] = tmp["channel"]
- result["s_descript"] = description
- result["l_comeintime"] = time.Now().Unix()
- result["l_complete"] = time.Now().AddDate(0, 0, cycletime).Unix()
- result["s_modifyid"] = tmp["modifyid"]
- result["s_modify"] = tmp["modify"]
- result["i_event"] = tmp["event"]
- result["s_source"] = "程序"
- result["i_num"] = downloadnum
- result["i_min"] = 0
- result["i_state"] = state
- result["s_type"] = "7"
- result["s_urgency"] = "1"
- result["i_times"] = 0
- result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
- ct_lock.Lock()
- savetaskArr = append(savetaskArr, result)
- updateArr = append(updateArr, update)
- ct_lock.Unlock()
- }(l)
- }
- ct_wg.Wait()
- ct_lock.Lock()
- if len(savetaskArr) > 0 {
- MgoE.SaveBulk("task", savetaskArr...)
- savetaskArr = []map[string]interface{}{}
- }
- if len(updateArr) > 0 {
- MgoE.UpdateBulk("luayearmincode", updateArr...)
- updateArr = [][]map[string]interface{}{}
- }
- ct_lock.Unlock()
- //time.AfterFunc(time.Duration(cycletime)*time.Second, func() { CreateTaskByInterval(l) })
- time.AfterFunc(time.Duration(cycletime*24)*time.Hour, func() { CreateTaskByInterval(l) })
- }
- //历史数据采集为0的建任务
- func CreateFirstCodeTask() {
- defer qu.Catch()
- qu.Debug("开始创建sendfirst任务...")
- stime := time.Now().AddDate(-1, 0, 0).Unix()
- etime := GetTime(0)
- cl_wg := &sync.WaitGroup{}
- cl_lock := &sync.Mutex{}
- cl_ch := make(chan bool, 3)
- savetaskArr := []map[string]interface{}{}
- updateArr := [][]map[string]interface{}{}
- for _, lua := range SendFirstMap {
- cl_wg.Add(1)
- cl_ch <- true
- go func(l *Lua) {
- defer func() {
- <-cl_ch
- cl_wg.Done()
- }()
- update := []map[string]interface{}{ //更新
- map[string]interface{}{"code": l.Code},
- map[string]interface{}{
- "$set": map[string]interface{}{
- "send": true,
- },
- },
- }
- result := map[string]interface{}{}
- result["s_code"] = l.Code
- result["s_site"] = l.Site
- result["s_channel"] = l.Channel
- result["s_descript"] = "下载量异常:\n一年内数据下载量:" + fmt.Sprint(l.Count)
- result["l_comeintime"] = time.Now().Unix()
- result["l_complete"] = time.Now().AddDate(1, 0, 0).Unix()
- result["s_modifyid"] = l.Modifyid
- result["s_modify"] = l.Modify
- result["i_event"] = l.Event
- result["s_source"] = "程序"
- result["i_num"] = l.Count
- result["i_min"] = 0
- result["i_state"] = 0
- result["s_type"] = "10"
- result["s_urgency"] = "1"
- result["i_times"] = 0
- result["s_downloadtime"] = qu.FormatDateByInt64(&stime, qu.Date_Full_Layout) + "/" + qu.FormatDateByInt64(&etime, qu.Date_Full_Layout)
- cl_lock.Lock()
- savetaskArr = append(savetaskArr, result)
- updateArr = append(updateArr, update)
- if len(savetaskArr) > 500 {
- MgoE.SaveBulk("task", savetaskArr...)
- savetaskArr = []map[string]interface{}{}
- }
- if len(updateArr) > 500 {
- MgoE.UpdateBulk("luayearmincode", updateArr...)
- updateArr = [][]map[string]interface{}{}
- }
- cl_lock.Unlock()
- }(lua)
- }
- cl_wg.Wait()
- cl_lock.Lock()
- if len(savetaskArr) > 0 {
- MgoE.SaveBulk("task", savetaskArr...)
- savetaskArr = []map[string]interface{}{}
- }
- if len(updateArr) > 0 {
- MgoE.UpdateBulk("luayearmincode", updateArr...)
- updateArr = [][]map[string]interface{}{}
- }
- cl_lock.Unlock()
- SendFirstMap = map[string]*Lua{}
- qu.Debug("sendfirst任务创建完毕...")
- }
- //计算循环周期和每轮新建任务爬虫的个数
- func CycleTime() {
- defer qu.Catch()
- for k, interval := range IntervalMap {
- cycletime := -1
- if k == 1 { //区间在[0,1),循环周期设置为10天
- cycletime = 10
- } else if k == 2 || k == 3 { //confinval最大值都在x以下,可设置为x天
- list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":-1}`, `{"confinval":1}`, false, 0, 1)
- if list != nil && len(*list) == 1 {
- cycletime = qu.IntAll((*list)[0]["confinval"])
- }
- } else if k == 4 || k == 5 || k == 6 { //最大值90%都在x以下,可设置为x天
- percent := 0.9
- if k == 6 {
- percent = 0.5
- }
- count := MgoE.Count("luayearmincode", `{"interval":"`+interval+`"}`)
- index := int(math.Floor(float64(count) * percent))
- list, _ := MgoE.Find("luayearmincode", `{"interval":"`+interval+`"}`, `{"confinval":1}`, `{"confinval":1}`, false, 0, index+1)
- if list != nil && len(*list) == index+1 {
- cycletime = qu.IntAll((*list)[index]["confinval"])
- }
- } else if k == 7 {
- cycletime = 180
- }
- updata := map[string]interface{}{
- "$set": map[string]interface{}{
- "cycletime": cycletime,
- "send": false,
- },
- }
- MgoE.Update("luayearmincode", `{"interval":"`+interval+`"}`, updata, false, true)
- q := map[string]interface{}{
- "interval": interval,
- "sendfirst": map[string]interface{}{
- "$exists": false,
- },
- }
- count := MgoE.Count("luayearmincode", q)
- t := float64((count * cycletime)) / float64((30 * IntervalRotateTime[interval]))
- rotateNum := math.Ceil(t)
- text := interval + ",总数:" + fmt.Sprint(count) + "," + fmt.Sprint(30*IntervalRotateTime[interval]) + "天发送完毕。每" + fmt.Sprint(cycletime) + "天轮循一次,一次发送" + fmt.Sprint(rotateNum) + "条"
- qu.Debug(text)
- MgoE.Save("luayearmincodeinterval", map[string]interface{}{"interval": interval, "timesnum": int(rotateNum), "cycletime": cycletime, "text": text})
- }
- }
- //标记数据
- func TagCode() {
- defer qu.Catch()
- sess := MgoE.GetMgoConn()
- defer MgoE.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- arr := [][]map[string]interface{}{}
- it := sess.DB("editor").C("luayearmincode").Find(nil).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- set := map[string]interface{}{}
- //code := qu.ObjToString(tmp["code"])
- count := qu.IntAll(tmp["count"])
- if count == 1 || count == 0 { //爬虫下载量为1,放入第7区间
- set["interval"] = IntervalMap[7]
- if count == 0 {
- set["sendfirst"] = true
- }
- } else {
- var tmpArr Int64Slice
- for _, tp := range tmp["publishtime"].([]interface{}) {
- tmpArr = append(tmpArr, tp.(int64))
- }
- sort.Sort(tmpArr) //发布时间排序
- //
- intervalNumArr := map[int][]float64{} //记录每个区间发布时间间隔信息
- for i, p := range tmpArr {
- if i == 0 {
- continue
- }
- dval := float64(p-tmpArr[i-1]) / 86400
- //计算区间
- intervalNum := -1 //区间
- for j, pi := range PublishtimeInterval { //1.0, 3.0, 10.0, 20.0, 31.0, 93.0
- if dval == pi {
- intervalNum = j + 2
- break
- } else if dval < pi {
- intervalNum = j + 1
- break
- }
- }
- if intervalNum == -1 { //如果为初始值,证明dval大于93
- intervalNum = 7
- }
- intervalNumArr[intervalNum] = append(intervalNumArr[intervalNum], dval)
- }
- //
- maxIn := 0 //记录最大区间
- maxInLen := 0 //记录最大区间长度
- flag := true //记录是否只有第一区间有值
- for in := 1; in <= 7; in++ {
- lens := len(intervalNumArr[in])
- if (in == 1 && lens == 0) || (in != 1 && lens > 0) {
- flag = false
- }
- if in != 1 && lens >= maxInLen {
- maxInLen = lens
- maxIn = in
- }
- }
- //qu.Debug(flag, "最大区间:", maxIn, "最大区间长度:", maxInLen)
- if flag { //只有第一区间有值
- if count < IntervalMaxNum { //划分到第七区间,直接新建任务
- set["sendfirst"] = true
- set["interval"] = IntervalMap[7]
- } else {
- set["interval"] = IntervalMap[1]
- }
- } else if maxIn != 0 && maxInLen != 0 {
- sumInval := float64(0)
- for _, inval := range intervalNumArr[maxIn] {
- sumInval += inval
- }
- mean := sumInval / float64(maxInLen)
- se := mean / math.Pow(float64(maxInLen), 0.5)
- confInval := math.Ceil(mean + se*2.32)
- set["confinval"] = int(confInval) //置信区间
- set["interval"] = IntervalMap[maxIn]
- } else {
- qu.Debug("错误数据id:", tmp["_id"])
- }
- }
- if len(set) > 0 {
- update = append(update, map[string]interface{}{"$set": set})
- }
- lock.Lock()
- if len(update) == 2 {
- arr = append(arr, update)
- }
- if len(arr) >= 500 {
- tmps := arr
- MgoE.UpdateBulk("luayearmincode", tmps...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%1000 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoE.UpdateBulk("luayearmincode", arr...)
- arr = [][]map[string]interface{}{}
- }
- qu.Debug("标记完成")
- }
- //统计爬虫下载量
- func GetSpidercode() {
- defer qu.Catch()
- query := map[string]interface{}{
- "$or": []interface{}{
- map[string]interface{}{"state": 5},
- map[string]interface{}{
- "state": map[string]interface{}{
- "$in": []int{0, 1, 2},
- },
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- },
- },
- }
- codeMap := map[string]*Lua{}
- luas, _ := MgoE.Find("luaconfig", query, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
- for _, l := range *luas {
- pc := l["param_common"].([]interface{})
- lua := &Lua{
- Modify: qu.ObjToString(l["createuser"]),
- Modifyid: qu.ObjToString(l["createuserid"]),
- Event: qu.IntAll(l["event"]),
- }
- if len(pc) > 2 {
- lua.Site = qu.ObjToString(pc[1])
- lua.Channel = qu.ObjToString(pc[2])
- }
- code := qu.ObjToString(l["code"])
- codeMap[code] = lua
- }
- qu.Debug("开始统计...", len(codeMap))
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gte": time.Now().AddDate(-1, 0, 0).Unix(),
- "$lte": time.Now().Unix(),
- },
- }
- f := map[string]interface{}{
- "spidercode": 1,
- "publishtime": 1,
- }
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- codeNum := map[string]int{}
- codePublishtime := map[string][]int64{}
- i := 0
- it1 := sess.DB("spider").C("data_bak").Find(&q).Select(&f).Iter()
- for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- publishtime := qu.Int64All(tmp["publishtime"])
- if publishtime > 0 {
- spidercode := qu.ObjToString(tmp["spidercode"])
- if codeMap[spidercode] != nil {
- lock.Lock()
- codeNum[spidercode] += 1
- if codeNum[spidercode] > YearMinDownloadNum {
- lock.Unlock()
- return
- }
- codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
- lock.Unlock()
- }
- }
- }(tmp)
- if i%1000 == 0 {
- qu.Debug(i)
- }
- tmp = map[string]interface{}{}
- }
- qu.Debug("data_bak查询完毕", len(codeNum))
- i = 0
- it2 := sess.DB("spider").C("data_bak_202011030854").Find(&q).Select(&f).Iter()
- for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- publishtime := qu.Int64All(tmp["publishtime"])
- if publishtime > 0 {
- spidercode := qu.ObjToString(tmp["spidercode"])
- if codeMap[spidercode] != nil {
- lock.Lock()
- codeNum[spidercode] += 1
- if codeNum[spidercode] > YearMinDownloadNum {
- lock.Unlock()
- return
- }
- codePublishtime[spidercode] = append(codePublishtime[spidercode], publishtime)
- lock.Unlock()
- }
- }
- }(tmp)
- if i%1000 == 0 {
- qu.Debug(i)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- qu.Debug("data_bak_202011030854查询完毕", len(codeNum))
- for code, num := range codeNum {
- lua := codeMap[code]
- delete(codeMap, code)
- if num <= YearMinDownloadNum {
- parr := codePublishtime[code]
- //sort.Sort(parr)
- MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": num, "publishtime": parr, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid})
- }
- }
- for code, lua := range codeMap { //下载量为0
- MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": 0, "event": lua.Event, "site": lua.Site, "channel": lua.Channel, "modify": lua.Modify, "modifyid": lua.Modifyid, "publishtime": []int64{}})
- }
- qu.Debug("统计完毕...")
- }
- //补充信息
- func getlua() {
- luas, _ := MgoE.Find("luaconfig", nil, nil, `{"code":1,"event":1,"param_common":1,"createuser":1,"createuserid":1}`, false, -1, -1)
- for i, l := range *luas {
- qu.Debug(i)
- pc := l["param_common"].([]interface{})
- Site := ""
- Channel := ""
- if len(pc) > 2 {
- Site = qu.ObjToString(pc[1])
- Channel = qu.ObjToString(pc[2])
- }
- Modify := qu.ObjToString(l["createuser"])
- Modifyid := qu.ObjToString(l["createuserid"])
- code := qu.ObjToString(l["code"])
- MgoE.Update("luayearmincode", `{"code":"`+code+`"}`, map[string]interface{}{"$set": map[string]interface{}{"site": Site, "channel": Channel, "modify": Modify, "modifyid": Modifyid}}, false, false)
- }
- }
- //分组查询
- func GetSpidercode_back() {
- defer qu.Catch()
- qu.Debug("开始统计...")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gte": time.Now().AddDate(-1, 0, 0).Unix(),
- "$lte": time.Now().Unix(),
- },
- }
- g := map[string]interface{}{
- "_id": "$spidercode",
- "count": map[string]interface{}{"$sum": 1},
- }
- pro := map[string]interface{}{
- "spidercode": 1,
- }
- s := map[string]interface{}{
- "count": 1,
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": q},
- map[string]interface{}{"$project": pro},
- map[string]interface{}{"$group": g},
- map[string]interface{}{"$sort": s},
- }
- it1 := sess.DB("spider").C("data_bak").Pipe(p).Iter()
- codeCount := map[string]int{}
- i := 0
- for tmp := make(map[string]interface{}); it1.Next(&tmp); i++ {
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"])
- qu.Debug(code, count)
- if count <= YearMinDownloadNum {
- codeCount[code] = count
- } else {
- break
- }
- if i%50 == 0 {
- qu.Debug(i)
- }
- }
- i = 0
- it2 := sess.DB("spider").C("data_bak_202011030854").Pipe(p).Iter()
- for tmp := make(map[string]interface{}); it2.Next(&tmp); i++ {
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"])
- qu.Debug(code, count)
- if count <= YearMinDownloadNum {
- codeCount[code] += count
- } else {
- break
- }
- if i%50 == 0 {
- qu.Debug(i)
- }
- }
- for code, count := range codeCount {
- if count <= 100 {
- MgoE.Save("luayearmincode", map[string]interface{}{"code": code, "count": count})
- }
- }
- qu.Debug("统计数量完毕...")
- list, _ := MgoE.Find("luayearmincode", nil, nil, nil, false, -1, -1)
- for _, l := range *list {
- code := qu.ObjToString(l["code"])
- count := qu.IntAll(l["count"])
- if count > YearMinDownloadNum {
- continue
- }
- d1s, _ := MgoS.Find("data_bak", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
- d2s, _ := MgoS.Find("data_bak_202011030854", `{"spidercode":"`+code+`"}`, nil, `{"publishtime":1}`, false, -1, -1)
- var publishtimeArr Int64Slice
- for _, d1 := range *d1s {
- publishtime := qu.Int64All(d1["publishtime"])
- if publishtime > 0 {
- publishtimeArr = append(publishtimeArr, publishtime)
- }
- }
- for _, d2 := range *d2s {
- publishtime := qu.Int64All(d2["publishtime"])
- if publishtime > 0 {
- publishtimeArr = append(publishtimeArr, publishtime)
- }
- }
- sort.Sort(publishtimeArr)
- MgoE.Update("luayearmincode", map[string]interface{}{"_id": l["_id"]}, map[string]interface{}{"$set": map[string]interface{}{"publishtime": publishtimeArr}}, false, false)
- }
- qu.Debug("统计完毕...")
- }
- //自定义[]int64数组排序
- type Int64Slice []int64
- func (p Int64Slice) Len() int { return len(p) }
- func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
- func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|