match.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package projecter
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. util "app.yhyue.com/moapp/jybase/common"
  9. elastic "app.yhyue.com/moapp/jybase/es"
  10. . "app.yhyue.com/moapp/jybase/mongodb"
  11. . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
  12. "app.yhyue.com/moapp/jybase/logger"
  13. )
  14. type ProjectMatcher interface {
  15. Unique(u *UserInfo) string
  16. UpSetAppend(u *UserInfo) map[string]interface{}
  17. }
  18. type Project struct {
  19. Id string
  20. List_last_publishtime int64
  21. }
  22. type Match struct {
  23. Pushspace_project string
  24. RelationProjectPoolSize int
  25. LoadProjectPoolSize int
  26. RelationProjectBatch int
  27. RelationProjectLimit int
  28. Mgo_Log *MongodbSim
  29. AllProject *sync.Map
  30. Unique string
  31. }
  32. //关联项目
  33. func (m *Match) Execute(projectMatcher ProjectMatcher, projectUser *map[*UserInfo]*[]string, myMatchId *map[string]map[string]bool) {
  34. logger.Info("开始关联项目。。。")
  35. index := 0
  36. var updateproject [][]map[string]interface{}
  37. //
  38. userPool := make(chan bool, m.RelationProjectPoolSize)
  39. searchPool := make(chan bool, m.LoadProjectPoolSize)
  40. userWaitGroup := &sync.WaitGroup{}
  41. lock := &sync.Mutex{}
  42. for k, v := range *projectUser {
  43. userPool <- true
  44. userWaitGroup.Add(1)
  45. go func(u *UserInfo, _ids *[]string) {
  46. defer util.Catch()
  47. defer func() {
  48. <-userPool
  49. userWaitGroup.Done()
  50. }()
  51. unique := projectMatcher.Unique(u)
  52. needLength := int64(m.RelationProjectLimit - len((*myMatchId)[unique]))
  53. if needLength <= 0 {
  54. return
  55. }
  56. newIds := []string{}
  57. searchWaitGroup := &sync.WaitGroup{}
  58. for _, _id := range *_ids {
  59. if surplus := atomic.AddInt64(&needLength, -1); surplus < 0 {
  60. continue
  61. }
  62. if _, ok := m.AllProject.Load(_id); ok {
  63. continue
  64. }
  65. newIds = append(newIds, _id)
  66. if len(newIds) == m.RelationProjectBatch {
  67. searchPool <- true
  68. searchWaitGroup.Add(1)
  69. go func(searchIds []string) {
  70. defer util.Catch()
  71. defer func() {
  72. <-searchPool
  73. searchWaitGroup.Done()
  74. }()
  75. m.searchProjectBatch(searchIds)
  76. }(newIds)
  77. newIds = []string{}
  78. }
  79. }
  80. searchWaitGroup.Wait()
  81. if len(newIds) > 0 {
  82. m.searchProjectBatch(newIds)
  83. newIds = []string{}
  84. }
  85. for _, _id := range *_ids {
  86. var list_last_publishtime int64
  87. projectId := ""
  88. if value, ok := m.AllProject.Load(_id); ok {
  89. project, _ := value.(*Project)
  90. projectId = project.Id
  91. list_last_publishtime = project.List_last_publishtime
  92. } else {
  93. continue
  94. }
  95. if projectId == "" || list_last_publishtime <= 0 {
  96. continue
  97. }
  98. lock.Lock()
  99. upSet := map[string]interface{}{
  100. "projectid": projectId,
  101. "infoid": _id,
  102. "maxpublishtime": list_last_publishtime,
  103. "subtypes": u.SubSet.Subtypes,
  104. "createtime": time.Now().Unix(),
  105. }
  106. for uk, uv := range projectMatcher.UpSetAppend(u) {
  107. upSet[uk] = uv
  108. }
  109. updateproject = append(updateproject, []map[string]interface{}{
  110. map[string]interface{}{
  111. "projectid": projectId,
  112. m.Unique: unique,
  113. },
  114. map[string]interface{}{
  115. "$set": upSet,
  116. },
  117. })
  118. if len(updateproject) == Mgo_BulkSize {
  119. m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
  120. updateproject = [][]map[string]interface{}{}
  121. }
  122. lock.Unlock()
  123. }
  124. }(k, v)
  125. userWaitGroup.Wait()
  126. index++
  127. if index%500 == 0 {
  128. logger.Info("关联项目:", index)
  129. }
  130. }
  131. if len(updateproject) > 0 {
  132. m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
  133. updateproject = [][]map[string]interface{}{}
  134. }
  135. logger.Info("关联项目结束。。。", index)
  136. }
  137. //
  138. func (m *Match) searchProjectBatch(_ids []string) {
  139. _idMap := map[string]bool{}
  140. for _, v := range _ids {
  141. _idMap[v] = true
  142. }
  143. projects := elastic.Get(Es_Projectset, Es_Projectset, fmt.Sprintf(Es_ProjectQuery, `"`+strings.Join(_ids, `","`)+`"`, len(_ids)))
  144. if projects == nil || len(*projects) == 0 {
  145. return
  146. }
  147. for _, v := range *projects {
  148. list, _ := v["list"].([]interface{})
  149. if len(list) == 0 {
  150. continue
  151. }
  152. list_last, _ := list[len(list)-1].(map[string]interface{})
  153. for _, vv := range list {
  154. vvMap, _ := vv.(map[string]interface{})
  155. if infoid := util.ObjToString(vvMap["infoid"]); _idMap[infoid] {
  156. m.AllProject.Store(infoid, &Project{
  157. Id: util.ObjToString(v["_id"]),
  158. List_last_publishtime: util.Int64All(list_last["publishtime"]),
  159. })
  160. break
  161. }
  162. }
  163. }
  164. }