Browse Source

Merge branch 'dev/v2.4.21.1_rjj' of jianyu/qmx_admin into hotfix/v2.4.21.1

renjiaojiao 1 year ago
parent
commit
d249e7a340

+ 14 - 0
src/github.com/mongo_util/README.md

@@ -0,0 +1,14 @@
+# 说明
+MongoDb go驱动封装
+
+## 功能介绍
+
+## 使用方法
+
+## 变更说明
+
+## 注意事项
+
+golang.org包的更新
+go mod edit -replace=golang.org/x/sys@v0.0.0=github.com/golang/sys@latest
+go mod edit -replace=golang.org/x/text@v0.0.0=github.com/golang/text@latest

+ 15 - 0
src/github.com/mongo_util/go.mod

@@ -0,0 +1,15 @@
+module app.yhyue.com/BP/mongo_util
+
+
+
+
+
+require go.mongodb.org/mongo-driver v1.3.2
+
+replace golang.org/x/crypto v0.0.0 => github.com/golang/crypto v1.2.3
+
+replace golang.org/x/sys v0.0.0 => github.com/golang/sys v0.0.0-20200420163511-1957bb5e6d1f
+
+replace golang.org/x/text v0.0.0 => github.com/golang/text v0.3.2
+
+go 1.14

+ 83 - 0
src/github.com/mongo_util/mongo_util.go

@@ -0,0 +1,83 @@
+/**
+MongoDb工具包
+Auth:hongbo
+Date:2020-4-21
+*/
+package mongo_util
+
+import (
+	"context"
+	"errors"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"go.mongodb.org/mongo-driver/mongo/readconcern"
+	"go.mongodb.org/mongo-driver/mongo/readpref"
+	"go.mongodb.org/mongo-driver/mongo/writeconcern"
+	"time"
+)
+
+//
+type MongoUtil struct {
+	url    string //MongoDb库链接地址
+	client *mongo.Client
+	opts   *options.ClientOptions
+}
+
+// 创建MongoUtil
+func NewMongoUtil(url string, //mongo 连接url
+	maxConnIdle int64, //最大空闲时间
+	maxPoolSize uint64, //连接池最大数
+	onlyUseSecondarNode bool, //只使用辅助节点
+) (error, *MongoUtil) {
+	var err error
+	var want *readpref.ReadPref
+	if onlyUseSecondarNode {
+		want, err = readpref.New(readpref.SecondaryMode) //表示只使用辅助节点
+	} else {
+		want, err = readpref.New(readpref.PrimaryPreferredMode) //表示首选使用主节点
+	}
+	if err != nil {
+		return err, nil
+	}
+	wc := writeconcern.New(writeconcern.WMajority())
+	readconcern.Majority()
+	//链接mongo服务
+	opt := options.Client().ApplyURI(url)
+	opt.SetLocalThreshold(3 * time.Second)                           //只使用与mongo操作耗时小于3秒的
+	opt.SetMaxConnIdleTime(time.Duration(maxConnIdle) * time.Second) //指定连接可以保持空闲的最大毫秒数
+	opt.SetMaxPoolSize(maxPoolSize)                                  //使用最大的连接数
+	//定义读取数据策略
+	opt.SetReadPreference(want)                //表示只使用辅助节点
+	opt.SetReadConcern(readconcern.Majority()) //指定查询应返回实例的最新数据确认为,已写入副本集中的大多数成员
+	opt.SetWriteConcern(wc)                    //请求确认写操作传播到大多数mongod实
+	return nil, &MongoUtil{opts: opt, url: url}
+}
+
+//从连接池中获取连接
+func (mu *MongoUtil) Client(checkUseableBefore bool) (error, *mongo.Client) {
+	var err error
+	var client *mongo.Client
+	if client, err = mongo.Connect(mu.getContext(10), mu.opts); err != nil {
+		return errors.New("获取连接失败"), nil
+	}
+	if checkUseableBefore {
+		//判断主节点服务是否可用
+		if err = client.Ping(mu.getContext(10), readpref.PrimaryPreferred()); err != nil {
+			return errors.New("ping主节点失败"), nil
+		}
+	}
+	mu.client = client
+	return nil, client
+}
+
+//销毁连接
+func (mu *MongoUtil) Destory(client *mongo.Client) error {
+	err := client.Disconnect(mu.getContext(10))
+	return err
+}
+
+//取得支持超时的上下文
+func (mu *MongoUtil) getContext(timeout uint64) (ctx context.Context) {
+	ctx, _ = context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
+	return
+}

