wangchuanjin 2 ماه پیش
والد
کامیت
8ccce8cb84
8فایلهای تغییر یافته به همراه274 افزوده شده و 178 حذف شده
  1. 0 69
      config.json
  2. 69 0
      config.yaml
  3. 21 1
      config/config.go
  4. 20 21
      main.go
  5. 8 2
      oss/http.go
  6. 56 11
      oss/oss.go
  7. 0 74
      util/heartbeat.go
  8. 100 0
      util/warn.go

+ 0 - 69
config.json

@@ -1,69 +0,0 @@
-{
-  "port": ":8011",
-  "oss_accounts": [
-    {
-      "id": "main",
-      "endpoint": "oss-cn-beijing.aliyuncs.com",
-      "access_key_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
-      "access_key_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
-    }
-  ],
-  "buckets": [
-    {
-      "bucket_id": "detail",
-      "account_id": "main",
-      "bucket_name": "jytest2022"
-    }
-  ],
-  "node": {
-    "node_name": "node1"
-  },
-  "redis": {
-    "address": "172.20.45.129:1712",
-    "password": ""
-  },
-  "elasticSearch": {
-    "address": "http://172.20.45.129:9206,http://172.20.45.130:9306",
-    "size": 10,
-    "version": "v7",
-    "userName": "",
-    "password": "",
-    "indexName": "bidding"
-  },
-  "email": {
-    "mails": [
-      {
-        "addr": "smtp.exmail.qq.com",
-        "port": 465,
-        "pwd": "ue9Rg9Sf4CVtdm5a",
-        "user": "public03@topnet.net.cn",
-        "mailPoolSize": 5,
-        "mailReTry": 1
-      },
-      {
-        "addr": "smtp.exmail.qq.com",
-        "port": 465,
-        "pwd": "Mu^$i21673",
-        "user": "public04@topnet.net.cn",
-        "mailPoolSize": 4,
-        "mailReTry": 1
-      }
-    ],
-    "title": "oos代理服务告警",
-    "recipients": [
-      "wangchuanjin@topnet.net.cn"
-    ]
-  },
-  "weixin": {
-    "webhook_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=295f5f6a-f58c-4765-8bae-9bd718c566a5"
-  },
-  "warnMaxNodeNum": 1,
-  "warnInterval": 600,
-  "logger": {
-    "filename": "./logs/oss.log",
-    "maxSize": 1024,
-    "maxBackups": 3,
-    "maxAge": 3,
-    "compress": true
-  }
-}

+ 69 - 0
config.yaml

@@ -0,0 +1,69 @@
+port: ":8011"
+oss_accounts:
+  - id: "main"
+    endpoint: "oss-cn-beijing.aliyuncs.com"
+    access_key_id: "LTAI4G5x9aoZx8dDamQ7vfZi"
+    access_key_secret: "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
+buckets:
+  - bucket_id: "detail"
+    account_id": "main"
+    bucket_name: "jytest2022"
+node:
+  node_name: "node1"
+redis:
+  address: "172.20.45.129:1712"
+  password: ""
+elasticSearch:
+  address: "http://172.20.45.129:9206,http://172.20.45.130:9306"
+  size: 10
+  version: "v7"
+  userName: ""
+  password: ""
+  indexName: "bidding"
+mongodb:
+  address: "172.20.45.128:27080"
+  size: 5
+  dbName: "qfw"
+  replSet: ""
+  userName: "JS2Z_Rbid_ProG"
+  password: "JS2z@S1e3aR5Ch"
+  collection: "bidding"
+email:
+  mails:
+    - addr: "smtp.exmail.qq.com"
+      port: 465
+      pwd: "ue9Rg9Sf4CVtdm5a"
+      user: "public03@topnet.net.cn"
+      mailPoolSize: 5
+      mailReTry: 1
+    - addr: "smtp.exmail.qq.com"
+      port: 465
+      pwd: "Mu^$i21673"
+      user: "public04@topnet.net.cn"
+      mailPoolSize: 4
+      mailReTry: 1
+  title: "oos代理服务告警"
+  recipients:
+    - "wangchuanjin@topnet.net.cn"
+weixin:
+  webhook_url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=295f5f6a-f58c-4765-8bae-9bd718c566a5"
+warnMaxNodeNum: 1
+warnInterval: 600
+logger:
+  filename: "./logs/oss.log"
+  maxSize: 1024
+  maxBackups: 3
+  maxAge: 3
+  compress: true
+downLoadPoolSize: 200
+getDetailFromEsPoolSize: 100
+getDetailFromMgoPoolSize: 100
+onlineNodesWarn: "在线节点数少于%d,当前在线节点数:%d"
+downLoadQueueWarn: "下载并发数%d,排队数量超过%d,请检查!"
+getDetailQueueWarn: "获取正文oss并发数%d,es并发数%d,mgo并发数%d,排队数量超过%d,请检查!"
+downLoadLineUpWarnSize: 10
+getDetailLineUpWarnSize: 10
+getDetailOrder:  #mgo es oss
+  - es
+  - mgo
+  - oss

