Browse Source

更新chromedp采集;更新bidding改为发布nats消息

mxs 1 year ago
parent
commit
282a4d8056
9 changed files with 288 additions and 49 deletions
  1. 7 3
      src/config.json
  2. 2 2
      src/front/front.go
  3. 8 1
      src/front/lua.go
  4. 29 6
      src/spider/download.go
  5. 54 10
      src/spider/msgservice.go
  6. 95 6
      src/spider/script.go
  7. 19 18
      src/udptask/udptask.go
  8. 4 3
      src/util/config.go
  9. 70 0
      src/util/nats.go

+ 7 - 3
src/config.json

@@ -53,15 +53,20 @@
       "stype": "index"
     }
   ],
+  "nats": {
+    "natsurl": "192.168.3.240:19090",
+    "subscribe": "mgosave"
+  },
   "threadbasenum": 50,
   "threadupperlimit": 10,
   "msgname": "爬虫采集平台7100",
   "msgserveraddr": "spdata.jianyu360.com:801",
   "msgserveraddrfile": "spdata.jianyu360.com:802",
-  "tesseractadd": "http://test.qmx.top:1688",
-  "testdir": "res/test/spider_test.lua",
+  "msgserveraddrchromedp": "spdata.jianyu360.com:806",
   "serveraddress": "127.0.0.1:8030",
   "jsserveraddress":  "127.0.0.1:8031",
+  "tesseractadd": "http://test.qmx.top:1688",
+  "testdir": "res/test/spider_test.lua",
   "word":{
     "keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见)",
     "notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
@@ -109,5 +114,4 @@
       "write": true
     }
   }
-
 }

+ 2 - 2
src/front/front.go

@@ -52,7 +52,7 @@ type Front struct {
 	dataView   xweb.Mapper `xweb:"/front/task/dataview"`   //数据详情
 	dataSend   xweb.Mapper `xweb:"/front/task/datasend"`   //数据推送
 	dataUpdate xweb.Mapper `xweb:"/front/task/dataupdate"` //数据更新推送
-	dataDelete xweb.Mapper `xweb:"/front/task/datadelete"` //数据更新推送
+	dataDelete xweb.Mapper `xweb:"/front/task/datadelete"` //数据更新删除
 
 	//data
 	importData     xweb.Mapper `xweb:"/front/data/importdata"`     //数据导入
@@ -110,7 +110,7 @@ func (f *Front) Login() {
 	}
 }
 