+ 99 - 0
src/github.com/mongo_util/mongo_util_op.go

@@ -0,0 +1,99 @@
+package mongo_util
+
+import (
+	"encoding/json"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"log"
+	"runtime"
+	"strings"
+)
+
+//异常处理,主要是输出日志
+func panicHandler() {
+	if r := recover(); r != nil {
+		log.Println("[E]", r)
+		for skip := 1; ; skip++ {
+			_, file, line, ok := runtime.Caller(skip)
+			if !ok {
+				break
+			}
+			go log.Printf("%v,%v\n", file, line)
+		}
+	}
+}
+
+func ObjToOth(query interface{}) *bson.M {
+	return ObjToMQ(query, false)
+}
+func ObjToM(query interface{}) *bson.M {
+	return ObjToMQ(query, true)
+}
+func ObjToMQ(query interface{}, isQuery bool) *bson.M {
+	defer panicHandler()
+	data := make(bson.M)
+	if s2, ok2 := query.(*map[string]interface{}); ok2 {
+		data = bson.M(*s2)
+	} else if s3, ok3 := query.(*bson.M); ok3 {
+		return s3
+	} else if s, ok := query.(string); ok {
+		json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
+		if ss, oks := data["_id"]; oks && isQuery {
+			switch ss.(type) {
+			case string:
+				data["_id"], _ = primitive.ObjectIDFromHex(ss.(string))
+			case map[string]interface{}:
+				tmp := ss.(map[string]interface{})
+				for k, v := range tmp {
+					tmp[k], _ = primitive.ObjectIDFromHex(v.(string))
+				}
+				data["_id"] = tmp
+			}
+
+		}
+	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
+		data = s1
+	} else if s4, ok4 := query.(bson.M); ok4 {
+		data = s4
+	} else {
+		data = nil
+	}
+	return &data
+}
+
+//数据更新
+func (mu *MongoUtil) Update(db string, c string, query interface{}, set interface{}, multi bool) error {
+	coll := mu.client.Database(db).Collection(c)
+	var err error
+	if multi {
+		_, err = coll.UpdateMany(mu.getContext(10), ObjToM(query), ObjToM(set))
+	} else {
+		_, err = coll.UpdateOne(mu.getContext(10), ObjToM(query), ObjToM(set))
+	}
+	return err
+}
+
+//批量修改
+func (mu *MongoUtil) UpdateBulk(db string, c string, query interface{}, doc ...[]map[string]interface{}) error {
+	//TODO 暂未实现
+	//coll := mu.client.Database(db).Collection(c)
+	//var err error
+	//var operations []mongo.WriteModel
+	//
+	//operation := mongo.NewUpdateOneModel()
+	//operation.Filter=ObjToM(query)
+	return nil
+}
+
+//按ID删除
+func (mu *MongoUtil) DelById(db, c, id string) {
+	defer panicHandler()
+	coll := mu.client.Database(db).Collection(c)
+	_, mongonId := primitive.ObjectIDFromHex(id)
+	coll.FindOneAndDelete(mu.getContext(10), bson.M{"_id": mongonId})
+}
+
+//删除对象
+func Del(db, c string, query interface{}) {
+
+}

+ 41 - 0
src/github.com/mongo_util/mongo_util_test.go

