Prechádzať zdrojové kódy

多线程采集增加单个爬虫线程上限和newspider

maxiaoshan 3 rokov pred
rodič
commit
bc2176196b
4 zmenil súbory, kde vykonal 78 pridanie a 15 odobranie
  1. 1 1
      src/config.json
  2. 60 6
      src/spider/handler.go
  3. 8 2
      src/spider/script.go
  4. 9 6
      src/spider/spider.go

+ 1 - 1
src/config.json

@@ -18,7 +18,7 @@
     "modal": 1,
     "ishistoryevent": false,
     "threadbasenum": 50,
-    "threadupperlimit": 100,
+    "threadupperlimit": 10,
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/test/spider_test.lua",
     "redisservers": "title_repeat_judgement=192.168.3.207:2679,title_repeat_fulljudgement=192.168.3.207:2679,title_repeat_listpagehref=192.168.3.207:1679",

+ 60 - 6
src/spider/handler.go

@@ -535,7 +535,7 @@ func UpdateSpiderByCodeState(code, state string) (bool, error) {
 					}
 					sp2.MUserName = v["modifyuser"]
 					sp2.MUserEmail = v["modifyemail"]
-					sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true) //更新上架,重载脚本
+					sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本
 					Allspiders2.Store(k, sp2)
 					// up = true
 					// err = nil
@@ -688,9 +688,9 @@ func UpdateSpiderByCodeState(code, state string) (bool, error) {
 						}
 						sp2.MUserName = v["modifyuser"]
 						sp2.MUserEmail = v["modifyemail"]
-						sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true) //更新上架,重载脚本
-						Allspiders2.Store(k, sp2)                                                        //重载后放入集合
-						UpdateHighListDataByCode(k)                                                      //爬虫更新上架后,重置数据state=0
+						sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本
+						Allspiders2.Store(k, sp2)                                                               //重载后放入集合
+						UpdateHighListDataByCode(k)                                                             //爬虫更新上架后,重置数据state=0
 						// up = true
 						// err = nil
 						logger.Info("Allspiders2上架重载脚本", sp2.Code)
@@ -891,7 +891,7 @@ func ReloadSpiderFile() {
 func NewSpider_New(code, luafile string, newstate bool) (*Spider, string) {
 	defer mu.Catch()
 	spider := &Spider{}
-	err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, newstate)
+	err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, newstate, false)
 	if err != "" {
 		return nil, err
 	}
@@ -952,7 +952,7 @@ func NewSpider_New(code, luafile string, newstate bool) (*Spider, string) {
 func NewSpider(code, luafile string) (*Spider, string) {
 	defer mu.Catch()
 	spider := &Spider{}
-	err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, true)
+	err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, true, false)
 	if err != "" {
 		return nil, err
 	}
@@ -1011,6 +1011,60 @@ func NewSpider(code, luafile string) (*Spider, string) {
 	return spider, ""
 }
 
