package projecter import ( "fmt" "strings" "sync" "sync/atomic" "time" util "app.yhyue.com/moapp/jybase/common" elastic "app.yhyue.com/moapp/jybase/es" . "app.yhyue.com/moapp/jybase/mongodb" . "bp.jydev.jianyu360.cn/BaseService/pushpkg/p" "app.yhyue.com/moapp/jybase/logger" ) type ProjectMatcher interface { Unique(u *UserInfo) string UpSetAppend(u *UserInfo) map[string]interface{} } type Project struct { Id string List_last_publishtime int64 } type Match struct { Pushspace_project string RelationProjectPoolSize int LoadProjectPoolSize int ProjectBatch int Mgo_Log *MongodbSim AllProject *sync.Map Unique string } //关联项目 func (m *Match) Execute(projectMatcher ProjectMatcher, projectUser *map[*UserInfo]*[]string, myMatchId *map[string]map[string]bool) { logger.Info("开始关联项目。。。") index := 0 var updateproject [][]map[string]interface{} // userPool := make(chan bool, m.RelationProjectPoolSize) searchPool := make(chan bool, m.LoadProjectPoolSize) userWaitGroup := &sync.WaitGroup{} lock := &sync.Mutex{} for k, v := range *projectUser { userPool <- true userWaitGroup.Add(1) go func(u *UserInfo, _ids *[]string) { defer util.Catch() defer func() { <-userPool userWaitGroup.Done() }() newIds := []string{} unique := projectMatcher.Unique(u) needLength := int64(RelationProjectLimit - len((*myMatchId)[unique])) searchWaitGroup := &sync.WaitGroup{} for _, _id := range *_ids { if (*myMatchId)[unique] == nil || !(*myMatchId)[unique][_id] { if surplus := atomic.AddInt64(&needLength, -1); surplus <= 0 { continue } } if _, ok := m.AllProject.Load(_id); ok { continue } newIds = append(newIds, _id) if len(newIds) == m.ProjectBatch { searchPool <- true searchWaitGroup.Add(1) go func(searchIds []string) { defer util.Catch() defer func() { <-searchPool searchWaitGroup.Done() }() m.searchProjectBatch(searchIds) }(newIds) newIds = []string{} } } searchWaitGroup.Wait() if len(newIds) > 0 { m.searchProjectBatch(newIds) newIds = []string{} } for _, _id := range *_ids { var list_last_publishtime int64 projectId := "" if value, ok := m.AllProject.Load(_id); ok { project, _ := value.(*Project) projectId = project.Id list_last_publishtime = project.List_last_publishtime } else { continue } if projectId == "" || list_last_publishtime <= 0 { continue } lock.Lock() upSet := map[string]interface{}{ "projectid": projectId, "infoid": _id, "maxpublishtime": list_last_publishtime, "subtypes": u.SubSet.Subtypes, "createtime": time.Now().Unix(), } for uk, uv := range projectMatcher.UpSetAppend(u) { upSet[uk] = uv } updateproject = append(updateproject, []map[string]interface{}{ map[string]interface{}{ "projectid": projectId, m.Unique: unique, }, map[string]interface{}{ "$set": upSet, }, }) if len(updateproject) == Mgo_BulkSize { m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...) updateproject = [][]map[string]interface{}{} } lock.Unlock() } }(k, v) userWaitGroup.Wait() index++ if index%500 == 0 { logger.Info("关联项目:", index) } } if len(updateproject) > 0 { m.Mgo_Log.UpSertMultiBulk(m.Pushspace_project, true, true, updateproject...) updateproject = [][]map[string]interface{}{} } logger.Info("关联项目结束。。。", index) } // func (m *Match) searchProjectBatch(_ids []string) { _idMap := map[string]bool{} for _, v := range _ids { _idMap[v] = true } projects := elastic.Get(Es_Projectset, Es_Projectset, fmt.Sprintf(Es_ProjectQuery, `"`+strings.Join(_ids, `","`)+`"`, len(_ids))) if projects == nil || len(*projects) == 0 { return } for _, v := range *projects { list, _ := v["list"].([]interface{}) if len(list) == 0 { continue } list_last, _ := list[len(list)-1].(map[string]interface{}) for _, vv := range list { vvMap, _ := vv.(map[string]interface{}) if infoid := util.ObjToString(vvMap["infoid"]); _idMap[infoid] { m.AllProject.Store(infoid, &Project{ Id: util.ObjToString(v["_id"]), List_last_publishtime: util.Int64All(list_last["publishtime"]), }) break } } } }