mongodb.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. package mongodb
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/big"
  8. "reflect"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "go.mongodb.org/mongo-driver/bson"
  14. "go.mongodb.org/mongo-driver/bson/primitive"
  15. "go.mongodb.org/mongo-driver/mongo"
  16. "go.mongodb.org/mongo-driver/mongo/options"
  17. )
  18. func NewMgo(addr, db string, size int) *MongodbSim {
  19. mgo := &MongodbSim{
  20. MongodbAddr: addr,
  21. Size: size,
  22. DbName: db,
  23. }
  24. mgo.InitPool()
  25. return mgo
  26. }
  27. func NewMgoQyfw(addr, db string, size int, replSet string) *MongodbSim {
  28. mgo := &MongodbSim{
  29. MongodbAddr: addr,
  30. Size: size,
  31. DbName: db,
  32. }
  33. mgo.ReplSet = replSet
  34. mgo.InitPool()
  35. return mgo
  36. }
  37. type Bluk struct {
  38. ms *MgoSess
  39. writes []mongo.WriteModel
  40. }
  41. func (b *Bluk) Insert(doc interface{}) {
  42. write := mongo.NewInsertOneModel()
  43. write.SetDocument(doc)
  44. b.writes = append(b.writes, write)
  45. }
  46. func (b *Bluk) Update(doc ...interface{}) {
  47. write := mongo.NewUpdateOneModel()
  48. write.SetFilter(doc[0])
  49. ue := ObjToM(doc[1])
  50. autoUpdateTime(b.ms.db, b.ms.coll, ue)
  51. write.SetUpdate(ue)
  52. write.SetUpsert(false)
  53. b.writes = append(b.writes, write)
  54. }
  55. func (b *Bluk) UpdateAll(doc ...interface{}) {
  56. write := mongo.NewUpdateManyModel()
  57. write.SetFilter(doc[0])
  58. ue := ObjToM(doc[1])
  59. autoUpdateTime(b.ms.db, b.ms.coll, ue)
  60. write.SetUpdate(ue)
  61. write.SetUpsert(false)
  62. b.writes = append(b.writes, write)
  63. }
  64. func (b *Bluk) Upsert(doc ...interface{}) {
  65. write := mongo.NewUpdateOneModel()
  66. write.SetFilter(doc[0])
  67. ue := ObjToM(doc[1])
  68. autoUpdateTime(b.ms.db, b.ms.coll, ue)
  69. write.SetUpdate(ue)
  70. write.SetUpsert(true)
  71. b.writes = append(b.writes, write)
  72. }
  73. func (b *Bluk) Remove(doc interface{}) {
  74. write := mongo.NewDeleteOneModel()
  75. write.SetFilter(doc)
  76. b.writes = append(b.writes, write)
  77. }
  78. func (b *Bluk) RemoveAll(doc interface{}) {
  79. write := mongo.NewDeleteManyModel()
  80. write.SetFilter(doc)
  81. b.writes = append(b.writes, write)
  82. }
  83. func (b *Bluk) Run() (*mongo.BulkWriteResult, error) {
  84. return b.ms.M.C.Database(b.ms.db).Collection(b.ms.coll).BulkWrite(b.ms.M.Ctx, b.writes)
  85. }
  86. //
  87. type MgoIter struct {
  88. Cursor *mongo.Cursor
  89. Ctx context.Context
  90. }
  91. func (mt *MgoIter) Next(result interface{}) bool {
  92. if mt.Cursor != nil {
  93. if mt.Cursor.Next(mt.Ctx) {
  94. rType := reflect.TypeOf(result)
  95. rVal := reflect.ValueOf(result)
  96. if rType.Kind() == reflect.Ptr {
  97. rType = rType.Elem()
  98. rVal = rVal.Elem()
  99. }
  100. var err error
  101. if rType.Kind() == reflect.Map {
  102. r := make(map[string]interface{})
  103. err = mt.Cursor.Decode(&r)
  104. if rVal.CanSet() {
  105. rVal.Set(reflect.ValueOf(r))
  106. } else {
  107. for it := rVal.MapRange(); it.Next(); {
  108. rVal.SetMapIndex(it.Key(), reflect.Value{})
  109. }
  110. for it := reflect.ValueOf(r).MapRange(); it.Next(); {
  111. rVal.SetMapIndex(it.Key(), it.Value())
  112. }
  113. }
  114. } else {
  115. err = mt.Cursor.Decode(&result)
  116. }
  117. if err != nil {
  118. log.Println("mgo cur err", err.Error())
  119. mt.Cursor.Close(mt.Ctx)
  120. return false
  121. }
  122. return true
  123. } else {
  124. mt.Cursor.Close(mt.Ctx)
  125. return false
  126. }
  127. } else {
  128. return false
  129. }
  130. }
  131. //
  132. type MgoSess struct {
  133. db string
  134. coll string
  135. query interface{}
  136. sorts []string
  137. fields interface{}
  138. limit int64
  139. skip int64
  140. pipe []map[string]interface{}
  141. all interface{}
  142. M *MongodbSim
  143. }
  144. func (ms *MgoSess) DB(name string) *MgoSess {
  145. ms.db = name
  146. return ms
  147. }
  148. func (ms *MgoSess) C(name string) *MgoSess {
  149. ms.coll = name
  150. return ms
  151. }
  152. func (ms *MgoSess) Bulk() *Bluk {
  153. return &Bluk{ms: ms}
  154. }
  155. func (ms *MgoSess) Find(q interface{}) *MgoSess {
  156. if q == nil {
  157. q = map[string]interface{}{}
  158. }
  159. ms.query = q
  160. return ms
  161. }
  162. func (ms *MgoSess) FindId(_id interface{}) *MgoSess {
  163. ms.query = map[string]interface{}{"_id": _id}
  164. return ms
  165. }
  166. func (ms *MgoSess) Select(fields interface{}) *MgoSess {
  167. ms.fields = fields
  168. return ms
  169. }
  170. func (ms *MgoSess) Limit(limit int64) *MgoSess {
  171. ms.limit = limit
  172. return ms
  173. }
  174. func (ms *MgoSess) Skip(skip int64) *MgoSess {
  175. ms.skip = skip
  176. return ms
  177. }
  178. func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
  179. ms.sorts = sorts
  180. return ms
  181. }
  182. func (ms *MgoSess) Pipe(p []map[string]interface{}) *MgoSess {
  183. ms.pipe = p
  184. return ms
  185. }
  186. func (ms *MgoSess) Insert(doc interface{}) error {
  187. _, err := ms.M.C.Database(ms.db).Collection(ms.coll).InsertOne(ms.M.Ctx, doc)
  188. return err
  189. }
  190. func (ms *MgoSess) Remove(filter interface{}) error {
  191. _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, filter)
  192. return err
  193. }
  194. func (ms *MgoSess) RemoveId(_id interface{}) error {
  195. _, err := ms.M.C.Database(ms.db).Collection(ms.coll).DeleteOne(ms.M.Ctx, map[string]interface{}{"_id": _id})
  196. return err
  197. }
  198. func (ms *MgoSess) RemoveAll(filter interface{}) (*mongo.DeleteResult, error) {
  199. return ms.M.C.Database(ms.db).Collection(ms.coll).DeleteMany(ms.M.Ctx, filter)
  200. }
  201. func (ms *MgoSess) Upsert(filter, update interface{}) (*mongo.UpdateResult, error) {
  202. ct := options.Update()
  203. ct.SetUpsert(true)
  204. ue := ObjToM(update)
  205. autoUpdateTime(ms.db, ms.coll, ue)
  206. return ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, filter, ue, ct)
  207. }
  208. func (ms *MgoSess) UpsertId(filter, update interface{}) (*mongo.UpdateResult, error) {
  209. ct := options.Update()
  210. ct.SetUpsert(true)
  211. ue := ObjToM(update)
  212. autoUpdateTime(ms.db, ms.coll, ue)
  213. return ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, ue, ct)
  214. }
  215. func (ms *MgoSess) UpdateId(filter, update interface{}) error {
  216. ue := ObjToM(update)
  217. autoUpdateTime(ms.db, ms.coll, ue)
  218. _, err := ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, map[string]interface{}{"_id": filter}, ue)
  219. return err
  220. }
  221. func (ms *MgoSess) Update(filter, update interface{}) error {
  222. ue := ObjToM(update)
  223. autoUpdateTime(ms.db, ms.coll, ue)
  224. _, err := ms.M.C.Database(ms.db).Collection(ms.coll).UpdateOne(ms.M.Ctx, filter, ue)
  225. return err
  226. }
  227. func (ms *MgoSess) Count() (int64, error) {
  228. return ms.M.C.Database(ms.db).Collection(ms.coll).CountDocuments(ms.M.Ctx, ms.query)
  229. }
  230. func (ms *MgoSess) One(v *map[string]interface{}) {
  231. of := options.FindOne()
  232. of.SetProjection(ms.fields)
  233. sr := ms.M.C.Database(ms.db).Collection(ms.coll).FindOne(ms.M.Ctx, ms.query, of)
  234. if sr.Err() == nil {
  235. sr.Decode(&v)
  236. }
  237. }
  238. func (ms *MgoSess) All(v *[]map[string]interface{}) {
  239. cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Aggregate(ms.M.Ctx, ms.pipe)
  240. if err == nil && cur.Err() == nil {
  241. cur.All(ms.M.Ctx, v)
  242. }
  243. }
  244. func (ms *MgoSess) Iter() *MgoIter {
  245. it := &MgoIter{}
  246. find := options.Find()
  247. if ms.skip > 0 {
  248. find.SetSkip(ms.skip)
  249. }
  250. if ms.limit > 0 {
  251. find.SetLimit(ms.limit)
  252. }
  253. find.SetBatchSize(100)
  254. if len(ms.sorts) > 0 {
  255. sort := bson.M{}
  256. for _, k := range ms.sorts {
  257. switch k[:1] {
  258. case "-":
  259. sort[k[1:]] = -1
  260. case "+":
  261. sort[k[1:]] = 1
  262. default:
  263. sort[k] = 1
  264. }
  265. }
  266. find.SetSort(sort)
  267. }
  268. if ms.fields != nil {
  269. find.SetProjection(ms.fields)
  270. }
  271. cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Find(ms.M.Ctx, ms.query, find)
  272. if err != nil {
  273. log.Println("mgo find err", err.Error())
  274. } else {
  275. it.Cursor = cur
  276. it.Ctx = ms.M.Ctx
  277. }
  278. return it
  279. }
  280. type MongodbSim struct {
  281. MongodbAddr string
  282. Size int
  283. // MinSize int
  284. DbName string
  285. C *mongo.Client
  286. Ctx context.Context
  287. ShortCtx context.Context
  288. pool chan bool
  289. UserName string
  290. Password string
  291. ReplSet string
  292. }
  293. func (m *MongodbSim) GetMgoConn() *MgoSess {
  294. //m.Open()
  295. ms := &MgoSess{}
  296. ms.M = m
  297. return ms
  298. }
  299. func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
  300. //m.Close()
  301. ms.M = nil
  302. ms = nil
  303. }
  304. func (m *MongodbSim) Destroy() {
  305. //m.Close()
  306. m.C.Disconnect(nil)
  307. m.C = nil
  308. }
  309. func (m *MongodbSim) InitPool() {
  310. opts := options.Client()
  311. registry := bson.NewRegistryBuilder().RegisterTypeMapEntry(bson.TypeArray, reflect.TypeOf([]interface{}{})).Build()
  312. opts.SetRegistry(registry)
  313. opts.SetConnectTimeout(3 * time.Second)
  314. opts.SetHosts(strings.Split(m.MongodbAddr, ","))
  315. //opts.ApplyURI("mongodb://" + m.MongodbAddr)
  316. opts.SetMaxPoolSize(uint64(m.Size))
  317. if m.UserName != "" && m.Password != "" {
  318. cre := options.Credential{
  319. Username: m.UserName,
  320. Password: m.Password,
  321. }
  322. opts.SetAuth(cre)
  323. }
  324. /*ms := strings.Split(m.MongodbAddr, ",")
  325. if m.ReplSet == "" && len(ms) > 1 {
  326. m.ReplSet = "qfws"
  327. }*/
  328. if m.ReplSet != "" {
  329. opts.SetReplicaSet(m.ReplSet)
  330. opts.SetDirect(false)
  331. }
  332. m.pool = make(chan bool, m.Size)
  333. opts.SetMaxConnIdleTime(2 * time.Hour)
  334. m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
  335. m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
  336. client, err := mongo.Connect(m.ShortCtx, opts)
  337. if err != nil {
  338. log.Println("mgo init error:", err.Error())
  339. } else {
  340. m.C = client
  341. }
  342. }
  343. func (m *MongodbSim) Open() {
  344. m.pool <- true
  345. }
  346. func (m *MongodbSim) Close() {
  347. <-m.pool
  348. }
  349. func (m *MongodbSim) Save(c string, doc interface{}) string {
  350. defer catch()
  351. m.Open()
  352. defer m.Close()
  353. coll := m.C.Database(m.DbName).Collection(c)
  354. obj := ObjToM(doc)
  355. id := primitive.NewObjectID()
  356. (*obj)["_id"] = id
  357. _, err := coll.InsertOne(m.Ctx, obj)
  358. if nil != err {
  359. log.Println("SaveError", err)
  360. return ""
  361. }
  362. return id.Hex()
  363. }
  364. //原_id不变
  365. func (m *MongodbSim) SaveByOriID(c string, doc interface{}) bool {
  366. defer catch()
  367. m.Open()
  368. defer m.Close()
  369. coll := m.C.Database(m.DbName).Collection(c)
  370. _, err := coll.InsertOne(m.Ctx, ObjToM(doc))
  371. if nil != err {
  372. log.Println("SaveByOriIDError", err)
  373. return false
  374. }
  375. return true
  376. }
  377. //批量插入
  378. func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
  379. defer catch()
  380. m.Open()
  381. defer m.Close()
  382. coll := m.C.Database(m.DbName).Collection(c)
  383. var writes []mongo.WriteModel
  384. for _, d := range doc {
  385. write := mongo.NewInsertOneModel()
  386. write.SetDocument(d)
  387. writes = append(writes, write)
  388. }
  389. br, e := coll.BulkWrite(m.Ctx, writes)
  390. if e != nil {
  391. b := strings.Index(e.Error(), "duplicate") > -1
  392. log.Println("mgo savebulk error:", e.Error())
  393. if br != nil {
  394. log.Println("mgo savebulk size:", br.InsertedCount)
  395. }
  396. return b
  397. }
  398. return true
  399. }
  400. //批量插入
  401. func (m *MongodbSim) SaveBulkInterface(c string, doc ...interface{}) bool {
  402. defer catch()
  403. m.Open()
  404. defer m.Close()
  405. coll := m.C.Database(m.DbName).Collection(c)
  406. var writes []mongo.WriteModel
  407. for _, d := range doc {
  408. write := mongo.NewInsertOneModel()
  409. write.SetDocument(d)
  410. writes = append(writes, write)
  411. }
  412. br, e := coll.BulkWrite(m.Ctx, writes)
  413. if e != nil {
  414. b := strings.Index(e.Error(), "duplicate") > -1
  415. log.Println("mgo SaveBulkInterface error:", e.Error())
  416. if br != nil {
  417. log.Println("mgo SaveBulkInterface size:", br.InsertedCount)
  418. }
  419. return b
  420. }
  421. return true
  422. }
  423. //按条件统计
  424. func (m *MongodbSim) Count(c string, q interface{}) int {
  425. r, _ := m.CountByErr(c, q)
  426. return r
  427. }
  428. //统计
  429. func (m *MongodbSim) CountByErr(c string, q interface{}) (int, error) {
  430. defer catch()
  431. m.Open()
  432. defer m.Close()
  433. res, err := m.C.Database(m.DbName).Collection(c).CountDocuments(m.Ctx, ObjToM(q))
  434. if err != nil {
  435. log.Println("统计错误", err.Error())
  436. return 0, err
  437. } else {
  438. return int(res), nil
  439. }
  440. }
  441. //按条件删除
  442. func (m *MongodbSim) Delete(c string, q interface{}) int64 {
  443. defer catch()
  444. m.Open()
  445. defer m.Close()
  446. res, err := m.C.Database(m.DbName).Collection(c).DeleteMany(m.Ctx, ObjToM(q))
  447. if err != nil && res == nil {
  448. log.Println("删除错误", err.Error())
  449. }
  450. return res.DeletedCount
  451. }
  452. //删除对象
  453. func (m *MongodbSim) Del(c string, q interface{}) bool {
  454. defer catch()
  455. m.Open()
  456. defer m.Close()
  457. _, err := m.C.Database(m.DbName).Collection(c).DeleteMany(m.Ctx, ObjToM(q))
  458. if err != nil {
  459. log.Println("删除错误", err.Error())
  460. return false
  461. }
  462. return true
  463. }
  464. //按条件更新
  465. func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool) bool {
  466. defer catch()
  467. m.Open()
  468. defer m.Close()
  469. ct := options.Update()
  470. if upsert {
  471. ct.SetUpsert(true)
  472. }
  473. coll := m.C.Database(m.DbName).Collection(c)
  474. ue := ObjToM(u)
  475. autoUpdateTime(m.DbName, c, ue)
  476. var err error
  477. if multi {
  478. _, err = coll.UpdateMany(m.Ctx, ObjToM(q), ue, ct)
  479. } else {
  480. _, err = coll.UpdateOne(m.Ctx, ObjToM(q), ue, ct)
  481. }
  482. if err != nil {
  483. log.Println("更新错误", err.Error())
  484. return false
  485. }
  486. return true
  487. }
  488. func (m *MongodbSim) UpdateById(c string, id interface{}, set interface{}) bool {
  489. defer catch()
  490. m.Open()
  491. defer m.Close()
  492. q := make(map[string]interface{})
  493. if sid, ok := id.(string); ok {
  494. q["_id"], _ = primitive.ObjectIDFromHex(sid)
  495. } else {
  496. q["_id"] = id
  497. }
  498. ue := ObjToM(set)
  499. autoUpdateTime(m.DbName, c, ue)
  500. _, err := m.C.Database(m.DbName).Collection(c).UpdateOne(m.Ctx, q, ue)
  501. if nil != err {
  502. log.Println("UpdateByIdError", err)
  503. return false
  504. }
  505. return true
  506. }
  507. //批量更新
  508. func (m *MongodbSim) UpdateBulkAll(db, c string, doc ...[]map[string]interface{}) bool {
  509. return m.NewUpdateBulk(db, c, false, false, doc...)
  510. }
  511. func (m *MongodbSim) UpdateBulk(c string, doc ...[]map[string]interface{}) bool {
  512. return m.UpdateBulkAll(m.DbName, c, doc...)
  513. }
  514. //批量插入
  515. func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) bool {
  516. return m.NewUpdateBulk(m.DbName, c, true, false, doc...)
  517. }
  518. //批量插入
  519. func (m *MongodbSim) UpSertMultiBulk(c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
  520. return m.NewUpdateBulk(m.DbName, c, upsert, multi, doc...)
  521. }
  522. //批量插入
  523. func (m *MongodbSim) NewUpdateBulk(db, c string, upsert, multi bool, doc ...[]map[string]interface{}) bool {
  524. defer catch()
  525. m.Open()
  526. defer m.Close()
  527. coll := m.C.Database(db).Collection(c)
  528. var writes []mongo.WriteModel
  529. for _, d := range doc {
  530. if multi {
  531. write := mongo.NewUpdateManyModel()
  532. write.SetFilter(d[0])
  533. ue := ObjToM(d[1])
  534. autoUpdateTime(m.DbName, c, ue)
  535. write.SetUpdate(ue)
  536. write.SetUpsert(upsert)
  537. writes = append(writes, write)
  538. } else {
  539. write := mongo.NewUpdateOneModel()
  540. write.SetFilter(d[0])
  541. ue := ObjToM(d[1])
  542. autoUpdateTime(m.DbName, c, ue)
  543. write.SetUpdate(ue)
  544. write.SetUpsert(upsert)
  545. writes = append(writes, write)
  546. }
  547. }
  548. br, e := coll.BulkWrite(m.Ctx, writes)
  549. if e != nil {
  550. log.Println("mgo upsert error:", e.Error())
  551. return br == nil || br.UpsertedCount == 0
  552. }
  553. // else {
  554. // if r.UpsertedCount != int64(len(doc)) {
  555. // log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
  556. // }
  557. // return true
  558. // }
  559. return true
  560. }
  561. //查询单条对象
  562. func (m *MongodbSim) FindOne(c string, query interface{}) (*map[string]interface{}, bool) {
  563. return m.FindOneByField(c, query, nil)
  564. }
  565. //查询单条对象
  566. func (m *MongodbSim) FindOneByField(c string, query interface{}, fields interface{}) (*map[string]interface{}, bool) {
  567. defer catch()
  568. res, ok := m.Find(c, query, nil, fields, true, -1, -1)
  569. if nil != res && len(*res) > 0 {
  570. return &((*res)[0]), ok
  571. }
  572. return nil, ok
  573. }
  574. //查询单条对象
  575. func (m *MongodbSim) FindById(c string, query string, fields interface{}) (*map[string]interface{}, bool) {
  576. defer catch()
  577. m.Open()
  578. defer m.Close()
  579. of := options.FindOne()
  580. of.SetProjection(ObjToOth(fields))
  581. res := make(map[string]interface{})
  582. _id, err := primitive.ObjectIDFromHex(query)
  583. if err != nil {
  584. log.Println("_id error", err)
  585. return &res, true
  586. }
  587. sr := m.C.Database(m.DbName).Collection(c).FindOne(m.Ctx, map[string]interface{}{"_id": _id}, of)
  588. if sr.Err() == nil {
  589. sr.Decode(&res)
  590. }
  591. return &res, true
  592. }
  593. //底层查询方法
  594. func (m *MongodbSim) Find(c string, query interface{}, order interface{}, fields interface{}, single bool, start int, limit int) (*[]map[string]interface{}, bool) {
  595. defer catch()
  596. m.Open()
  597. defer m.Close()
  598. var res []map[string]interface{}
  599. coll := m.C.Database(m.DbName).Collection(c)
  600. if single {
  601. res = make([]map[string]interface{}, 1)
  602. of := options.FindOne()
  603. of.SetProjection(ObjToOth(fields))
  604. of.SetSort(ObjToM(order))
  605. if sr := coll.FindOne(m.Ctx, ObjToM(query), of); sr.Err() == nil {
  606. sr.Decode(&res[0])
  607. }
  608. } else {
  609. res = []map[string]interface{}{}
  610. of := options.Find()
  611. of.SetProjection(ObjToOth(fields))
  612. of.SetSort(ObjToM(order))
  613. if start > -1 {
  614. of.SetSkip(int64(start))
  615. of.SetLimit(int64(limit))
  616. }
  617. cur, err := coll.Find(m.Ctx, ObjToM(query), of)
  618. if err == nil && cur.Err() == nil {
  619. cur.All(m.Ctx, &res)
  620. }
  621. }
  622. return &res, true
  623. }
  624. func ObjToOth(query interface{}) *bson.M {
  625. return ObjToMQ(query, false)
  626. }
  627. func ObjToM(query interface{}) *bson.M {
  628. return ObjToMQ(query, true)
  629. }
  630. //obj(string,M)转M,查询用到
  631. func ObjToMQ(query interface{}, isQuery bool) *bson.M {
  632. data := make(bson.M)
  633. defer catch()
  634. if s2, ok2 := query.(*map[string]interface{}); ok2 {
  635. data = bson.M(*s2)
  636. } else if s3, ok3 := query.(*bson.M); ok3 {
  637. return s3
  638. } else if s3, ok3 := query.(*primitive.M); ok3 {
  639. return s3
  640. } else if s, ok := query.(string); ok {
  641. json.Unmarshal([]byte(strings.Replace(s, "'", "\"", -1)), &data)
  642. if ss, oks := data["_id"]; oks && isQuery {
  643. switch ss.(type) {
  644. case string:
  645. data["_id"], _ = primitive.ObjectIDFromHex(ss.(string))
  646. case map[string]interface{}:
  647. tmp := ss.(map[string]interface{})
  648. for k, v := range tmp {
  649. tmp[k], _ = primitive.ObjectIDFromHex(v.(string))
  650. }
  651. data["_id"] = tmp
  652. }
  653. }
  654. } else if s1, ok1 := query.(map[string]interface{}); ok1 {
  655. data = s1
  656. } else if s4, ok4 := query.(bson.M); ok4 {
  657. data = s4
  658. } else if s4, ok4 := query.(primitive.M); ok4 {
  659. data = s4
  660. } else {
  661. data = nil
  662. }
  663. return &data
  664. }
  665. func intAllDef(num interface{}, defaultNum int) int {
  666. if i, ok := num.(int); ok {
  667. return int(i)
  668. } else if i0, ok0 := num.(int32); ok0 {
  669. return int(i0)
  670. } else if i1, ok1 := num.(float64); ok1 {
  671. return int(i1)
  672. } else if i2, ok2 := num.(int64); ok2 {
  673. return int(i2)
  674. } else if i3, ok3 := num.(float32); ok3 {
  675. return int(i3)
  676. } else if i4, ok4 := num.(string); ok4 {
  677. in, _ := strconv.Atoi(i4)
  678. return int(in)
  679. } else if i5, ok5 := num.(int16); ok5 {
  680. return int(i5)
  681. } else if i6, ok6 := num.(int8); ok6 {
  682. return int(i6)
  683. } else if i7, ok7 := num.(*big.Int); ok7 {
  684. in, _ := strconv.Atoi(fmt.Sprint(i7))
  685. return int(in)
  686. } else if i8, ok8 := num.(*big.Float); ok8 {
  687. in, _ := strconv.Atoi(fmt.Sprint(i8))
  688. return int(in)
  689. } else {
  690. return defaultNum
  691. }
  692. }
  693. //出错拦截
  694. func catch() {
  695. if r := recover(); r != nil {
  696. log.Println(r)
  697. for skip := 0; ; skip++ {
  698. _, file, line, ok := runtime.Caller(skip)
  699. if !ok {
  700. break
  701. }
  702. go log.Printf("%v,%v\n", file, line)
  703. }
  704. }
  705. }
  706. //根据bsonID转string
  707. func BsonIdToSId(uid interface{}) string {
  708. if uid == nil {
  709. return ""
  710. } else if u, ok := uid.(string); ok {
  711. return u
  712. } else if u, ok := uid.(primitive.ObjectID); ok {
  713. return u.Hex()
  714. } else {
  715. return ""
  716. }
  717. }
  718. func StringTOBsonId(id string) (bid primitive.ObjectID) {
  719. defer catch()
  720. if id != "" {
  721. bid, _ = primitive.ObjectIDFromHex(id)
  722. }
  723. return
  724. }
  725. func ToObjectIds(ids []string) []primitive.ObjectID {
  726. _ids := []primitive.ObjectID{}
  727. for _, v := range ids {
  728. _id, _ := primitive.ObjectIDFromHex(v)
  729. _ids = append(_ids, _id)
  730. }
  731. return _ids
  732. }
  733. //自动添加更新时间
  734. func autoUpdateTime(db, coll string, ue *bson.M) {
  735. if db == "qfw" && coll == "user" {
  736. set := ObjToM((*ue)["$set"])
  737. if *set == nil {
  738. set = &bson.M{}
  739. }
  740. (*set)["auto_updatetime"] = time.Now().Unix()
  741. (*ue)["$set"] = set
  742. }
  743. }