wcj před 5 roky
rodič
revize
a718f9d7e7

+ 1 - 0
src/jfw/modules/pushsubscribe/src/match/job/job.go

@@ -25,5 +25,6 @@ var Jobs = &jobs{
 		savePool:             make(chan bool, Config.SavePoolSize),
 		savePool:             make(chan bool, Config.SavePoolSize),
 		saveWaitGroup:        &sync.WaitGroup{},
 		saveWaitGroup:        &sync.WaitGroup{},
 		userMapLock:          &sync.Mutex{},
 		userMapLock:          &sync.Mutex{},
+		allProject:           &sync.Map{},
 	},
 	},
 }
 }

+ 34 - 19
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -32,6 +32,11 @@ const (
 	ProjectQuery = `{"query":{"filtered":{"filter":{"term":{"list.infoid":"%s"}}}},"_source":["_id","list.infoid"],"sort":{"id":"desc"},"from":0,"size":1}`
 	ProjectQuery = `{"query":{"filtered":{"filter":{"term":{"list.infoid":"%s"}}}},"_source":["_id","list.infoid"],"sort":{"id":"desc"},"from":0,"size":1}`
 )
 )
 
 
+type Project struct {
+	Id               string
+	List_last_infoid string
+}
+
 type MatchJob struct {
 type MatchJob struct {
 	matchPool            chan bool
 	matchPool            chan bool
 	matchWaitGroup       *sync.WaitGroup
 	matchWaitGroup       *sync.WaitGroup
@@ -43,6 +48,7 @@ type MatchJob struct {
 	savePool             chan bool
 	savePool             chan bool
 	saveWaitGroup        *sync.WaitGroup
 	saveWaitGroup        *sync.WaitGroup
 	userMapLock          *sync.Mutex
 	userMapLock          *sync.Mutex
+	allProject           *sync.Map
 }
 }
 
 
 //定时任务,匹配数据,存库
 //定时任务,匹配数据,存库
@@ -90,6 +96,7 @@ func (m *MatchJob) Execute() {
 			break
 			break
 		}
 		}
 	}
 	}
+	m.allProject = &sync.Map{}
 	if endTime == nil {
 	if endTime == nil {
 		endTime = time.Now().Unix()
 		endTime = time.Now().Unix()
 	}
 	}
@@ -106,7 +113,6 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
 	logger.Info("第", batchIndex, "批开始保存到", Pushspace_temp, "表。。。")
 	logger.Info("第", batchIndex, "批开始保存到", Pushspace_temp, "表。。。")
 	index := 0
 	index := 0
 	var saveBatch []map[string]interface{}
 	var saveBatch []map[string]interface{}
-	user_infoId := map[string]map[string]bool{}
 	lock := &sync.Mutex{}
 	lock := &sync.Mutex{}
 	for u, i := range *userMap {
 	for u, i := range *userMap {
 		m.savePool <- true
 		m.savePool <- true
@@ -169,7 +175,6 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
 			}
 			}
 			lock.Lock()
 			lock.Lock()
 			defer lock.Unlock()
 			defer lock.Unlock()
-			user_infoId[user.Id] = infoIdMap
 			saveBatch = append(saveBatch, map[string]interface{}{
 			saveBatch = append(saveBatch, map[string]interface{}{
 				"s_m_openid":    user.S_m_openid,
 				"s_m_openid":    user.S_m_openid,
 				"a_m_openid":    user.A_m_openid,
 				"a_m_openid":    user.A_m_openid,
@@ -213,11 +218,11 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]
 		saveBatch = []map[string]interface{}{}
 		saveBatch = []map[string]interface{}{}
 	}
 	}
 	logger.Info("第", batchIndex, "批保存到", Pushspace_temp, "表结束。。", index)
 	logger.Info("第", batchIndex, "批保存到", Pushspace_temp, "表结束。。", index)
-	m.ToRelationProject(projectUserMap, user_infoId)
+	m.ToRelationProject(projectUserMap)
 }
 }
 
 
 //关联项目
 //关联项目
-func (m *MatchJob) ToRelationProject(projectUser *sync.Map, user_infoId map[string]map[string]bool) {
+func (m *MatchJob) ToRelationProject(projectUser *sync.Map) {
 	logger.Info("开始关联项目。。。")
 	logger.Info("开始关联项目。。。")
 	index := 0
 	index := 0
 	var updateproject [][]map[string]interface{}
 	var updateproject [][]map[string]interface{}
@@ -232,33 +237,43 @@ func (m *MatchJob) ToRelationProject(projectUser *sync.Map, user_infoId map[stri
 				<-m.savePool
 				<-m.savePool
 				m.saveWaitGroup.Done()
 				m.saveWaitGroup.Done()
 			}()
 			}()
-			projects := elastic.Get(Projectset, Projectset, fmt.Sprintf(ProjectQuery, _id))
-			if projects == nil || len(*projects) == 0 {
-				return
-			}
-			list := (*projects)[0]["list"].([]interface{})
-			if len(list) == 0 {
-				return
+			list_last_infoid := ""
+			projectId := ""
+			if value, ok := m.allProject.Load(_id); ok {
+				project, _ := value.(*Project)
+				projectId = project.Id
+				list_last_infoid = project.List_last_infoid
+			} else {
+				projects := elastic.Get(Projectset, Projectset, fmt.Sprintf(ProjectQuery, _id))
+				if projects == nil || len(*projects) == 0 {
+					return
+				}
+				list := (*projects)[0]["list"].([]interface{})
+				if len(list) == 0 {
+					return
+				}
+				list_last, _ := list[len(list)-1].(map[string]interface{})
+				list_last_infoid = util.ObjToString(list_last["infoid"])
+				projectId, _ := (*projects)[0]["_id"].(string)
+				m.allProject.Store(_id, &Project{
+					Id:               projectId,
+					List_last_infoid: list_last_infoid,
+				})
 			}
 			}
-			list_last, _ := list[len(list)-1].(map[string]interface{})
-			list_last_infoid := util.ObjToString(list_last["infoid"])
-			if list_last_infoid == "" {
+			if projectId == "" || list_last_infoid == "" {
 				return
 				return
 			}
 			}
 			lock.Lock()
 			lock.Lock()
 			defer lock.Unlock()
 			defer lock.Unlock()
 			for _, user := range *users {
 			for _, user := range *users {
-				if user_infoId[user.Id] == nil || !user_infoId[user.Id][_id] {
-					continue
-				}
 				updateproject = append(updateproject, []map[string]interface{}{
 				updateproject = append(updateproject, []map[string]interface{}{
 					map[string]interface{}{
 					map[string]interface{}{
-						"projectid": (*projects)[0]["_id"],
+						"projectid": projectId,
 						"userid":    user.Id,
 						"userid":    user.Id,
 					},
 					},
 					map[string]interface{}{
 					map[string]interface{}{
 						"$set": map[string]interface{}{
 						"$set": map[string]interface{}{
-							"projectid":  (*projects)[0]["_id"],
+							"projectid":  projectId,
 							"infoid":     _id,
 							"infoid":     _id,
 							"userid":     user.Id,
 							"userid":     user.Id,
 							"maxid":      list_last_infoid,
 							"maxid":      list_last_infoid,