+ 21 - 1
config/config.go

@@ -3,6 +3,8 @@ package config
 import (
 	"app.yhyue.com/moapp/jybase/es"
 	"app.yhyue.com/moapp/jybase/go-xweb/log"
+	. "app.yhyue.com/moapp/jybase/mongodb"
+	"github.com/go-redis/redis/v8"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gctx"
 )
@@ -31,10 +33,28 @@ type PushMail struct {
 	MailReTry    int    `json:"mailReTry"`
 }
 
+var Rdb *redis.Client
+var Mgo *MongodbSim
+
 // LoadConfig 从指定的配置文件中加载配置
 func init() {
-	log.Println("开始初始化elasticSearch。。。")
+	log.Println("开始初始化。。。")
 	ctx := gctx.New()
 	es.NewEs(g.Config().MustGet(ctx, "elasticSearch.version").String(), g.Config().MustGet(ctx, "elasticSearch.address").String(), g.Config().MustGet(ctx, "elasticSearch.size").Int(), g.Config().MustGet(ctx, "elasticSearch.userName").String(), g.Config().MustGet(ctx, "elasticSearch.password").String())
 	log.Println("初始化elasticSearch结束。。。")
+	Rdb = redis.NewClient(&redis.Options{
+		Addr:     g.Config().MustGet(ctx, "redis.address").String(),
+		Password: g.Config().MustGet(ctx, "redis.password").String(),
+		DB:       0,
+	})
+	log.Println("初始化redis结束。。。")
+	Mgo = &MongodbSim{
+		MongodbAddr: g.Config().MustGet(ctx, "mongodb.address").String(),
+		Size:        g.Config().MustGet(ctx, "mongodb.size").Int(),
+		DbName:      g.Config().MustGet(ctx, "mongodb.dbName").String(),
+		UserName:    g.Config().MustGet(ctx, "mongodb.userName").String(),
+		Password:    g.Config().MustGet(ctx, "mongodb.password").String(),
+	}
+	Mgo.InitPool()
+	log.Println("初始化mongodb结束。。。")
 }

+ 20 - 21
main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"app.yhyue.com/moapp/jybase/endless"
-	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/gogf/gf/v2/os/gfsnotify"
@@ -24,37 +23,37 @@ func main() {
 	// 初始化OSS帐号与bucket信息
 	ossService.LoadOSSAccounts()
 	// 注册一个回调函数,当配置发生变更时会被调用
+	downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int()
+	getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
+	getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
 	gfsnotify.Add("./config.json", func(event *gfsnotify.Event) {
 		log.Println(event.String())
 		if event.IsWrite() || event.IsChmod() || event.IsRename() {
 			log.Println("配置文件有变化,更新内存。。。")
 			ossService.LoadOSSAccounts()
+			if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize {
+				downLoadPoolSize = poolSize
+				ossService.DownLoadPool = make(chan bool, downLoadPoolSize)
+			}
+			if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
+				getDetailFromEsPoolSize = poolSize
+				ossService.GetDetailFromEsPool = make(chan bool, getDetailFromEsPoolSize)
+			}
+			if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int(); getDetailFromMgoPoolSize != poolSize {
+				getDetailFromMgoPoolSize = poolSize
+				ossService.GetDetailFromMgoPool = make(chan bool, getDetailFromMgoPoolSize)
+			}
 		}
 	})
-	// 初始化Redis(用于节点心跳检测)
-	util.InitRedis()
-
 	// 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
 	go func() {
 		ticker := time.NewTicker(5 * time.Second)
-		var prevWarn int64
+		var onlineNodesPrevWarn, downLoadQueuePrevWarn int64
+		ctx := gctx.New()
 		for range ticker.C {
-			util.SendHeartbeat()
-			nodes, err := util.CheckOnlineNodes()
-			if err != nil {
-				log.Println("Heartbeat check error", err)
-				continue
-			}
-			warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int()
-			if len(nodes) < warnMaxNodeNum {
-				alertMsg := fmt.Sprintf("在线节点数少于%d: 当前在线节点数:%d", warnMaxNodeNum, len(nodes))
-				log.Println(alertMsg)
-				if nowUnix := time.Now().Unix(); nowUnix-prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
-					prevWarn = nowUnix
-					util.SendWeixinNotification(alertMsg)
-					util.SendEmailNotification(alertMsg)
-				}
-			}
+			util.SendHeartbeat(ctx)
+			util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
+			util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn)
 		}
 	}()
 	// 启动RPC服务:注册OSSService,实现接口调用