@@ -0,0 +1,41 @@
+package mongo_util
+
+import (
+	"log"
+	"testing"
+)
+
+//池测试
+func Test_Pool(t *testing.T) {
+	_, mu := NewMongoUtil("mongodb://192.168.3.207:27092", 20000, 5, true)
+	log.Println(mu)
+	err, client := mu.Client(true)
+	if err != nil {
+		log.Fatal(err)
+	} else {
+		log.Println(client)
+	}
+	defer mu.Destory(client)
+}
+
+//连接池的性能测试
+func Benchmark_Pool(b *testing.B) {
+	_, mu := NewMongoUtil("mongodb://192.168.3.207:27092", 20000, 5, true)
+	log.Println(mu)
+	for i := 0; i < b.N; i++ {
+		_, client := mu.Client(true)
+		mu.Destory(client)
+	}
+}
+
+//连接池的性能测试
+func Benchmark_Pool_Without_Inittime(b *testing.B) {
+	b.StopTimer()
+	_, mu := NewMongoUtil("mongodb://192.168.3.207:27092", 20000, 5, true)
+	log.Println(mu)
+	b.StartTimer()
+	for i := 0; i < b.N; i++ {
+		_, client := mu.Client(true)
+		mu.Destory(client)
+	}
+}

+ 1 - 1
src/main.go

@@ -78,13 +78,13 @@ func main() {
 		http.HandleFunc(s, outProxy.DoProxy)
 		http.HandleFunc(s, outProxy.DoProxy)
 	}
 	}
 
 
-	go task.MsgAgain()
 	fmt.Println(config.SysConfigs.WebPort)
 	fmt.Println(config.SysConfigs.WebPort)
 	task.InitTask()
 	task.InitTask()
 	cn := cron.New()
 	cn := cron.New()
 	cn.AddFunc(config.SysConfigs.VipStateTask, task.VipStateTask)
 	cn.AddFunc(config.SysConfigs.VipStateTask, task.VipStateTask)
 	cn.Start()
 	cn.Start()
 	order.ShopTask()
 	order.ShopTask()
+	go task.MsgAgain()
 	err := endless.ListenAndServe(config.SysConfigs.WebPort, nil, func() {})
 	err := endless.ListenAndServe(config.SysConfigs.WebPort, nil, func() {})
 	if err != nil {
 	if err != nil {
 		fmt.Println("Listen Server Error", err)
 		fmt.Println("Listen Server Error", err)

+ 19 - 12
src/public/message.go

@@ -3,9 +3,11 @@ package public
 import (
 import (
 	"bytes"
 	"bytes"
 	"config"
 	"config"
+	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo/options"
 	"io"
 	"io"
 	"log"
 	"log"
 	"mongodb"
 	"mongodb"
@@ -74,7 +76,6 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
 		"iosPushUrl":  param.IosUrl,
 		"iosPushUrl":  param.IosUrl,
 	}
 	}
 
 
-	sess := util.MQFW.GetMgoConn()
 	filter := map[string]interface{}{
 	filter := map[string]interface{}{
 		"i_appid": 2,
 		"i_appid": 2,
 	}
 	}
@@ -86,28 +87,34 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
 			filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(qutil.ObjToString(lastId))}
 			filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(qutil.ObjToString(lastId))}
 		}
 		}
 	}
 	}
-	fields := map[string]interface{}{
-		"_id": 1,
-		//"s_name": 1,
-		"s_opushid": 1, "s_jpushid": 1, "s_appponetype": 1, "s_appversion": 1, "o_pushset": 1,
-	}
 	lastUserId := ""
 	lastUserId := ""
 	// mongo  库查询用户
 	// mongo  库查询用户
 	count := util.MQFW.Count("user", filter)
 	count := util.MQFW.Count("user", filter)
 	log.Println("user表用户量", count)
 	log.Println("user表用户量", count)
-	//调用中台次数
-	//a := 0
 	for f := 0; f < 5; f++ {
 	for f := 0; f < 5; f++ {
 		if lastUserId != "" {
 		if lastUserId != "" {
 			filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(lastUserId)}
 			filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(lastUserId)}
 		}
 		}
