|
@@ -28,7 +28,7 @@ var (
|
|
|
|
|
|
const (
|
|
|
DB = "bidding"
|
|
|
- MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
+ MaxId = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
ProjectQuery = `{"query":{"filtered":{"filter":{"term":{"list.infoid":"%s"}}}},"_source":["_id","list.infoid"],"sort":{"id":"desc"},"from":0,"size":1}`
|
|
|
)
|
|
|
|
|
@@ -48,24 +48,27 @@ type MatchJob struct {
|
|
|
//定时任务,匹配数据,存库
|
|
|
func (m *MatchJob) Execute() {
|
|
|
defer util.Catch()
|
|
|
- lastId := util.ObjToString(TaskConfig.LastId)
|
|
|
- logger.Info("开始匹配数据任务。。。", lastId)
|
|
|
+ logger.Info("开始匹配数据任务。。。", TaskConfig.LastId, TaskConfig.LastTime)
|
|
|
+ startId := util.ObjToString(TaskConfig.LastId)
|
|
|
//获取本次查询的最大id
|
|
|
idQuery := ""
|
|
|
- if lastId == "" {
|
|
|
- idQuery = strings.Replace(fmt.Sprintf(MaxId, lastId), `"gt"`, `"gte"`, -1)
|
|
|
+ if startId == "" {
|
|
|
+ idQuery = strings.Replace(fmt.Sprintf(MaxId, startId), `"gt"`, `"gte"`, -1)
|
|
|
} else {
|
|
|
- idQuery = fmt.Sprintf(MaxId, lastId)
|
|
|
+ idQuery = fmt.Sprintf(MaxId, startId)
|
|
|
}
|
|
|
resId := elastic.Get(DB, DB, idQuery)
|
|
|
- newId := ""
|
|
|
+ endId := ""
|
|
|
+ var endTime interface{}
|
|
|
if resId != nil && *resId != nil && len(*resId) == 1 {
|
|
|
- newId = util.ObjToString((*resId)[0]["_id"])
|
|
|
+ endId = util.ObjToString((*resId)[0]["_id"])
|
|
|
+ endTime = (*resId)[0]["endTime"]
|
|
|
} else {
|
|
|
logger.Info("获取本次查询的最大id的时候,未查找到数据!", idQuery)
|
|
|
return
|
|
|
}
|
|
|
- datas := m.LoadBidding(lastId, newId, TaskConfig.StartTime)
|
|
|
+ st, _ := time.ParseInLocation(util.Date_Full_Layout, TaskConfig.LastTime, time.Local)
|
|
|
+ datas := m.LoadBidding(startId, endId, st.Unix())
|
|
|
if datas == nil || len(*datas) == 0 {
|
|
|
return
|
|
|
}
|
|
@@ -87,10 +90,13 @@ func (m *MatchJob) Execute() {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
- logger.Info("匹配数据任务结束。。。", newId)
|
|
|
+ if endTime == nil {
|
|
|
+ endTime = time.Now().Unix()
|
|
|
+ }
|
|
|
+ TaskConfig.LastTime = util.FormatDateWithObj(&endTime, util.Date_Full_Layout)
|
|
|
+ TaskConfig.LastId = endId
|
|
|
+ logger.Info("匹配数据任务结束。。。", TaskConfig.LastId, TaskConfig.LastTime)
|
|
|
//
|
|
|
- TaskConfig.StartTime = time.Now().Unix()
|
|
|
- TaskConfig.LastId = newId
|
|
|
}
|
|
|
|
|
|
func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]interface{}) {
|
|
@@ -116,9 +122,23 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
|
|
|
//取最新50条
|
|
|
sort.Sort(pushArray)
|
|
|
var array []*MatchInfo
|
|
|
+ titleMap := map[string]bool{}
|
|
|
size := 0
|
|
|
for _, v2 := range pushArray {
|
|
|
- size++
|
|
|
+ title := util.ObjToString((*v2.Info)["title"])
|
|
|
+ pushInfoKey := PushInfoKey(user.Id, util.ObjToString((*v2.Info)["_id"]))
|
|
|
+ if titleMap[title] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ titleMap[title] = true
|
|
|
+ isExists, err := redis.Exists(Pushcache_2_a, pushInfoKey)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("推送信息redis判重出错", err)
|
|
|
+ }
|
|
|
+ if isExists {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ redis.Put(Pushcache_2_a, pushInfoKey, 1, OneDaySecond)
|
|
|
info := map[string]interface{}{}
|
|
|
for _, field := range SaveFields {
|
|
|
if (*v2.Info)[field] == nil {
|
|
@@ -130,14 +150,18 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
|
|
|
Info: &info,
|
|
|
Keys: v2.Keys,
|
|
|
})
|
|
|
+ size++
|
|
|
maxPushSize := Config.MaxPushSize
|
|
|
if IsVipUser(user.VipStatus) {
|
|
|
maxPushSize = Config.VipMaxPushSize
|
|
|
}
|
|
|
- if len(array) == maxPushSize {
|
|
|
+ if size == maxPushSize {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
+ if size == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
saveBatch = append(saveBatch, map[string]interface{}{
|
|
|
"s_m_openid": user.S_m_openid,
|
|
|
"a_m_openid": user.A_m_openid,
|
|
@@ -188,7 +212,8 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
|
|
|
func (m *MatchJob) ToRelationProject(projectUser *sync.Map) {
|
|
|
logger.Info("开始关联项目。。。")
|
|
|
index := 0
|
|
|
- var saveBatch []map[string]interface{}
|
|
|
+ var setBatch []map[string]interface{}
|
|
|
+ var queryBatch []map[string]interface{}
|
|
|
lock := &sync.Mutex{}
|
|
|
projectUser.Range(func(key interface{}, value interface{}) bool {
|
|
|
m.savePool <- true
|
|
@@ -216,18 +241,25 @@ func (m *MatchJob) ToRelationProject(projectUser *sync.Map) {
|
|
|
lock.Lock()
|
|
|
defer lock.Unlock()
|
|
|
for _, user := range *users {
|
|
|
- saveBatch = append(saveBatch, map[string]interface{}{
|
|
|
- "projectid": (*projects)[0]["_id"],
|
|
|
- "infoid": _id,
|
|
|
- "userid": user.Id,
|
|
|
- "maxid": list_last_infoid,
|
|
|
- "toptypes": user.O_vipjy.TopTypes,
|
|
|
- "createtime": time.Now().Unix(),
|
|
|
+ queryBatch = append(queryBatch, map[string]interface{}{
|
|
|
+ "projectid": (*projects)[0]["_id"],
|
|
|
+ "userid": user.Id,
|
|
|
+ })
|
|
|
+ setBatch = append(setBatch, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "projectid": (*projects)[0]["_id"],
|
|
|
+ "infoid": _id,
|
|
|
+ "userid": user.Id,
|
|
|
+ "maxid": list_last_infoid,
|
|
|
+ "toptypes": user.O_vipjy.TopTypes,
|
|
|
+ "createtime": time.Now().Unix(),
|
|
|
+ },
|
|
|
})
|
|
|
}
|
|
|
- if len(saveBatch) == BulkSize {
|
|
|
- mongodb.SaveBulk(Pushspace_project, saveBatch...)
|
|
|
- saveBatch = []map[string]interface{}{}
|
|
|
+ if len(setBatch) == BulkSize {
|
|
|
+ mongodb.NewUpdateBulk(Pushspace_project, queryBatch, setBatch)
|
|
|
+ setBatch = []map[string]interface{}{}
|
|
|
+ queryBatch = []map[string]interface{}{}
|
|
|
}
|
|
|
}(k, v)
|
|
|
index++
|
|
@@ -237,9 +269,10 @@ func (m *MatchJob) ToRelationProject(projectUser *sync.Map) {
|
|
|
return true
|
|
|
})
|
|
|
m.saveWaitGroup.Wait()
|
|
|
- if len(saveBatch) > 0 {
|
|
|
- mongodb.SaveBulk(Pushspace_project, saveBatch...)
|
|
|
- saveBatch = []map[string]interface{}{}
|
|
|
+ if len(setBatch) > 0 {
|
|
|
+ mongodb.NewUpdateBulk(Pushspace_project, queryBatch, setBatch)
|
|
|
+ setBatch = []map[string]interface{}{}
|
|
|
+ queryBatch = []map[string]interface{}{}
|
|
|
}
|
|
|
logger.Info("关联项目结束。。。", index)
|
|
|
}
|
|
@@ -264,7 +297,7 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[stri
|
|
|
c_query["_id"] = idQuery
|
|
|
}
|
|
|
//c_query = map[string]interface{}{
|
|
|
- //"_id": bson.ObjectIdHex("5da706f7a5cb26b9b778d08a"),
|
|
|
+ //"_id": bson.ObjectIdHex("5da71f96a5cb26b9b79b8e6c"),
|
|
|
//}
|
|
|
logger.Info("开始加载", Bidding, "数据", c_query)
|
|
|
var res []map[string]interface{}
|
|
@@ -306,7 +339,7 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[stri
|
|
|
}
|
|
|
info[v] = temp[v]
|
|
|
}
|
|
|
- redis.Put("pushcache_1", "info_"+_id, info, 259200)
|
|
|
+ redis.Put(Pushcache_1, "info_"+_id, info, 259200)
|
|
|
}(data)
|
|
|
data = make(map[string]interface{})
|
|
|
index++
|