|
@@ -1,15 +1,10 @@
|
|
|
package spider
|
|
|
|
|
|
import (
|
|
|
- "encoding/json"
|
|
|
"fmt"
|
|
|
- "github.com/cron"
|
|
|
"github.com/donnie4w/go-logger/logger"
|
|
|
- "github.com/yuin/gopher-lua"
|
|
|
mu "mfw/util"
|
|
|
"qfw/util"
|
|
|
-
|
|
|
- //"qfw/util/redis"
|
|
|
lu "spiderutil"
|
|
|
"strings"
|
|
|
"time"
|
|
@@ -35,7 +30,7 @@ type LogMap struct {
|
|
|
|
|
|
//数据存储批量
|
|
|
func StoreBlak(mode, event int, c, coverAttr string, data []map[string]interface{}) {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
for _, v := range data {
|
|
|
if t, err := time.ParseInLocation(util.Date_Full_Layout, util.ObjToString(v["publishtime"]), time.Local); err == nil {
|
|
|
v["publishtime"] = t.Unix()
|
|
@@ -56,7 +51,7 @@ func StoreBlak(mode, event int, c, coverAttr string, data []map[string]interface
|
|
|
|
|
|
//数据存储
|
|
|
func Store(mode, event int, c, coverAttr string, data map[string]interface{}, flag bool) {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
if t, err := time.ParseInLocation(util.Date_Full_Layout, util.ObjToString(data["publishtime"]), time.Local); err == nil {
|
|
|
data["publishtime"] = t.Unix()
|
|
|
}
|
|
@@ -74,11 +69,11 @@ func Store(mode, event int, c, coverAttr string, data map[string]interface{}, fl
|
|
|
logger.Warn(c, mode, "保存失败", data)
|
|
|
}
|
|
|
}
|
|
|
- href := fmt.Sprint(data["href"])
|
|
|
- if len(href) > 5 && flag { //有效数据
|
|
|
- hashHref := lu.HexText(href)
|
|
|
- lu.RedisClusterSet(hashHref, "", -1)
|
|
|
- }
|
|
|
+ //href := fmt.Sprint(data["href"])
|
|
|
+ //if len(href) > 5 && flag { //有效数据
|
|
|
+ // hashHref := lu.HexText(href)
|
|
|
+ // lu.RedisClusterSet(hashHref, "", -1)
|
|
|
+ //}
|
|
|
} else if mode == 2 {
|
|
|
data["T"] = c
|
|
|
SaveObj(event, coverAttr, data, flag)
|
|
@@ -91,7 +86,7 @@ func Store(mode, event int, c, coverAttr string, data map[string]interface{}, fl
|
|
|
|
|
|
//保存验证错误日志
|
|
|
func saveVerificationLog(code, name, url, content string) {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
data := map[string]interface{}{}
|
|
|
data["code"] = code
|
|
|
data["name"] = name
|
|
@@ -104,7 +99,7 @@ func saveVerificationLog(code, name, url, content string) {
|
|
|
|
|
|
//查找信息是否存在
|
|
|
func findHasExit(c, q string) bool {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
ret, _ := MgoS.FindOne(c, q)
|
|
|
if ret != nil && len(*ret) > 0 {
|
|
|
return true
|
|
@@ -117,7 +112,7 @@ func findHasExit(c, q string) bool {
|
|
|
var spider_ldtime = map[string]map[string]interface{}{}
|
|
|
|
|
|
func GetLastPubtime(code string) int64 {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
if len(spider_ldtime) < 1 {
|
|
|
list, _ := MgoS.Find("spider_ldtime", nil, nil, nil, false, -1, -1)
|
|
|
for _, v := range *list {
|
|
@@ -134,7 +129,7 @@ func GetLastPubtime(code string) int64 {
|
|
|
|
|
|
//获取最后执行时间
|
|
|
func GetLastExectime(code string) int64 {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
if len(spider_ldtime) < 1 {
|
|
|
list, _ := MgoS.Find("spider_ldtime", nil, nil, nil, false, -1, -1)
|
|
|
for _, v := range *list {
|
|
@@ -153,7 +148,7 @@ func GetLastExectime(code string) int64 {
|
|
|
var spider_downlog = map[string]map[string]interface{}{}
|
|
|
|
|
|
func GetDownloadLast(code, date string) map[string]interface{} {
|
|
|
- defer mu.Catch()
|
|
|
+ defer util.Catch()
|
|
|
if len(spider_downlog) < 1 {
|
|
|
list, _ := MgoS.Find("spider_downlog", map[string]interface{}{"date": date}, nil, nil, false, -1, -1)
|
|
|
for _, v := range *list {
|
|
@@ -193,51 +188,8 @@ func GcCount() {
|
|
|
lu.TimeAfterFunc(30*time.Minute, GcCount, TimeChan)
|
|
|
}
|
|
|
|
|
|
-//保存错误数据信息,重新下载
|
|
|
-func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- if href := util.ObjToString(pd["href"]); href != "" {
|
|
|
- delete(pd, "_id")
|
|
|
- pd["state"] = 0
|
|
|
- pd["from"] = "lua"
|
|
|
- pd["comeintime"] = time.Now().Unix()
|
|
|
- pd["modifyuser"] = modifyuser
|
|
|
- if luaErr, ok := err.(*lua.ApiError); ok && luaErr != nil {
|
|
|
- pd["error"] = luaErr.Object.String()
|
|
|
- }
|
|
|
- if publishtime, ok := pd["publishtime"].(string); ok {
|
|
|
- pd["publishtime"] = lu.ParseDate2Int64(publishtime)
|
|
|
- }
|
|
|
- if jsondata := util.ObjToString(pd["jsondata"]); jsondata != "" && jsondata != "null" {
|
|
|
- tmp := map[string]interface{}{}
|
|
|
- json.Unmarshal([]byte(jsondata), &tmp)
|
|
|
- pd["jsondata"] = tmp
|
|
|
- }
|
|
|
- coll := "spider_highlistdata"
|
|
|
- if lu.Config.Modal == 0 {
|
|
|
- coll = "spider_listdata"
|
|
|
- } else if lu.Config.IsHistoryEvent {
|
|
|
- coll = "spider_historydata"
|
|
|
- }
|
|
|
- pd["coll"] = coll
|
|
|
- //mgu.Save("regatherdata", "spider", "spider", pd)
|
|
|
- query := map[string]interface{}{
|
|
|
- "href": href,
|
|
|
- }
|
|
|
- set := map[string]interface{}{
|
|
|
- "$set": pd,
|
|
|
- }
|
|
|
- update := []map[string]interface{}{}
|
|
|
- update = append(update, query)
|
|
|
- update = append(update, set)
|
|
|
- UpdataMgoCache <- update
|
|
|
- //Mgo.Update("regatherdata", "spider", "spider", query, set, true, false)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
//保存modal=1模式采集的列表页信息
|
|
|
func SaveHighListPageData(tmp map[string]interface{}, hashHref string, num *int) {
|
|
|
- lu.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
|
|
|
tmp["state"] = 0
|
|
|
tmp["event"] = lu.Config.Uploadevent
|
|
|
tmp["comeintime"] = time.Now().Unix()
|
|
@@ -246,6 +198,7 @@ func SaveHighListPageData(tmp map[string]interface{}, hashHref string, num *int)
|
|
|
} else {
|
|
|
MgoS.Save("spider_highlistdata", tmp)
|
|
|
}
|
|
|
+ lu.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
|
|
|
}
|
|
|
|
|
|
//保存7410、7500、7510、7520、7700采集的列表页信息
|
|
@@ -292,43 +245,6 @@ func UpdateHighListDataByCode(code string) {
|
|
|
MgoS.Update("spider_highlistdata", query, set, false, true)
|
|
|
}
|
|
|
|
|
|
-//批量更新错误数据
|
|
|
-func UpdateErrDataMgo() {
|
|
|
- fmt.Println("Update Error Data...")
|
|
|
- arru := make([][]map[string]interface{}, 50)
|
|
|
- indexu := 0
|
|
|
- for {
|
|
|
- select {
|
|
|
- case v := <-UpdataMgoCache:
|
|
|
- arru[indexu] = v
|
|
|
- indexu++
|
|
|
- if indexu == 50 {
|
|
|
- SP <- true
|
|
|
- go func(arru [][]map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-SP
|
|
|
- }()
|
|
|
- MgoS.UpSertBulk("regatherdata", arru...)
|
|
|
- }(arru)
|
|
|
- arru = make([][]map[string]interface{}, 50)
|
|
|
- indexu = 0
|
|
|
- }
|
|
|
- case <-time.After(1 * time.Minute):
|
|
|
- if indexu > 0 {
|
|
|
- SP <- true
|
|
|
- go func(arru [][]map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-SP
|
|
|
- }()
|
|
|
- MgoS.UpSertBulk("regatherdata", arru...)
|
|
|
- }(arru[:indexu])
|
|
|
- arru = make([][]map[string]interface{}, 50)
|
|
|
- indexu = 0
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
//批量更新心跳信息
|
|
|
func UpdateHeartInfo() {
|
|
|
fmt.Println("Update Heart Info...")
|
|
@@ -341,11 +257,11 @@ func UpdateHeartInfo() {
|
|
|
indexh++
|
|
|
if indexh == 200 {
|
|
|
SPH <- true
|
|
|
- go func(heartarr [][]map[string]interface{}) {
|
|
|
+ go func(tmp [][]map[string]interface{}) {
|
|
|
defer func() {
|
|
|
<-SPH
|
|
|
}()
|
|
|
- MgoS.UpSertBulk("spider_heart", heartarr...)
|
|
|
+ MgoS.UpSertBulk("spider_heart", tmp...)
|
|
|
}(heartarr)
|
|
|
heartarr = make([][]map[string]interface{}, 200)
|
|
|
indexh = 0
|
|
@@ -353,11 +269,11 @@ func UpdateHeartInfo() {
|
|
|
case <-time.After(1 * time.Minute):
|
|
|
if indexh > 0 {
|
|
|
SPH <- true
|
|
|
- go func(heartarr [][]map[string]interface{}) {
|
|
|
+ go func(tmp [][]map[string]interface{}) {
|
|
|
defer func() {
|
|
|
<-SPH
|
|
|
}()
|
|
|
- MgoS.UpSertBulk("spider_heart", heartarr...)
|
|
|
+ MgoS.UpSertBulk("spider_heart", tmp...)
|
|
|
}(heartarr[:indexh])
|
|
|
heartarr = make([][]map[string]interface{}, 200)
|
|
|
indexh = 0
|
|
@@ -366,81 +282,196 @@ func UpdateHeartInfo() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//保存爬虫采集非本站点数据
|
|
|
-func SaveOtherSiteData() {
|
|
|
- fmt.Println("Save Other Site Data...")
|
|
|
+//批量保存data_bak
|
|
|
+func SaveDataBak() {
|
|
|
+ fmt.Println("Save DataBak...")
|
|
|
savearr := make([]map[string]interface{}, 200)
|
|
|
- indexh := 0
|
|
|
+ indexdb := 0
|
|
|
for {
|
|
|
select {
|
|
|
- case v := <-SaveMgoCache:
|
|
|
- savearr[indexh] = v
|
|
|
- indexh++
|
|
|
- if indexh == 200 {
|
|
|
- SPS <- true
|
|
|
- go func(savearr []map[string]interface{}) {
|
|
|
+ case v := <-DataBakSaveCache:
|
|
|
+ savearr[indexdb] = v
|
|
|
+ indexdb++
|
|
|
+ if indexdb == 200 {
|
|
|
+ DB_CH <- true
|
|
|
+ go func(tmp []map[string]interface{}) {
|
|
|
defer func() {
|
|
|
- <-SPS
|
|
|
+ <-DB_CH
|
|
|
}()
|
|
|
- MgoS.SaveBulk("spider_othersite", savearr...)
|
|
|
+ MgoS.SaveBulk("data_bak", tmp...)
|
|
|
}(savearr)
|
|
|
savearr = make([]map[string]interface{}, 200)
|
|
|
- indexh = 0
|
|
|
+ indexdb = 0
|
|
|
}
|
|
|
- case <-time.After(1 * time.Minute):
|
|
|
- if indexh > 0 {
|
|
|
- SPS <- true
|
|
|
- go func(savearr []map[string]interface{}) {
|
|
|
+ case <-time.After(30 * time.Second):
|
|
|
+ if indexdb > 0 {
|
|
|
+ DB_CH <- true
|
|
|
+ go func(tmp []map[string]interface{}) {
|
|
|
defer func() {
|
|
|
- <-SPS
|
|
|
+ <-DB_CH
|
|
|
}()
|
|
|
- MgoS.SaveBulk("spider_othersite", savearr...)
|
|
|
- }(savearr[:indexh])
|
|
|
+ MgoS.SaveBulk("data_bak", tmp...)
|
|
|
+ }(savearr[:indexdb])
|
|
|
savearr = make([]map[string]interface{}, 200)
|
|
|
- indexh = 0
|
|
|
+ indexdb = 0
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//批量更新错误数据
|
|
|
+//func UpdateErrDataMgo() {
|
|
|
+// fmt.Println("Update Error Data...")
|
|
|
+// arru := make([][]map[string]interface{}, 50)
|
|
|
+// indexu := 0
|
|
|
+// for {
|
|
|
+// select {
|
|
|
+// case v := <-UpdataMgoCache:
|
|
|
+// arru[indexu] = v
|
|
|
+// indexu++
|
|
|
+// if indexu == 50 {
|
|
|
+// SP <- true
|
|
|
+// go func(arru [][]map[string]interface{}) {
|
|
|
+// defer func() {
|
|
|
+// <-SP
|
|
|
+// }()
|
|
|
+// MgoS.UpSertBulk("regatherdata", arru...)
|
|
|
+// }(arru)
|
|
|
+// arru = make([][]map[string]interface{}, 50)
|
|
|
+// indexu = 0
|
|
|
+// }
|
|
|
+// case <-time.After(1 * time.Minute):
|
|
|
+// if indexu > 0 {
|
|
|
+// SP <- true
|
|
|
+// go func(arru [][]map[string]interface{}) {
|
|
|
+// defer func() {
|
|
|
+// <-SP
|
|
|
+// }()
|
|
|
+// MgoS.UpSertBulk("regatherdata", arru...)
|
|
|
+// }(arru[:indexu])
|
|
|
+// arru = make([][]map[string]interface{}, 50)
|
|
|
+// indexu = 0
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|
|
|
+//保存错误数据信息,重新下载
|
|
|
+//func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}) {
|
|
|
+// defer util.Catch()
|
|
|
+// if href := util.ObjToString(pd["href"]); href != "" {
|
|
|
+// delete(pd, "_id")
|
|
|
+// pd["state"] = 0
|
|
|
+// pd["from"] = "lua"
|
|
|
+// pd["comeintime"] = time.Now().Unix()
|
|
|
+// pd["modifyuser"] = modifyuser
|
|
|
+// if luaErr, ok := err.(*lua.ApiError); ok && luaErr != nil {
|
|
|
+// pd["error"] = luaErr.Object.String()
|
|
|
+// }
|
|
|
+// if publishtime, ok := pd["publishtime"].(string); ok {
|
|
|
+// pd["publishtime"] = lu.ParseDate2Int64(publishtime)
|
|
|
+// }
|
|
|
+// if jsondata := util.ObjToString(pd["jsondata"]); jsondata != "" && jsondata != "null" {
|
|
|
+// tmp := map[string]interface{}{}
|
|
|
+// json.Unmarshal([]byte(jsondata), &tmp)
|
|
|
+// pd["jsondata"] = tmp
|
|
|
+// }
|
|
|
+// coll := "spider_highlistdata"
|
|
|
+// if lu.Config.Modal == 0 {
|
|
|
+// coll = "spider_listdata"
|
|
|
+// } else if lu.Config.IsHistoryEvent {
|
|
|
+// coll = "spider_historydata"
|
|
|
+// }
|
|
|
+// pd["coll"] = coll
|
|
|
+// //mgu.Save("regatherdata", "spider", "spider", pd)
|
|
|
+// query := map[string]interface{}{
|
|
|
+// "href": href,
|
|
|
+// }
|
|
|
+// set := map[string]interface{}{
|
|
|
+// "$set": pd,
|
|
|
+// }
|
|
|
+// update := []map[string]interface{}{}
|
|
|
+// update = append(update, query)
|
|
|
+// update = append(update, set)
|
|
|
+// UpdataMgoCache <- update
|
|
|
+// //Mgo.Update("regatherdata", "spider", "spider", query, set, true, false)
|
|
|
+// }
|
|
|
+//}
|
|
|
+
|
|
|
+//保存爬虫采集非本站点数据
|
|
|
+//func SaveOtherSiteData() {
|
|
|
+// fmt.Println("Save Other Site Data...")
|
|
|
+// savearr := make([]map[string]interface{}, 200)
|
|
|
+// indexh := 0
|
|
|
+// for {
|
|
|
+// select {
|
|
|
+// case v := <-SaveMgoCache:
|
|
|
+// savearr[indexh] = v
|
|
|
+// indexh++
|
|
|
+// if indexh == 200 {
|
|
|
+// SPS <- true
|
|
|
+// go func(savearr []map[string]interface{}) {
|
|
|
+// defer func() {
|
|
|
+// <-SPS
|
|
|
+// }()
|
|
|
+// MgoS.SaveBulk("spider_othersite", savearr...)
|
|
|
+// }(savearr)
|
|
|
+// savearr = make([]map[string]interface{}, 200)
|
|
|
+// indexh = 0
|
|
|
+// }
|
|
|
+// case <-time.After(1 * time.Minute):
|
|
|
+// if indexh > 0 {
|
|
|
+// SPS <- true
|
|
|
+// go func(savearr []map[string]interface{}) {
|
|
|
+// defer func() {
|
|
|
+// <-SPS
|
|
|
+// }()
|
|
|
+// MgoS.SaveBulk("spider_othersite", savearr...)
|
|
|
+// }(savearr[:indexh])
|
|
|
+// savearr = make([]map[string]interface{}, 200)
|
|
|
+// indexh = 0
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|
|
|
+
|
|
|
//定时任务
|
|
|
-func TimeTask() {
|
|
|
- now := time.Now()
|
|
|
- Today = util.FormatDate(&now, util.Date_Short_Layout) //初始化日期
|
|
|
- cr := cron.New()
|
|
|
- cr.Start()
|
|
|
- cr.AddFunc("0 30 0 * * ?", UpdateSpiderFlow) //每天零时提交统计
|
|
|
-}
|
|
|
+//func TimeTask() {
|
|
|
+// now := time.Now()
|
|
|
+// Today = util.FormatDate(&now, util.Date_Short_Layout) //初始化日期
|
|
|
+// cr := cron.New()
|
|
|
+// cr.Start()
|
|
|
+// cr.AddFunc("0 30 0 * * ?", UpdateSpiderFlow) //每天零时提交统计
|
|
|
+//}
|
|
|
|
|
|
//更新流量信息
|
|
|
-func UpdateSpiderFlow() {
|
|
|
- defer util.Catch()
|
|
|
- logger.Info("统计流量信息开始...", Today)
|
|
|
- arr := []map[string]interface{}{}
|
|
|
- SpiderFlowMap.Range(func(key, temp interface{}) bool {
|
|
|
- date := strings.Split(key.(string), "+")
|
|
|
- if len(date) == 2 && date[0] == Today { //统计非当天的
|
|
|
- if sfMap, ok := temp.(*SpiderFlow); ok {
|
|
|
- arr = append(arr, map[string]interface{}{
|
|
|
- "spidercode": date[1],
|
|
|
- "date": date[0],
|
|
|
- "flow": sfMap.Flow,
|
|
|
- "site": sfMap.Site,
|
|
|
- "channel": sfMap.Channel,
|
|
|
- "modifyuser": sfMap.ModifyUser,
|
|
|
- "comeintime": time.Now().Unix(),
|
|
|
- "event": lu.Config.Uploadevent,
|
|
|
- })
|
|
|
- SpiderFlowMap.Delete(key) //统计完成后删除非当天数据
|
|
|
- }
|
|
|
- }
|
|
|
- return true
|
|
|
- })
|
|
|
- if len(arr) > 0 {
|
|
|
- MgoS.SaveBulk("spider_flow", arr...)
|
|
|
- arr = []map[string]interface{}{}
|
|
|
- }
|
|
|
- now := time.Now()
|
|
|
- Today = util.FormatDate(&now, util.Date_Short_Layout)
|
|
|
- logger.Info("统计流量信息完成...", Today)
|
|
|
-}
|
|
|
+//func UpdateSpiderFlow() {
|
|
|
+// defer util.Catch()
|
|
|
+// logger.Info("统计流量信息开始...", Today)
|
|
|
+// arr := []map[string]interface{}{}
|
|
|
+// SpiderFlowMap.Range(func(key, temp interface{}) bool {
|
|
|
+// date := strings.Split(key.(string), "+")
|
|
|
+// if len(date) == 2 && date[0] == Today { //统计非当天的
|
|
|
+// if sfMap, ok := temp.(*SpiderFlow); ok {
|
|
|
+// arr = append(arr, map[string]interface{}{
|
|
|
+// "spidercode": date[1],
|
|
|
+// "date": date[0],
|
|
|
+// "flow": sfMap.Flow,
|
|
|
+// "site": sfMap.Site,
|
|
|
+// "channel": sfMap.Channel,
|
|
|
+// "modifyuser": sfMap.ModifyUser,
|
|
|
+// "comeintime": time.Now().Unix(),
|
|
|
+// "event": lu.Config.Uploadevent,
|
|
|
+// })
|
|
|
+// SpiderFlowMap.Delete(key) //统计完成后删除非当天数据
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return true
|
|
|
+// })
|
|
|
+// if len(arr) > 0 {
|
|
|
+// MgoS.SaveBulk("spider_flow", arr...)
|
|
|
+// arr = []map[string]interface{}{}
|
|
|
+// }
|
|
|
+// now := time.Now()
|
|
|
+// Today = util.FormatDate(&now, util.Date_Short_Layout)
|
|
|
+// logger.Info("统计流量信息完成...", Today)
|
|
|
+//}
|