match.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package projecter
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. util "app.yhyue.com/moapp/jybase/common"
  9. elastic "app.yhyue.com/moapp/jybase/esv1"
  10. . "app.yhyue.com/moapp/jybase/mongodb"
  11. . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
  12. "app.yhyue.com/moapp/jybase/logger"
  13. )
  14. type ProjectMatcher interface {
  15. Unique(u *UserInfo) string
  16. UpSetAppend(u *UserInfo) map[string]interface{}
  17. }
  18. type Project struct {
  19. Id string
  20. List_last_publishtime int64
  21. }
  22. type Match struct {
  23. Pushspace_project string
  24. RelationProjectPoolSize int
  25. LoadProjectPoolSize int
  26. ProjectBatch int
  27. Mgo_Log *MongodbSim
  28. AllProject *sync.Map
  29. Unique string
  30. }
  31. //关联项目
  32. func (m *Match) Execute(projectMatcher ProjectMatcher, projectUser *map[*UserInfo]*[]string, myMatchId *map[string]map[string]bool) {
  33. logger.Info("开始关联项目。。。")
  34. index := 0
  35. var updateproject [][]map[string]interface{}
  36. //
  37. userPool := make(chan bool, m.RelationProjectPoolSize)
  38. searchPool := make(chan bool, m.LoadProjectPoolSize)
  39. userWaitGroup := &sync.WaitGroup{}
  40. lock := &sync.Mutex{}
  41. for k, v := range *projectUser {
  42. userPool <- true
  43. userWaitGroup.Add(1)
  44. go func(u *UserInfo, _ids *[]string) {
  45. defer util.Catch()
  46. defer func() {
  47. <-userPool
  48. userWaitGroup.Done()
  49. }()
  50. newIds := []string{}
  51. unique := projectMatcher.Unique(u)
  52. needLength := int64(RelationProjectLimit - len((*myMatchId)[unique]))
  53. searchWaitGroup := &sync.WaitGroup{}
  54. for _, _id := range *_ids {
  55. if (*myMatchId)[unique] == nil || !(*myMatchId)[unique][_id] {
  56. if surplus := atomic.AddInt64(&needLength, -1); surplus <= 0 {
  57. continue
  58. }
  59. }
  60. if _, ok := m.AllProject.Load(_id); ok {
  61. continue
  62. }
  63. newIds = append(newIds, _id)
  64. if len(newIds) == m.ProjectBatch {
  65. searchPool <- true
  66. searchWaitGroup.Add(1)
  67. go func(searchIds []string) {
  68. defer util.Catch()
  69. defer func() {
  70. <-searchPool
  71. searchWaitGroup.Done()
  72. }()
  73. m.searchProjectBatch(searchIds)
  74. }(newIds)
  75. newIds = []string{}
  76. }
  77. }
  78. searchWaitGroup.Wait()
  79. if len(newIds) > 0 {
  80. m.searchProjectBatch(newIds)
  81. newIds = []string{}
  82. }
  83. for _, _id := range *_ids {
  84. var list_last_publishtime int64
  85. projectId := ""
  86. if value, ok := m.AllProject.Load(_id); ok {
  87. project, _ := value.(*Project)
  88. projectId = project.Id
  89. list_last_publishtime = project.List_last_publishtime
  90. } else {
  91. continue
  92. }
  93. if projectId == "" || list_last_publishtime <= 0 {
  94. continue
  95. }
  96. lock.Lock()
  97. upSet := map[string]interface{}{
  98. "projectid": projectId,
  99. "infoid": _id,
  100. "maxpublishtime": list_last_publishtime,
  101. "subtypes": u.SubSet.Subtypes,
  102. "createtime": time.Now().Unix(),
  103. }
  104. for uk, uv := range projectMatcher.UpSetAppend(u) {
  105. upSet[uk] = uv
  106. }
  107. updateproject = append(updateproject, []map[string]interface{}{
  108. map[string]interface{}{
  109. "projectid": projectId,
  110. m.Unique: unique,
  111. },
  112. map[string]interface{}{
  113. "$set": upSet,
  114. },
  115. })
  116. if len(updateproject) == Mgo_BulkSize {
  117. m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
  118. updateproject = [][]map[string]interface{}{}
  119. }
  120. lock.Unlock()
  121. }
  122. }(k, v)
  123. userWaitGroup.Wait()
  124. index++
  125. if index%500 == 0 {
  126. logger.Info("关联项目:", index)
  127. }
  128. }
  129. if len(updateproject) > 0 {
  130. m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
  131. updateproject = [][]map[string]interface{}{}
  132. }
  133. logger.Info("关联项目结束。。。", index)
  134. }
  135. //
  136. func (m *Match) searchProjectBatch(_ids []string) {
  137. _idMap := map[string]bool{}
  138. for _, v := range _ids {
  139. _idMap[v] = true
  140. }
  141. projects := elastic.Get(Es_Projectset, Es_Projectset, fmt.Sprintf(Es_ProjectQuery, `"`+strings.Join(_ids, `","`)+`"`, len(_ids)))
  142. if projects == nil || len(*projects) == 0 {
  143. return
  144. }
  145. for _, v := range *projects {
  146. list, _ := v["list"].([]interface{})
  147. if len(list) == 0 {
  148. continue
  149. }
  150. list_last, _ := list[len(list)-1].(map[string]interface{})
  151. for _, vv := range list {
  152. vvMap, _ := vv.(map[string]interface{})
  153. if infoid := util.ObjToString(vvMap["infoid"]); _idMap[infoid] {
  154. m.AllProject.Store(infoid, &Project{
  155. Id: util.ObjToString(v["_id"]),
  156. List_last_publishtime: util.Int64All(list_last["publishtime"]),
  157. })
  158. break
  159. }
  160. }
  161. }
  162. }