Преглед изворни кода

bug处理,附件内容获取修改从oss获取

Jianghan пре 3 година
родитељ
комит
5903937d0e
6 измењених фајлова са 108 додато и 18 уклоњено
  1. 3 1
      jy_publishing/config.json
  2. 3 0
      jy_publishing/go.mod
  3. 6 0
      jy_publishing/go.sum
  4. 7 6
      jy_publishing/main.go
  5. 67 0
      jy_publishing/ossclient.go
  6. 22 11
      jy_publishing/task.go

+ 3 - 1
jy_publishing/config.json

@@ -17,6 +17,7 @@
   "es": {
     "addr": "http://192.168.3.206:9800",
     "index": "bidding",
+    "index_all": "biddingall",
     "itype": "bidding",
     "pool": 3
   },
@@ -31,7 +32,8 @@
     "concurrent": 1
   },
   "nsq_attachment": {
-    "addr": "192.168.3.13:4150",
+    "addr_p": "192.168.3.13:4150",
+    "addr_c": "192.168.3.13:4150",
     "topic": "attachment",
     "topic-result": "attachment-txt",
     "channel": "data-processing",

+ 3 - 0
jy_publishing/go.mod

@@ -5,8 +5,11 @@ go 1.16
 require (
 	app.yhyue.com/BP/servicerd v0.0.0-20201203055056-87643512f867
 	github.com/Chain-Zhang/pinyin v0.1.3
+	github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible
+	github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
 	github.com/golang/protobuf v1.5.2
 	github.com/nsqio/go-nsq v1.1.0
+	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/zeromicro/go-zero v1.3.2
 	google.golang.org/grpc v1.44.0
 	google.golang.org/protobuf v1.27.1

+ 6 - 0
jy_publishing/go.sum

@@ -67,6 +67,8 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
 github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
 github.com/alicebob/miniredis/v2 v2.17.0 h1:EwLdrIS50uczw71Jc7iVSxZluTKj5nfSP8n7ARRnJy0=
 github.com/alicebob/miniredis/v2 v2.17.0/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
+github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM=
+github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
 github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
 github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
 github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
@@ -74,6 +76,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
 github.com/aws/aws-sdk-go v1.29.15 h1:0ms/213murpsujhsnxnNKNeVouW60aJqSd992Ks3mxs=
 github.com/aws/aws-sdk-go v1.29.15/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg=
+github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
+github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
 github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -424,6 +428,8 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
 github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
+github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/shirou/gopsutil v2.20.6+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

+ 7 - 6
jy_publishing/main.go

@@ -60,12 +60,13 @@ func init() {
 		I_size:  util.IntAllDef(es["pool"], 10),
 	}
 	Index = util.ObjToString(es["index"])
-	IndexAll = util.ObjToString(es["bidding_all"])
+	IndexAll = util.ObjToString(es["index_all"])
 	Itype = util.ObjToString(es["itype"])
 	Es.InitElasticSize()
 
 	//加载敏感词文件
 	Ms = ms.NewMegaloscope("./rules.txt")
+	InitOss()
 	initEtcd()
 	initUdp()
 }
@@ -104,7 +105,7 @@ func jyNsqMethod() {
 	MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
 		IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
 		Addr:         util.ObjToString(cof["addr"]),
-		ConnectType:  0, //默认连接nsqd
+		ConnectType:  1, //默认连接nsqd
 		Topic:        util.ObjToString(cof["topic"]),
 		Channel:      util.ObjToString(cof["channel"]),
 		Concurrent:   util.IntAllDef(cof["concurrent"], 1), //并发数
@@ -126,15 +127,15 @@ func jyNsqMethod() {
 func attsNsqMethod() {
 	var err error
 	cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
-	MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr"]), util.ObjToString(cofAtts["topic"]), true)
+	MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
 	if err != nil {
 		Logger.Error(err.Error())
 	}
 	FileTopicResult = util.ObjToString(cofAtts["topic-result"])
 	MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
-		IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
-		Addr:         util.ObjToString(cofAtts["addr"]),
-		ConnectType:  0, //默认连接nsqd
+		IsJsonEncode: true,
+		Addr:         util.ObjToString(cofAtts["addr_c"]),
+		ConnectType:  1, //默认连接nsqd
 		Topic:        FileTopicResult,
 		Channel:      util.ObjToString(cofAtts["channel"]),
 		Concurrent:   util.IntAllDef(cofAtts["concurrent"], 1), //并发数

+ 67 - 0
jy_publishing/ossclient.go

@@ -0,0 +1,67 @@
+// ossclient
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"utils"
+
+	"github.com/aliyun/aliyun-oss-go-sdk/oss"
+)
+
+var (
+	ossEndpoint        = "oss-cn-beijing-internal.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
+	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"
+	ossAccessKeySecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
+	ossBucketName      = "topjy"
+	ossclient          *oss.Client
+)
+
+func InitOss() {
+	client, err := oss.New(ossEndpoint, ossAccessKeyId, ossAccessKeySecret)
+	if err != nil {
+		fmt.Println("Error:", err)
+		os.Exit(-1)
+	}
+	ossclient = client
+}
+
+func OssGetObject(objectName string) string {
+	util.Catch()
+	// 获取存储空间。
+	bucket, err := ossclient.Bucket(ossBucketName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+
+	// 下载文件到流。
+	body, err := bucket.GetObject(objectName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	defer body.Close()
+	data, err := ioutil.ReadAll(body)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	return string(data)
+}
+
+func OssObjExists(fid string) bool {
+	util.Catch()
+	// 获取存储空间。
+	bucket, err := ossclient.Bucket(ossBucketName)
+	if err != nil {
+		fmt.Println("Error:", err)
+	}
+	// 判断文件是否存在。
+	isExist, err := bucket.IsObjectExist(fid)
+	if err != nil {
+		fmt.Println("Error:", err)
+	}
+	return isExist
+}

+ 22 - 11
jy_publishing/task.go

@@ -79,10 +79,10 @@ func Sensitive(info map[string]interface{}) {
 		for _, m := range attsMap {
 			m1 := m.(map[string]interface{})
 			attsArr = append(attsArr, &pb.Request{
-				FileUrl:     util.ObjToString(m1["fid"]),
-				FileName:    util.ObjToString(m1["filename"]),
-				FileType:    util.ObjToString(m1["ftype"]),
-				ReturnType:  1,
+				FileUrl:  util.ObjToString(m1["fid"]),
+				FileName: util.ObjToString(m1["filename"]),
+				FileType: util.ObjToString(m1["ftype"]),
+				//ReturnType:  0,  // 不传
 				ExtractType: 0,
 			})
 		}
@@ -221,18 +221,24 @@ func InfoPub(info map[string]interface{}) {
 				if err := json.Unmarshal([]byte(s), &atts); err != nil {
 					Logger.Error("data Unmarshal Failed:", Field("error", err))
 				}
-				delete(atts, "uid")
-				delete(atts, "ossurl")
-				atts["url"] = "oss"
+				for _, i := range atts {
+					i2 := i.(map[string]interface{})
+					//delete(i2, "uid")
+					delete(i2, "ossurl")
+					i2["url"] = "oss"
+				}
 				saveMap["projectinfo"] = map[string]interface{}{"attachments": atts}
 			}
 		} else if f == "discern_attach" {
 			if s := util.ObjToString(tmp[f]); s != "" {
-				atts := map[string]interface{}{}
-				if err := json.Unmarshal([]byte(s), &atts); err != nil {
+				atts_txt := map[string]interface{}{}
+				if err := json.Unmarshal([]byte(s), &atts_txt); err != nil {
 					Logger.Error("data Unmarshal Failed:", Field("error", err))
 				}
-				saveMap[SaveFields[f]] = atts
+				for k, v := range atts_txt {
+					atts_txt[k] = map[string]interface{}{k: v}
+				}
+				saveMap[SaveFields[f]] = atts_txt
 			}
 		} else {
 			if s := util.ObjToString(tmp[f]); s != "" {
@@ -302,7 +308,8 @@ func taskAtts(obj map[string]interface{}) {
 		text := make(map[string]interface{})
 		at["state"] = r1["errorState"].(string)
 		if r1["errorState"].(string) == "200" {
-			at["sensitive"] = WordsIdentify(r1["textContent"].(string))
+			textContent := OssGetObject(util.ObjToString(r1["textUrl"]))
+			at["sensitive"] = WordsIdentify(textContent)
 			text["file_name"] = r1["fileName"].(string)
 			text["attach_url"] = r1["textUrl"].(string)
 		}
@@ -349,6 +356,10 @@ func JyRpcSensitive(req *jypb.SensitiveRequest) {
 // @Description 信息删除(es、bidding、extract、project)
 // @Author J 2022/4/8 4:37 PM:00
 func DelMethod(res string) {
+	if !bson.IsObjectIdHex(res) {
+		Logger.Error(" bidding del fail, id err" + res)
+		return
+	}
 	q := map[string]interface{}{"_id": mongodb.StringTOBsonId(res)}
 	b := MgoBid.Del(BidColl, q)
 	if !b {