Browse Source

更新批量保存es

wcc 1 year ago
parent
commit
4cb37e2b9c
2 changed files with 44 additions and 2 deletions
  1. 6 2
      elastic/elasticSim.go
  2. 38 0
      mongodb/mongodb.go

+ 6 - 2
elastic/elasticSim.go

@@ -170,8 +170,12 @@ func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
 			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
 			//}
 			id := util.ObjToString(v["_id"])
-			delete(v, "_id")
-			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+			doc := make(map[string]interface{}, 0)
+			for k, va := range v {
+				doc[k] = va
+			}
+			delete(doc, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
 		}
 		_, err := req.Do(context.Background())
 		if err != nil {

+ 38 - 0
mongodb/mongodb.go

@@ -562,6 +562,44 @@ func (m *MongodbSim) Update(c string, q, u interface{}, upsert bool, multi bool)
 	}
 	return true
 }
+func (m *MongodbSim) InsertOrUpdate(db, collectionName string, data map[string]interface{}) error {
+	defer catch()
+	m.Open()
+	defer m.Close()
+	// 获取 _id 字段的值
+	idValue, ok := data["_id"]
+	var idObj primitive.ObjectID
+	if ok {
+		if sid, ok := idValue.(string); ok {
+			idValue, _ = primitive.ObjectIDFromHex(sid)
+		}
+	} else {
+		// 如果 _id 不存在,则生成一个新的 ObjectID
+		idObj = primitive.NewObjectID()
+		data["_id"] = idObj.Hex()
+	}
+
+	// 根据 _id 字段进行查找和更新,如果不存在则插入新数据
+	filter := bson.M{"_id": idObj}
+	update := bson.M{"$set": data}
+
+	// 尝试更新记录,如果记录不存在则插入新数据
+	updateResult, err := m.C.Database(db).Collection(collectionName).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true))
+	if err != nil {
+		return err
+	}
+
+	// 如果未修改任何文档,则表示需要插入新数据
+	if updateResult.ModifiedCount == 0 {
+		_, err := m.C.Database(db).Collection(collectionName).InsertOne(context.Background(), data)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+
+}
 func (m *MongodbSim) UpdateById(c string, id interface{}, set interface{}) bool {
 	defer catch()
 	m.Open()