Browse Source

修复Pipe

wangchuanjin 4 years ago
parent
commit
60cddcf23a
1 changed files with 39 additions and 27 deletions
  1. 39 27
      mongodb/mongodb.go

+ 39 - 27
mongodb/mongodb.go

@@ -242,35 +242,48 @@ func (ms *MgoSess) All(v *[]map[string]interface{}) {
 }
 }
 func (ms *MgoSess) Iter() *MgoIter {
 func (ms *MgoSess) Iter() *MgoIter {
 	it := &MgoIter{}
 	it := &MgoIter{}
-	find := options.Find()
-	if ms.skip > 0 {
-		find.SetSkip(ms.skip)
-	}
-	if ms.limit > 0 {
-		find.SetLimit(ms.limit)
-	}
-	find.SetBatchSize(100)
-	if len(ms.sorts) > 0 {
-		sort := bson.D{}
-		for _, k := range ms.sorts {
-			switch k[:1] {
-			case "-":
-				sort = append(sort, bson.E{k[1:], -1})
-			case "+":
-				sort = append(sort, bson.E{k[1:], 1})
-			default:
-				sort = append(sort, bson.E{k, 1})
+	coll := ms.M.C.Database(ms.db).Collection(ms.coll)
+	var cur *mongo.Cursor
+	var err error
+	if ms.query != nil {
+		find := options.Find()
+		if ms.skip > 0 {
+			find.SetSkip(ms.skip)
+		}
+		if ms.limit > 0 {
+			find.SetLimit(ms.limit)
+		}
+		find.SetBatchSize(100)
+		if len(ms.sorts) > 0 {
+			sort := bson.D{}
+			for _, k := range ms.sorts {
+				switch k[:1] {
+				case "-":
+					sort = append(sort, bson.E{k[1:], -1})
+				case "+":
+					sort = append(sort, bson.E{k[1:], 1})
+				default:
+					sort = append(sort, bson.E{k, 1})
+				}
 			}
 			}
+			find.SetSort(sort)
+		}
+		if ms.fields != nil {
+			find.SetProjection(ms.fields)
+		}
+		cur, err = coll.Find(ms.M.Ctx, ms.query, find)
+		if err != nil {
+			log.Println("mgo find err", err.Error())
+		}
+	} else if ms.pipe != nil {
+		aggregate := options.Aggregate()
+		aggregate.SetBatchSize(100)
+		cur, err = coll.Aggregate(ms.M.Ctx, ms.pipe, aggregate)
+		if err != nil {
+			log.Println("mgo aggregate err", err.Error())
 		}
 		}
-		find.SetSort(sort)
-	}
-	if ms.fields != nil {
-		find.SetProjection(ms.fields)
 	}
 	}
-	cur, err := ms.M.C.Database(ms.db).Collection(ms.coll).Find(ms.M.Ctx, ms.query, find)
-	if err != nil {
-		log.Println("mgo find err", err.Error())
-	} else {
+	if err == nil {
 		it.Cursor = cur
 		it.Cursor = cur
 		it.Ctx = ms.M.Ctx
 		it.Ctx = ms.M.Ctx
 	}
 	}
@@ -681,7 +694,6 @@ func ObjToMQ(query interface{}, isQuery bool) *bson.M {
 				}
 				}
 				data["_id"] = tmp
 				data["_id"] = tmp
 			}
 			}
-
 		}
 		}
 	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
 	} else if s1, ok1 := query.(map[string]interface{}); ok1 {
 		data = s1
 		data = s1