123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package projecter
- import (
- "fmt"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- util "app.yhyue.com/moapp/jybase/common"
- elastic "app.yhyue.com/moapp/jybase/esv1"
- . "app.yhyue.com/moapp/jybase/mongodb"
- . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p"
- "app.yhyue.com/moapp/jybase/logger"
- )
- type ProjectMatcher interface {
- Unique(u *UserInfo) string
- UpSetAppend(u *UserInfo) map[string]interface{}
- }
- type Project struct {
- Id string
- List_last_publishtime int64
- }
- type Match struct {
- Pushspace_project string
- RelationProjectPoolSize int
- LoadProjectPoolSize int
- ProjectBatch int
- Mgo_Log *MongodbSim
- AllProject *sync.Map
- Unique string
- }
- //关联项目
- func (m *Match) Execute(projectMatcher ProjectMatcher, projectUser *map[*UserInfo]*[]string, myMatchId *map[string]map[string]bool) {
- logger.Info("开始关联项目。。。")
- index := 0
- var updateproject [][]map[string]interface{}
- //
- userPool := make(chan bool, m.RelationProjectPoolSize)
- searchPool := make(chan bool, m.LoadProjectPoolSize)
- userWaitGroup := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- for k, v := range *projectUser {
- userPool <- true
- userWaitGroup.Add(1)
- go func(u *UserInfo, _ids *[]string) {
- defer util.Catch()
- defer func() {
- <-userPool
- userWaitGroup.Done()
- }()
- newIds := []string{}
- unique := projectMatcher.Unique(u)
- needLength := int64(RelationProjectLimit - len((*myMatchId)[unique]))
- searchWaitGroup := &sync.WaitGroup{}
- for _, _id := range *_ids {
- if (*myMatchId)[unique] == nil || !(*myMatchId)[unique][_id] {
- if surplus := atomic.AddInt64(&needLength, -1); surplus <= 0 {
- continue
- }
- }
- if _, ok := m.AllProject.Load(_id); ok {
- continue
- }
- newIds = append(newIds, _id)
- if len(newIds) == m.ProjectBatch {
- searchPool <- true
- searchWaitGroup.Add(1)
- go func(searchIds []string) {
- defer util.Catch()
- defer func() {
- <-searchPool
- searchWaitGroup.Done()
- }()
- m.searchProjectBatch(searchIds)
- }(newIds)
- newIds = []string{}
- }
- }
- searchWaitGroup.Wait()
- if len(newIds) > 0 {
- m.searchProjectBatch(newIds)
- newIds = []string{}
- }
- for _, _id := range *_ids {
- var list_last_publishtime int64
- projectId := ""
- if value, ok := m.AllProject.Load(_id); ok {
- project, _ := value.(*Project)
- projectId = project.Id
- list_last_publishtime = project.List_last_publishtime
- } else {
- continue
- }
- if projectId == "" || list_last_publishtime <= 0 {
- continue
- }
- lock.Lock()
- upSet := map[string]interface{}{
- "projectid": projectId,
- "infoid": _id,
- "maxpublishtime": list_last_publishtime,
- "subtypes": u.SubSet.Subtypes,
- "createtime": time.Now().Unix(),
- }
- for uk, uv := range projectMatcher.UpSetAppend(u) {
- upSet[uk] = uv
- }
- updateproject = append(updateproject, []map[string]interface{}{
- map[string]interface{}{
- "projectid": projectId,
- m.Unique: unique,
- },
- map[string]interface{}{
- "$set": upSet,
- },
- })
- if len(updateproject) == Mgo_BulkSize {
- m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
- updateproject = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }
- }(k, v)
- userWaitGroup.Wait()
- index++
- if index%500 == 0 {
- logger.Info("关联项目:", index)
- }
- }
- if len(updateproject) > 0 {
- m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...)
- updateproject = [][]map[string]interface{}{}
- }
- logger.Info("关联项目结束。。。", index)
- }
- //
- func (m *Match) searchProjectBatch(_ids []string) {
- _idMap := map[string]bool{}
- for _, v := range _ids {
- _idMap[v] = true
- }
- projects := elastic.Get(Es_Projectset, Es_Projectset, fmt.Sprintf(Es_ProjectQuery, `"`+strings.Join(_ids, `","`)+`"`, len(_ids)))
- if projects == nil || len(*projects) == 0 {
- return
- }
- for _, v := range *projects {
- list, _ := v["list"].([]interface{})
- if len(list) == 0 {
- continue
- }
- list_last, _ := list[len(list)-1].(map[string]interface{})
- for _, vv := range list {
- vvMap, _ := vv.(map[string]interface{})
- if infoid := util.ObjToString(vvMap["infoid"]); _idMap[infoid] {
- m.AllProject.Store(infoid, &Project{
- Id: util.ObjToString(v["_id"]),
- List_last_publishtime: util.Int64All(list_last["publishtime"]),
- })
- break
- }
- }
- }
- }
|