-//获取用户的一级和二级菜单
+// 获取用户的一级和二级菜单
 func GetUserMenu(role string) []map[string]interface{} {
 	list := []map[string]interface{}{}
 	maps := map[string]interface{}{

+ 8 - 1
src/front/lua.go

@@ -356,6 +356,9 @@ func (f *Front) FilterEdit() {
 
 func (f *Front) FilterSave() {
 	defer qu.Catch()
+	//TODO:发送到保存服务且不做判重处理(repeat="bigdetail")
+	f.ServeJson(map[string]interface{}{"msg": "请联系开发人员", "success": false})
+	return
 	id := f.GetString("id")
 	msg := "保存失败"
 	success := false
@@ -400,10 +403,14 @@ func updateBidding(errId, reasons string, data, update map[string]interface{}) (
 		hrefTmp := qu.ObjToString(tmp["href"])
 		spidercodeTmp := qu.ObjToString(tmp["spidercode"])
 		if hrefTmp == href && spidercodeTmp == spidercode {
-			if MgoB.UpdateById("bidding", tmp["_id"], map[string]interface{}{"$set": update}) {
+			if PubNats(tmp["_id"], update) { //nats发布数据
 				TagToSpiderWarnErr(-1, errId, reasons, update) //spider_warn_err日志
 				return true, "修复成功;数据所在表:bidding;数据ID:" + mongodb.BsonIdToSId(tmp["_id"])
 			}
+			//if MgoB.UpdateById("bidding", tmp["_id"], map[string]interface{}{"$set": update}) {
+			//	TagToSpiderWarnErr(-1, errId, reasons, update) //spider_warn_err日志
+			//	return true, "修复成功;数据所在表:bidding;数据ID:" + mongodb.BsonIdToSId(tmp["_id"])
+			//}
 		}
 	}
 	return false, "未匹配到bidding信息"

+ 29 - 6
src/spider/download.go

@@ -1,4 +1,5 @@
-/**
+/*
+*
 GO代码相对简单,
 重点处理下载工具,爬虫启动,监控等。
 逻辑处理交给LUA处理
@@ -26,7 +27,7 @@ func init() {
 	regImg, _ = regexp.Compile(regImgStr)
 }
 
-//下载页面,发送消息,等待下载
+// 下载页面,发送消息,等待下载
 func Download(retLen *int64, downloaderid, url, method string, head map[string]interface{}, encoding string, useproxy, ishttps bool, code string, timeout int64) string {
 	defer mu.Catch()
 	msgid := mu.UUID(8)
@@ -83,7 +84,7 @@ func Download(retLen *int64, downloaderid, url, method string, head map[string]i
 	}
 }
 
-//下载页面,发送消息,等待下载
+// 下载页面,发送消息,等待下载
 func DownloadAdv(retLen *int64, downloaderid, url, method string, reqparam, head map[string]interface{}, mycookie []*http.Cookie, encoding string, useproxy, ishttps bool, code string, timeout int64) (string, []*http.Cookie, map[string]interface{}) {
 	defer mu.Catch()
 	msgid := mu.UUID(8)
@@ -145,7 +146,7 @@ func DownloadAdv(retLen *int64, downloaderid, url, method string, reqparam, head
 	}
 }
 
-//下载附件
+// 下载附件
 func DownloadFile_bak(downloaderid, url, method string, reqparam, head map[string]interface{}, mycookie []*http.Cookie, encoding string, useproxy, ishttps bool, code string, timeout int64) []byte {
 	defer mu.Catch()
 	msgid := mu.UUID(8)
@@ -306,8 +307,30 @@ func NewDownloadFile(downloaderid, url, method string, reqparam, head map[string
 		return nil
 	}
 }
+func DownloadByChrome(code, downloaderid string, chrometask lu.ChromeTask, timeout int64) (result []interface{}) {
+	defer mu.Catch()
+	timeout = timeout * 2
+	msgid := mu.UUID(8)
+	var ret []byte
+	var err error
+	if downloaderid == "" {
+		ret, err = MsclientChromedp.Call("", msgid, mu.SERVICE_DOWNLOAD, mu.SENDTO_TYPE_RAND_RECIVER, chrometask, timeout)
+	} else {
+		if isAvailable(downloaderid) {
+			ret, err = MsclientChromedp.Call(downloaderid, msgid, mu.SERVICE_DOWNLOAD, mu.SENDTO_TYPE_P2P, chrometask, timeout)
+		} else {
+			return
+		}
+	}
+	if err != nil {
+		str := code + "方法DownloadByChrome,err:" + err.Error()
+		logger.Error(str, timeout)
+	}
+	json.Unmarshal(ret, &result)
+	return
+}
 
-//下载点是否可用
+// 下载点是否可用
 func isAvailable(code string) bool {
 	b := false
 	for k, _ := range Alldownloader {
@@ -318,7 +341,7 @@ func isAvailable(code string) bool {
 	return b
 }
 
-//下载点是否可用
+// 下载点是否可用
 func isAvailableFile(code string) bool {
 	b := false
 	for k, _ := range AlldownloaderFile {

+ 54 - 10
src/spider/msgservice.go

@@ -18,10 +18,12 @@ type DynamicIPMap struct {
 
 var Msclient *mu.Client
 var MsclientFile *mu.Client
+var MsclientChromedp *mu.Client
 var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
 var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
+var AlldownloaderChromedp map[string]DynamicIPMap = make(map[string]DynamicIPMap)
 
-//初始化,启动消息客户端
+// 初始化,启动消息客户端
 func InitMsgClient(serveraddr, name string) {
 	Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
 		MsgServerAddr: serveraddr,
@@ -39,7 +41,7 @@ func InitMsgClient(serveraddr, name string) {
 	go gc4Alldownloader()
 }
 
-//初始化,启动消息客户端File
+// 初始化,启动消息客户端File
 func InitMsgClientFile(serveraddr, name string) {
 	MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
 		MsgServerAddr:   serveraddr,
@@ -51,7 +53,18 @@ func InitMsgClientFile(serveraddr, name string) {
 	go gc4AlldownloaderFile()
 }
 
-//
+// 初始化,启动消息客户端chromedp
+func InitMsgClientChromedp(serveraddr, name string) {
+	MsclientChromedp, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
+		MsgServerAddr:   serveraddr,
+		EventHandler:    processeventChromedp,
+		CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
+		ReadBufferSize:  200,
+		WriteBufferSize: 200,
+	})
+	go gc4AlldownloaderChromedp()
+}
+
 func processevent(p *mu.Packet) {
 	defer mu.Catch()
 	var data []byte
@@ -89,7 +102,6 @@ func processevent(p *mu.Packet) {
 	}
 }
 
-//
 func processeventFile(p *mu.Packet) {
 	defer mu.Catch()
 	var data []byte
@@ -127,7 +139,30 @@ func processeventFile(p *mu.Packet) {
 	}
 }
 
-//
+func processeventChromedp(p *mu.Packet) {
+	defer qu.Catch()
+	var data []byte
+	switch p.Event {
+	case mu.SERVICE_DOWNLOAD_APPEND_NODE:
+		data = p.GetBusinessData()
+		//log.Println("获取动态地址:", len(data), string(data))
+		for i := 0; i < len(data)/8; i++ {
+			code := string(data[i*8 : (i+1)*8])
+			AlldownloaderChromedp[code] = DynamicIPMap{
+				Code:        code,
+				InvalidTime: time.Now().Unix() + 60*10,
+			}
+		}
+	case mu.SERVICE_DOWNLOAD_DELETE_NODE:
+		data = p.GetBusinessData()
+		//log.Println("删除动态地址:", len(data), string(data))
+		for i := 0; i < len(data)/8; i++ {
+			code := string(data[i*8 : (i+1)*8])
+			delete(AlldownloaderChromedp, code)
+		}
+	}
+}
+
 func gc4Alldownloader() {
 	n := time.Now().Unix()
 	for _, v := range Alldownloader {
@@ -138,7 +173,6 @@ func gc4Alldownloader() {
 	util.TimeAfterFunc(1*time.Minute, gc4Alldownloader, TimeChan)
 }
 
-//
 func gc4AlldownloaderFile() {
 	n := time.Now().Unix()
 	for _, v := range AlldownloaderFile {
@@ -149,7 +183,7 @@ func gc4AlldownloaderFile() {
 	util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderFile, TimeChan)
 }
 
-//获取一个下载点
+// 获取一个下载点
 func GetOneDownloader() string {
 	if len(Alldownloader) < 1 {
 		return ""
@@ -169,7 +203,7 @@ func GetOneDownloader() string {
 	return retcode
 }
 
-//获取一个下载点
+// 获取一个下载点
 func GetOneDownloaderFile() string {
 	if len(AlldownloaderFile) < 1 {
 		return ""
@@ -188,6 +222,16 @@ func GetOneDownloaderFile() string {
 	return retcode
 }
 
+func gc4AlldownloaderChromedp() {
+	n := time.Now().Unix()
+	for _, v := range AlldownloaderChromedp {
+		if v.InvalidTime < n {
+			delete(AlldownloaderChromedp, v.Code)
+		}
+	}
+	util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderChromedp, TimeChan)
+}
+
 //调用消息保存
 //func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) bool {
 //	bs, _ := json.Marshal(data)
@@ -238,7 +282,7 @@ func GetOneDownloaderFile() string {
 //	return false
 //}
 
-//从微信端获取验证码
+// 从微信端获取验证码
 func GetCodeByWx(img []byte) (string, error) {
 	msgid := mu.UUID(8)
 	ret, err := GetMsgFromWx(msgid, img, true, 300)
@@ -250,7 +294,7 @@ func GetCodeByWx(img []byte) (string, error) {
 	return qu.ObjToString(tmp["content"]), err
 }
 
-//从微信获取验证码消息
+// 从微信获取验证码消息
 func GetMsgFromWx(msgid string, img []byte, falg bool, timeout int64) ([]byte, error) {
 	ret, err := Msclient.Call("", msgid, mu.SERVICE_DISTINGUISH, mu.SENDTO_TYPE_ALL_RECIVER,
 		map[string]interface{}{

+ 95 - 6
src/spider/script.go

@@ -38,7 +38,7 @@ const (
 
 var TimeSleepChan = make(chan bool, 1)
 
-//脚本
+// 脚本
 type Script struct {
 	SCode, ScriptFile string
 	Encoding          string
@@ -57,7 +57,7 @@ type Script struct {
 }
 
 var ErrFid = "a6879f0a8570256aa21fb978e6dabb50429a30dfacff697cf0b898abbc5c262e" //限制访问的附件
-//加载文件
+// 加载文件
 func (s *Script) LoadScript(site, channel, user *string, code, script_file string) string {
 	defer mu.Catch()
 	s.SCode = code
@@ -793,6 +793,32 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		S.Push(lua.LString(result))
 		return 1
 	}))
+	//aes cbc模式加密
+	s.L.SetGlobal("aesEncryptCBC", s.L.NewFunction(func(S *lua.LState) int {
+		origData := S.ToString(-3)
+		key := S.ToString(-2)
+		iv := S.ToString(-1)
+		bytekey := []byte(key)
+		byteorigData := []byte(origData)
+		byteiv := []byte(iv)
+		encrypted := util.AesCBCEncrypt(byteorigData, bytekey, byteiv)
+		// 将加密后的数据和初始向量进行Base64编码
+		result := base64.StdEncoding.EncodeToString(encrypted)
+		S.Push(lua.LString(result))
+		return 1
+	}))
+	//aes cbc模式解密
+	s.L.SetGlobal("aesDecryptCBC", s.L.NewFunction(func(S *lua.LState) int {
+		origData := S.ToString(-3)
+		key := S.ToString(-2)
+		iv := S.ToString(-1)
+		bytekey := []byte(key)
+		byteiv := []byte(iv)
+		data, _ := base64.StdEncoding.DecodeString(origData)
+		result := util.AesCBCDecrypter(data, bytekey, byteiv)
+		S.Push(lua.LString(result))
+		return 1
+	}))
 	//aes ecb模式加密
 	s.L.SetGlobal("aesEncryptECB", s.L.NewFunction(func(S *lua.LState) int {
 		origData := S.ToString(-2)
@@ -831,6 +857,31 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		S.Push(lua.LString(result))
 		return 1
 	}))
+	//des cbc模式加密
+	s.L.SetGlobal("desEncryptCBC", s.L.NewFunction(func(S *lua.LState) int {
+		origData := S.ToString(-3)
+		key := S.ToString(-2)
+		iv := S.ToString(-1)
+		bytekey := []byte(key)
+		byteorigData := []byte(origData)
+		byteiv := []byte(iv)
+		encrypted := util.DesCBCEncrypt(byteorigData, bytekey, byteiv)
+		result := base64.StdEncoding.EncodeToString(encrypted)
+		S.Push(lua.LString(result))
+		return 1
+	}))
+	//des cbc模式解密
+	s.L.SetGlobal("desDecryptCBC", s.L.NewFunction(func(S *lua.LState) int {
+		origData := S.ToString(-3)
+		key := S.ToString(-2)
+		iv := S.ToString(-1)
+		bytekey := []byte(key)
+		byteiv := []byte(iv)
+		data, _ := base64.StdEncoding.DecodeString(origData)
+		result := util.DesCBCDecrypter(data, bytekey, byteiv)
+		S.Push(lua.LString(result))
+		return 1
+	}))
 	//rsa 公钥加密
 	s.L.SetGlobal("rsaEncrypt", s.L.NewFunction(func(S *lua.LState) int {
 		origData := S.ToString(-2)
@@ -938,6 +989,7 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		head := S.ToTable(-2)
 		stype := S.ToString(-3)
 		path := S.ToString(-4)
+		proxy := S.ToBool(-5)
 		headMap := util.GetTable(head)
 		//qu.Debug("cookie----------", cookie)
 		//qu.Debug("headMap----------", headMap)
@@ -946,7 +998,7 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		if err == nil {
 			headJsonStr = string(headByte)
 		}
-		code, respHead, respCookie := codegrpc.GetCodeByPath(path, stype, headJsonStr, cookie)
+		code, respHead, respCookie := codegrpc.GetCodeByPath(path, stype, headJsonStr, cookie, proxy)
 		//qu.Debug("code====", code)
 		//qu.Debug("respHead====", respHead)
 		//qu.Debug("respCookie====", respCookie)
@@ -1012,6 +1064,43 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		S.Push(lua.LString(contentHtml))
 		return 1
 	}))
+	//chromedp下载
+	s.L.SetGlobal("downloadByChrome", s.L.NewFunction(func(S *lua.LState) int {
+		timeout := S.ToInt64(-2)
+		taskStr := S.ToString(-1)
+		cam := util.ChromeActionMap{}
+		if json.Unmarshal([]byte(taskStr), &cam) == nil {
+			if len(cam.BaseActions) > 0 {
+				if len(cam.RangeActions) > 0 && cam.RangeTimes > 0 {
+					for times := 1; times <= cam.RangeTimes; times++ {
+						cam.BaseActions = append(cam.BaseActions, cam.RangeActions...)
+					}
+				}
+				chromeTask := util.ChromeTask{
+					TimeOut: timeout,
+					Actions: cam.BaseActions,
+				}
+				ret := DownloadByChrome(s.SCode, s.Downloader, chromeTask, s.Timeout)
+				S.Push(util.MapToTable(S, ret))
+			} else {
+				S.Push(S.NewTable())
+			}
+		} else {
+			S.Push(S.NewTable())
+		}
+		return 1
+	}))
+	//针对中国招标投标公共服务平台三级页瑞数加密下载方法
+	s.L.SetGlobal("downloadByDataIntercept", s.L.NewFunction(func(S *lua.LState) int {
+		url := S.ToString(-4)
+		url_regex := S.ToString(-3)
+		timeout := S.ToInt(-2)
+		proxy := S.ToBool(-1)
+		headers := util.DownloadByDataIntercept(url, url_regex, timeout, proxy)
+		table := util.MapToLuaTable(S, headers)
+		S.Push(table)
+		return 1
+	}))
 	return ""
 }
 
@@ -1058,7 +1147,7 @@ func getChildrenLen(sq *gq.Selection) (ret int) {
 	return
 }
 
-//unicode转码
+// unicode转码
 func transUnic(str string) string {
 	buf := bytes.NewBuffer(nil)
 	i, j := 0, len(str)
@@ -1086,7 +1175,7 @@ func transUnic(str string) string {
 	return buf.String()
 }
 
-//取得变量
+// 取得变量
 func (s *Script) GetVar(key string) string {
 	return s.L.GetGlobal(key).String()
 }
@@ -1107,7 +1196,7 @@ func (s *Script) GetBoolVar(key string) bool {
 	return false
 }
 
-//设置睡眠时间
+// 设置睡眠时间
 func SleepTime(basetime int, times []time.Duration) {
 	st := 0 //记录最后睡眠时长
 	base := float64(basetime * 60)

+ 19 - 18
src/udptask/udptask.go

@@ -64,7 +64,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-//更新bidding表
+// 更新bidding表
 func UpdateBiddingData(gtid, lteid string) {
 	defer qu.Catch()
 	logger.Info("开始更新bidding数据...")
@@ -93,7 +93,7 @@ func UpdateBiddingData(gtid, lteid string) {
 	ch := make(chan bool, 2)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
-	arr := [][]map[string]interface{}{}
+	//arr := [][]map[string]interface{}{}
 	arrTmp := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		wg.Add(1)
@@ -103,24 +103,25 @@ func UpdateBiddingData(gtid, lteid string) {
 				<-ch
 				wg.Done()
 			}()
-			update := []map[string]interface{}{
-				map[string]interface{}{"_id": tmp["_id"]},
-				map[string]interface{}{
-					"$set": tmp,
-				},
-			}
+			//update := []map[string]interface{}{
+			//	map[string]interface{}{"_id": tmp["_id"]},
+			//	map[string]interface{}{
+			//		"$set": tmp,
+			//	},
+			//}
+			go util.PubNats(tmp["_id"], tmp) //nats发布数据
 			lock.Lock()
-			arr = append(arr, update)                         //线上bidding表
+			//arr = append(arr, update)                         //线上bidding表
 			arrTmp = append(arrTmp, []map[string]interface{}{ //任务表
 				map[string]interface{}{"_id": tmp["_id"]},
 				map[string]interface{}{
 					"$set": map[string]interface{}{"state": 2},
 				},
 			})
-			if len(arr) > 50 {
-				util.MgoB.UpdateBulk("bidding", arr...)
-				arr = [][]map[string]interface{}{}
-			}
+			//if len(arr) > 50 {
+			//	util.MgoB.UpdateBulk("bidding", arr...)
+			//	arr = [][]map[string]interface{}{}
+			//}
 			if len(arrTmp) > 50 {
 				util.MgoDT.UpdateBulk("bidding", arrTmp...)
 				arrTmp = [][]map[string]interface{}{}
@@ -130,17 +131,17 @@ func UpdateBiddingData(gtid, lteid string) {
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
-	if len(arr) > 0 {
-		util.MgoB.UpdateBulk("bidding", arr...)
-		arr = [][]map[string]interface{}{}
-	}
+	//if len(arr) > 0 {
+	//	util.MgoB.UpdateBulk("bidding", arr...)
+	//	arr = [][]map[string]interface{}{}
+	//}
 	if len(arrTmp) > 0 {
 		util.MgoDT.UpdateBulk("bidding", arrTmp...)
 		arrTmp = [][]map[string]interface{}{}
 	}
 }
 
-//发送udp的时候一个key参数
+// 发送udp的时候一个key参数
 func SendUdp(coll, gtid, lteid, stype, udpaddr string, udpport int) {
 	defer qu.Catch()
 	key := gtid + "-" + lteid + "-" + stype

+ 4 - 3
src/util/config.go

@@ -2,7 +2,6 @@ package util
 
 import (
 	codegrpc "analysiscode/client"
-	gojs "gorunjs/client"
 	"mongodb"
 	qu "qfw/util"
 	"spider"
@@ -90,11 +89,11 @@ func InitOther() {
 	//验证码识别client
 	codegrpc.InitCodeGrpcClient()
 	//go执行js服务
-	gojs.InitGoRunJsClient()
+	//gojs.InitGoRunJsClient()
 	//启动消息服务
 	spider.InitMsgClient(Config.Msgserveraddr, Config.Msgname)
 	spider.InitMsgClientFile(Config.MsgserveraddrFile, Config.Msgname+"file")
-
+	spider.InitMsgClientChromedp(Config.MsgserveraddrChromedp, Config.Msgname+"chromedp")
 	//初始化网络存储服务
 	OssInit(
 		qu.ObjToString(Config.OssInfo["ossEndpoint"]),
@@ -102,4 +101,6 @@ func InitOther() {
 		qu.ObjToString(Config.OssInfo["ossAccessKeySecret"]),
 		qu.ObjToString(Config.OssInfo["ossBucketName"]),
 	)
+	//nats
+	InitNats()
 }

+ 70 - 0
src/util/nats.go

@@ -0,0 +1,70 @@
+package util
+
+import (
+	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
+	qu "qfw/util"
+	sputil "spiderutil"
+	"time"
+)
+
+type MsgInfo struct {
+	Id       string                 //消息唯一id
+	CurrSetp string                 //当前步骤
+	NextSetp string                 //下个步骤,特殊流程增加
+	IsEnd    int                    //当前流程后结束 1-结束
+	Data     map[string]interface{} //数据内容
+	Extend   struct {               //有需要按照示例增加
+		Extract struct { //抽取
+
+		}
+		Repeat struct { //判重
+			SId string //原始id
+			RId string //被替换id
+		}
+		MgoSave struct { //mgo保存更新
+			SType string //更新u 保存s
+			Col   string //表
+		}
+		EsSave struct { //es保存更新
+			SType string //更新u 保存s
+			Index string //索引
+		}
+	}
+	Err   string //错误信息 有错误会告警并终止流程
+	Stime int64
+	Etime int64
+}
+
+var (
+	JnatsClient *sputil.Jnats
+)
+
+// InitNats 初始化nats
+func InitNats() {
+	fmt.Println("nats url:", sputil.Config.SpiderNats.NatsUrl)
+	NatsUrl := qu.ObjToString(sputil.Config.SpiderNats.NatsUrl)
+	JnatsClient = sputil.NewJnats(NatsUrl)
+}
+
+func PubNats(id interface{}, data map[string]interface{}) bool {
+	msgFlag := false
+	data["_id"] = id
+	msg := &MsgInfo{
+		Data: data,
+	}
+	msg.Extend.MgoSave.SType = "U"
+	msg.Extend.MgoSave.Col = "bidding"
+	byteMsg, err := bson.Marshal(msg)
+	if err == nil {
+		_, err := JnatsClient.PubReqZip(sputil.Config.SpiderNats.Subscribe, byteMsg, 5*time.Minute) //发布
+		if err != nil {
+			qu.Debug("信息发布失败:", data["_id"], data["title"], err)
+		} else {
+			msgFlag = true
+		}
+	} else {
+		qu.Debug("数据格式化失败:", err)
+	}
+	return msgFlag
+}