浏览代码

Merge branch 'master' of ssh://192.168.3.207:10022/BP/mongodb-util

wangchuanjin 4 年之前
父节点
当前提交
be8722f622
共有 1 个文件被更改,包括 458 次插入0 次删除
  1. 458 0
      src/qfw/mongodbutil/mongodbutil.go

+ 458 - 0
src/qfw/mongodbutil/mongodbutil.go

@@ -0,0 +1,458 @@
+package mongodbutil
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	. "qfw/util"
+	"runtime"
+	"strings"
+	"time"
+
+	mgo "gopkg.in/mgo.v2"
+	. "gopkg.in/mgo.v2/bson"
+)
+
+type PoolConfig struct {
+	Alias string
+	Addr  string
+	DB    string
+	Size  int
+	Repl  string
+}
+
+const (
+	TIMEOUT = 300 * time.Second
+)
+
+//连接池
+var pool map[string]chan *mgo.Session
+var Config []PoolConfig
+
+//[mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
+//初始化mongodb连接池大小
+func InitMongodbPool() {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	pool = make(map[string]chan *mgo.Session)
+	for _, v := range Config {
+		pool[v.Alias] = make(chan *mgo.Session, v.Size)
+		ms := strings.Split(v.Addr, ",")
+		if v.Repl == "" && len(ms) > 1 {
+			v.Repl = "qfws"
+		}
+		for i := 0; i < v.Size; i++ {
+			sess, err := createConn(v.Alias)
+			if err == nil && sess.Ping() == nil {
+				pool[v.Alias] <- sess
+			}
+		}
+	}
+}
+
+//从连接池中取得mongodb连接,懒加载模式
+func GetMgoConn(alias string, sec ...int) (session *mgo.Session) {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	timeout := TIMEOUT
+	if len(sec) == 1 && sec[0] > 4 {
+		timeout = time.Duration(sec[0]) * time.Second
+	}
+	if v, ok := pool[alias]; ok {
+		select {
+		case session = <-v:
+			if session != nil {
+				session.SetSocketTimeout(timeout)
+				session.SetSyncTimeout(timeout)
+			}
+			err := session.Ping()
+			if err == nil {
+				return session
+			} else {
+				session.Close()
+				session, err = createConn(alias)
+				if err == nil && session.Ping() == nil {
+					session.SetSocketTimeout(timeout)
+					session.SetSyncTimeout(timeout)
+					return session
+				}
+				return nil
+			}
+		case <-time.After(time.Second * 2):
+			//超时
+			return nil
+		}
+	} else {
+		return nil
+	}
+
+}
+
+//取session链接
+func createConn(alias string) (sess *mgo.Session, err error) {
+	for _, v := range Config {
+		if v.Alias == alias {
+			if v.Repl != "" {
+				info := mgo.DialInfo{
+					Addrs:          strings.Split(v.Addr, ","),
+					Timeout:        TIMEOUT,
+					ReplicaSetName: v.Repl,
+					Direct:         false,
+				}
+				return mgo.DialWithInfo(&info)
+			}
+			return mgo.Dial(v.Addr)
+		}
+	}
+	return
+}
+
+//释放连接,在取得连接后要使用defer DestoryMongoConn()进行释放
+func DestoryMongoConn(alias string, session *mgo.Session) {
+	if v, ok := pool[alias]; ok {
+		if session != nil {
+			session.SetSocketTimeout(TIMEOUT)
+			session.SetSyncTimeout(TIMEOUT)
+		}
+		v <- session
+	}
+}
+
+//查询单条对象
+func FindOne(c, alias, db string, query interface{}) *map[string]interface{} {
+	return FindOneByField(c, alias, db, query, nil)
+}
+
+//查询单条对象
+func FindOneByField(c, alias, db string, query interface{}, fields interface{}) *map[string]interface{} {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	res := Find(c, alias, db, query, nil, fields, true, -1, -1)
+	if nil != res && len(*res) > 0 {
+		return &((*res)[0])
+	}
+	return nil
+}
+
+//查询单条对象
+func FindById(c, alias, db, query string, fields interface{}) *map[string]interface{} {
+	sess := GetMgoConn(alias)
+	var res map[string]interface{}
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		res = make(map[string]interface{})
+		coll := sess.DB(db).C(c)
+		var err error
+		err = coll.FindId(ObjectIdHex(query)).Select(ObjToOth(fields)).One(&res)
+		if nil != err {
+			log.Println("FindByIdError", err)
+		}
+	}
+	return &res
+}
+
+//底层查询方法
+func Find(c, alias, db string, query interface{}, order interface{}, fields interface{}, single bool, start int, limit int) *[]map[string]interface{} {
+	sess := GetMgoConn(alias)
+	var res []map[string]interface{}
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		res = make([]map[string]interface{}, 1)
+		coll := sess.DB(db).C(c)
+		var err error
+		if single {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).One(&res[0])
+		} else if start > -1 {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).Sort(ObjToArr(order)...).Skip(start).Limit(limit).All(&res)
+		} else {
+			err = coll.Find(ObjToM(query)).Select(ObjToOth(fields)).Sort(ObjToArr(order)...).All(&res)
+		}
+		if nil != err {
+			log.Println("FindError", err)
+		}
+	}
+	return &res
+}
+
+//修改对象
+func Update(c, alias, db string, query interface{}, set interface{}, upsert bool, multi bool) bool {
+	sess := GetMgoConn(alias)
+	b := true
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				b = false
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		coll := sess.DB(db).C(c)
+		var err error
+		if upsert {
+			_, err = coll.Upsert(ObjToM(query), ObjToM(set))
+		} else {
+			if multi {
+				_, err = coll.UpdateAll(ObjToM(query), ObjToM(set))
+			} else {
+				err = coll.Update(ObjToM(query), ObjToM(set))
+			}
+		}
+		if nil != err {
+			log.Println("updateError", err)
+			b = false
+		}
+	}
+	return b
+}
+
+//删除对象
+func Del(c, alias, db string, query interface{}) bool {
+	sess := GetMgoConn(alias)
+	b := true
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		coll := sess.DB(db).C(c)
+		_, err := coll.RemoveAll(ObjToM(query))
+		if nil != err {
+			log.Println("CountError", err)
+			b = false
+		}
+	}
+	return b
+}
+
+//保存对象
+func Save(c, alias, db string, doc interface{}) string {
+	sess := GetMgoConn(alias)
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		coll := sess.DB(db).C(c)
+		obj := ObjToM(doc)
+		id := NewObjectId()
+		(*obj)["_id"] = id
+		err := coll.Insert(obj)
+		//log.Println(obj)
+		if nil != err {
+			log.Println("SaveError", err)
+			return ""
+		}
+		return (strings.Split(fmt.Sprintf("%s", id), `"`)[1])
+	}
+	return ""
+}
+
+//批量插入
+func SaveBulk(c, alias, db string, doc ...map[string]interface{}) bool {
+	sess := GetMgoConn(alias)
+	b := true
+	if sess != nil {
+		defer func() {
+			b = false
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		coll := sess.DB(db).C(c)
+		bulk := coll.Bulk()
+		for _, v := range doc {
+			bulk.Insert(v)
+		}
+		_, err := bulk.Run()
+		if nil != err {
+			log.Println("BulkError", err)
+			b = false
+		}
+	} else {
+		b = false
+	}
+	return b
+}
+
+//统计
+func Count(c, alias, db string, query interface{}) int {
+	sess := GetMgoConn(alias)
+	var n int = 0
+	if sess != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		defer DestoryMongoConn(alias, sess)
+		coll := sess.DB(db).C(c)
+		var err error
+		n, err = coll.Find(ObjToM(query)).Count()
+		if nil != err {
+			log.Println("CountError", err)
+		}
+	}
+	return n
+}
+
+//对象转数组
+func ObjToArr(obj interface{}) []string {
+	if s, ok := obj.(string); ok {
+		if strings.ContainsAny(s, "{") {
+			//暂时简单支持此种写法
+			var temp = make(M)
+			var str = []string{}
+			json.Unmarshal([]byte(s), &temp)
+			for k, v := range temp {
+				m := IntAll(v)
+				if m > 0 {
+					str = append(str, k)
+				} else {
+					str = append(str, "-"+k)
+				}
+			}
+			return str
+		} else {
+			return strings.Split(s, ",")
+		}
+	} else if s1, ok1 := obj.([]string); ok1 {
+		return s1
+	} else {
+		return []string{}
+	}
+}
+
+func ObjToOth(query interface{}) *M {
+	return ObjToMQ(query, false)
+}
+func ObjToM(query interface{}) *M {
+	return ObjToMQ(query, true)
+}
+
+//obj(string,M)转M,查询用到
+func ObjToMQ(query interface{}, isQuery bool) *M {
+	data := make(M)
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	if s2, ok2 := query.(*map[string]interface{}); ok2 {
+		data = M(*s2)
+	} else if s3, ok3 := query.(*M); ok3 {
+		return s3
+	} else if s, ok := query.(string); ok {
+		json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
+		if ss, oks := data["_id"]; oks && isQuery {
+			data["_id"] = ObjectIdHex(ss.(string))
+		}
+	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
+		data = s1
+	} else if s4, ok4 := query.(M); ok4 {
+		data = s4
+	} else {
+		data = nil
+	}
+	return &data
+}