package main import ( "context" "log" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type MgoSess struct { Db string Coll string Query interface{} Sorts []string Hints interface{} fields interface{} limit int64 skip int64 M *MongodbSim } type MgoIter struct { Cursor *mongo.Cursor } func (mt *MgoIter) Next(result interface{}) bool { if mt.Cursor != nil { if mt.Cursor.Next(nil) { err := mt.Cursor.Decode(result) if err != nil { log.Println("mgo cur err", err.Error()) mt.Cursor.Close(nil) return false } return true } else { mt.Cursor.Close(nil) return false } } else { return false } } func (ms *MgoSess) DB(name string) *MgoSess { ms.Db = name return ms } func (ms *MgoSess) C(name string) *MgoSess { ms.Coll = name return ms } func (ms *MgoSess) Find(q interface{}) *MgoSess { ms.Query = q return ms } func (ms *MgoSess) Select(fields interface{}) *MgoSess { ms.fields = fields return ms } func (ms *MgoSess) Limit(limit int64) *MgoSess { ms.limit = limit return ms } func (ms *MgoSess) Skip(skip int64) *MgoSess { ms.skip = skip return ms } func (ms *MgoSess) Sort(sorts ...string) *MgoSess { ms.Sorts = sorts return ms } func (ms *MgoSess) Hint(hint interface{}) *MgoSess { ms.Hints = hint return ms } func (ms *MgoSess) Iter() *MgoIter { it := &MgoIter{} find := options.Find() if ms.skip > 0 { find.SetSkip(ms.skip) } if ms.limit > 0 { find.SetLimit(ms.limit) } find.SetBatchSize(300) if len(ms.Sorts) > 0 { sort := bson.M{} for _, k := range ms.Sorts { switch k[:1] { case "-": sort[k[1:]] = -1 case "+": sort[k[1:]] = 1 default: sort[k] = 1 } } find.SetSort(sort) } if ms.Hints != nil { find.SetHint(ms.Hints) } if ms.fields != nil { find.SetProjection(ms.fields) } cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find) if err != nil { log.Println("mgo find err", err.Error()) } else { it.Cursor = cur } return it } type MongodbSim struct { MongodbAddr string Size int // MinSize int DbName string C *mongo.Client Ctx context.Context ShortCtx context.Context pool chan bool } func (m *MongodbSim) GetMgoConn() *MgoSess { //m.Open() ms := &MgoSess{} ms.M = m return ms } func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) { //m.Close() ms.M = nil ms = nil } func (m *MongodbSim) InitPool() { opts := options.Client() opts.SetConnectTimeout(3 * time.Second) opts.ApplyURI("mongodb://" + m.MongodbAddr) opts.SetMaxPoolSize(uint64(m.Size)) m.pool = make(chan bool, m.Size) opts.SetMaxConnIdleTime(2 * time.Hour) m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour) m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute) client, err := mongo.Connect(m.ShortCtx, opts) if err != nil { log.Println("mgo init error:", err.Error()) } else { m.C = client log.Println("init success") } } func (m *MongodbSim) Open() { m.pool <- true } func (m *MongodbSim) Close() { <-m.pool } //批量插入 func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) var writes []mongo.WriteModel for _, d := range doc { write := mongo.NewUpdateOneModel() write.SetFilter(d[0]) write.SetUpdate(d[1]) write.SetUpsert(true) writes = append(writes, write) } _, e := coll.BulkWrite(m.Ctx, writes) if e != nil { log.Println("mgo upsert error:", e.Error()) return false } return true } //批量插入 func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) var writes []mongo.WriteModel for _, d := range doc { write := mongo.NewInsertOneModel() write.SetDocument(d) writes = append(writes, write) } _, e := coll.BulkWrite(m.Ctx, writes) if e != nil { log.Println("mgo savebulk error:", e.Error()) return false } return true } //保存 func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) r, err := coll.InsertOne(m.Ctx, doc) if err != nil { return nil } return r.InsertedID } //更新by Id func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) _, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc) if err != nil { return false } return true } //删除 func (m *MongodbSim) Delete(c, id string) int64 { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}) if err != nil { return 0 } return r.DeletedCount } func (m *MongodbSim) FindById(c, id string) map[string]interface{} { m.Open() defer m.Close() coll := m.C.Database(m.DbName).Collection(c) r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}) result := map[string]interface{}{} _ = r.Decode(&result) return result }