maxiaoshan 1 年之前
父节点
当前提交
81cd19767a
共有 7 个文件被更改,包括 106 次插入5 次删除
  1. 3 2
      src/config.json
  2. 1 0
      src/main.go
  3. 1 1
      src/res/util/comm.lua
  4. 22 0
      src/spider/download.go
  5. 1 1
      src/spider/handler.go
  6. 61 0
      src/spider/msgservice.go
  7. 17 1
      src/spider/script.go

+ 3 - 2
src/config.json

@@ -1,9 +1,9 @@
 {
     "webport": "7400",
-    "mongodb_spider": "192.168.3.71:29099",
+    "mongodb_spider": "192.168.3.166:27082",
     "spider_dbsize": 50,
     "bideditor": {
-        "addr": "192.168.3.71:29099",
+        "addr": "192.168.3.166:27082",
         "db": "editor",
         "size": 5,
         "username": "",
@@ -13,6 +13,7 @@
     "msgname": "爬虫采集平台7100",
     "msgserveraddr": "spdata.jianyu360.com:801",
     "msgserveraddrfile": "spdata.jianyu360.com:802",
+    "msgserveraddrchromedp": "spdata.jianyu360.com:807",
 	"isdelay":false,
     "working": 0,
     "chansize": 4,

+ 1 - 0
src/main.go

@@ -49,6 +49,7 @@ func init() {
 	//启动消息服务
 	spider.InitMsgClient(Config.Msgserveraddr, Config.Msgname)
 	spider.InitMsgClientFile(Config.MsgserveraddrFile, Config.Msgname+"file")
+	spider.InitMsgClientChromedp(Config.MsgserveraddrChromedp, Config.Msgname+"chromedp")
 
 	//初始化网络存储服务
 	//InitWeedcl()

+ 1 - 1
src/res/util/comm.lua

@@ -511,7 +511,7 @@ end
 --确定模块的附件下载方法(获取title与href)
 --tags:模块选择器
 --withend:是否以文件类型为后缀,比如 .doc,true为后缀,false不为后缀
-filetype={"jpg","JPG","bid","pdf","png","PDF","docx","doc","xlsx","xls","zip","rar","swf","DOCX","DOC","PDF","XLSX","XLS","ZIP","RAR","SWF"}	 
+filetype={"jpeg","JPEG","jpg","JPG","bid","pdf","png","PDF","docx","doc","xlsx","xls","zip","rar","swf","DOCX","DOC","PDF","XLSX","XLS","ZIP","RAR","SWF"}
 function common.getFilesLinkByTag(href,tags,content,withend)
 	local dhtml = findOneHtml(tags, content)
 	--dhtml=dhtml.."<a href='/123.doc'>123.doc</a>"

+ 22 - 0
src/spider/download.go

@@ -306,6 +306,28 @@ func NewDownloadFile(downloaderid, url, method string, reqparam, head map[string
 		return nil
 	}
 }
+func DownloadByChrome(code, downloaderid string, chrometask lu.ChromeTask, timeout int64) (result []interface{}) {
+	defer mu.Catch()
+	timeout = timeout * 2
+	msgid := mu.UUID(8)
+	var ret []byte
+	var err error
+	if downloaderid == "" {
+		ret, err = MsclientChromedp.Call("", msgid, mu.SERVICE_DOWNLOAD, mu.SENDTO_TYPE_RAND_RECIVER, chrometask, timeout)
+	} else {
+		if isAvailable(downloaderid) {
+			ret, err = MsclientChromedp.Call(downloaderid, msgid, mu.SERVICE_DOWNLOAD, mu.SENDTO_TYPE_P2P, chrometask, timeout)
+		} else {
+			return
+		}
+	}
+	if err != nil {
+		str := code + "方法DownloadByChrome,err:" + err.Error()
+		logger.Error(str, timeout)
+	}
+	json.Unmarshal(ret, &result)
+	return
+}
 
 //下载点是否可用
 func isAvailable(code string) bool {

+ 1 - 1
src/spider/handler.go

@@ -1415,7 +1415,7 @@ func SaveHeartInfo() {
 		return true
 	})
 	logger.Info("更新心跳个数:", num)
-	time.AfterFunc(20*time.Minute, SaveHeartInfo)
+	time.AfterFunc(1*time.Second, SaveHeartInfo)
 }
 
 //保存7000节点爬虫转增量节点日志

+ 61 - 0
src/spider/msgservice.go

@@ -23,8 +23,10 @@ type DynamicIPMap struct {
 
 var Msclient *mu.Client
 var MsclientFile *mu.Client
+var MsclientChromedp *mu.Client
 var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
 var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
+var AlldownloaderChromedp map[string]DynamicIPMap = make(map[string]DynamicIPMap)
 
 //初始化,启动消息客户端
 func InitMsgClient(serveraddr, name string) {
@@ -56,6 +58,18 @@ func InitMsgClientFile(serveraddr, name string) {
 	go gc4AlldownloaderFile()
 }
 
+//初始化,启动消息客户端chromedp
+func InitMsgClientChromedp(serveraddr, name string) {
+	MsclientChromedp, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
+		MsgServerAddr:   serveraddr,
+		EventHandler:    processeventChromedp,
+		CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
+		ReadBufferSize:  200,
+		WriteBufferSize: 200,
+	})
+	go gc4AlldownloaderChromedp()
+}
+
 //
 func processevent(p *mu.Packet) {
 	defer qu.Catch()
@@ -132,6 +146,43 @@ func processeventFile(p *mu.Packet) {
 	}
 }
 
+func processeventChromedp(p *mu.Packet) {
+	defer qu.Catch()
+	var data []byte
+	switch p.Event {
+	case mu.SERVICE_DOWNLOAD_APPEND_NODE:
+		data = p.GetBusinessData()
+		//log.Println("获取动态地址:", len(data), string(data))
+		for i := 0; i < len(data)/8; i++ {
+			code := string(data[i*8 : (i+1)*8])
+			AlldownloaderChromedp[code] = DynamicIPMap{
+				Code:        code,
+				InvalidTime: time.Now().Unix() + 60*10,
+			}
+		}
+	case mu.SERVICE_DOWNLOAD_DELETE_NODE:
+		data = p.GetBusinessData()
+		//log.Println("删除动态地址:", len(data), string(data))
+		for i := 0; i < len(data)/8; i++ {
+			code := string(data[i*8 : (i+1)*8])
+			delete(AlldownloaderChromedp, code)
+		}
+	case int32(util.Config.Uploadevent):
+		param := map[string]interface{}{}
+		json.Unmarshal(p.GetBusinessData(), &param)
+		ret := map[string]interface{}{}
+		if param["code"] != nil {
+			b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string))
+			ret["b"] = b
+			ret["err"] = err
+		} else {
+			ret["b"] = false
+			ret["err"] = "code或state值不存在"
+		}
+		MsclientChromedp.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret)
+	}
+}
+
 //
 func gc4Alldownloader() {
 	n := time.Now().Unix()
@@ -154,6 +205,16 @@ func gc4AlldownloaderFile() {
 	util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderFile, TimeChan)
 }
 
