|
@@ -3,9 +3,11 @@ package public
|
|
import (
|
|
import (
|
|
"bytes"
|
|
"bytes"
|
|
"config"
|
|
"config"
|
|
|
|
+ "context"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
+ "go.mongodb.org/mongo-driver/mongo/options"
|
|
"io"
|
|
"io"
|
|
"log"
|
|
"log"
|
|
"mongodb"
|
|
"mongodb"
|
|
@@ -74,7 +76,6 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
|
|
"iosPushUrl": param.IosUrl,
|
|
"iosPushUrl": param.IosUrl,
|
|
}
|
|
}
|
|
|
|
|
|
- sess := util.MQFW.GetMgoConn()
|
|
|
|
filter := map[string]interface{}{
|
|
filter := map[string]interface{}{
|
|
"i_appid": 2,
|
|
"i_appid": 2,
|
|
}
|
|
}
|
|
@@ -86,28 +87,34 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
|
|
filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(qutil.ObjToString(lastId))}
|
|
filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(qutil.ObjToString(lastId))}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- fields := map[string]interface{}{
|
|
|
|
- "_id": 1,
|
|
|
|
- //"s_name": 1,
|
|
|
|
- "s_opushid": 1, "s_jpushid": 1, "s_appponetype": 1, "s_appversion": 1, "o_pushset": 1,
|
|
|
|
- }
|
|
|
|
lastUserId := ""
|
|
lastUserId := ""
|
|
// mongo 库查询用户
|
|
// mongo 库查询用户
|
|
count := util.MQFW.Count("user", filter)
|
|
count := util.MQFW.Count("user", filter)
|
|
log.Println("user表用户量", count)
|
|
log.Println("user表用户量", count)
|
|
- //调用中台次数
|
|
|
|
- //a := 0
|
|
|
|
for f := 0; f < 5; f++ {
|
|
for f := 0; f < 5; f++ {
|
|
if lastUserId != "" {
|
|
if lastUserId != "" {
|
|
filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(lastUserId)}
|
|
filter["_id"] = bson.M{"$gt": mongodb.StringTOBsonId(lastUserId)}
|
|
}
|
|
}
|
|
- log.Println("查询条件filter:", filter)
|
|
|
|
- it := sess.DB("qfw").C("user").Find(filter).Select(fields).Sort(`{"_id":1}`).Iter()
|
|
|
|
- for userInfo := make(map[string]interface{}); it.Next(&userInfo); {
|
|
|
|
|
|
+
|
|
|
|
+ _, client := util.NewMgo.Client(true)
|
|
|
|
+ defer util.NewMgo.Destory(client)
|
|
|
|
+ opt := &options.FindOptions{}
|
|
|
|
+ opt.Projection = bson.M{"_id": 1, "s_opushid": 1, "s_jpushid": 1, "s_appponetype": 1, "s_appversion": 1, "o_pushset": 1}
|
|
|
|
+ opt.Sort = bson.M{"_id": -1}
|
|
|
|
+ cursor, _ := client.Database("qfw").Collection("user").Find(context.Background(), filter)
|
|
|
|
+
|
|
|
|
+ for cursor.Next(context.Background()) {
|
|
|
|
+ var userInfo bson.M
|
|
|
|
+ err := cursor.Decode(&userInfo)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println(err)
|
|
|
|
+ break
|
|
|
|
+ }
|
|
if userInfo == nil || len(userInfo) == 0 {
|
|
if userInfo == nil || len(userInfo) == 0 {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
userId := mongodb.BsonIdToSId(userInfo["_id"])
|
|
userId := mongodb.BsonIdToSId(userInfo["_id"])
|
|
|
|
+ fmt.Println("userId", userId)
|
|
//如果是补发消息前2000条需要判重
|
|
//如果是补发消息前2000条需要判重
|
|
if again && totalCount <= 4000 {
|
|
if again && totalCount <= 4000 {
|
|
if totalCount <= 4000 {
|
|
if totalCount <= 4000 {
|
|
@@ -136,7 +143,7 @@ func AllUserSendMsg(param *Message, loginUserId int, loginUserName string, msgLo
|
|
currentCount = 0
|
|
currentCount = 0
|
|
|
|
|
|
}
|
|
}
|
|
- userInfo = make(map[string]interface{})
|
|
|
|
|
|
+ //userInfo = make(map[string]interface{})
|
|
if totalCount >= 2000 && totalCount%2000 == 0 {
|
|
if totalCount >= 2000 && totalCount%2000 == 0 {
|
|
//定时存发送到哪一位用户
|
|
//定时存发送到哪一位用户
|
|
redis.Put("qmx_filter", fmt.Sprintf(Redis_lastUserId_key, msgLogId), userId, 604800)
|
|
redis.Put("qmx_filter", fmt.Sprintf(Redis_lastUserId_key, msgLogId), userId, 604800)
|