mongodb.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. package mongodb
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.mongodb.org/mongo-driver/mongo"
  9. "go.mongodb.org/mongo-driver/mongo/options"
  10. "log"
  11. "math/big"
  12. "runtime"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. type MgoSess struct {
  18. db string
  19. coll string
  20. query interface{}
  21. sorts []string
  22. fields interface{}
  23. limit int64
  24. skip int64
  25. pipe []map[string]interface{}
  26. all interface{}
  27. M *MongodbSim
  28. }
  29. type MgoIter struct {
  30. Cursor *mongo.Cursor
  31. }
  32. func NewMgo(addr, db string, size int) *MongodbSim {
  33. mgo := &MongodbSim{
  34. MongodbAddr: addr,
  35. Size: size,
  36. DbName: db,
  37. }
  38. mgo.InitPool()
  39. return mgo
  40. }
  41. func (mt *MgoIter) Next(result interface{}) bool {
  42. if mt.Cursor != nil {
  43. if mt.Cursor.Next(nil) {
  44. err := mt.Cursor.Decode(result)
  45. if err != nil {
  46. log.Println("mgo cur err", err.Error())
  47. mt.Cursor.Close(nil)
  48. return false
  49. }
  50. return true
  51. } else {
  52. mt.Cursor.Close(nil)
  53. return false
  54. }
  55. } else {
  56. return false
  57. }
  58. }
  59. func (ms *MgoSess) DB(name string) *MgoSess {
  60. ms.db = name
  61. return ms
  62. }
  63. func (ms *MgoSess) C(name string) *MgoSess {
  64. ms.coll = name
  65. return ms
  66. }
  67. func (ms *MgoSess) Find(q interface{}) *MgoSess {
  68. if q == nil {
  69. q = map[string]interface{}{}
  70. }
  71. ms.query = q
  72. return ms
  73. }
  74. func (ms *MgoSess) Select(fields interface{}) *MgoSess {
  75. ms.fields = fields
  76. return ms
  77. }
  78. func (ms *MgoSess) Limit(limit int64) *MgoSess {
  79. ms.limit = limit
  80. return ms
  81. }
  82. func (ms *MgoSess) Skip(skip int64) *MgoSess {
  83. ms.skip = skip
  84. return ms
  85. }
  86. func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
  87. ms.sorts = sorts
  88. return ms
  89. }
  90. func (ms *MgoSess) Pipe(p []map[string]interface{}) *MgoSess {
  91. ms.pipe = p
  92. return ms
  93. }
  94. func (ms *MgoSess) All(v *[]map[string]interface{}) {
  95. cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Aggregate(ms.M.Ctx, ms.pipe)
  96. if err == nil && cur.Err() == nil {
  97. cur.All(ms.M.Ctx, v)
  98. }
  99. }
  100. func (ms *MgoSess) Iter() *MgoIter {
  101. it := &MgoIter{}
  102. find := options.Find()
  103. if ms.skip > 0 {
  104. find.SetSkip(ms.skip)
  105. }
  106. if ms.limit > 0 {
  107. find.SetLimit(ms.limit)
  108. }
  109. find.SetBatchSize(100)
  110. if len(ms.sorts) > 0 {
  111. sort := bson.M{}
  112. for _, k := range ms.sorts {
  113. switch k[:1] {
  114. case "-":
  115. sort[k[1:]] = -1
  116. case "+":
  117. sort[k[1:]] = 1
  118. default:
  119. sort[k] = 1
  120. }
  121. }
  122. find.SetSort(sort)
  123. }
  124. if ms.fields != nil {
  125. find.SetProjection(ms.fields)
  126. }
  127. cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Find(ms.M.Ctx, ms.query, find)
  128. if err != nil {
  129. log.Println("mgo find err", err.Error())
  130. } else {
  131. it.Cursor = cur
  132. }
  133. return it
  134. }
  135. func (ms *MgoSess) Count() (int64, error) {
  136. return ms.M.C.Database(ms.db).Collection(ms.coll).CountDocuments(ms.M.Ctx, ms.query)
  137. }
  138. type MongodbSim struct {
  139. MongodbAddr string
  140. Size int
  141. // MinSize int
  142. DbName string
  143. C *mongo.Client
  144. Ctx context.Context
  145. ShortCtx context.Context
  146. pool chan bool
  147. UserName string
  148. Password string
  149. ReplSet string
  150. }
  151. func (m *MongodbSim) GetMgoConn() *MgoSess {
  152. //m.Open()
  153. ms := &MgoSess{}
  154. ms.M = m
  155. return ms
  156. }
  157. func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
  158. //m.Close()
  159. ms.M = nil
  160. ms = nil
  161. }
  162. func (m *MongodbSim) Destroy() {
  163. //m.Close()
  164. m.C.Disconnect(nil)
  165. m.C = nil
  166. }
  167. func (m *MongodbSim) InitPool() {
  168. opts := options.Client()
  169. opts.SetConnectTimeout(3 * time.Second)
  170. opts.SetHosts(strings.Split(m.MongodbAddr, ","))
  171. //opts.ApplyURI("mongodb://" + m.MongodbAddr)
  172. opts.SetMaxPoolSize(uint64(m.Size))
  173. if m.UserName != "" && m.Password != "" {
  174. cre := options.Credential{
  175. Username: m.UserName,
  176. Password: m.Password,
  177. }
  178. opts.SetAuth(cre)
  179. }
  180. //ms := strings.Split(m.MongodbAddr, ",")
  181. //if m.ReplSet == "" && len(ms) > 1 {
  182. // m.ReplSet = "qfws"
  183. //}
  184. if m.ReplSet != "" {
  185. opts.SetReplicaSet(m.ReplSet)
  186. opts.SetDirect(false)
  187. }
  188. m.pool = make(chan bool, m.Size)
  189. opts.SetMaxConnIdleTime(2 * time.Hour)
  190. m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
  191. m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
  192. client, err := mongo.Connect(m.ShortCtx, opts)
  193. if err != nil {
  194. log.Println("mgo init error:", err.Error())
  195. } else {
  196. m.C = client
  197. }
  198. }
  199. func (m *MongodbSim) Open() {
  200. m.pool <- true
  201. }
  202. func (m *MongodbSim) Close() {
  203. <-m.pool
  204. }
  205. func (m *MongodbSim) Save(c string, doc interface{}) string {
  206. defer catch()
  207. m.Open()
  208. defer m.Close()
  209. coll := m.C.Database(m.DbName).Collection(c)
  210. obj := ObjToM(doc)
  211. id := primitive.NewObjectID()
  212. (*obj)["_id"] = id
  213. _, err := coll.InsertOne(m.Ctx, obj)
  214. if nil != err {
  215. log.Println("SaveError", err)
  216. return ""
  217. }
  218. return id.Hex()
  219. }
  220. //原_id不变
  221. func (m *MongodbSim) SaveByOriID(c string, doc interface{}) bool {
  222. defer catch()
  223. m.Open()
  224. defer m.Close()
  225. coll := m.C.Database(m.DbName).Collection(c)
  226. _, err := coll.InsertOne(m.Ctx, ObjToM(doc))
  227. if nil != err {
  228. log.Println("SaveByOriIDError", err)
  229. return false
  230. }
  231. return true
  232. }
  233. //批量插入
  234. func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
  235. defer catch()
  236. m.Open()
  237. defer m.Close()
  238. coll := m.C.Database(m.DbName).Collection(c)
  239. var writes []mongo.WriteModel
  240. for _, d := range doc {
  241. write := mongo.NewInsertOneModel()
  242. write.SetDocument(d)
  243. writes = append(writes, write)
  244. }
  245. br, e := coll.BulkWrite(m.Ctx, writes)
  246. if e != nil {
  247. b := strings.Index(e.Error(), "duplicate") > -1
  248. log.Println("mgo savebulk error:", e.Error())
  249. if br != nil {
  250. log.Println("mgo savebulk size:", br.InsertedCount)
  251. }
  252. return b
  253. }
  254. return true
  255. }
  256. //批量插入
  257. func (m *MongodbSim) SaveBulkInterface(c string, doc ...interface{}) bool {
  258. defer catch()
  259. m.Open()
  260. defer m.Close()
  261. coll := m.C.Database(m.DbName).Collection(c)
  262. var writes []mongo.WriteModel
  263. for _, d := range doc {
  264. write := mongo.NewInsertOneModel()
  265. write.SetDocument(d)
  266. writes = append(writes, write)
  267. }
  268. br, e := coll.BulkWrite(m.Ctx, writes)
  269. if e != nil {
  270. b := strings.Index(e.Error(), "duplicate") > -1
  271. log.Println("mgo SaveBulkInterface error:", e.Error())
  272. if br != nil {
  273. log.Println("mgo SaveBulkInterface size:", br.InsertedCount)
  274. }
  275. return b
  276. }
  277. return true
  278. }
  279. //按条件统计
  280. func (m *MongodbSim) Count(c string, q interface{}) int {
  281. r, _ := m.CountByErr(c, q)
  282. return r
  283. }
  284. //统计
  285. func (m *MongodbSim) CountByErr(c string, q interface{}) (int, error) {
  286. defer catch()
  287. m.Open()
  288. defer m.Close()
  289. res, err := m.C.Database(m.DbName).Collection(c).CountDocuments(m.Ctx, ObjToM(q))
  290. if err != nil {
  291. log.Println("统计错误", err.Error())
  292. return 0, err
  293. } else {
  294. return int(res), nil
  295. }
  296. }
  297. //按条件删除
  298. func (m *MongodbSim) Delete(c string, q interface{}) int64 {
  299. defer catch()
  300. m.Open()
  301. defer m.Close()
  302. res, err := m.C.Database(m.DbName).Collection(c).DeleteMany(m.Ctx, ObjToM(q))
  303. if err != nil && res == nil {
  304. log.Println("删除错误", err.Error())
  305. }
  306. return res.DeletedCount
  307. }
  308. //删除对象
  309. func (m *MongodbSim) Del(c string, q interface{}) bool {
  310. defer catch()
  311. m.Open()
  312. defer m.Close()
  313. _, err := m.C.Database(m.DbName).Collection(c).DeleteMany(m.Ctx, ObjToM(q))
  314. if err != nil {
  315. log.Println("删除错误", err.Error())
  316. return false
  317. }
  318. return true
  319. }
  320. //删除表
  321. func (m *MongodbSim) DelColl(c string) bool {
  322. defer catch()
  323. m.Open()
  324. defer m.Close()
  325. err := m.C.Database(m.DbName).Collection(c).Drop(m.Ctx)
  326. if err != nil {
  327. log.Println("删除错误", err.Error())
  328. return false
  329. }
  330. return true
  331. }
  332. //按条件更新
  333. func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool) bool {
  334. defer catch()
  335. m.Open()
  336. defer m.Close()
  337. ct := options.Update()
  338. if upsert {
  339. ct.SetUpsert(true)
  340. }
  341. coll := m.C.Database(m.DbName).Collection(c)
  342. var err error
  343. if multi {
  344. _, err = coll.UpdateMany(m.Ctx, ObjToM(q), ObjToM(u), ct)
  345. } else {
  346. _, err = coll.UpdateOne(m.Ctx, ObjToM(q), ObjToM(u), ct)
  347. }
  348. if err != nil {
  349. log.Println("删除错误", err.Error())
  350. return false
  351. }
  352. return true
  353. }
  354. func (m *MongodbSim) UpdateById(c string, id interface{}, set interface{}) bool {
  355. defer catch()
  356. m.Open()
  357. defer m.Close()
  358. q := make(map[string]interface{})
  359. if sid, ok := id.(string); ok {
  360. q["_id"], _ = primitive.ObjectIDFromHex(sid)
  361. } else {
  362. q["_id"] = id
  363. }
  364. _, err := m.C.Database(m.DbName).Collection(c).UpdateOne(m.Ctx, q, ObjToM(set))
  365. if nil != err {
  366. log.Println("UpdateByIdError", err)
  367. return false
  368. }
  369. return true
  370. }
  371. //批量更新
  372. func (m *MongodbSim) UpdateBulkAll(db, c string, doc ...[]map[string]interface{}) bool {
  373. return m.upSertBulk(db, c, false, doc...)
  374. }
  375. func (m *MongodbSim) UpdateBulk(c string, doc ...[]map[string]interface{}) bool {
  376. return m.UpdateBulkAll(m.DbName, c, doc...)
  377. }
  378. //批量插入
  379. func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool {
  380. return m.upSertBulk(m.DbName, c, true, doc...)
  381. }
  382. //批量插入
  383. func (m *MongodbSim) upSertBulk(db, c string, upsert bool, doc ...[]map[string]interface{}) bool {
  384. defer catch()
  385. m.Open()
  386. defer m.Close()
  387. coll := m.C.Database(db).Collection(c)
  388. var writes []mongo.WriteModel
  389. for _, d := range doc {
  390. write := mongo.NewUpdateOneModel()
  391. write.SetFilter(d[0])
  392. write.SetUpdate(d[1])
  393. write.SetUpsert(upsert)
  394. writes = append(writes, write)
  395. }
  396. br, e := coll.BulkWrite(m.Ctx, writes)
  397. if e != nil {
  398. log.Println("mgo upsert error:", e.Error())
  399. return br == nil || br.UpsertedCount == 0
  400. }
  401. // else {
  402. // if r.UpsertedCount != int64(len(doc)) {
  403. // log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
  404. // }
  405. // return true
  406. // }
  407. return true
  408. }
  409. //查询单条对象
  410. func (m *MongodbSim) FindOne(c string, query interface{}) (*map[string]interface{}, bool) {
  411. return m.FindOneByField(c, query, nil)
  412. }
  413. //查询单条对象
  414. func (m *MongodbSim) FindOneByField(c string, query interface{}, fields interface{}) (*map[string]interface{}, bool) {
  415. defer catch()
  416. res, ok := m.Find(c, query, nil, fields, true, -1, -1)
  417. if nil != res && len(*res) > 0 {
  418. return &((*res)[0]), ok
  419. }
  420. return nil, ok
  421. }
  422. //查询单条对象
  423. func (m *MongodbSim) FindById(c string, query string, fields interface{}) (*map[string]interface{}, bool) {
  424. defer catch()
  425. m.Open()
  426. defer m.Close()
  427. of := options.FindOne()
  428. of.SetProjection(ObjToOth(fields))
  429. b := false
  430. res := make(map[string]interface{})
  431. _id, err := primitive.ObjectIDFromHex(query)
  432. if err != nil {
  433. log.Println("_id error", err)
  434. return &res, b
  435. }
  436. sr := m.C.Database(m.DbName).Collection(c).FindOne(m.Ctx, map[string]interface{}{"_id": _id}, of)
  437. if sr.Err() == nil {
  438. b = true
  439. sr.Decode(&res)
  440. }
  441. return &res, b
  442. }
  443. //底层查询方法
  444. func (m *MongodbSim) Find(c string, query interface{}, order interface{}, fields interface{}, single bool, start int, limit int) (*[]map[string]interface{}, bool) {
  445. defer catch()
  446. m.Open()
  447. defer m.Close()
  448. res := make([]map[string]interface{}, 1)
  449. coll := m.C.Database(m.DbName).Collection(c)
  450. if single {
  451. of := options.FindOne()
  452. of.SetProjection(ObjToOth(fields))
  453. of.SetSort(ObjToM(order))
  454. if sr := coll.FindOne(m.Ctx, ObjToM(query), of); sr.Err() == nil {
  455. sr.Decode(&res[0])
  456. }
  457. } else {
  458. of := options.Find()
  459. of.SetProjection(ObjToOth(fields))
  460. of.SetSort(ObjToM(order))
  461. if start > -1 {
  462. of.SetSkip(int64(start))
  463. of.SetLimit(int64(limit))
  464. }
  465. cur, err := coll.Find(m.Ctx, ObjToM(query), of)
  466. if err == nil && cur.Err() == nil {
  467. cur.All(m.Ctx, &res)
  468. }
  469. }
  470. return &res, true
  471. }
  472. func ObjToOth(query interface{}) *bson.M {
  473. return ObjToMQ(query, false)
  474. }
  475. func ObjToM(query interface{}) *bson.M {
  476. return ObjToMQ(query, true)
  477. }
  478. //obj(string,M)转M,查询用到
  479. func ObjToMQ(query interface{}, isQuery bool) *bson.M {
  480. data := make(bson.M)
  481. defer catch()
  482. if s2, ok2 := query.(*map[string]interface{}); ok2 {
  483. data = bson.M(*s2)
  484. } else if s3, ok3 := query.(*bson.M); ok3 {
  485. return s3
  486. } else if s3, ok3 := query.(*primitive.M); ok3 {
  487. return s3
  488. } else if s, ok := query.(string); ok {
  489. json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
  490. if ss, oks := data["_id"]; oks && isQuery {
  491. switch ss.(type) {
  492. case string:
  493. data["_id"], _ = primitive.ObjectIDFromHex(ss.(string))
  494. case map[string]interface{}:
  495. tmp := ss.(map[string]interface{})
  496. for k, v := range tmp {
  497. tmp[k], _ = primitive.ObjectIDFromHex(v.(string))
  498. }
  499. data["_id"] = tmp
  500. }
  501. }
  502. } else if s1, ok1 := query.(map[string]interface{}); ok1 {
  503. data = s1
  504. } else if s4, ok4 := query.(bson.M); ok4 {
  505. data = s4
  506. } else if s4, ok4 := query.(primitive.M); ok4 {
  507. data = s4
  508. } else {
  509. data = nil
  510. }
  511. return &data
  512. }
  513. func intAllDef(num interface{}, defaultNum int) int {
  514. if i, ok := num.(int); ok {
  515. return int(i)
  516. } else if i0, ok0 := num.(int32); ok0 {
  517. return int(i0)
  518. } else if i1, ok1 := num.(float64); ok1 {
  519. return int(i1)
  520. } else if i2, ok2 := num.(int64); ok2 {
  521. return int(i2)
  522. } else if i3, ok3 := num.(float32); ok3 {
  523. return int(i3)
  524. } else if i4, ok4 := num.(string); ok4 {
  525. in, _ := strconv.Atoi(i4)
  526. return int(in)
  527. } else if i5, ok5 := num.(int16); ok5 {
  528. return int(i5)
  529. } else if i6, ok6 := num.(int8); ok6 {
  530. return int(i6)
  531. } else if i7, ok7 := num.(*big.Int); ok7 {
  532. in, _ := strconv.Atoi(fmt.Sprint(i7))
  533. return int(in)
  534. } else if i8, ok8 := num.(*big.Float); ok8 {
  535. in, _ := strconv.Atoi(fmt.Sprint(i8))
  536. return int(in)
  537. } else {
  538. return defaultNum
  539. }
  540. }
  541. //出错拦截
  542. func catch() {
  543. if r := recover(); r != nil {
  544. log.Println(r)
  545. for skip := 0; ; skip++ {
  546. _, file, line, ok := runtime.Caller(skip)
  547. if !ok {
  548. break
  549. }
  550. go log.Printf("%v,%v\n", file, line)
  551. }
  552. }
  553. }
  554. //根据bsonID转string
  555. func BsonIdToSId(uid interface{}) string {
  556. if uid == nil {
  557. return ""
  558. } else if u, ok := uid.(string); ok {
  559. return u
  560. } else if u, ok := uid.(primitive.ObjectID); ok {
  561. return u.Hex()
  562. } else {
  563. return ""
  564. }
  565. }
  566. func StringTOBsonId(id string) (bid primitive.ObjectID) {
  567. defer catch()
  568. if id != "" {
  569. bid, _ = primitive.ObjectIDFromHex(id)
  570. }
  571. return
  572. }