+func gc4AlldownloaderChromedp() {
+	n := time.Now().Unix()
+	for _, v := range AlldownloaderChromedp {
+		if v.InvalidTime < n {
+			delete(AlldownloaderChromedp, v.Code)
+		}
+	}
+	util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderChromedp, TimeChan)
+}
+
 //获取一个下载点
 func GetOneDownloader() string {
 	if len(Alldownloader) < 1 {

+ 17 - 1
src/spider/script.go

@@ -22,7 +22,6 @@ import (
 	"path"
 
 	qu "qfw/util"
-	_ "qfw/util/redis"
 	"regexp"
 	util "spiderutil"
 	"strconv"
@@ -1148,6 +1147,23 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		S.Push(lua.LString(contentHtml))
 		return 1
 	}))
+	//chromedp下载
+	s.L.SetGlobal("downloadByChrome", s.L.NewFunction(func(S *lua.LState) int {
+		timeout := S.ToInt64(-2)
+		taskStr := S.ToString(-1)
+		chromeActions := []util.ChromeActions{}
+		if json.Unmarshal([]byte(taskStr), &chromeActions) == nil {
+			chromeTask := util.ChromeTask{
+				TimeOut: timeout,
+				Actions: chromeActions,
+			}
+			ret := DownloadByChrome(s.SCode, s.Downloader, chromeTask, s.Timeout)
+			S.Push(util.MapToTable(S, ret))
+		} else {
+			S.Push(S.NewTable())
+		}
+		return 1
+	}))
 	return ""
 }
 func dealHref(pageListUrl, href string) string {