+//多线程生成爬虫
+func NewSpiderForThread(code, luafile string) (*Spider, string) {
+	defer mu.Catch()
+	spider := &Spider{}
+	err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, true, true)
+	if err != "" {
+		return nil, err
+	}
+	spider.Code = spider.GetVar("spiderCode")
+	spider.SCode = spider.Code
+	spider.Script.SCode = spider.Code
+	spider.Name = spider.GetVar("spiderName")
+	spider.Channel = spider.GetVar("spiderChannel")
+
+	//spider.LastExecTime = GetLastExectime(spider.Code)
+	spider.DownDetail = spider.GetBoolVar("spiderDownDetailPage")
+	spider.Collection = spider.GetVar("spider2Collection")
+	spider.SpiderRunRate = int64(spider.GetIntVar("spiderRunRate"))
+	//spider.Thread = int64(spider.GetIntVar("spiderThread"))
+	spider.StoreToMsgEvent = spider.GetIntVar("spiderStoreToMsgEvent")
+	spider.StoreMode = spider.GetIntVar("spiderStoreMode")
+	spider.CoverAttr = spider.GetVar("spiderCoverAttr")
+	spiderSleepBase := spider.GetIntVar("spiderSleepBase")
+	if spiderSleepBase == -1 {
+		spider.SleepBase = 1000
+	} else {
+		spider.SleepBase = spiderSleepBase
+	}
+	spiderSleepRand := spider.GetIntVar("spiderSleepRand")
+	if spiderSleepRand == -1 {
+		spider.SleepRand = 1000
+	} else {
+		spider.SleepRand = spiderSleepRand
+	}
+	spiderTimeout := spider.GetIntVar("spiderTimeout")
+	if spiderTimeout == -1 {
+		spider.Timeout = 60
+	} else {
+		spider.Timeout = int64(spiderTimeout)
+	}
+	spider.TargetChannelUrl = spider.GetVar("spiderTargetChannelUrl")
+
+	spider.UserName = spider.GetVar("spiderUserName")
+	spider.UserEmail = spider.GetVar("spiderUserEmail")
+	spider.UploadTime = spider.GetVar("spiderUploadTime")
+	//新增历史补漏
+	//qu.Debug("-------", spider.GetBoolVar("spiderIsHistoricalMend"), spider.GetBoolVar("spiderIsMustDownload"))
+	spider.IsHistoricalMend = spider.GetBoolVar("spiderIsHistoricalMend")
+	spider.IsMustDownload = spider.GetBoolVar("spiderIsMustDownload")
+	//新老爬虫
+	spider.IsCompete = spider.GetBoolVar("spiderIsCompete")
+	return spider, ""
+}
+
 //下载量入库
 func SaveDownCount(code string, addtotal bool, todayDowncount, todayRequestNum, yesterdayDowncount, yestoDayRequestNum int32) {
 	date := time.Unix(time.Now().Unix(), 0).Format(qu.Date_Short_Layout)

+ 8 - 2
src/spider/script.go

@@ -70,11 +70,11 @@ func init() {
 var TimeSleepChan = make(chan bool, 1)
 
 //加载文件
-func (s *Script) LoadScript(site, channel, user *string, code, script_file string, newstate bool) string {
+func (s *Script) LoadScript(site, channel, user *string, code, script_file string, newstate bool, thread bool) string {
 	defer mu.Catch()
 	s.SCode = code
 	s.ScriptFile = script_file
-	if util.Config.Working == 0 {
+	if util.Config.Working == 0 { //高性能模式
 		if newstate {
 			s.L = lua.NewState(lua.Options{
 				RegistrySize:        256 * 20,
@@ -82,6 +82,12 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 				IncludeGoStackTrace: false,
 			})
 		}
+	} else if thread { //多线程模式创建lua对象
+		s.L = lua.NewState(lua.Options{
+			RegistrySize:        256 * 20,
+			CallStackSize:       256,
+			IncludeGoStackTrace: false,
+		})
 	} else { //节能模式从CC池中获取lua.LState
 		if newstate { //队列模式的newstate主要区分是列表页爬虫CC还是三级页爬虫CC2
 			lState := <-CC2

+ 9 - 6
src/spider/spider.go

@@ -177,7 +177,7 @@ func (s *Spider) ExecJob(reload bool) {
 		}
 	}()
 	if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
-		s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true)
+		s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
 	}
 	logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
 	s.LastDowncount = 0
@@ -876,10 +876,13 @@ func (s *Spider) DownloadDetail(stype string) {
 		UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
 	}
 	countNum := Mgo.Count("spider_highlistdata", q) //统计util.Config.DayNum天内未下载爬虫个数
-	logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum)
+	//logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum)
 	if countNum > 0 {
 		threadNum := countNum / util.Config.ThreadBaseNum //线程数
-		//logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum, "	thread num:", threadNum)
+		if threadNum > util.Config.ThreadUpperLimit {     //设置单个爬虫线程上限
+			threadNum = util.Config.ThreadUpperLimit
+		}
+		logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum, "	thread num:", threadNum)
 		list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 200)
 		if list != nil && len(*list) > 0 {
 			spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数)
@@ -1060,7 +1063,7 @@ func (s *Spider) DownloadDetail(stype string) {
 			}
 			if stype == "highlist" { //高性能模式下载完三级页数据,sp对象需要重载
 				//重载主线程sp
-				s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true)
+				s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
 			}
 		}
 	} else if stype == "highlist" { //高性能模式无数据sleep2分钟
@@ -1071,7 +1074,7 @@ func (s *Spider) DownloadDetail(stype string) {
 //初始化sp对象
 func NewSpiderByScript(num int, code string, info map[string]string, spChan chan *Spider) {
 	for i := 1; i <= num; i++ {
-		spTmp, errstr := NewSpider(code, info["script"])
+		spTmp, errstr := NewSpiderForThread(code, info["script"])
 		if errstr == "" && spTmp != nil { //脚本加载成功
 			spTmp.UserName = info["createuser"]
 			spTmp.UserEmail = info["createuseremail"]
@@ -1086,7 +1089,7 @@ func NewSpiderByScript(num int, code string, info map[string]string, spChan chan
 
 //打印线程数
 func AllThreadLog() {
-	logger.Info("当前三级页采集线程总数:", AllThreadNum)
+	logger.Info("Detail Download All Thread:", AllThreadNum)
 	time.AfterFunc(1*time.Minute, AllThreadLog)
 }