push.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package projecter
  2. import (
  3. "fmt"
  4. "sort"
  5. "sync"
  6. "time"
  7. util "app.yhyue.com/moapp/jybase/common"
  8. . "app.yhyue.com/moapp/jybase/date"
  9. elastic "app.yhyue.com/moapp/jybase/es"
  10. "app.yhyue.com/moapp/jybase/logger"
  11. . "app.yhyue.com/moapp/jybase/mongodb"
  12. "app.yhyue.com/moapp/jybase/redis"
  13. . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
  14. )
  15. type Pusher interface {
  16. Filter(unique string) (bool, []interface{}, *UserInfo)
  17. SaveToMysql(u *UserInfo, matchInfos *SortList, jobTime *time.Time) int64
  18. PushInfoKey(u *UserInfo, infoId string) string
  19. DayCountKey(u *UserInfo, ymd string) string
  20. ChildUserInfo(unique interface{}, parent *UserInfo) (*UserInfo, bool)
  21. }
  22. const (
  23. projectQuery = `{"query":{"bool":{"must":{"range":{"pici":{"gte":%d,"lt":%d}}}}}%s}`
  24. projectPagingQuery = `,"_source":["_id","list"]`
  25. )
  26. type Push struct {
  27. Mgo_Bidding *MongodbSim
  28. Mgo_Log *MongodbSim
  29. ProjectPushPoolSize int
  30. ProjectPublishTimeLimit int64
  31. Mgo_Log_DbName string
  32. Mgo_Bidding_DbName string
  33. Mgo_Bidding_Collection string
  34. MaxPushSize int
  35. MaxLoadProjectCount int
  36. Pici int64
  37. ProjectRetainDay int
  38. ProjectBatch int
  39. TestQuery map[string]interface{}
  40. MaxRelationProject int
  41. Pushspace_project string
  42. Unique string
  43. SelectField map[string]interface{}
  44. ProjectCount map[string]int
  45. }
  46. func (p *Push) Execute(pusher Pusher) int64 {
  47. defer util.Catch()
  48. now := time.Now()
  49. todayEnd := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local)
  50. unix := now.Unix()
  51. projectLength, projects := p.loadProject(unix)
  52. if projectLength > 0 {
  53. startId := ""
  54. batchIndex := 0
  55. //
  56. pushPool := make(chan bool, p.ProjectPushPoolSize)
  57. pushWait := &sync.WaitGroup{}
  58. ymd := NowFormat(Date_yyyyMMdd)
  59. for {
  60. p.ProjectCount = map[string]int{}
  61. batchIndex++
  62. batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
  63. logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
  64. index := 0
  65. for k, v := range *datas {
  66. pushPool <- true
  67. pushWait.Add(1)
  68. go func(ue string, list []map[string]interface{}) {
  69. defer util.Catch()
  70. defer func() {
  71. <-pushPool
  72. pushWait.Done()
  73. }()
  74. isFilter, uniques, parent := pusher.Filter(ue)
  75. if isFilter {
  76. p.Delete(ue)
  77. return
  78. }
  79. hasUpdate := false
  80. for _, unique := range uniques {
  81. sortList := &SortList{}
  82. length := 0
  83. i := len(list)
  84. oneHasUpdate := false
  85. userInfo, isMain := pusher.ChildUserInfo(unique, parent)
  86. for {
  87. var updateProject [][]map[string]interface{}
  88. for i > 0 {
  89. i--
  90. myInfoId := util.ObjToString(list[i]["infoid"])
  91. pi, ok := projects.Load(myInfoId)
  92. if !ok {
  93. continue
  94. }
  95. subtypes, _ := list[i]["subtypes"].([]interface{})
  96. maxPublishTime := util.Int64All(list[i]["maxpublishtime"])
  97. userSubtype := map[string]bool{}
  98. for _, t := range subtypes {
  99. userSubtype[util.ObjToString(t)] = true
  100. }
  101. //
  102. project, _ := pi.(map[string]interface{})
  103. project_list, _ := project["list"].([]interface{})
  104. var publishTime int64
  105. for _, pl := range project_list {
  106. plm, _ := pl.(map[string]interface{})
  107. publishTime = util.Int64All(plm["publishtime"])
  108. infoId, _ := plm["infoid"].(string)
  109. if publishTime <= maxPublishTime {
  110. continue
  111. } else if unix-publishTime > p.ProjectPublishTimeLimit { //发布时间7天之内
  112. continue
  113. } else if subtype, _ := plm["subtype"].(string); len(userSubtype) > 0 && !userSubtype[subtype] {
  114. continue
  115. } else if infoId == myInfoId {
  116. continue
  117. }
  118. isExists, err := redis.Exists(Pushcache_2_a, pusher.PushInfoKey(userInfo, infoId))
  119. if err != nil {
  120. logger.Error("推送信息判重出错", err)
  121. continue
  122. } else if isExists {
  123. continue
  124. }
  125. length++
  126. //满足条件,可以推送
  127. info := map[string]interface{}{}
  128. for _, field := range SaveBiddingField {
  129. if field == "_id" || plm[field] == nil {
  130. continue
  131. }
  132. info[field] = plm[field]
  133. }
  134. info["_id"] = infoId
  135. info["attachment_count"] = GetAttachmentCountById(p.Mgo_Bidding, p.Mgo_Bidding_DbName, p.Mgo_Bidding_Collection, infoId)
  136. *sortList = append(*sortList, &MatchInfo{Info: info})
  137. }
  138. if !hasUpdate && isMain && publishTime > maxPublishTime {
  139. updateProject = append(updateProject, []map[string]interface{}{
  140. map[string]interface{}{
  141. "_id": list[i]["_id"],
  142. },
  143. map[string]interface{}{
  144. "$set": map[string]interface{}{
  145. "maxpublishtime": publishTime,
  146. },
  147. },
  148. })
  149. if len(updateProject) == Mgo_BulkSize {
  150. break
  151. }
  152. }
  153. }
  154. if len(updateProject) > 0 {
  155. if p.Mgo_Log.UpSertMultiBulk(p.Pushspace_project, true, true, updateProject...) {
  156. oneHasUpdate = true
  157. }
  158. }
  159. if len(updateProject) < Mgo_BulkSize {
  160. break
  161. }
  162. }
  163. if oneHasUpdate {
  164. hasUpdate = true
  165. }
  166. if length == 0 {
  167. continue
  168. }
  169. sort.Sort(*sortList)
  170. dayCountKey := pusher.DayCountKey(userInfo, ymd)
  171. dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey)
  172. maxPushSize := userInfo.SubSet.MaxPushSize
  173. if maxPushSize <= 0 {
  174. maxPushSize = p.MaxPushSize
  175. }
  176. if err != nil {
  177. logger.Error(unique, "redis中获取当天推送总数出错", err)
  178. continue
  179. } else if dayCount >= maxPushSize {
  180. continue
  181. }
  182. needCount := maxPushSize - dayCount
  183. if length > needCount {
  184. length = needCount
  185. }
  186. saveArray := make(SortList, length)
  187. for sk, sl := range *sortList {
  188. saveArray[length-1-sk] = sl
  189. if sk == length-1 {
  190. break
  191. }
  192. }
  193. jobTime := time.Now()
  194. if jobTime.After(todayEnd) {
  195. jobTime = todayEnd
  196. }
  197. if pushDate := pusher.SaveToMysql(userInfo, &saveArray, &jobTime); pushDate > 0 {
  198. redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond)
  199. for _, v := range saveArray {
  200. redis.Put(Pushcache_2_a, pusher.PushInfoKey(userInfo, util.ObjToString(v.Info["_id"])), 1, OneDaySecond)
  201. }
  202. }
  203. }
  204. }(k, v)
  205. index++
  206. if index%500 == 0 {
  207. logger.Info("第", batchIndex, "次关联项目匹配:", index)
  208. }
  209. }
  210. pushWait.Wait()
  211. logger.Info("第", batchIndex, "次关联项目匹配结束", index)
  212. if batchCount < p.ProjectBatch {
  213. p.ProjectCount = map[string]int{}
  214. break
  215. }
  216. }
  217. }
  218. p.Clear()
  219. return unix
  220. }
  221. //分批次加载pushspace_project表数据
  222. func (p *Push) loadPushspace_project(batchIndex int, startId *string) (int, *map[string][]map[string]interface{}) {
  223. query := map[string]interface{}{}
  224. if *startId != "" {
  225. query["_id"] = map[string]interface{}{
  226. "$lt": StringTOBsonId(*startId),
  227. }
  228. }
  229. if p.TestQuery != nil {
  230. for k, v := range p.TestQuery {
  231. query[k] = v
  232. }
  233. }
  234. // query = map[string]interface{}{
  235. // "_id": StringTOBsonId("628602aff1583c11b8cab58b"),
  236. // }
  237. logger.Info("开始加载第", batchIndex, "批关联项目", query)
  238. index := 0
  239. //一个用户最多加载最新的一万个关联项目
  240. datas := map[string][]map[string]interface{}{}
  241. sess := p.Mgo_Log.GetMgoConn()
  242. defer p.Mgo_Log.DestoryMongoConn(sess)
  243. it := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).Find(query).Select(p.SelectField).Sort("-_id").Iter()
  244. for temp := make(map[string]interface{}); it.Next(&temp); {
  245. unique, _ := temp[p.Unique].(string)
  246. *startId = BsonIdToSId(temp["_id"])
  247. if p.ProjectCount[unique] < p.MaxRelationProject {
  248. datas[unique] = append(datas[unique], temp)
  249. }
  250. p.ProjectCount[unique] = p.ProjectCount[unique] + 1
  251. index++
  252. if index == p.ProjectBatch {
  253. break
  254. }
  255. temp = make(map[string]interface{})
  256. }
  257. logger.Info("第", batchIndex, "批关联项目加载结束", index, *startId)
  258. return index, &datas
  259. }
  260. //加载项目数据
  261. func (p *Push) loadProject(unix int64) (int64, *sync.Map) {
  262. defer util.Catch()
  263. projectMap := &sync.Map{}
  264. query := fmt.Sprintf(projectQuery, p.Pici, unix, projectPagingQuery)
  265. //query = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}}`
  266. logger.Info("开始加载项目。。。", query)
  267. count := int(elastic.Count(Es_Projectset, Es_Projectset, fmt.Sprintf(projectQuery, p.Pici, unix, "")))
  268. logger.Info("需要加载项目", count, "条")
  269. if count == 0 {
  270. return 0, projectMap
  271. } else if count > p.MaxLoadProjectCount {
  272. count = p.MaxLoadProjectCount
  273. logger.Info("需要加载的项目数超过限制,只加载", p.MaxLoadProjectCount, "条")
  274. }
  275. var length int64
  276. for t := 1; t <= 5; t++ {
  277. length = 0
  278. projectMap = &sync.Map{}
  279. elastic.VarEs.(*elastic.EsV7).Scroll(Es_Projectset, "5m", query, func(fv map[string]interface{}) bool {
  280. if int(length) > count {
  281. return false
  282. }
  283. list, _ := fv["list"].([]interface{})
  284. for _, vv := range list {
  285. vv_map, _ := vv.(map[string]interface{})
  286. infoid, _ := vv_map["infoid"].(string)
  287. projectMap.Store(infoid, fv)
  288. }
  289. length++
  290. return true
  291. })
  292. if int(length) >= count {
  293. break
  294. }
  295. logger.Info("加载项目", "第", t, "次加载数据完成,数据总数", length, count, ",由于数据量不够,重新加载")
  296. }
  297. logger.Info("加载项目结束。。。", length)
  298. return length, projectMap
  299. }
  300. //清理三个月前的数据
  301. func (p *Push) Clear() {
  302. logger.Info("开始清理过期的关联项目数据。。。")
  303. sess := p.Mgo_Log.GetMgoConn()
  304. defer p.Mgo_Log.DestoryMongoConn(sess)
  305. _, err := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{
  306. "createtime": map[string]interface{}{
  307. "$lt": time.Now().AddDate(0, 0, -p.ProjectRetainDay).Unix(),
  308. },
  309. })
  310. if err != nil {
  311. logger.Error("清理过期的关联项目数据错误", err)
  312. } else {
  313. logger.Info("清理过期的关联项目数据结束。。。")
  314. }
  315. }
  316. //删除
  317. func (p *Push) Delete(unique string) {
  318. sess := p.Mgo_Log.GetMgoConn()
  319. defer p.Mgo_Log.DestoryMongoConn(sess)
  320. sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{p.Unique: unique})
  321. }