Răsfoiți Sursa

并发处理nats消息

mxs 1 an în urmă
părinte
comite
4c31481211
5 a modificat fișierele cu 89 adăugiri și 46 ștergeri
  1. 35 1
      client_test.go
  2. 2 1
      config.json
  3. 21 0
      file.go
  4. 26 41
      main.go
  5. 5 3
      nats.go

+ 35 - 1
client_test.go

@@ -22,7 +22,7 @@ func Test_NatsClient(t *testing.T) {
 		Data:     *tmp,   //数据内容
 	}
 	msgByte, _ := bson.Marshal(msg)
-	resp, err := Jnats.PubReqZip(Subscribe, msgByte, 10*time.Second)
+	resp, err := Jnats.PubReqZip(Subscribe, msgByte, 100*time.Second)
 	if err != nil {
 		fmt.Println("发布回执异常:", err)
 		return
@@ -35,3 +35,37 @@ func Test_NatsClient(t *testing.T) {
 		fmt.Println("解析数据失败")
 	}
 }
+
+// 并发流程测试
+func Test_NatsClients(t *testing.T) {
+	mgo := mongodb.MongodbSim{
+		MongodbAddr: "192.168.3.166:27082",
+		DbName:      "qfw",
+		Size:        1,
+	}
+	mgo.InitPool()
+	list, _ := mgo.Find("bidding", nil, nil, nil, false, 0, 10)
+	for i, l := range *list {
+		go func(index int, tmp map[string]interface{}) {
+			msg := &MsgInfo{
+				Id:       fmt.Sprint(index), //消息唯一id
+				CurrSetp: "test",            //当前步骤
+				Data:     tmp,               //数据内容
+			}
+			fmt.Println("发布:", index)
+			msgByte, _ := bson.Marshal(msg)
+			resp, err := Jnats.PubReqZip(Subscribe, msgByte, 10*time.Second)
+			if err != nil {
+				fmt.Println("发布回执异常:", err)
+				return
+			}
+			respMsg := &MsgInfo{}
+			if bson.Unmarshal(resp.Data, &respMsg) == nil {
+				fmt.Println("回应:", index, respMsg.Id)
+			} else {
+				fmt.Println("解析数据失败")
+			}
+		}(i, l)
+	}
+	select {}
+}

+ 2 - 1
config.json

@@ -1,7 +1,8 @@
 {
 	"webport": "8010",
 	"natsurl": "192.168.3.240:19090",
-	"subscribe": "dataprocess.file",
+	"natsthreads": 11,
+	"subscribe": "file",
 	"api": "http://172.17.145.179:19281/_send/_mail",
 	"to": "maxiaoshan@topnet.net.cn,zhangjinkun@topnet.net.cn",
 	"osssite": {

+ 21 - 0
file.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
+	su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
 	"os"
 	"regexp"
 	"strconv"
@@ -23,6 +24,26 @@ var (
 	SpecialTextReg = regexp.MustCompile("(原网页|见附件|下载附件|(查看|访问)(源网|原网)|详情请下载附件!|详情请访问原网页!)")
 )
 
+func DealFile(tmp map[string]interface{}) {
+	site := cu.ObjToString(tmp["site"])              //解析附件站点
+	if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
+		replace, filetext := AnalysisFile(true, limitRatio, tmp)
+		if replace { //替换正文
+			tmp["detail"] = filetext
+		}
+	} else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
+		//判断detail是否有效
+		detail := cu.ObjToString(tmp["detail"])
+		detail = su.FilterDetail(detail) //只保留文本内容
+		if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
+			replace, filetext := AnalysisFile(false, 0, tmp)
+			if replace { //替换正文
+				tmp["detail"] = filetext
+			}
+		}
+	}
+}
+
 func InitFileInfo() {
 	OssSite = map[string]float64{}
 	TimesLimit = cu.IntAll(Config["timeslimit"])

+ 26 - 41
main.go

@@ -6,7 +6,6 @@ import (
 	"go.mongodb.org/mongo-driver/bson"
 	cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
 	iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
-	su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
 	"log"
 	"net/http"
 	"time"
@@ -39,47 +38,33 @@ func main() {
 func SubscribeNats() {
 	//先消费,带压缩
 	Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
-		data := &MsgInfo{}
-		err := bson.Unmarshal(msg.Data, &data)
-		if err != nil {
-			log.Println("解析数据失败:", err)
-			data.Err = err.Error()
-			//SaveData()//保存异常数据
-		} else {
-			//处理数据
-			data.Stime = time.Now().Unix()
-			data.CurrSetp = Subscribe
-			DealFile(data.Data)
-			data.Etime = time.Now().Unix()
-		}
-		//消息回写
-		bs, _ := bson.Marshal(data)
-		err = msg.Respond(bs)
-		if err != nil {
-			fmt.Println("回执失败:", data.Id)
-			//SaveData()//保存异常数据
-		}
-	})
-}
-
-func DealFile(tmp map[string]interface{}) {
-	site := cu.ObjToString(tmp["site"])              //解析附件站点
-	if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
-		replace, filetext := AnalysisFile(true, limitRatio, tmp)
-		if replace { //替换正文
-			tmp["detail"] = filetext
-		}
-	} else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
-		//判断detail是否有效
-		detail := cu.ObjToString(tmp["detail"])
-		detail = su.FilterDetail(detail) //只保留文本内容
-		if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
-			replace, filetext := AnalysisFile(false, 0, tmp)
-			if replace { //替换正文
-				tmp["detail"] = filetext
+		NatsThreads <- true
+		go func(msg *nats.Msg) {
+			defer func() {
+				<-NatsThreads
+			}()
+			data := &MsgInfo{}
+			err := bson.Unmarshal(msg.Data, &data)
+			if err != nil {
+				log.Println("解析数据失败:", err)
+				data.Err = err.Error()
+				//SaveData()//保存异常数据
+			} else {
+				//处理数据
+				data.Stime = time.Now().Unix()
+				data.CurrSetp = Subscribe
+				DealFile(data.Data)
+				data.Etime = time.Now().Unix()
 			}
-		}
-	}
+			//消息回写
+			bs, _ := bson.Marshal(data)
+			err = msg.Respond(bs)
+			if err != nil {
+				fmt.Println("回执失败:", data.Id, data.Data["_id"])
+				//SaveData()//保存异常数据
+			}
+		}(msg)
+	})
 }
 
 /*func SendMail(bodyTextAll string) {

+ 5 - 3
nats.go

@@ -6,9 +6,10 @@ import (
 )
 
 var (
-	NatsUrl   string
-	Jnats     *jnats.Jnats
-	Subscribe string
+	NatsUrl     string
+	Jnats       *jnats.Jnats
+	Subscribe   string
+	NatsThreads chan bool
 )
 
 type MsgInfo struct {
@@ -42,6 +43,7 @@ type MsgInfo struct {
 // InitNats 初始化nats
 func InitNats() {
 	NatsUrl = cu.ObjToString(Config["natsurl"])
+	NatsThreads = make(chan bool, cu.IntAllDef(Config["natsthreads"], 10))
 	Jnats = jnats.NewJnats(NatsUrl)
 	Subscribe = cu.ObjToString(Config["subscribe"])
 }