package main import ( "context" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" "log" ) func main() { getHot() //QlmChannel() } //QlmChannel 千里马 channel func QlmChannel() { type Item struct { ID primitive.ObjectID `bson:"_id,omitempty"` Site string `bson:"site"` Channel string `bson:"channel"` Publishtime string `bson:"publishtime"` // Add other fields as needed } type GroupedData struct { Channel string `bson:"channel"` Count int `bson:"count"` } // 设置MongoDB连接选项 //clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27081") clientOptions := options.Client().ApplyURI("mongodb://172.17.4.87:27080") //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083") clientOptions.SetReadPreference(readpref.Primary()) clientOptions.SetDirect(true) // 连接MongoDB client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { log.Println(err) } // 检查连接 err = client.Ping(context.Background(), nil) if err != nil { log.Fatal(err) } // 选择数据库和集合 database := client.Database("qlm") collection := database.Collection("data_merge") // 查询条件 filter := bson.D{ {"site", "千里马"}, {"publishtime", bson.D{{"$gt", "2022-01-01"}}}, } // 执行查询 cursor, err := collection.Find(context.Background(), filter) if err != nil { log.Fatal(err) } defer cursor.Close(context.Background()) // 手动分组查看数据 count := 0 groupedData := make(map[string]int) for cursor.Next(context.Background()) { var item Item err := cursor.Decode(&item) if err != nil { log.Fatal(err) } // 在这里可以根据需要进行其他逻辑处理 // 手动分组 groupedData[item.Channel]++ count++ if count%10000 == 0 { log.Println("current:", count) } } // 输出结果 //for channel, count := range groupedData { // fmt.Printf("Channel: %s, Count: %d\n", channel, count) //} // 将手动分组的数据保存到新的集合中 var groupedDataList []interface{} for channel, count := range groupedData { groupedDataList = append(groupedDataList, GroupedData{Channel: channel, Count: count}) } // 选择新的集合 newCollection := database.Collection("wcc_qlm") // 插入数据 _, err = newCollection.InsertMany(context.Background(), groupedDataList) if err != nil { log.Fatal(err) } fmt.Println("Data inserted into wcc_qlm collection.") }