|
@@ -0,0 +1,516 @@
|
|
|
+package luatask
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "github.com/donnie4w/go-logger/logger"
|
|
|
+ qu "qfw/util"
|
|
|
+ "sync"
|
|
|
+ "util"
|
|
|
+)
|
|
|
+
|
|
|
+var NewCodeInfoMap = map[string]*NewSpider{}
|
|
|
+
|
|
|
+type NewSpider struct {
|
|
|
+ //爬虫基本信息
|
|
|
+ Code string `json:"code"`
|
|
|
+ Site string `json:"site"`
|
|
|
+ Channel string `json:"channel"`
|
|
|
+ Platform string `json:"platform"`
|
|
|
+ Event int `json:"event"`
|
|
|
+ PendState int `json:"pendstate"`
|
|
|
+ ModifyUser string `json:"modifyuser"`
|
|
|
+ ModifyId string `json:"modifyuserid"`
|
|
|
+ ModifyTime int64 `json:"modifytime"`
|
|
|
+ Model int `json:"model"`
|
|
|
+ Working int `json:"working"`
|
|
|
+ AuditTime int64 `json:"l_uploadtime"`
|
|
|
+ ListIsFilter bool `json:"listisfilter"`
|
|
|
+ TaskTags map[string]interface{} `json:"tasktags"`
|
|
|
+ //统计信息
|
|
|
+ Detail_DownloadNum int `json:"detail_downloadnum"`
|
|
|
+ Detail_DownloadSuccessNum int `json:"detail_downloadsuccessnum"`
|
|
|
+ Detail_DownloadFailNum int `json:"detail_downloadfailnum"`
|
|
|
+ List_IsGetData bool `json:"list_isgetdata"`
|
|
|
+ List_RunTimes int `json:"list_runtimes"`
|
|
|
+ List_NoDataTimes int `json:"list_nodatatimes"`
|
|
|
+ List_AllInTimes int `json:"list_allintimes"`
|
|
|
+ WarnInfoMap map[int]*WarnInfo `json:"warninfo"`
|
|
|
+ //python
|
|
|
+ Py_TaskId string `json:"py_taskid"`
|
|
|
+ Py_NodeName string `json:"py_nodename"`
|
|
|
+ //补充信息
|
|
|
+ Comeintime int64 `json:"comeintime"`
|
|
|
+}
|
|
|
+
|
|
|
+type WarnInfo struct {
|
|
|
+ Info string `json:"info"`
|
|
|
+ Num int `json:"num"`
|
|
|
+ Fields map[string]int `json:"fields"`
|
|
|
+}
|
|
|
+
|
|
|
+func NewStartTask() {
|
|
|
+ InitInfo() //初始化时间
|
|
|
+ logger.Info(StartTime, EndTime, Publishtime)
|
|
|
+ getCodeBaseInfo() //获取爬虫基本信息
|
|
|
+ getPythonSummaryInfo() //获取python汇总信息
|
|
|
+ getLuaSummaryInfo() //获取lua汇总信息
|
|
|
+ getWarnInfo() //异常信息汇总
|
|
|
+ saveCodeInfo() //保存记录
|
|
|
+}
|
|
|
+
|
|
|
+func getCodeBaseInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoEB.GetMgoConn()
|
|
|
+ defer util.MgoEB.DestoryMongoConn(sess)
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "$or": []interface{}{
|
|
|
+ //lua、python上线爬虫
|
|
|
+ map[string]interface{}{
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$in": []int{5, 11}, //上架、上线爬虫
|
|
|
+ },
|
|
|
+ },
|
|
|
+ //lua正在被维护的爬虫和上架爬虫
|
|
|
+ map[string]interface{}{
|
|
|
+ "platform": "golua平台",
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$in": []int{0, 1, 2}, //待完成、待审核、未通过
|
|
|
+ },
|
|
|
+ "event": map[string]interface{}{
|
|
|
+ "$ne": 7000,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "code": 1,
|
|
|
+ "site": 1,
|
|
|
+ "channel": 1,
|
|
|
+ "platform": 1,
|
|
|
+ "event": 1,
|
|
|
+ "pendstate": 1,
|
|
|
+ "modifyuser": 1,
|
|
|
+ "modifyuserid": 1,
|
|
|
+ "modifytime": 1,
|
|
|
+ "l_uploadtime": 1,
|
|
|
+ "listisfilter": 1,
|
|
|
+ "tasktags": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ sp := &NewSpider{
|
|
|
+ WarnInfoMap: map[int]*WarnInfo{},
|
|
|
+ }
|
|
|
+ luaByte, _ := json.Marshal(tmp)
|
|
|
+ if json.Unmarshal(luaByte, &sp) != nil {
|
|
|
+ qu.Info("初始化爬虫失败:", tmp["_id"])
|
|
|
+ return
|
|
|
+ }
|
|
|
+ sp.Working = util.CodeEventWorking[sp.Working]
|
|
|
+ sp.Model = util.CodeEventModel[sp.Event]
|
|
|
+ lock.Lock()
|
|
|
+ NewCodeInfoMap[sp.Code] = sp
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("爬虫基本信息准备完成...", len(CodeInfoMap))
|
|
|
+}
|
|
|
+
|
|
|
+func getPythonSummaryInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoPy.GetMgoConn()
|
|
|
+ defer util.MgoPy.DestoryMongoConn(sess)
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": util.GetTime(0),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoPy.DbName).C("spider_monitor").Find(&query).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["code"])
|
|
|
+ if is_valid, _ := tmp["is_valid"].(bool); !is_valid { //无效监控爬虫
|
|
|
+ lock.Lock()
|
|
|
+ delete(NewCodeInfoMap, code)
|
|
|
+ lock.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ py_taskid := qu.ObjToString(tmp["py_taskid"])
|
|
|
+ py_nodename := qu.ObjToString(tmp["py_nodename"])
|
|
|
+ list_isgetdata, _ := tmp["list_isgetdata"].(bool)
|
|
|
+ list_allintimes := qu.IntAll(tmp["list_allintimes"])
|
|
|
+ list_nodatatimes := qu.IntAll(tmp["list_nodatatimes"])
|
|
|
+ list_runtimes := qu.IntAll(tmp["list_runtimes"])
|
|
|
+ detail_downloadnum := qu.IntAll(tmp["detail_downloadnum"])
|
|
|
+ detail_downloadsuccessnum := qu.IntAll(tmp["detail_downloadsuccessnum"])
|
|
|
+ detail_downloadfailnum := qu.IntAll(tmp["detail_downloadfailnum"])
|
|
|
+ lock.Lock()
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ sp.Py_TaskId = py_taskid
|
|
|
+ sp.Py_NodeName = py_nodename
|
|
|
+ sp.List_IsGetData = list_isgetdata
|
|
|
+ sp.List_AllInTimes = list_allintimes
|
|
|
+ sp.List_NoDataTimes = list_nodatatimes
|
|
|
+ sp.List_RunTimes = list_runtimes
|
|
|
+ sp.Detail_DownloadNum = detail_downloadnum
|
|
|
+ sp.Detail_DownloadSuccessNum = detail_downloadsuccessnum
|
|
|
+ sp.Detail_DownloadFailNum = detail_downloadfailnum
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("python汇总信息完成...")
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func getLuaSummaryInfo() {
|
|
|
+ getCodeHeart() //获取心跳信息
|
|
|
+ getSpiderHighListDownloadNum() //获取下载量信息
|
|
|
+ getSpiderListDownloadNum() //获取下载量信息
|
|
|
+ getSpiderDownloadRateDataNew() //获取下载详情
|
|
|
+}
|
|
|
+
|
|
|
+func getWarnInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "data": 0,
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ infotype := qu.IntAll(tmp["infotype"])
|
|
|
+ level := qu.IntAll(tmp["level"])
|
|
|
+ field := qu.ObjToString(tmp["field"])
|
|
|
+ if infotype == 3 || infotype == 7 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if (infotype == 5 || infotype == 6) && level == 1 {
|
|
|
+ return
|
|
|
+ } else if infotype == 8 && field == "projectinfo" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ code := qu.ObjToString(tmp["code"])
|
|
|
+ info := qu.ObjToString(tmp["info"])
|
|
|
+ lock.Lock()
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ if wf := sp.WarnInfoMap[infotype]; wf != nil {
|
|
|
+ wf.Fields[field] += 1
|
|
|
+ } else {
|
|
|
+ sp.WarnInfoMap[infotype] = &WarnInfo{
|
|
|
+ Info: info,
|
|
|
+ Num: 1,
|
|
|
+ Fields: map[string]int{field: 1},
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("错误信息数据统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func getCodeHeart() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "del": false,
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "code": 1,
|
|
|
+ "findlist": 1,
|
|
|
+ }
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["code"])
|
|
|
+ findListHeart := qu.Int64All(tmp["findlist"])
|
|
|
+ lock.Lock()
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ limitDayNum := 0
|
|
|
+ if sp.Event == 7520 { //由于7520节点爬虫循环一轮的时间较长,心跳有可能仍是前一天的
|
|
|
+ limitDayNum = -1
|
|
|
+ }
|
|
|
+ sp.List_IsGetData = findListHeart > util.GetTime(limitDayNum)
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("lua统计心跳信息完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func getSpiderHighListDownloadNum() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
+ "state": 1,
|
|
|
+ }
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ //1、统计spider_highlistdata
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["spidercode"])
|
|
|
+ state := qu.IntAll(tmp["state"])
|
|
|
+ lock.Lock()
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ if state == 1 {
|
|
|
+ sp.Detail_DownloadSuccessNum++
|
|
|
+ } else {
|
|
|
+ sp.Detail_DownloadFailNum++
|
|
|
+ }
|
|
|
+ sp.Detail_DownloadNum++
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("lua统计采集量spider_highlistdata完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func getSpiderListDownloadNum() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
+ "state": 1,
|
|
|
+ "href": 1,
|
|
|
+ }
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ repeatHrefMap := map[string]int{}
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Sort("_id").Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ state := qu.IntAll(tmp["state"])
|
|
|
+ code := qu.ObjToString(tmp["spidercode"])
|
|
|
+ href := qu.ObjToString(tmp["href"])
|
|
|
+ lock.Lock()
|
|
|
+ defer lock.Unlock()
|
|
|
+ tmpState := repeatHrefMap[href]
|
|
|
+ if tmpState == 1 { //该href已记录下载成功,后续不做任务记录
|
|
|
+ return
|
|
|
+ } else if tmpState == 0 { //未曾记录该href
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ if state == 1 {
|
|
|
+ sp.Detail_DownloadSuccessNum++
|
|
|
+ } else {
|
|
|
+ state = -1
|
|
|
+ sp.Detail_DownloadFailNum++
|
|
|
+ }
|
|
|
+ sp.Detail_DownloadNum++
|
|
|
+ repeatHrefMap[href] = state
|
|
|
+ }
|
|
|
+ } else if tmpState == -1 && state == 1 { //已记录状态是下载失败,当前下载成功,记录该href最终为下载成功
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ sp.Detail_DownloadSuccessNum++
|
|
|
+ sp.Detail_DownloadFailNum--
|
|
|
+ repeatHrefMap[href] = state
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ repeatHrefMap = map[string]int{}
|
|
|
+ logger.Info("lua统计spider_listdata采集量完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func getSpiderDownloadRateDataNew() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "date": date,
|
|
|
+ "event": map[string]interface{}{
|
|
|
+ "$ne": 7000,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
+ "alltimes": 1,
|
|
|
+ "zero": 1,
|
|
|
+ "oh_percent": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["spidercode"])
|
|
|
+ alltimes := qu.IntAll(tmp["alltimes"])
|
|
|
+ zero := qu.IntAll(tmp["zero"])
|
|
|
+ oh_percent := qu.IntAll(tmp["oh_percent"])
|
|
|
+ lock.Lock()
|
|
|
+ if sp := NewCodeInfoMap[code]; sp != nil {
|
|
|
+ sp.List_NoDataTimes = zero
|
|
|
+ sp.List_AllInTimes = alltimes
|
|
|
+ sp.List_AllInTimes = oh_percent
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Info("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Info("lua爬虫采集详情统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func saveCodeInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ arr := []map[string]interface{}{}
|
|
|
+ for _, spider := range NewCodeInfoMap {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(sp *NewSpider) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ spByte, err := json.Marshal(sp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Info("Json Marshal Error", sp.Code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ if json.Unmarshal(spByte, &tmp) == nil {
|
|
|
+ lock.Lock()
|
|
|
+ arr = append(arr, tmp)
|
|
|
+ if len(arr) > 500 {
|
|
|
+ util.MgoS.SaveBulk("spider_info", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ }(spider)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ util.MgoS.SaveBulk("spider_info", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ logger.Info("爬虫统计完成...")
|
|
|
+}
|