-		log.Println("查询条件filter:", filter)
-		it := sess.DB("qfw").C("user").Find(filter).Select(fields).Sort(`{"_id":1}`).Iter()
-		for userInfo := make(map[string]interface{}); it.Next(&userInfo); {
+
+		_, client := util.NewMgo.Client(true)
+		defer util.NewMgo.Destory(client)
+		opt := &options.FindOptions{}
+		opt.Projection = bson.M{"_id": 1, "s_opushid": 1, "s_jpushid": 1, "s_appponetype": 1, "s_appversion": 1, "o_pushset": 1}
+		opt.Sort = bson.M{"_id": -1}
+		cursor, _ := client.Database("qfw").Collection("user").Find(context.Background(), filter)
+
+		for cursor.Next(context.Background()) {
+			var userInfo bson.M
+			err := cursor.Decode(&userInfo)
+			if err != nil {
+				log.Println(err)
+				break
+			}
 			if userInfo == nil || len(userInfo) == 0 {
 			if userInfo == nil || len(userInfo) == 0 {
 				continue
 				continue
 			}
 			}
 			userId := mongodb.BsonIdToSId(userInfo["_id"])
 			userId := mongodb.BsonIdToSId(userInfo["_id"])
+			fmt.Println("userId", userId)
 			//如果是补发消息前2000条需要判重
 			//如果是补发消息前2000条需要判重
 			if again && totalCount <= 4000 {
 			if again && totalCount <= 4000 {
 				if totalCount <= 4000 {
 				if totalCount <= 4000 {
@@ -136,7 +143,7 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
 				currentCount = 0
 				currentCount = 0
 
 
 			}
 			}
-			userInfo = make(map[string]interface{})
+			//userInfo = make(map[string]interface{})
 			if totalCount >= 2000 && totalCount%2000 == 0 {
 			if totalCount >= 2000 && totalCount%2000 == 0 {
 				//定时存发送到哪一位用户
 				//定时存发送到哪一位用户
 				redis.Put("qmx_filter", fmt.Sprintf(Redis_lastUserId_key, msgLogId), userId, 604800)
 				redis.Put("qmx_filter", fmt.Sprintf(Redis_lastUserId_key, msgLogId), userId, 604800)

+ 15 - 2
src/util/db.go

@@ -3,6 +3,7 @@ package util
 import (
 import (
 	. "config"
 	. "config"
 	"database/sql"
 	"database/sql"
+	"fmt"
 	"log"
 	"log"
 	"mongodb"
 	"mongodb"
 	"os"
 	"os"
@@ -11,10 +12,10 @@ import (
 	"qfw/util/redis"
 	"qfw/util/redis"
 	"time"
 	"time"
 
 
-	"github.com/coreos/etcd/clientv3"
-
 	"github.com/baiy/Cadmin-server-go/admin"
 	"github.com/baiy/Cadmin-server-go/admin"
+	"github.com/coreos/etcd/clientv3"
 	_ "github.com/go-sql-driver/mysql"
 	_ "github.com/go-sql-driver/mysql"
+	"github.com/mongo_util"
 )
 )
 
 
 var JysqlDB *mysql.Mysql
 var JysqlDB *mysql.Mysql
@@ -32,6 +33,10 @@ var JyqyfwMgo *mongodb.MongodbSim
 var EtcdCli *clientv3.Client
 var EtcdCli *clientv3.Client
 var GlobalCommonDataDB *mysql.Mysql
 var GlobalCommonDataDB *mysql.Mysql
 
 
+var (
+	NewMgo *mongo_util.MongoUtil
+)
+
 const (
 const (
 	TableJyLeadsRecord = "jy_leads_record" // 销售线索上传记录表
 	TableJyLeadsRecord = "jy_leads_record" // 销售线索上传记录表
 	TableJyLeads       = "jy_leads"        // 销售线索表
 	TableJyLeads       = "jy_leads"        // 销售线索表
@@ -94,6 +99,14 @@ func init() {
 	// 设置数据库操作对象
 	// 设置数据库操作对象
 	admin.SetDb(admindb)
 	admin.SetDb(admindb)
 
 
+	//新工具类
+	fmt.Println(SysConfigs.MongoDBServer, uint64(SysConfigs.MongodbPoolSize))
+	err, NewMgo = mongo_util.NewMongoUtil("mongodb://"+SysConfigs.MongoDBServer, 20000, uint64(SysConfigs.MongodbPoolSize), false)
+
+	if err != nil {
+		log.Println(err)
+	}
+
 	//连接MongoDB数据库
 	//连接MongoDB数据库
 	MQFW = &mongodb.MongodbSim{
 	MQFW = &mongodb.MongodbSim{
 		MongodbAddr: SysConfigs.MongoDBServer,
 		MongodbAddr: SysConfigs.MongoDBServer,