wangchuanjin 2 ani în urmă
părinte
comite
83542ccf86
9 a modificat fișierele cu 286 adăugiri și 60 ștergeri
  1. 18 0
      fsnotify/README.md
  2. 113 0
      fsnotify/fsnotify.go
  3. 1 1
      fsw/fsw_test.go
  4. 1 0
      go.mod
  5. 1 0
      go.sum
  6. 114 45
      mongodb/mongodb.go
  7. 6 4
      mysql/mysql.go
  8. 31 8
      redis/redis.go
  9. 1 2
      sms/sms_test.go

+ 18 - 0
fsnotify/README.md

@@ -0,0 +1,18 @@
+#fsnotify监控文件
+
+## 第一版-wangshan
+#“sword->%s”:应用模块名称;“./test.json”:监控文件位置和名称;“true”:是否初始化;“func()”:逻辑处理;
+
+#fs.FSNotifyFUNC("sword->%s", "./test.json", true, func() {
+#	log.Println("------------")
+#	util.ReadConfig("./test.json", &TC)
+#})
+***
+
+## 20220325更新-wangshan
+#fs.GetNewWatch().WatchDir("./tmp", true, func() {
+#	util.ReadConfig("./tmp/ws.json", &WSConf)
+#	log.Println("name:", WSConf.Name)
+#})
+
+#默认5秒内 重复操作无效!!!

+ 113 - 0
fsnotify/fsnotify.go

@@ -0,0 +1,113 @@
+package fsnotify
+
+import (
+	"fmt"
+	"log"
+	"os"
+	"path/filepath"
+	"strings"
+	"time"
+
+	"github.com/fsnotify/fsnotify"
+)
+
+var applyAction = map[string]int64{}
+var applyTime int64 = 5
+
+type Watch struct {
+	watch *fsnotify.Watcher
+}
+
+//
+func GetNewWatch() *Watch {
+	watch, _ := fsnotify.NewWatcher()
+	return &Watch{
+		watch: watch,
+	}
+}
+
+//
+func (w *Watch) WatchDir(dir string, b bool, f func()) {
+	go func(b bool) {
+		if b {
+			f()
+		}
+	}(b)
+	filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
+		//判断是否为目录,只监控目录
+		if info.IsDir() {
+			path, err := filepath.Abs(path)
+			if err != nil {
+				return err
+			}
+			err = w.watch.Add(path)
+			if err != nil {
+				return err
+			}
+			fmt.Println("监控:", path)
+		}
+		return nil
+	})
+	go func() {
+		for {
+			select {
+			case ev := <-w.watch.Events:
+				{
+					if strings.HasSuffix(ev.Name, ".json") && ev.Op&fsnotify.Write == fsnotify.Write {
+						go f()
+					}
+				}
+			case err := <-w.watch.Errors:
+				{
+					fmt.Println("error:", err)
+					return
+				}
+			}
+		}
+	}()
+}
+
+//
+func FSNotifyFUNC(name, dir string, flag bool, f func()) {
+	if flag {
+		f()
+	}
+	watch, err := fsnotify.NewWatcher()
+	if err != nil {
+		log.Println("watch new err", err)
+		return
+	}
+	defer watch.Close()
+	log.Println("dir:", dir)
+	err = watch.Add("../tmp")
+	if err != nil {
+		log.Println("watch add err", err)
+		return
+	}
+	go func() {
+		for {
+			select {
+			case ev := <-watch.Events:
+				{
+					if ev.Op&fsnotify.Write == fsnotify.Write {
+						now := time.Now().Unix()
+						//5秒之内 同一个文件不作操作
+						if applyAction[fmt.Sprintf(name, ev.Name)] != 0 && now-applyAction[fmt.Sprintf(name, ev.Name)] < applyTime {
+							continue
+						}
+						log.Println("修改文件 : ", fmt.Sprintf(name, ev.Name))
+						applyAction[fmt.Sprintf(name, ev.Name)] = now
+						time.Sleep(1 * time.Second)
+						//更新应用版本号
+						f()
+					}
+				}
+			case err := <-watch.Errors:
+				{
+					log.Println("watch error : ", err)
+					return
+				}
+			}
+		}
+	}()
+}

