push.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package projecter
  2. import (
  3. "fmt"
  4. "sort"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. util "app.yhyue.com/moapp/jybase/common"
  9. . "app.yhyue.com/moapp/jybase/date"
  10. elastic "app.yhyue.com/moapp/jybase/esv1"
  11. . "app.yhyue.com/moapp/jybase/mongodb"
  12. "app.yhyue.com/moapp/jybase/redis"
  13. . "bp.jydev.jianyu360.cn/pushpkg/p"
  14. "github.com/donnie4w/go-logger/logger"
  15. )
  16. type Pusher interface {
  17. Filter(unique string) (bool, []interface{}, *UserInfo)
  18. SaveToMysql(u *UserInfo, matchInfos *SortList, jobTime *time.Time) int64
  19. PushInfoKey(u *UserInfo, infoId string) string
  20. DayCountKey(u *UserInfo, ymd string) string
  21. ChildUserInfo(unique interface{}, parent *UserInfo) (*UserInfo, bool)
  22. }
  23. const (
  24. projectQuery = `{"query":{"filtered":{"filter":{"bool":{"must":[{"range":{"pici":{"gt":%d,"lte":%d}}}]}}}}%s}`
  25. projectPagingQuery = `,"_source":["_id","list"],"from":%d,"size":%d`
  26. )
  27. type Push struct {
  28. Mgo_Bidding *MongodbSim
  29. Mgo_Log *MongodbSim
  30. ProjectPushPoolSize int
  31. ProjectPublishTimeLimit int64
  32. Mgo_Log_DbName string
  33. Mgo_Bidding_DbName string
  34. Mgo_Bidding_Collection string
  35. MaxPushSize int
  36. MaxLoadProjectCount int
  37. ProjectPagingSize int
  38. Pici int64
  39. LoadProjectPoolSize int
  40. ProjectRetainDay int
  41. ProjectBatch int
  42. TestQuery map[string]interface{}
  43. MaxRelationProject int
  44. Pushspace_project string
  45. Unique string
  46. SelectField map[string]interface{}
  47. ProjectCount map[string]int
  48. }
  49. func (p *Push) Execute(pusher Pusher) int64 {
  50. defer util.Catch()
  51. now := time.Now()
  52. todayEnd := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local)
  53. unix := now.Unix()
  54. projectLength, projects := p.loadProject(unix)
  55. if projectLength > 0 {
  56. startId := ""
  57. batchIndex := 0
  58. //
  59. pushPool := make(chan bool, p.ProjectPushPoolSize)
  60. pushWait := &sync.WaitGroup{}
  61. ymd := NowFormat(Date_yyyyMMdd)
  62. for {
  63. p.ProjectCount = map[string]int{}
  64. batchIndex++
  65. batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
  66. logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
  67. index := 0
  68. for k, v := range *datas {
  69. pushPool <- true
  70. pushWait.Add(1)
  71. go func(ue string, list []map[string]interface{}) {
  72. defer util.Catch()
  73. defer func() {
  74. <-pushPool
  75. pushWait.Done()
  76. }()
  77. isFilter, uniques, parent := pusher.Filter(ue)
  78. if isFilter {
  79. p.Delete(ue)
  80. return
  81. }
  82. hasUpdate := false
  83. for _, unique := range uniques {
  84. sortList := &SortList{}
  85. length := 0
  86. i := len(list)
  87. oneHasUpdate := false
  88. userInfo, isMain := pusher.ChildUserInfo(unique, parent)
  89. for {
  90. var updateProject [][]map[string]interface{}
  91. for i > 0 {
  92. i--
  93. myInfoId := util.ObjToString(list[i]["infoid"])
  94. pi, ok := projects.Load(myInfoId)
  95. if !ok {
  96. continue
  97. }
  98. subtypes, _ := list[i]["subtypes"].([]interface{})
  99. maxPublishTime := util.Int64All(list[i]["maxpublishtime"])
  100. userSubtype := map[string]bool{}
  101. for _, t := range subtypes {
  102. userSubtype[util.ObjToString(t)] = true
  103. }
  104. //
  105. project, _ := pi.(map[string]interface{})
  106. project_list, _ := project["list"].([]interface{})
  107. var publishTime int64
  108. for _, pl := range project_list {
  109. plm, _ := pl.(map[string]interface{})
  110. publishTime = util.Int64All(plm["publishtime"])
  111. infoId, _ := plm["infoid"].(string)
  112. if publishTime <= maxPublishTime {
  113. continue
  114. } else if unix-publishTime > p.ProjectPublishTimeLimit { //发布时间7天之内
  115. continue
  116. } else if subtype, _ := plm["subtype"].(string); len(userSubtype) > 0 && !userSubtype[subtype] {
  117. continue
  118. } else if infoId == myInfoId {
  119. continue
  120. }
  121. isExists, err := redis.Exists(Pushcache_2_a, pusher.PushInfoKey(userInfo, infoId))
  122. if err != nil {
  123. logger.Error("推送信息判重出错", err)
  124. continue
  125. } else if isExists {
  126. continue
  127. }
  128. length++
  129. //满足条件,可以推送
  130. info := map[string]interface{}{}
  131. for _, field := range SaveBiddingField {
  132. if field == "_id" || plm[field] == nil {
  133. continue
  134. }
  135. info[field] = plm[field]
  136. }
  137. info["_id"] = infoId
  138. info["attachment_count"] = GetAttachmentCountById(p.Mgo_Bidding, p.Mgo_Bidding_DbName, p.Mgo_Bidding_Collection, infoId)
  139. *sortList = append(*sortList, &MatchInfo{Info: &info})
  140. }
  141. if !hasUpdate && isMain && publishTime > maxPublishTime {
  142. updateProject = append(updateProject, []map[string]interface{}{
  143. map[string]interface{}{
  144. "_id": list[i]["_id"],
  145. },
  146. map[string]interface{}{
  147. "$set": map[string]interface{}{
  148. "maxpublishtime": publishTime,
  149. },
  150. },
  151. })
  152. if len(updateProject) == Mgo_BulkSize {
  153. break
  154. }
  155. }
  156. }
  157. if len(updateProject) > 0 {
  158. if p.Mgo_Log.UpSertMultiBulk(p.Pushspace_project, true, true, updateProject...) {
  159. oneHasUpdate = true
  160. }
  161. }
  162. if len(updateProject) < Mgo_BulkSize {
  163. break
  164. }
  165. }
  166. if oneHasUpdate {
  167. hasUpdate = true
  168. }
  169. if length == 0 {
  170. continue
  171. }
  172. sort.Sort(*sortList)
  173. dayCountKey := pusher.DayCountKey(userInfo, ymd)
  174. dayCount, err := redis.GetNewInt(Pushcache_2_a, dayCountKey)
  175. maxPushSize := userInfo.SubSet.MaxPushSize
  176. if maxPushSize <= 0 {
  177. maxPushSize = p.MaxPushSize
  178. }
  179. if err != nil {
  180. logger.Error(unique, "redis中获取当天推送总数出错", err)
  181. continue
  182. } else if dayCount >= maxPushSize {
  183. continue
  184. }
  185. needCount := maxPushSize - dayCount
  186. if length > needCount {
  187. length = needCount
  188. }
  189. saveArray := make(SortList, length)
  190. for sk, sl := range *sortList {
  191. saveArray[length-1-sk] = sl
  192. if sk == length-1 {
  193. break
  194. }
  195. }
  196. jobTime := time.Now()
  197. if jobTime.After(todayEnd) {
  198. jobTime = todayEnd
  199. }
  200. if pushDate := pusher.SaveToMysql(userInfo, &saveArray, &jobTime); pushDate > 0 {
  201. redis.Put(Pushcache_2_a, dayCountKey, dayCount+length, OneDaySecond)
  202. for _, v := range saveArray {
  203. redis.Put(Pushcache_2_a, pusher.PushInfoKey(userInfo, util.ObjToString((*v.Info)["_id"])), 1, OneDaySecond)
  204. }
  205. }
  206. }
  207. }(k, v)
  208. index++
  209. if index%500 == 0 {
  210. logger.Info("第", batchIndex, "次关联项目匹配:", index)
  211. }
  212. }
  213. pushWait.Wait()
  214. logger.Info("第", batchIndex, "次关联项目匹配结束", index)
  215. if batchCount < p.ProjectBatch {
  216. p.ProjectCount = map[string]int{}
  217. break
  218. }
  219. }
  220. }
  221. p.Clear()
  222. return unix
  223. }
  224. //分批次加载pushspace_project表数据
  225. func (p *Push) loadPushspace_project(batchIndex int, startId *string) (int, *map[string][]map[string]interface{}) {
  226. query := map[string]interface{}{}
  227. if *startId != "" {
  228. query["_id"] = map[string]interface{}{
  229. "$lt": StringTOBsonId(*startId),
  230. }
  231. }
  232. if p.TestQuery != nil {
  233. for k, v := range p.TestQuery {
  234. query[k] = v
  235. }
  236. }
  237. // query = map[string]interface{}{
  238. // "_id": StringTOBsonId("628602aff1583c11b8cab58b"),
  239. // }
  240. logger.Info("开始加载第", batchIndex, "批关联项目", query)
  241. index := 0
  242. //一个用户最多加载最新的一万个关联项目
  243. datas := map[string][]map[string]interface{}{}
  244. sess := p.Mgo_Log.GetMgoConn()
  245. defer p.Mgo_Log.DestoryMongoConn(sess)
  246. it := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).Find(query).Select(p.SelectField).Sort("-_id").Iter()
  247. for temp := make(map[string]interface{}); it.Next(&temp); {
  248. unique, _ := temp[p.Unique].(string)
  249. *startId = BsonIdToSId(temp["_id"])
  250. if p.ProjectCount[unique] < p.MaxRelationProject {
  251. datas[unique] = append(datas[unique], temp)
  252. }
  253. p.ProjectCount[unique] = p.ProjectCount[unique] + 1
  254. index++
  255. if index == p.ProjectBatch {
  256. break
  257. }
  258. temp = make(map[string]interface{})
  259. }
  260. logger.Info("第", batchIndex, "批关联项目加载结束", index, *startId)
  261. return index, &datas
  262. }
  263. //加载项目数据
  264. func (p *Push) loadProject(unix int64) (int64, *sync.Map) {
  265. defer util.Catch()
  266. projectMap := &sync.Map{}
  267. query := fmt.Sprintf(projectQuery, p.Pici, unix, "")
  268. //query = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}}`
  269. logger.Info("开始加载项目。。。", query)
  270. count := int(elastic.Count(Es_Projectset, Es_Projectset, query))
  271. logger.Info("需要加载项目", count, "条")
  272. if count == 0 {
  273. return 0, projectMap
  274. } else if count > p.MaxLoadProjectCount {
  275. count = p.MaxLoadProjectCount
  276. logger.Info("需要加载的项目数超过限制,只加载", p.MaxLoadProjectCount, "条")
  277. }
  278. onceSize := p.ProjectPagingSize //ES一次查询这么多条
  279. if onceSize > count {
  280. onceSize = count
  281. }
  282. totalPage := int((count + onceSize - 1) / onceSize)
  283. logger.Info("加载项目一共", totalPage, "页")
  284. loadPool := make(chan bool, p.LoadProjectPoolSize)
  285. loadWait := &sync.WaitGroup{}
  286. var length int64
  287. for t := 0; t < 3; t++ {
  288. projectMap = &sync.Map{}
  289. for i := 0; i < totalPage; i++ {
  290. loadPool <- true
  291. loadWait.Add(1)
  292. go func(start int) {
  293. defer util.Catch()
  294. defer func() {
  295. <-loadPool
  296. loadWait.Done()
  297. }()
  298. size := onceSize
  299. if start == totalPage-1 && count%onceSize != 0 {
  300. size = count % onceSize
  301. }
  302. pagingQuery := fmt.Sprintf(projectQuery, p.Pici, unix, fmt.Sprintf(projectPagingQuery, start*onceSize, size))
  303. //pagingQuery = `{"query":{"filtered":{"filter":{"bool":{"must":[{"term": {"id": "5cef4c36a5cb26b9b7498a4c"}}]}}}}` + fmt.Sprintf(projectPagingQuery, start*onceSize, size) + `}`
  304. logger.Info("加载项目", "开始加载第", start+1, "页数据", pagingQuery)
  305. r := elastic.Get(Es_Projectset, Es_Projectset, pagingQuery)
  306. if r == nil {
  307. return
  308. }
  309. for _, v := range *r {
  310. length = atomic.AddInt64(&length, 1)
  311. list, _ := v["list"].([]interface{})
  312. for _, vv := range list {
  313. vv_map, _ := vv.(map[string]interface{})
  314. infoid, _ := vv_map["infoid"].(string)
  315. projectMap.Store(infoid, v)
  316. }
  317. }
  318. logger.Info("加载项目", "第", start+1, "页数据加载完成")
  319. }(i)
  320. }
  321. loadWait.Wait()
  322. if int(length) >= count-5 {
  323. break
  324. }
  325. logger.Info("加载项目", "第", t, "次加载数据完成,数据总数", length, ",由于数据量不够,重新加载")
  326. }
  327. logger.Info("加载项目结束。。。", length)
  328. return length, projectMap
  329. }
  330. //清理三个月前的数据
  331. func (p *Push) Clear() {
  332. logger.Info("开始清理过期的关联项目数据。。。")
  333. sess := p.Mgo_Log.GetMgoConn()
  334. defer p.Mgo_Log.DestoryMongoConn(sess)
  335. _, err := sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{
  336. "createtime": map[string]interface{}{
  337. "$lt": time.Now().AddDate(0, 0, -p.ProjectRetainDay).Unix(),
  338. },
  339. })
  340. if err != nil {
  341. logger.Error("清理过期的关联项目数据错误", err)
  342. } else {
  343. logger.Info("清理过期的关联项目数据结束。。。")
  344. }
  345. }
  346. //删除
  347. func (p *Push) Delete(unique string) {
  348. sess := p.Mgo_Log.GetMgoConn()
  349. defer p.Mgo_Log.DestoryMongoConn(sess)
  350. sess.DB(p.Mgo_Log_DbName).C(p.Pushspace_project).RemoveAll(map[string]interface{}{p.Unique: unique})
  351. }