|
@@ -10,7 +10,6 @@ import (
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
- "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/donnie4w/go-logger/logger"
|
|
@@ -31,10 +30,11 @@ var (
|
|
|
IndexFields []string //生索引字段
|
|
|
ProjectinfoFields []string
|
|
|
PurchasinglistFields []string
|
|
|
- SpiderCodeArr []string //要下载的所有数据的spidercode
|
|
|
- SpiderData map[string][]*Data //记录某个爬虫下所有的重采数据信息
|
|
|
- SpiderDataAll map[string]map[string]interface{} //存储查询到的所有数据,主要用于udp爬虫重采补全数据
|
|
|
- ScriptDataArr []*ScriptData
|
|
|
+ SpiderCodeArr []string //要下载的所有数据的spidercode
|
|
|
+ SpiderScriptData map[string]*ScriptData //记录爬虫对象
|
|
|
+ //SpiderData map[string][]*Data //记录某个爬虫下所有的重采数据信息
|
|
|
+ //SpiderDataAll map[string]map[string]interface{} //存储查询到的所有数据,主要用于udp爬虫重采补全数据
|
|
|
+ //ScriptDataArr []*ScriptData
|
|
|
//ScriptSpider map[string]*Spider //所有爬虫信息 key:spidercode
|
|
|
)
|
|
|
|
|
@@ -65,7 +65,7 @@ func InitSysconfig() {
|
|
|
//初始化mgo
|
|
|
MgoE = &mongo.MongodbSim{
|
|
|
MongodbAddr: qu.ObjToString(util.Config.Mongodb_editor),
|
|
|
- Size: 2,
|
|
|
+ Size: 10,
|
|
|
DbName: "editor",
|
|
|
}
|
|
|
MgoE.InitPool()
|
|
@@ -105,9 +105,10 @@ func InitSysconfig() {
|
|
|
// ProjectinfoFields = qu.ObjArrToStringArr(es["projectinfo"].([]interface{}))
|
|
|
// PurchasinglistFields = qu.ObjArrToStringArr(es["purchasinglist"].([]interface{}))
|
|
|
|
|
|
- SpiderData = make(map[string][]*Data)
|
|
|
- SpiderDataAll = make(map[string]map[string]interface{})
|
|
|
+ // SpiderData = make(map[string][]*Data)
|
|
|
+ // SpiderDataAll = make(map[string]map[string]interface{})
|
|
|
//ScriptSpider = make(map[string]*Spider)
|
|
|
+ SpiderScriptData = make(map[string]*ScriptData)
|
|
|
}
|
|
|
|
|
|
func TimeTask() {
|
|
@@ -154,44 +155,39 @@ func TimeRegatherTask() {
|
|
|
func DataReDather(query map[string]interface{}) {
|
|
|
defer qu.Catch()
|
|
|
logger.Debug("数据重采...")
|
|
|
- if InitData(query) && InitSpider() { //查询数据;加载脚本
|
|
|
- Start() //执行
|
|
|
+ if InitSpider_New(query) {
|
|
|
+ Start_New(query)
|
|
|
}
|
|
|
+ // if InitData(query) && InitSpider() { //查询数据;加载脚本
|
|
|
+ // Start() //执行
|
|
|
+ // }
|
|
|
//
|
|
|
logger.Debug("开始清理内存...")
|
|
|
- ScriptDataArr = []*ScriptData{}
|
|
|
- SpiderData = map[string][]*Data{}
|
|
|
- SpiderCodeArr = []string{}
|
|
|
- SpiderDataAll = map[string]map[string]interface{}{}
|
|
|
+ // ScriptDataArr = []*ScriptData{}
|
|
|
+ // SpiderData = map[string][]*Data{}
|
|
|
+ // SpiderCodeArr = []string{}
|
|
|
+ // SpiderDataAll = map[string]map[string]interface{}{}
|
|
|
}
|
|
|
|
|
|
-//查数据
|
|
|
-func InitData(q map[string]interface{}) bool {
|
|
|
+//初始化爬虫
|
|
|
+func InitSpider_New(q map[string]interface{}) bool {
|
|
|
defer qu.Catch()
|
|
|
- logger.Debug("开始查询...", q)
|
|
|
+ logger.Debug("开始统计爬虫...", q)
|
|
|
sess := Mgo.GetMgoConn()
|
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
|
- sf := map[string]interface{}{ //舍弃字段
|
|
|
- //"spidercode": 1, "href": 1, "title": 1, "publishtime": 1, "jsondata": 1,
|
|
|
- "comeintime": 0,
|
|
|
- "state": 0,
|
|
|
- "error": 0,
|
|
|
- "modifyuser": 0,
|
|
|
+ sf := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
}
|
|
|
-
|
|
|
wg := &sync.WaitGroup{}
|
|
|
lock := &sync.Mutex{}
|
|
|
c := make(chan bool, 3)
|
|
|
- update := [][]map[string]interface{}{}
|
|
|
count, _ := sess.DB(util.Config.TmpDbName).C(util.Config.TmpCollName).Find(&q).Count()
|
|
|
if count == 0 {
|
|
|
logger.Debug("无数据...")
|
|
|
return false
|
|
|
}
|
|
|
- logger.Debug("查询总数:", count)
|
|
|
it := sess.DB(util.Config.TmpDbName).C(util.Config.TmpCollName).Find(&q).Select(&sf).Iter()
|
|
|
index := 0
|
|
|
- n := int64(0)
|
|
|
for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
|
|
|
wg.Add(1)
|
|
|
c <- true
|
|
@@ -200,74 +196,22 @@ func InitData(q map[string]interface{}) bool {
|
|
|
<-c
|
|
|
wg.Done()
|
|
|
}()
|
|
|
- atomic.AddInt64(&n, 1)
|
|
|
- href := qu.ObjToString(tmp["href"])
|
|
|
- if len(href) <= 5 { //无效数据
|
|
|
- return
|
|
|
- }
|
|
|
- id := mongo.BsonIdToSId(tmp["_id"])
|
|
|
- if id == "" {
|
|
|
- return
|
|
|
- }
|
|
|
spidercode := qu.ObjToString(tmp["spidercode"])
|
|
|
arr := strings.Split(spidercode, "_")
|
|
|
s_text := arr[len(arr)-1]
|
|
|
if strings.HasPrefix(s_text, "qy") || strings.HasSuffix(s_text, "qy") { //过滤掉企业爬虫
|
|
|
return
|
|
|
}
|
|
|
- publishtime := qu.Int64All(tmp["publishtime"])
|
|
|
- from := qu.ObjToString(tmp["from"])
|
|
|
- jsondata := map[string]interface{}{}
|
|
|
- if jd, ok := tmp["jsondata"].(map[string]interface{}); ok && jd != nil {
|
|
|
- jsondata = jd
|
|
|
- }
|
|
|
- d := &Data{
|
|
|
- Id: id,
|
|
|
- Title: qu.ObjToString(tmp["title"]),
|
|
|
- Href: qu.ObjToString(tmp["href"]),
|
|
|
- SpiderCode: spidercode,
|
|
|
- PublishTime: qu.FormatDateByInt64(&publishtime, qu.Date_Full_Layout),
|
|
|
- JsonData: jsondata,
|
|
|
- From: from,
|
|
|
- }
|
|
|
- //更新
|
|
|
- idAndSet := []map[string]interface{}{}
|
|
|
- _id := map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- }
|
|
|
- set := map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "state": 1,
|
|
|
- },
|
|
|
- }
|
|
|
- idAndSet = append(idAndSet, _id) //第一个为查询条件
|
|
|
- idAndSet = append(idAndSet, set) //第二个为更新内容
|
|
|
lock.Lock()
|
|
|
- if from == "lua" {
|
|
|
- // delete(tmp, "title")
|
|
|
- // delete(tmp, "href")
|
|
|
- // delete(tmp, "jsondata")
|
|
|
- // delete(tmp, "spidercode")
|
|
|
- // delete(tmp, "publishtime")
|
|
|
- delete(tmp, "from")
|
|
|
- delete(tmp, "_id")
|
|
|
- SpiderDataAll[id] = tmp //记录该条信息除了以上删除字段外其余字段信息,用于发送给保存服务
|
|
|
- }
|
|
|
- //组装数据
|
|
|
- if SpiderData[spidercode] == nil {
|
|
|
- SpiderCodeArr = append(SpiderCodeArr, spidercode) //记录所有爬虫
|
|
|
- tmpArr := []*Data{}
|
|
|
- tmpArr = append(tmpArr, d)
|
|
|
- SpiderData[spidercode] = tmpArr
|
|
|
- } else {
|
|
|
- sd := SpiderData[spidercode]
|
|
|
- sd = append(sd, d)
|
|
|
- SpiderData[spidercode] = sd
|
|
|
- }
|
|
|
- update = append(update, idAndSet)
|
|
|
- if len(update) > 500 {
|
|
|
- Mgo.UpdateBulk(util.Config.TmpCollName, update...) //更新state状态
|
|
|
- update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
+ if SpiderScriptData[spidercode] == nil { //
|
|
|
+ lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": spidercode})
|
|
|
+ if lua != nil && len(*lua) > 0 {
|
|
|
+ script := GetScriptByTmp(*lua) //拼装爬虫脚本
|
|
|
+ sd := &ScriptData{
|
|
|
+ SP: NewSpider(spidercode, script),
|
|
|
+ }
|
|
|
+ SpiderScriptData[spidercode] = sd
|
|
|
+ }
|
|
|
}
|
|
|
lock.Unlock()
|
|
|
}(tmp)
|
|
@@ -277,99 +221,63 @@ func InitData(q map[string]interface{}) bool {
|
|
|
}
|
|
|
}
|
|
|
wg.Wait()
|
|
|
- lock.Lock()
|
|
|
- if len(update) > 0 {
|
|
|
- Mgo.UpdateBulk(util.Config.TmpCollName, update...) //更新state状态
|
|
|
- update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- logger.Debug("查询完毕...", index, n, "爬虫个数:", len(SpiderCodeArr), len(SpiderData))
|
|
|
+ logger.Debug("统计爬虫完毕...爬虫个数:", len(SpiderScriptData))
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
-//加载脚本
|
|
|
-func InitSpider() bool {
|
|
|
+//根据爬虫查询数据
|
|
|
+func Start_New(q map[string]interface{}) {
|
|
|
defer qu.Catch()
|
|
|
- logger.Debug("开始加载脚本...")
|
|
|
- // query := map[string]interface{}{
|
|
|
- // "code": map[string]interface{}{
|
|
|
- // "$in": SpiderCodeArr,
|
|
|
- // },
|
|
|
- // }
|
|
|
- lock := &sync.Mutex{}
|
|
|
+ logger.Debug("开始重采...")
|
|
|
wg := &sync.WaitGroup{}
|
|
|
- ch := make(chan bool, 3)
|
|
|
- luaList := []map[string]interface{}{}
|
|
|
- for _, code := range SpiderCodeArr {
|
|
|
+ spch := make(chan bool, util.Config.SpiderChan)
|
|
|
+ for code, sd := range SpiderScriptData {
|
|
|
wg.Add(1)
|
|
|
- ch <- true
|
|
|
- go func(code string) {
|
|
|
+ spch <- true
|
|
|
+ go func(spidercode string, sd *ScriptData) {
|
|
|
defer func() {
|
|
|
- <-ch
|
|
|
+ <-spch
|
|
|
+ sd.SP.L.Close()
|
|
|
wg.Done()
|
|
|
}()
|
|
|
- lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": code})
|
|
|
- if lua != nil && len(*lua) > 0 {
|
|
|
- lock.Lock()
|
|
|
- luaList = append(luaList, (*lua))
|
|
|
- lock.Unlock()
|
|
|
- }
|
|
|
- }(code)
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- //logger.Debug("lua query:", query)
|
|
|
- //list, _ := MgoE.Find("luaconfig", query, nil, nil, false, -1, -1)
|
|
|
- if len(luaList) == 0 || len(luaList) != len(SpiderCodeArr) {
|
|
|
- logger.Error("加载脚本失败")
|
|
|
- return false
|
|
|
- }
|
|
|
- for _, v := range luaList {
|
|
|
- code := qu.ObjToString(v["code"])
|
|
|
- script := GetScriptByTmp(v) //拼装爬虫脚本
|
|
|
- //sp := NewSpider(code, script)
|
|
|
- for _, data := range SpiderData[code] {
|
|
|
- sd := &ScriptData{
|
|
|
- SP: NewSpider(code, script),
|
|
|
- D: data,
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ q["spidercode"] = spidercode
|
|
|
+ sf := map[string]interface{}{
|
|
|
+ "comeintime": 0,
|
|
|
+ "state": 0,
|
|
|
+ "error": 0,
|
|
|
+ "modifyuser": 0,
|
|
|
+ "detail": 0,
|
|
|
+ "contenthtml": 0,
|
|
|
}
|
|
|
- ScriptDataArr = append(ScriptDataArr, sd)
|
|
|
- }
|
|
|
- //ScriptSpider[code] = sp
|
|
|
- }
|
|
|
- logger.Debug("加载脚本完毕...", len(ScriptDataArr))
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
-//开始重采
|
|
|
-func Start() {
|
|
|
- defer qu.Catch()
|
|
|
- logger.Debug("开始执行...")
|
|
|
- if len(ScriptDataArr) > 0 {
|
|
|
- spiderchan := make(chan bool, util.Config.SpiderChan)
|
|
|
- spiderwg := &sync.WaitGroup{}
|
|
|
- for n, sd := range ScriptDataArr {
|
|
|
- spiderwg.Add(1)
|
|
|
- spiderchan <- true
|
|
|
- go func(sd *ScriptData) {
|
|
|
- defer func() {
|
|
|
- <-spiderchan
|
|
|
- spiderwg.Done()
|
|
|
- sd.SP.L.Close()
|
|
|
- }()
|
|
|
- dataBytes, _ := json.Marshal(sd.D)
|
|
|
- tmp := map[string]interface{}{}
|
|
|
- if json.Unmarshal(dataBytes, &tmp) == nil {
|
|
|
- sd.SP.DownloadDetailItem(tmp)
|
|
|
- } else {
|
|
|
- logger.Error("Json Data Error", sd.D.Id)
|
|
|
+ //根据爬虫查询重采数据
|
|
|
+ it := sess.DB(util.Config.TmpDbName).C(util.Config.TmpCollName).Find(&q).Select(&sf).Iter()
|
|
|
+ index := 0
|
|
|
+ for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
|
|
|
+ //更新数据状态
|
|
|
+ Mgo.Update(util.Config.TmpCollName, map[string]interface{}{"_id": tmp["_id"]}, map[string]interface{}{"$set": map[string]interface{}{"state": 1}}, false, false)
|
|
|
+ //data
|
|
|
+ data := map[string]interface{}{}
|
|
|
+ data["id"] = mongo.BsonIdToSId(tmp["_id"]) //id替换
|
|
|
+ publishtime := qu.Int64All(tmp["publishtime"])
|
|
|
+ data["publishtime"] = qu.FormatDateByInt64(&publishtime, qu.Date_Full_Layout)
|
|
|
+ if jd, ok := tmp["jsondata"].(map[string]interface{}); ok && jd != nil {
|
|
|
+ data["jsondata"] = jd
|
|
|
}
|
|
|
- }(sd)
|
|
|
- if n%500 == 0 {
|
|
|
- logger.Debug("current:", n)
|
|
|
+ data["title"] = tmp["title"]
|
|
|
+ data["href"] = tmp["href"]
|
|
|
+ data["from"] = tmp["from"]
|
|
|
+ data["spidercode"] = spidercode
|
|
|
+ //tmp
|
|
|
+ delete(tmp, "_id")
|
|
|
+ delete(tmp, "from")
|
|
|
+ sd.SP.DownloadDetailItem(data, tmp) //数据采集
|
|
|
}
|
|
|
- }
|
|
|
- spiderwg.Wait()
|
|
|
+ }(code, sd)
|
|
|
}
|
|
|
+ wg.Wait()
|
|
|
+ SpiderScriptData = map[string]*ScriptData{} //重置
|
|
|
logger.Debug("执行完毕...")
|
|
|
}
|
|
|
|
|
@@ -707,4 +615,217 @@ func GetFileText(tmp map[string]interface{}) (filetext string) {
|
|
|
func FilterDetail(text string) string {
|
|
|
return filterReg.ReplaceAllString(text, "")
|
|
|
}
|
|
|
+
|
|
|
+//加载脚本
|
|
|
+func InitSpider() bool {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Debug("开始加载脚本...")
|
|
|
+ // query := map[string]interface{}{
|
|
|
+ // "code": map[string]interface{}{
|
|
|
+ // "$in": SpiderCodeArr,
|
|
|
+ // },
|
|
|
+ // }
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+ luaList := []map[string]interface{}{}
|
|
|
+ for _, code := range SpiderCodeArr {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(code string) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": code})
|
|
|
+ if lua != nil && len(*lua) > 0 {
|
|
|
+ lock.Lock()
|
|
|
+ luaList = append(luaList, (*lua))
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+ }(code)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ //logger.Debug("lua query:", query)
|
|
|
+ //list, _ := MgoE.Find("luaconfig", query, nil, nil, false, -1, -1)
|
|
|
+ if len(luaList) == 0 || len(luaList) != len(SpiderCodeArr) {
|
|
|
+ logger.Error("加载脚本失败")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ for _, v := range luaList {
|
|
|
+ code := qu.ObjToString(v["code"])
|
|
|
+ script := GetScriptByTmp(v) //拼装爬虫脚本
|
|
|
+ //sp := NewSpider(code, script)
|
|
|
+ for _, data := range SpiderData[code] {
|
|
|
+ sd := &ScriptData{
|
|
|
+ SP: NewSpider(code, script),
|
|
|
+ D: data,
|
|
|
+ }
|
|
|
+ ScriptDataArr = append(ScriptDataArr, sd)
|
|
|
+ }
|
|
|
+ //ScriptSpider[code] = sp
|
|
|
+ }
|
|
|
+ logger.Debug("加载脚本完毕...", len(ScriptDataArr))
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+//查数据
|
|
|
+func InitData(q map[string]interface{}) bool {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Debug("开始查询...", q)
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ sf := map[string]interface{}{ //舍弃字段
|
|
|
+ //"spidercode": 1, "href": 1, "title": 1, "publishtime": 1, "jsondata": 1,
|
|
|
+ "comeintime": 0,
|
|
|
+ "state": 0,
|
|
|
+ "error": 0,
|
|
|
+ "modifyuser": 0,
|
|
|
+ "detail": 0,
|
|
|
+ "contenthtml": 0,
|
|
|
+ }
|
|
|
+
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ c := make(chan bool, 3)
|
|
|
+ update := [][]map[string]interface{}{}
|
|
|
+ count, _ := sess.DB(util.Config.TmpDbName).C(util.Config.TmpCollName).Find(&q).Count()
|
|
|
+ if count == 0 {
|
|
|
+ logger.Debug("无数据...")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ logger.Debug("查询总数:", count)
|
|
|
+ it := sess.DB(util.Config.TmpDbName).C(util.Config.TmpCollName).Find(&q).Select(&sf).Iter()
|
|
|
+ index := 0
|
|
|
+ n := int64(0)
|
|
|
+ for tmp := map[string]interface{}{}; it.Next(&tmp); index++ {
|
|
|
+ wg.Add(1)
|
|
|
+ c <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-c
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ atomic.AddInt64(&n, 1)
|
|
|
+ href := qu.ObjToString(tmp["href"])
|
|
|
+ if len(href) <= 5 { //无效数据
|
|
|
+ return
|
|
|
+ }
|
|
|
+ id := mongo.BsonIdToSId(tmp["_id"])
|
|
|
+ if id == "" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ spidercode := qu.ObjToString(tmp["spidercode"])
|
|
|
+ arr := strings.Split(spidercode, "_")
|
|
|
+ s_text := arr[len(arr)-1]
|
|
|
+ if strings.HasPrefix(s_text, "qy") || strings.HasSuffix(s_text, "qy") { //过滤掉企业爬虫
|
|
|
+ return
|
|
|
+ }
|
|
|
+ publishtime := qu.Int64All(tmp["publishtime"])
|
|
|
+ from := qu.ObjToString(tmp["from"])
|
|
|
+ jsondata := map[string]interface{}{}
|
|
|
+ if jd, ok := tmp["jsondata"].(map[string]interface{}); ok && jd != nil {
|
|
|
+ jsondata = jd
|
|
|
+ }
|
|
|
+ d := &Data{
|
|
|
+ Id: id,
|
|
|
+ Title: qu.ObjToString(tmp["title"]),
|
|
|
+ Href: qu.ObjToString(tmp["href"]),
|
|
|
+ SpiderCode: spidercode,
|
|
|
+ PublishTime: qu.FormatDateByInt64(&publishtime, qu.Date_Full_Layout),
|
|
|
+ JsonData: jsondata,
|
|
|
+ From: from,
|
|
|
+ }
|
|
|
+ //更新
|
|
|
+ idAndSet := []map[string]interface{}{}
|
|
|
+ _id := map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ }
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "state": 1,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ idAndSet = append(idAndSet, _id) //第一个为查询条件
|
|
|
+ idAndSet = append(idAndSet, set) //第二个为更新内容
|
|
|
+ lock.Lock()
|
|
|
+ if from == "lua" {
|
|
|
+ // delete(tmp, "title")
|
|
|
+ // delete(tmp, "href")
|
|
|
+ // delete(tmp, "jsondata")
|
|
|
+ // delete(tmp, "spidercode")
|
|
|
+ // delete(tmp, "publishtime")
|
|
|
+ delete(tmp, "from")
|
|
|
+ delete(tmp, "_id")
|
|
|
+ SpiderDataAll[id] = tmp //记录该条信息除了以上删除字段外其余字段信息,用于发送给保存服务
|
|
|
+ }
|
|
|
+ //组装数据
|
|
|
+ if SpiderData[spidercode] == nil {
|
|
|
+ SpiderCodeArr = append(SpiderCodeArr, spidercode) //记录所有爬虫
|
|
|
+ tmpArr := []*Data{}
|
|
|
+ tmpArr = append(tmpArr, d)
|
|
|
+ SpiderData[spidercode] = tmpArr
|
|
|
+ } else {
|
|
|
+ sd := SpiderData[spidercode]
|
|
|
+ sd = append(sd, d)
|
|
|
+ SpiderData[spidercode] = sd
|
|
|
+ }
|
|
|
+ update = append(update, idAndSet)
|
|
|
+ if len(update) > 500 {
|
|
|
+ Mgo.UpdateBulk(util.Config.TmpCollName, update...) //更新state状态
|
|
|
+ update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ if index%500 == 0 {
|
|
|
+ logger.Debug("current:", index)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(update) > 0 {
|
|
|
+ Mgo.UpdateBulk(util.Config.TmpCollName, update...) //更新state状态
|
|
|
+ update = [][]map[string]interface{}{} //更新后把数据置空
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ logger.Debug("查询完毕...", index, n, "爬虫个数:", len(SpiderCodeArr), len(SpiderData))
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+//开始重采
|
|
|
+func Start() {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Debug("开始执行...")
|
|
|
+ if len(ScriptDataArr) > 0 {
|
|
|
+ spiderchan := make(chan bool, util.Config.SpiderChan)
|
|
|
+ spiderwg := &sync.WaitGroup{}
|
|
|
+ for n, sd := range ScriptDataArr {
|
|
|
+ spiderwg.Add(1)
|
|
|
+ spiderchan <- true
|
|
|
+ go func(sd *ScriptData) {
|
|
|
+ defer func() {
|
|
|
+ <-spiderchan
|
|
|
+ spiderwg.Done()
|
|
|
+ sd.SP.L.Close()
|
|
|
+ }()
|
|
|
+ dataBytes, _ := json.Marshal(sd.D)
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ if json.Unmarshal(dataBytes, &tmp) == nil {
|
|
|
+ sd.SP.DownloadDetailItem(tmp)
|
|
|
+ } else {
|
|
|
+ logger.Error("Json Data Error", sd.D.Id)
|
|
|
+ }
|
|
|
+ }(sd)
|
|
|
+ if n%500 == 0 {
|
|
|
+ logger.Debug("current:", n)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ spiderwg.Wait()
|
|
|
+ }
|
|
|
+ logger.Debug("执行完毕...")
|
|
|
+}
|
|
|
+
|
|
|
*/
|