123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- package main
- import (
- "context"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/mongo/readpref"
- "log"
- "time"
- )
- func main() {
- qqqq()
- //getHot()
- //QlmChannel()
- }
- // QlmChannel 千里马 channel
- func QlmChannel() {
- type Item struct {
- ID primitive.ObjectID `bson:"_id,omitempty"`
- Site string `bson:"site"`
- Channel string `bson:"channel"`
- Publishtime string `bson:"publishtime"`
- // Add other fields as needed
- }
- type GroupedData struct {
- Channel string `bson:"channel"`
- Count int `bson:"count"`
- }
- // 设置MongoDB连接选项
- //clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27081")
- clientOptions := options.Client().ApplyURI("mongodb://172.17.4.87:27080")
- //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
- clientOptions.SetReadPreference(readpref.Primary())
- clientOptions.SetDirect(true)
- // 连接MongoDB
- client, err := mongo.Connect(context.Background(), clientOptions)
- if err != nil {
- log.Println(err)
- }
- // 检查连接
- err = client.Ping(context.Background(), nil)
- if err != nil {
- log.Fatal(err)
- }
- // 选择数据库和集合
- database := client.Database("qlm")
- collection := database.Collection("data_merge")
- // 查询条件
- filter := bson.D{
- {"site", "千里马"},
- {"publishtime", bson.D{{"$gt", "2022-01-01"}}},
- }
- // 执行查询
- cursor, err := collection.Find(context.Background(), filter)
- if err != nil {
- log.Fatal(err)
- }
- defer cursor.Close(context.Background())
- // 手动分组查看数据
- count := 0
- groupedData := make(map[string]int)
- for cursor.Next(context.Background()) {
- var item Item
- err := cursor.Decode(&item)
- if err != nil {
- log.Fatal(err)
- }
- // 在这里可以根据需要进行其他逻辑处理
- // 手动分组
- groupedData[item.Channel]++
- count++
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- }
- // 输出结果
- //for channel, count := range groupedData {
- // fmt.Printf("Channel: %s, Count: %d\n", channel, count)
- //}
- // 将手动分组的数据保存到新的集合中
- var groupedDataList []interface{}
- for channel, count := range groupedData {
- groupedDataList = append(groupedDataList, GroupedData{Channel: channel, Count: count})
- }
- // 选择新的集合
- newCollection := database.Collection("wcc_qlm")
- // 插入数据
- _, err = newCollection.InsertMany(context.Background(), groupedDataList)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println("Data inserted into wcc_qlm collection.")
- }
- // MongoConfig 保存连接配置
- type MongoConfig struct {
- URI string
- Database string
- Username string
- Password string
- Direct bool
- }
- func connectMongo(cfg MongoConfig) (*mongo.Client, error) {
- //clientOpts := options.Client().ApplyURI(cfg.URI)
- clientOpts := options.Client().
- ApplyURI(cfg.URI).
- SetDirect(cfg.Direct) // 设置为直连模式,适合单节点连接
- if cfg.Username != "" && cfg.Password != "" {
- cred := options.Credential{
- Username: cfg.Username,
- Password: cfg.Password,
- }
- clientOpts.SetAuth(cred)
- }
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- client, err := mongo.Connect(ctx, clientOpts)
- if err != nil {
- return nil, err
- }
- return client, nil
- }
- // qqqq 迁移MongoDB
- func qqqq() {
- srcCfg := MongoConfig{
- URI: "mongodb://127.0.0.1:27001",
- Database: "mixdata",
- Username: "",
- Password: "",
- Direct: true,
- }
- dstCfg := MongoConfig{
- URI: "mongodb://172.20.45.129:27002",
- Database: "mixdata",
- Username: "",
- Password: "",
- //Direct: true,
- }
- // 连接源和目标Mongo
- srcClient, err := connectMongo(srcCfg)
- if err != nil {
- log.Fatalf("连接源Mongo失败: %v", err)
- }
- defer srcClient.Disconnect(context.Background())
- dstClient, err := connectMongo(dstCfg)
- if err != nil {
- log.Fatalf("连接目标Mongo失败: %v", err)
- }
- defer dstClient.Disconnect(context.Background())
- srcDB := srcClient.Database(srcCfg.Database)
- dstDB := dstClient.Database(dstCfg.Database)
- // 获取源库集合列表
- ctx := context.Background()
- //collections, err := srcDB.ListCollectionNames(ctx, bson.M{})
- collections := []string{"special_trade_union"}
- if err != nil {
- log.Fatalf("获取集合列表失败: %v", err)
- }
- fmt.Printf("发现 %d 个集合,将逐个迁移\n", len(collections))
- batchSize := 1000 // 批量大小
- for _, collName := range collections {
- fmt.Printf("迁移集合: %s\n", collName)
- srcColl := srcDB.Collection(collName)
- dstColl := dstDB.Collection(collName)
- // 查询全部数据
- cursor, err := srcColl.Find(ctx, bson.M{})
- if err != nil {
- log.Printf("查询集合 %s 失败: %v", collName, err)
- continue
- }
- batchDocs := make([]interface{}, 0, batchSize)
- count := 0
- for cursor.Next(ctx) {
- var doc bson.M
- if err := cursor.Decode(&doc); err != nil {
- log.Printf("解析文档失败: %v", err)
- continue
- }
- batchDocs = append(batchDocs, doc)
- if len(batchDocs) >= batchSize {
- if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
- log.Printf("批量写入失败: %v", err)
- }
- count += len(batchDocs)
- log.Println("current:", collName, count)
- batchDocs = batchDocs[:0]
- }
- }
- // 插入剩余文档
- if len(batchDocs) > 0 {
- if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
- log.Printf("批量写入失败: %v", err)
- }
- count += len(batchDocs)
- }
- fmt.Printf("集合 %s 迁移完成,导入 %d 条数据\n", collName, count)
- cursor.Close(ctx)
- }
- }
- func insertBatch(ctx context.Context, coll *mongo.Collection, docs []interface{}) error {
- _, err := coll.InsertMany(ctx, docs)
- return err
- }
|