|
@@ -6,6 +6,7 @@ import (
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"math/big"
|
|
|
+ "reflect"
|
|
|
"runtime"
|
|
|
"strconv"
|
|
|
"strings"
|
|
@@ -17,23 +18,6 @@ import (
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
)
|
|
|
|
|
|
-type MgoSess struct {
|
|
|
- db string
|
|
|
- coll string
|
|
|
- query interface{}
|
|
|
- sorts []string
|
|
|
- fields interface{}
|
|
|
- limit int64
|
|
|
- skip int64
|
|
|
- pipe []map[string]interface{}
|
|
|
- all interface{}
|
|
|
- M *MongodbSim
|
|
|
-}
|
|
|
-
|
|
|
-type MgoIter struct {
|
|
|
- Cursor *mongo.Cursor
|
|
|
-}
|
|
|
-
|
|
|
func NewMgo(addr, db string, size int) *MongodbSim {
|
|
|
mgo := &MongodbSim{
|
|
|
MongodbAddr: addr,
|
|
@@ -44,18 +28,91 @@ func NewMgo(addr, db string, size int) *MongodbSim {
|
|
|
return mgo
|
|
|
}
|
|
|
|
|
|
+type Bluk struct {
|
|
|
+ ms *MgoSess
|
|
|
+ writes []mongo.WriteModel
|
|
|
+}
|
|
|
+
|
|
|
+func (b *Bluk) Insert(doc interface{}) {
|
|
|
+ write := mongo.NewInsertOneModel()
|
|
|
+ write.SetDocument(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Update(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateOneModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(false)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) UpdateAll(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateManyModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(false)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Upsert(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateOneModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(true)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Remove(doc interface{}) {
|
|
|
+ write := mongo.NewDeleteOneModel()
|
|
|
+ write.SetFilter(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) RemoveAll(doc interface{}) {
|
|
|
+ write := mongo.NewDeleteManyModel()
|
|
|
+ write.SetFilter(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Run() (*mongo.BulkWriteResult, error) {
|
|
|
+ return b.ms.M.C.Database(b.ms.db).Collection(b.ms.coll).BulkWrite(b.ms.M.Ctx, b.writes)
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+type MgoIter struct {
|
|
|
+ Cursor *mongo.Cursor
|
|
|
+ Ctx context.Context
|
|
|
+}
|
|
|
+
|
|
|
func (mt *MgoIter) Next(result interface{}) bool {
|
|
|
if mt.Cursor != nil {
|
|
|
- if mt.Cursor.Next(nil) {
|
|
|
- err := mt.Cursor.Decode(result)
|
|
|
+ if mt.Cursor.Next(mt.Ctx) {
|
|
|
+ rType := reflect.TypeOf(result)
|
|
|
+ rVal := reflect.ValueOf(result)
|
|
|
+ if rType.Kind() == reflect.Ptr {
|
|
|
+ rType = rType.Elem()
|
|
|
+ rVal = rVal.Elem()
|
|
|
+ }
|
|
|
+ var err error
|
|
|
+ if rType.Kind() == reflect.Map {
|
|
|
+ r := make(map[string]interface{})
|
|
|
+ err = mt.Cursor.Decode(&r)
|
|
|
+ if rVal.CanSet() {
|
|
|
+ rVal.Set(reflect.ValueOf(r))
|
|
|
+ } else {
|
|
|
+ for it := rVal.MapRange(); it.Next(); {
|
|
|
+ rVal.SetMapIndex(it.Key(), reflect.Value{})
|
|
|
+ }
|
|
|
+ for it := reflect.ValueOf(r).MapRange(); it.Next(); {
|
|
|
+ rVal.SetMapIndex(it.Key(), it.Value())
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ err = mt.Cursor.Decode(&result)
|
|
|
+ }
|
|
|
if err != nil {
|
|
|
log.Println("mgo cur err", err.Error())
|
|
|
- mt.Cursor.Close(nil)
|
|
|
+ mt.Cursor.Close(mt.Ctx)
|
|
|
return false
|
|
|
}
|
|
|
return true
|
|
|
} else {
|
|
|
- mt.Cursor.Close(nil)
|
|
|
+ mt.Cursor.Close(mt.Ctx)
|
|
|
return false
|
|
|
}
|
|
|
} else {
|
|
@@ -63,16 +120,31 @@ func (mt *MgoIter) Next(result interface{}) bool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//
|
|
|
+type MgoSess struct {
|
|
|
+ db string
|
|
|
+ coll string
|
|
|
+ query interface{}
|
|
|
+ sorts []string
|
|
|
+ fields interface{}
|
|
|
+ limit int64
|
|
|
+ skip int64
|
|
|
+ pipe []map[string]interface{}
|
|
|
+ all interface{}
|
|
|
+ M *MongodbSim
|
|
|
+}
|
|
|
+
|
|
|
func (ms *MgoSess) DB(name string) *MgoSess {
|
|
|
ms.db = name
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) C(name string) *MgoSess {
|
|
|
ms.coll = name
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
+func (ms *MgoSess) Bulk() *Bluk {
|
|
|
+ return &Bluk{ms: ms}
|
|
|
+}
|
|
|
func (ms *MgoSess) Find(q interface{}) *MgoSess {
|
|
|
if q == nil {
|
|
|
q = map[string]interface{}{}
|
|
@@ -80,12 +152,14 @@ func (ms *MgoSess) Find(q interface{}) *MgoSess {
|
|
|
ms.query = q
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
+func (ms *MgoSess) FindId(_id interface{}) *MgoSess {
|
|
|
+ ms.query = map[string]interface{}{"_id": _id}
|
|
|
+ return ms
|
|
|
+}
|
|
|
func (ms *MgoSess) Select(fields interface{}) *MgoSess {
|
|
|
ms.fields = fields
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) Limit(limit int64) *MgoSess {
|
|
|
ms.limit = limit
|
|
|
return ms
|
|
@@ -94,7 +168,6 @@ func (ms *MgoSess) Skip(skip int64) *MgoSess {
|
|
|
ms.skip = skip
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
|
|
|
ms.sorts = sorts
|
|
|
return ms
|
|
@@ -103,6 +176,46 @@ func (ms *MgoSess) Pipe(p []map[string]interface{}) *MgoSess {
|
|
|
ms.pipe = p
|
|
|
return ms
|
|
|
}
|
|
|
+func (ms *MgoSess) Insert(doc interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).InsertOne(ms.M.Ctx, doc)
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) Remove(filter interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, filter)
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) RemoveId(_id interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, map[string]interface{}{"_id": _id})
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) RemoveAll(filter interface{}) (*mongo.DeleteResult, error) {
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).DeleteMany(ms.M.Ctx, filter)
|
|
|
+}
|
|
|
+func (ms *MgoSess) Upsert(filter, update interface{}) (*mongo.UpdateResult, error) {
|
|
|
+ ct := options.Update()
|
|
|
+ ct.SetUpsert(true)
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, filter, update, ct)
|
|
|
+}
|
|
|
+func (ms *MgoSess) UpsertId(filter, update interface{}) (*mongo.UpdateResult, error) {
|
|
|
+ ct := options.Update()
|
|
|
+ ct.SetUpsert(true)
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, update, ct)
|
|
|
+}
|
|
|
+func (ms *MgoSess) UpdateId(filter, update interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, update)
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) Count() (int64, error) {
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).CountDocuments(ms.M.Ctx, ms.query)
|
|
|
+}
|
|
|
+func (ms *MgoSess) One(v *map[string]interface{}) {
|
|
|
+ of := options.FindOne()
|
|
|
+ of.SetProjection(ms.fields)
|
|
|
+ sr := ms.M.C.Database(ms.db).Collection(ms.coll).FindOne(ms.M.Ctx, ms.query, of)
|
|
|
+ if sr.Err() == nil {
|
|
|
+ sr.Decode(&v)
|
|
|
+ }
|
|
|
+}
|
|
|
func (ms *MgoSess) All(v *[]map[string]interface{}) {
|
|
|
cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Aggregate(ms.M.Ctx, ms.pipe)
|
|
|
if err == nil && cur.Err() == nil {
|
|
@@ -141,14 +254,11 @@ func (ms *MgoSess) Iter() *MgoIter {
|
|
|
log.Println("mgo find err", err.Error())
|
|
|
} else {
|
|
|
it.Cursor = cur
|
|
|
+ it.Ctx = ms.M.Ctx
|
|
|
}
|
|
|
return it
|
|
|
}
|
|
|
|
|
|
-func (ms *MgoSess) Count() (int64, error) {
|
|
|
- return ms.M.C.Database(ms.db).Collection(ms.coll).CountDocuments(ms.M.Ctx, ms.query)
|
|
|
-}
|
|
|
-
|
|
|
type MongodbSim struct {
|
|
|
MongodbAddr string
|
|
|
Size int
|
|
@@ -184,6 +294,8 @@ func (m *MongodbSim) Destroy() {
|
|
|
|
|
|
func (m *MongodbSim) InitPool() {
|
|
|
opts := options.Client()
|
|
|
+ registry := bson.NewRegistryBuilder().RegisterTypeMapEntry(bson.TypeArray, reflect.TypeOf([]interface{}{})).Build()
|
|
|
+ opts.SetRegistry(registry)
|
|
|
opts.SetConnectTimeout(3 * time.Second)
|
|
|
opts.SetHosts(strings.Split(m.MongodbAddr, ","))
|
|
|
//opts.ApplyURI("mongodb://" + m.MongodbAddr)
|
|
@@ -195,10 +307,10 @@ func (m *MongodbSim) InitPool() {
|
|
|
}
|
|
|
opts.SetAuth(cre)
|
|
|
}
|
|
|
- ms := strings.Split(m.MongodbAddr, ",")
|
|
|
+ /*ms := strings.Split(m.MongodbAddr, ",")
|
|
|
if m.ReplSet == "" && len(ms) > 1 {
|
|
|
m.ReplSet = "qfws"
|
|
|
- }
|
|
|
+ }*/
|
|
|
if m.ReplSet != "" {
|
|
|
opts.SetReplicaSet(m.ReplSet)
|
|
|
opts.SetDirect(false)
|
|
@@ -345,19 +457,6 @@ func (m *MongodbSim) Del(c string, q interface{}) bool {
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
-//删除表
|
|
|
-func (m *MongodbSim) DelColl(c string) bool {
|
|
|
- defer catch()
|
|
|
- m.Open()
|
|
|
- defer m.Close()
|
|
|
- err := m.C.Database(m.DbName).Collection(c).Drop(m.Ctx)
|
|
|
- if err != nil {
|
|
|
- log.Println("删除错误", err.Error())
|
|
|
- return false
|
|
|
- }
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
//按条件更新
|
|
|
func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool) bool {
|
|
|
defer catch()
|
|
@@ -400,7 +499,7 @@ func (m *MongodbSim) UpdateById(c string, id interface{}, set interface{}) bool
|
|
|
|
|
|
//批量更新
|
|
|
func (m *MongodbSim) UpdateBulkAll(db, c string, doc ...[]map[string]interface{}) bool {
|
|
|
- return m.upSertBulk(db, c, false, doc...)
|
|
|
+ return m.NewUpdateBulk(db, c, false, false, doc...)
|
|
|
}
|
|
|
|
|
|
func (m *MongodbSim) UpdateBulk(c string, doc ...[]map[string]interface{}) bool {
|
|
@@ -409,22 +508,35 @@ func (m *MongodbSim) UpdateBulk(c string, doc ...[]map[string]interface{}) bool
|
|
|
|
|
|
//批量插入
|
|
|
func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool {
|
|
|
- return m.upSertBulk(m.DbName, c, true, doc...)
|
|
|
+ return m.NewUpdateBulk(m.DbName, c, true, false, doc...)
|
|
|
+}
|
|
|
+
|
|
|
+//批量插入
|
|
|
+func (m *MongodbSim) UpSertMultiBulk(c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
|
|
|
+ return m.NewUpdateBulk(m.DbName, c, upsert, multi, doc...)
|
|
|
}
|
|
|
|
|
|
//批量插入
|
|
|
-func (m *MongodbSim) upSertBulk(db, c string, upsert bool, doc ...[]map[string]interface{}) bool {
|
|
|
+func (m *MongodbSim) NewUpdateBulk(db, c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
|
|
|
defer catch()
|
|
|
m.Open()
|
|
|
defer m.Close()
|
|
|
coll := m.C.Database(db).Collection(c)
|
|
|
var writes []mongo.WriteModel
|
|
|
for _, d := range doc {
|
|
|
- write := mongo.NewUpdateOneModel()
|
|
|
- write.SetFilter(d[0])
|
|
|
- write.SetUpdate(d[1])
|
|
|
- write.SetUpsert(upsert)
|
|
|
- writes = append(writes, write)
|
|
|
+ if multi {
|
|
|
+ write := mongo.NewUpdateManyModel()
|
|
|
+ write.SetFilter(d[0])
|
|
|
+ write.SetUpdate(d[1])
|
|
|
+ write.SetUpsert(upsert)
|
|
|
+ writes = append(writes, write)
|
|
|
+ } else {
|
|
|
+ write := mongo.NewUpdateOneModel()
|
|
|
+ write.SetFilter(d[0])
|
|
|
+ write.SetUpdate(d[1])
|
|
|
+ write.SetUpsert(upsert)
|
|
|
+ writes = append(writes, write)
|
|
|
+ }
|
|
|
}
|
|
|
br, e := coll.BulkWrite(m.Ctx, writes)
|
|
|
if e != nil {
|
|
@@ -480,9 +592,10 @@ func (m *MongodbSim) Find(c string, query interface{}, order interface{}, fields
|
|
|
defer catch()
|
|
|
m.Open()
|
|
|
defer m.Close()
|
|
|
- res := make([]map[string]interface{}, 1)
|
|
|
+ var res []map[string]interface{}
|
|
|
coll := m.C.Database(m.DbName).Collection(c)
|
|
|
if single {
|
|
|
+ res = make([]map[string]interface{}, 1)
|
|
|
of := options.FindOne()
|
|
|
of.SetProjection(ObjToOth(fields))
|
|
|
of.SetSort(ObjToM(order))
|
|
@@ -490,6 +603,7 @@ func (m *MongodbSim) Find(c string, query interface{}, order interface{}, fields
|
|
|
sr.Decode(&res[0])
|
|
|
}
|
|
|
} else {
|
|
|
+ res = []map[string]interface{}{}
|
|
|
of := options.Find()
|
|
|
of.SetProjection(ObjToOth(fields))
|
|
|
of.SetSort(ObjToM(order))
|
|
@@ -611,3 +725,12 @@ func StringTOBsonId(id string) (bid primitive.ObjectID) {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
+func ToObjectIds(ids []string) []primitive.ObjectID {
|
|
|
+ _ids := []primitive.ObjectID{}
|
|
|
+ for _, v := range ids {
|
|
|
+ _id, _ := primitive.ObjectIDFromHex(v)
|
|
|
+ _ids = append(_ids, _id)
|
|
|
+ }
|
|
|
+ return _ids
|
|
|
+}
|