+ 8 - 2
oss/http.go

@@ -1,7 +1,9 @@
 package oss
 
 import (
+	"encoding/json"
 	"fmt"
+	"github.com/gogf/gf/v2/os/gctx"
 	"io"
 	"log"
 	"net/http"
@@ -107,14 +109,18 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
 
 // NodesHandler 接口用于查看当前在线节点信息(依赖heartbeat中redis心跳)
 func NodesHandler(w http.ResponseWriter, r *http.Request) {
-	nodes, err := util.GetOnlineNodesJSON() // 此函数在 heartbeat 模块中实现
+	nodes, err := util.GetOnlineNodes(gctx.New()) // 此函数在 heartbeat 模块中实现
+	var b []byte
+	if err == nil {
+		b, err = json.Marshal(nodes)
+	}
 	if err != nil {
 		log.Println("Fetch nodes failed: %v", err)
 		http.Error(w, "Fetch nodes failed: "+err.Error(), http.StatusInternalServerError)
 		return
 	}
 	w.Header().Set("Content-Type", "application/json")
-	w.Write(nodes)
+	w.Write(b)
 }
 
 /* 根据标讯id获取正文

+ 56 - 11
oss/oss.go

@@ -11,14 +11,55 @@ import (
 	"log"
 	"net/http"
 	"sync"
+	"sync/atomic"
 
 	"app.yhyue.com/moapp/jybase/es"
 	ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"jygit.jydev.jianyu360.cn/BaseService/ossService/config"
+	"jygit.jydev.jianyu360.cn/BaseService/ossService/util"
 )
 
-// accountMap 用来缓存OSS帐号信息
-var accountMap sync.Map
+var (
+	// accountMap 用来缓存OSS帐号信息
+	accountMap           sync.Map
+	DownLoadPool         = make(chan bool, g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int())
+	GetDetailFromEsPool  = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
+	GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
+	getDetail            = map[string]func(bucketID, objectName string) []byte{
+		"oss": func(bucketID, objectName string) []byte {
+			b, _, e := DownloadAttachment(bucketID, objectName)
+			if e == nil && b != nil && len(b) > 0 {
+				return b
+			}
+			return nil
+		},
+		"es": func(bucketID, objectName string) []byte {
+			GetDetailFromEsPool <- true
+			defer func() {
+				<-GetDetailFromEsPool
+			}()
+			indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
+			list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
+			if list != nil && len(*list) > 0 {
+				detail, _ := (*list)[0]["detail"].(string)
+				return []byte(detail)
+			}
+			return nil
+		},
+		"mgo": func(bucketID, objectName string) []byte {
+			GetDetailFromMgoPool <- true
+			defer func() {
+				<-GetDetailFromMgoPool
+			}()
+			data, _ := config.Mgo.FindOneByField(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
+			if data != nil && len(*data) > 0 {
+				detail, _ := (*data)["detail"].(string)
+				return []byte(detail)
+			}
+			return nil
+		},
+	}
+)
 
 // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
 func GetBucket(bucketID string) (config.BucketInfo, error) {
@@ -106,6 +147,12 @@ func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled b
 
 // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
 func DownloadAttachment(bucketID, objectName string) ([]byte, http.Header, error) {
+	atomic.AddInt64(&util.DownLoadCounter, 1)
+	DownLoadPool <- true
+	defer func() {
+		<-DownLoadPool
+		atomic.AddInt64(&util.DownLoadCounter, -1)
+	}()
 	bucket, err := GetCachedBucket(bucketID)
 	if err != nil {
 		return nil, nil, err
@@ -159,15 +206,13 @@ func LoadOSSAccounts() {
 
 // 获取标讯正文,优先从oss中取,再从es中取
 func GetBidDetail(bucketID, objectName string) []byte {
-	b, _, e := DownloadAttachment(bucketID, objectName)
-	if e == nil && b != nil && len(b) > 0 {
-		return b
-	}
-	indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
-	list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
-	if list != nil && len(*list) > 0 {
-		detail, _ := (*list)[0]["detail"].(string)
-		return []byte(detail)
+	atomic.AddInt64(&util.GetDetailCounter, 1)
+	defer atomic.AddInt64(&util.GetDetailCounter, -1)
+	for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() {
+		detail := getDetail[v](bucketID, objectName)
+		if detail != nil && len(detail) > 0 {
+			return detail
+		}
 	}
 	return nil
 }

+ 0 - 74
util/heartbeat.go

@@ -1,74 +0,0 @@
-package util
-
-import (
-	"context"
-	"encoding/json"
-	"github.com/go-redis/redis/v8"
-	"github.com/gogf/gf/v2/frame/g"
-	"github.com/gogf/gf/v2/os/gctx"
-	"log"
-	"time"
-)
-
-var ctx = context.Background()
-var rdb *redis.Client
-
-// InitRedis 初始化redis客户端
-func InitRedis() {
-	ctx := gctx.New()
-	rdb = redis.NewClient(&redis.Options{
-		Addr:     g.Config().MustGet(ctx, "redis.address").String(),
-		Password: g.Config().MustGet(ctx, "redis.password").String(),
-		DB:       0,
-	})
-}
-
-// NodeInfo 描述节点心跳信息
-type NodeInfo struct {
-	NodeName  string `json:"node_name"`
-	Timestamp int64  `json:"timestamp"`
-}
-
-// SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒
-func SendHeartbeat() {
-	nodeName := g.Config().MustGet(ctx, "node.node_name").String()
-	node := NodeInfo{
-		NodeName:  nodeName,
-		Timestamp: time.Now().Unix(),
-	}
-	data, err := json.Marshal(node)
-	if err != nil {
-		log.Println("Heartbeat marshal error", err)
-		return
-	}
-	// key 为 "node:<nodeName>"
-	rdb.Set(ctx, "node:"+nodeName, data, 10*time.Second)
-}
-
-// CheckOnlineNodes 检查在线节点数
-func CheckOnlineNodes() ([]NodeInfo, error) {
-	keys, err := rdb.Keys(ctx, "node:*").Result()
-	if err != nil {
-		return nil, err
-	}
-	var nodes []NodeInfo
-	for _, key := range keys {
-		data, err := rdb.Get(ctx, key).Result()
-		if err != nil {
-			continue
-		}
-		var node NodeInfo
-		json.Unmarshal([]byte(data), &node)
-		nodes = append(nodes, node)
-	}
-	return nodes, nil
-}
-
-// GetOnlineNodesJSON 返回在线节点信息的JSON格式(供HTTP接口调用)
-func GetOnlineNodesJSON() ([]byte, error) {
-	nodes, err := CheckOnlineNodes()
-	if err != nil {
-		return nil, err
-	}
-	return json.Marshal(nodes)
-}

+ 100 - 0
util/warn.go

@@ -0,0 +1,100 @@
+package util
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
+	"log"
+	"sync/atomic"
+	"time"
+)
+
+var DownLoadCounter int64
+var GetDetailCounter int64
+
+const OssService = "ossService"
+
+// NodeInfo 描述节点心跳信息
+type NodeInfo struct {
+	NodeName  string `json:"node_name"`
+	Timestamp int64  `json:"timestamp"`
+}
+
+// SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒
+func SendHeartbeat(ctx context.Context) {
+	nodeName := g.Config().MustGet(ctx, "node.node_name").String()
+	node := NodeInfo{
+		NodeName:  nodeName,
+		Timestamp: time.Now().Unix(),
+	}
+	data, err := json.Marshal(node)
+	if err != nil {
+		log.Println("Heartbeat marshal error", err)
+		return
+	}
+	Rdb.Set(ctx, OssService+":"+nodeName, data, 10*time.Second)
+}
+
+// CheckOnlineNodes 检查在线节点数
+func CheckOnlineNodes(ctx context.Context, prevWarn *int64) {
+	nodes, _ := GetOnlineNodes(ctx)
+	warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int()
+	if len(nodes) < warnMaxNodeNum {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "onlineNodesWarn").String(), warnMaxNodeNum, len(nodes))
+		log.Println(alertMsg)
+		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
+			*prevWarn = nowUnix
+			SendWeixinNotification(alertMsg)
+			SendEmailNotification(alertMsg)
+		}
+	}
+}
+
+func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) {
+	keys, err := Rdb.Keys(ctx, OssService+":*").Result()
+	var nodes []NodeInfo
+	if err != nil {
+		log.Println("GetOnlineNodes keys error", err)
+		return nodes, err
+	} else {
+		for _, key := range keys {
+			data, err := Rdb.Get(ctx, key).Result()
+			if err != nil {
+				log.Println("GetOnlineNodes key error", err)
+				continue
+			}
+			var node NodeInfo
+			json.Unmarshal([]byte(data), &node)
+			nodes = append(nodes, node)
+		}
+	}
+	return nodes, nil
+}
+
+func CheckDownLoadQueue(ctx context.Context, prevWarn *int64) {
+	warnSize := g.Config().MustGet(ctx, "downLoadLineUpWarnSize").Int64()
+	if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downLoadQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), warnSize)
+		log.Println(alertMsg)
+		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
+			*prevWarn = nowUnix
+			SendWeixinNotification(alertMsg)
+			SendEmailNotification(alertMsg)
+		}
+	}
+}
+
+func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) {
+	warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64()
+	if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
+		log.Println(alertMsg)
+		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
+			*prevWarn = nowUnix
+			SendWeixinNotification(alertMsg)
+			SendEmailNotification(alertMsg)
+		}
+	}
+}