+ 1 - 1
fsw/fsw_test.go

@@ -9,7 +9,7 @@ func TestMatch(t *testing.T) {
 	ReadFswDict("./baidu_fsw.dict")
 	ret := Match("这是什么啊,怎么会有胡锦涛的温总理名字")
 	log.Println(ret)
-	ret2 := Repl("这是什么啊,怎么会有胡锦涛的名字,还有江泽民,我去,什么玩意")
+	ret2 := Repl("这是什么啊,怎么台独会有胡锦涛的名字,还有江泽民,我去,什么玩意")
 	log.Println(ret2)
 }
 

+ 1 - 0
go.mod

@@ -7,6 +7,7 @@ require (
 	github.com/coscms/tagfast v0.0.0-20150925144250-2b69b2496250
 	github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f
 	github.com/donnie4w/go-logger v0.0.0-20170827050443-4740c51383f4
+	github.com/fsnotify/fsnotify v1.4.9
 	github.com/garyburd/redigo v1.6.2
 	github.com/go-sql-driver/mysql v1.6.0
 	github.com/golang-jwt/jwt/v4 v4.4.2

+ 1 - 0
go.sum

@@ -122,6 +122,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
 github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
 github.com/garyburd/redigo v1.6.2 h1:yE/pwKCrbLpLpQICzYTeZ7JsTA/C53wFTJHaEtRqniM=
 github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=

+ 114 - 45
mongodb/mongodb.go

@@ -28,6 +28,18 @@ func NewMgo(addr, db string, size int) *MongodbSim {
 	return mgo
 }
 
+func NewMgoWithUser(addr, db, uname, upwd string, size int) *MongodbSim {
+	mgo := &MongodbSim{
+		MongodbAddr: addr,
+		Size:        size,
+		DbName:      db,
+		UserName:    uname,
+		Password:    upwd,
+	}
+	mgo.InitPool()
+	return mgo
+}
+
 type Bluk struct {
 	ms     *MgoSess
 	writes []mongo.WriteModel
@@ -135,7 +147,6 @@ type MgoSess struct {
 	fields interface{}
 	limit  int64
 	skip   int64
-	pipe   []map[string]interface{}
 	all    interface{}
 	M      *MongodbSim
 }
@@ -178,9 +189,12 @@ func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
 	ms.sorts = sorts
 	return ms
 }
-func (ms *MgoSess) Pipe(p []map[string]interface{}) *MgoSess {
-	ms.pipe = p
-	return ms
+func (ms *MgoSess) Pipe(p []map[string]interface{}) *pipe {
+	pe := &pipe{
+		ms:       ms,
+		pipeline: p,
+	}
+	return pe
 }
 func (ms *MgoSess) Insert(doc interface{}) error {
 	_, err := ms.M.C.Database(ms.db).Collection(ms.coll).InsertOne(ms.M.Ctx, doc)
@@ -235,53 +249,58 @@ func (ms *MgoSess) One(v *map[string]interface{}) {
 	}
 }
 func (ms *MgoSess) All(v *[]map[string]interface{}) {
-	cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Aggregate(ms.M.Ctx, ms.pipe)
+	of := options.Find()
+	if ms.fields != nil {
+		of.SetProjection(ms.fields)
+	}
+	if len(ms.sorts) > 0 {
+		of.SetSort(ms.toSort())
+	}
+	if ms.skip > 0 {
+		of.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		of.SetLimit(ms.limit)
+	}
+	cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Find(ms.M.Ctx, ms.query, of)
 	if err == nil && cur.Err() == nil {
 		cur.All(ms.M.Ctx, v)
 	}
 }
+func (ms *MgoSess) toSort() bson.D {
+	sort := bson.D{}
+	for _, k := range ms.sorts {
+		switch k[:1] {
+		case "-":
+			sort = append(sort, bson.E{k[1:], -1})
+		case "+":
+			sort = append(sort, bson.E{k[1:], 1})
+		default:
+			sort = append(sort, bson.E{k, 1})
+		}
+	}
+	return sort
+}
 func (ms *MgoSess) Iter() *MgoIter {
 	it := &MgoIter{}
 	coll := ms.M.C.Database(ms.db).Collection(ms.coll)
-	var cur *mongo.Cursor
-	var err error
-	if ms.query != nil {
-		find := options.Find()
-		if ms.skip > 0 {
-			find.SetSkip(ms.skip)
-		}
-		if ms.limit > 0 {
-			find.SetLimit(ms.limit)
-		}
-		find.SetBatchSize(100)
-		if len(ms.sorts) > 0 {
-			sort := bson.D{}
-			for _, k := range ms.sorts {
-				switch k[:1] {
-				case "-":
-					sort = append(sort, bson.E{k[1:], -1})
-				case "+":
-					sort = append(sort, bson.E{k[1:], 1})
-				default:
-					sort = append(sort, bson.E{k, 1})
-				}
-			}
-			find.SetSort(sort)
-		}
-		if ms.fields != nil {
-			find.SetProjection(ms.fields)
-		}
-		cur, err = coll.Find(ms.M.Ctx, ms.query, find)
-		if err != nil {
-			log.Println("mgo find err", err.Error())
-		}
-	} else if ms.pipe != nil {
-		aggregate := options.Aggregate()
-		aggregate.SetBatchSize(100)
-		cur, err = coll.Aggregate(ms.M.Ctx, ms.pipe, aggregate)
-		if err != nil {
-			log.Println("mgo aggregate err", err.Error())
-		}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.sorts) > 0 {
+		find.SetSort(ms.toSort())
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := coll.Find(ms.M.Ctx, ms.query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
 	}
 	if err == nil {
 		it.Cursor = cur
@@ -290,6 +309,33 @@ func (ms *MgoSess) Iter() *MgoIter {
 	return it
 }
 
+type pipe struct {
+	ms       *MgoSess
+	pipeline []map[string]interface{}
+}
+
+func (p *pipe) All(v *[]map[string]interface{}) {
+	cur, err := p.ms.M.C.Database(p.ms.db).Collection(p.ms.coll).Aggregate(p.ms.M.Ctx, p.pipeline)
+	if err == nil && cur.Err() == nil {
+		cur.All(p.ms.M.Ctx, v)
+	}
+}
+func (p *pipe) Iter() *MgoIter {
+	it := &MgoIter{}
+	coll := p.ms.M.C.Database(p.ms.db).Collection(p.ms.coll)
+	aggregate := options.Aggregate()
+	aggregate.SetBatchSize(100)
+	cur, err := coll.Aggregate(p.ms.M.Ctx, p.pipeline, aggregate)
+	if err != nil {
+		log.Println("mgo aggregate err", err.Error())
+	}
+	if err == nil {
+		it.Cursor = cur
+		it.Ctx = p.ms.M.Ctx
+	}
+	return it
+}
+
 type MongodbSim struct {
 	MongodbAddr string
 	Size        int
@@ -365,6 +411,21 @@ func (m *MongodbSim) Close() {
 	<-m.pool
 }
 
+//新建表并生成索引
+func (m *MongodbSim) CreateIndex(c string, models []mongo.IndexModel) bool {
+	defer catch()
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	names, err := coll.Indexes().CreateMany(m.Ctx, models)
+	if err == nil && len(names) > 0 {
+		return true
+	} else {
+		log.Println("CreateIndex Error:", err)
+		return false
+	}
+}
+
 func (m *MongodbSim) Save(c string, doc interface{}) string {
 	defer catch()
 	m.Open()
@@ -781,7 +842,7 @@ func ToObjectIds(ids []string) []primitive.ObjectID {
 
 //自动添加更新时间
 func autoUpdateTime(db, coll string, ue *bson.M) {
-	if db == "qfw" && coll == "user" {
+	if coll == "user" {
 		set := ObjToM((*ue)["$set"])
 		if *set == nil {
 			set = &bson.M{}
@@ -790,3 +851,11 @@ func autoUpdateTime(db, coll string, ue *bson.M) {
 		(*ue)["$set"] = set
 	}
 }
+
+func IsObjectIdHex(hex string) bool {
+	_, err := primitive.ObjectIDFromHex(hex)
+	if err != nil {
+		return false
+	}
+	return true
+}

+ 6 - 4
mysql/mysql.go

@@ -258,7 +258,7 @@ func (m *Mysql) SelectBySql(q string, args ...interface{}) *[]map[string]interfa
 func (m *Mysql) SelectBySqlByTx(tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
 	return m.Select(0, nil, tx, q, args...)
 }
-func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
+func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}) bool, tx *sql.Tx, q string, args ...interface{}) *[]map[string]interface{} {
 	var stmtOut *sql.Stmt
 	var err error
 	if tx == nil {
@@ -306,7 +306,9 @@ func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx
 		}
 		list = append(list, ret)
 		if bath > 0 && len(list) == bath {
-			f(&list)
+			if !f(&list) {
+				break
+			}
 			list = []map[string]interface{}{}
 		}
 	}
@@ -316,10 +318,10 @@ func (m *Mysql) Select(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx
 	}
 	return &list
 }
-func (m *Mysql) SelectByBath(bath int, f func(l *[]map[string]interface{}), q string, args ...interface{}) {
+func (m *Mysql) SelectByBath(bath int, f func(l *[]map[string]interface{}) bool, q string, args ...interface{}) {
 	m.SelectByBathByTx(bath, f, nil, q, args...)
 }
-func (m *Mysql) SelectByBathByTx(bath int, f func(l *[]map[string]interface{}), tx *sql.Tx, q string, args ...interface{}) {
+func (m *Mysql) SelectByBathByTx(bath int, f func(l *[]map[string]interface{}) bool, tx *sql.Tx, q string, args ...interface{}) {
 	m.Select(bath, f, tx, q, args...)
 }
 func (m *Mysql) FindOne(tableName string, query map[string]interface{}, fields, order string) *map[string]interface{} {

+ 31 - 8
redis/redis.go

@@ -15,7 +15,7 @@ var RedisPool map[string]*redigo.Pool
 
 //初始化redis 1为多个连接池,2为共用一个连接池
 func InitRedis(addrs string) {
-	InitRedisBySize(addrs, 300, 30, 240)
+	InitRedisBySize(addrs, 80, 10, 240)
 }
 
 func InitRedisBySize(addrs string, maxSize, maxIdle, timeout int) {
@@ -369,20 +369,27 @@ func DelByPattern(key string) {
 **/
 //自增计数器
 func Incr(code, key string) int64 {
+	ret, err := IncrByErr(code, key)
+	if nil != err {
+		log.Println("redisutil-INCR-Error", err)
+	}
+	return ret
+}
+
+//自增计数器
+func IncrByErr(code, key string) (int64, error) {
 	defer catch()
 	conn := RedisPool[code].Get()
 	defer conn.Close()
 	ret, err := conn.Do("INCR", key)
 	if nil != err {
-		log.Println("redisutil-INCR-Error", err)
+		return 0, err
+	}
+	if res, ok := ret.(int64); ok {
+		return res, nil
 	} else {
-		if res, ok := ret.(int64); ok {
-			return res
-		} else {
-			return 0
-		}
+		return 0, nil
 	}
-	return 0
 }
 
 //自减
@@ -656,3 +663,19 @@ func catch() {
 		}
 	}
 }
+
+//获取到期时间 -1未设置时间永久 -2到期
+func GetTTL(code, key string) int64 {
+	defer catch()
+	conn := RedisPool[code].Get()
+	defer conn.Close()
+	ret, err := conn.Do("TTL", key)
+	if nil != err {
+		log.Println("redisutil-GetError", err)
+		return 0
+	}
+	if res, ok := ret.(int64); ok {
+		return res
+	}
+	return 0
+}

+ 1 - 2
sms/sms_test.go

@@ -1,12 +1,11 @@
 package sms
 
 import (
-	"qfw/util/sms"
 	"testing"
 	"time"
 )
 
 func TestSendSms(t *testing.T) {
-	sms.SendSms("15639297172", "1", "#company#=企业服务网&#code#=D323")
+	SendSms("192.168.3.11:932", "02", "15037870765", "用户")
 	time.Sleep(2 * time.Minute)
 }