|
@@ -17,23 +17,6 @@ import (
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
)
|
|
|
|
|
|
-type MgoSess struct {
|
|
|
- db string
|
|
|
- coll string
|
|
|
- query interface{}
|
|
|
- sorts []string
|
|
|
- fields interface{}
|
|
|
- limit int64
|
|
|
- skip int64
|
|
|
- pipe []map[string]interface{}
|
|
|
- all interface{}
|
|
|
- M *MongodbSim
|
|
|
-}
|
|
|
-
|
|
|
-type MgoIter struct {
|
|
|
- Cursor *mongo.Cursor
|
|
|
-}
|
|
|
-
|
|
|
func NewMgo(addr, db string, size int) *MongodbSim {
|
|
|
mgo := &MongodbSim{
|
|
|
MongodbAddr: addr,
|
|
@@ -44,6 +27,56 @@ func NewMgo(addr, db string, size int) *MongodbSim {
|
|
|
return mgo
|
|
|
}
|
|
|
|
|
|
+type Bluk struct {
|
|
|
+ ms *MgoSess
|
|
|
+ writes []mongo.WriteModel
|
|
|
+}
|
|
|
+
|
|
|
+func (b *Bluk) Insert(doc interface{}) {
|
|
|
+ write := mongo.NewInsertOneModel()
|
|
|
+ write.SetDocument(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Update(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateOneModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(false)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) UpdateAll(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateManyModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(false)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Upsert(doc ...interface{}) {
|
|
|
+ write := mongo.NewUpdateOneModel()
|
|
|
+ write.SetFilter(doc[0])
|
|
|
+ write.SetUpdate(doc[1])
|
|
|
+ write.SetUpsert(true)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Remove(doc interface{}) {
|
|
|
+ write := mongo.NewDeleteOneModel()
|
|
|
+ write.SetFilter(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) RemoveAll(doc interface{}) {
|
|
|
+ write := mongo.NewDeleteManyModel()
|
|
|
+ write.SetFilter(doc)
|
|
|
+ b.writes = append(b.writes, write)
|
|
|
+}
|
|
|
+func (b *Bluk) Run() (*mongo.BulkWriteResult, error) {
|
|
|
+ return b.ms.M.C.Database(b.ms.db).Collection(b.ms.coll).BulkWrite(b.ms.M.Ctx, b.writes)
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+type MgoIter struct {
|
|
|
+ Cursor *mongo.Cursor
|
|
|
+}
|
|
|
+
|
|
|
func (mt *MgoIter) Next(result interface{}) bool {
|
|
|
if mt.Cursor != nil {
|
|
|
if mt.Cursor.Next(nil) {
|
|
@@ -63,16 +96,31 @@ func (mt *MgoIter) Next(result interface{}) bool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//
|
|
|
+type MgoSess struct {
|
|
|
+ db string
|
|
|
+ coll string
|
|
|
+ query interface{}
|
|
|
+ sorts []string
|
|
|
+ fields interface{}
|
|
|
+ limit int64
|
|
|
+ skip int64
|
|
|
+ pipe []map[string]interface{}
|
|
|
+ all interface{}
|
|
|
+ M *MongodbSim
|
|
|
+}
|
|
|
+
|
|
|
func (ms *MgoSess) DB(name string) *MgoSess {
|
|
|
ms.db = name
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) C(name string) *MgoSess {
|
|
|
ms.coll = name
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
+func (ms *MgoSess) Bulk() *Bluk {
|
|
|
+ return &Bluk{ms: ms}
|
|
|
+}
|
|
|
func (ms *MgoSess) Find(q interface{}) *MgoSess {
|
|
|
if q == nil {
|
|
|
q = map[string]interface{}{}
|
|
@@ -80,12 +128,14 @@ func (ms *MgoSess) Find(q interface{}) *MgoSess {
|
|
|
ms.query = q
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
+func (ms *MgoSess) FindId(_id interface{}) *MgoSess {
|
|
|
+ ms.query = map[string]interface{}{"_id": _id}
|
|
|
+ return ms
|
|
|
+}
|
|
|
func (ms *MgoSess) Select(fields interface{}) *MgoSess {
|
|
|
ms.fields = fields
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) Limit(limit int64) *MgoSess {
|
|
|
ms.limit = limit
|
|
|
return ms
|
|
@@ -94,7 +144,6 @@ func (ms *MgoSess) Skip(skip int64) *MgoSess {
|
|
|
ms.skip = skip
|
|
|
return ms
|
|
|
}
|
|
|
-
|
|
|
func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
|
|
|
ms.sorts = sorts
|
|
|
return ms
|
|
@@ -103,9 +152,41 @@ func (ms *MgoSess) Pipe(p []map[string]interface{}) *MgoSess {
|
|
|
ms.pipe = p
|
|
|
return ms
|
|
|
}
|
|
|
+func (ms *MgoSess) Insert(doc interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).InsertOne(ms.M.Ctx, doc)
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) Remove(filter interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, filter)
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) RemoveId(_id interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, map[string]interface{}{"_id": _id})
|
|
|
+ return err
|
|
|
+}
|
|
|
+func (ms *MgoSess) RemoveAll(filter interface{}) (*mongo.DeleteResult, error) {
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).DeleteMany(ms.M.Ctx, filter)
|
|
|
+}
|
|
|
+func (ms *MgoSess) UpsertId(filter, update interface{}) (*mongo.UpdateResult, error) {
|
|
|
+ ct := options.Update()
|
|
|
+ ct.SetUpsert(true)
|
|
|
+ return ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, update, ct)
|
|
|
+}
|
|
|
+func (ms *MgoSess) UpdateId(filter, update interface{}) error {
|
|
|
+ _, err := ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, update)
|
|
|
+ return err
|
|
|
+}
|
|
|
func (ms *MgoSess) Count() (int64, error) {
|
|
|
return ms.M.C.Database(ms.db).Collection(ms.coll).CountDocuments(ms.M.Ctx, ms.query)
|
|
|
}
|
|
|
+func (ms *MgoSess) One(v *map[string]interface{}) {
|
|
|
+ of := options.FindOne()
|
|
|
+ of.SetProjection(ms.fields)
|
|
|
+ sr := ms.M.C.Database(ms.db).Collection(ms.coll).FindOne(ms.M.Ctx, ms.query, of)
|
|
|
+ if sr.Err() == nil {
|
|
|
+ sr.Decode(&v)
|
|
|
+ }
|
|
|
+}
|
|
|
func (ms *MgoSess) All(v *[]map[string]interface{}) {
|
|
|
cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Aggregate(ms.M.Ctx, ms.pipe)
|
|
|
if err == nil && cur.Err() == nil {
|
|
@@ -398,6 +479,11 @@ func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool
|
|
|
return m.NewUpdateBulk(m.DbName, c, true, false, doc...)
|
|
|
}
|
|
|
|
|
|
+//批量插入
|
|
|
+func (m *MongodbSim) UpSertMultiBulk(c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
|
|
|
+ return m.NewUpdateBulk(m.DbName, c, upsert, multi, doc...)
|
|
|
+}
|
|
|
+
|
|
|
//批量插入
|
|
|
func (m *MongodbSim) NewUpdateBulk(db, c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
|
|
|
defer catch()
|