浏览代码

Merge branch 'dev2.8.5' of http://192.168.3.207:10080/qmx/jy into dev2.8.5

xuzhiheng 5 年之前
父节点
当前提交
6edeee5f6f

+ 3 - 3
src/jfw/modules/pushsubscribe/src/match/config.json

@@ -6,13 +6,13 @@
 	"vipMaxPushSize":2000,
 	"mgoAddr":"192.168.3.128:27080",
 	"mgoSize":10,
-	"testids":["5dce0129e1382317285a570a"],
+	"testids":["5d81c5a525ef8723ac0036f9"],
 	"filterWords":["项目","中标","公告"],
 	"matchPoolSize":60,
 	"savePoolSize":5,
 	"loadBiddingPoolSize":60,
 	"loadUserPoolSize":60,
-	"matchDuration":1, 
-	"userBatch":2,
+	"matchDuration":60, 
+	"userBatch":10,
 	"pcHelper":"127.0.0.1:8082"
 }

+ 1 - 12
src/jfw/modules/pushsubscribe/src/match/job/job.go

@@ -1,7 +1,6 @@
 package job
 
 import (
-	. "match/config"
 	"sync"
 )
 
@@ -15,16 +14,6 @@ type jobs struct {
 
 var Jobs = &jobs{
 	Match: &MatchJob{
-		matchPool:            make(chan bool, Config.MatchPoolSize),
-		matchWaitGroup:       &sync.WaitGroup{},
-		loadBiddingPool:      make(chan bool, Config.LoadBiddingPoolSize),
-		loadBiddingWaitGroup: &sync.WaitGroup{},
-		loadUserPool:         make(chan bool, Config.LoadBiddingPoolSize),
-		loadUserWaitGroup:    &sync.WaitGroup{},
-		loadUserLock:         &sync.Mutex{},
-		savePool:             make(chan bool, Config.SavePoolSize),
-		saveWaitGroup:        &sync.WaitGroup{},
-		userMapLock:          &sync.Mutex{},
-		allProject:           &sync.Map{},
+		allProject: &sync.Map{},
 	},
 }

+ 267 - 245
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -1,7 +1,6 @@
 package job
 
 import (
-	"container/list"
 	"encoding/json"
 	"fmt"
 	. "match/config"
@@ -38,17 +37,7 @@ type Project struct {
 }
 
 type MatchJob struct {
-	matchPool            chan bool
-	matchWaitGroup       *sync.WaitGroup
-	loadBiddingPool      chan bool
-	loadBiddingWaitGroup *sync.WaitGroup
-	loadUserPool         chan bool
-	loadUserWaitGroup    *sync.WaitGroup
-	loadUserLock         *sync.Mutex
-	savePool             chan bool
-	saveWaitGroup        *sync.WaitGroup
-	userMapLock          *sync.Mutex
-	allProject           *sync.Map
+	allProject *sync.Map
 }
 
 //定时任务,匹配数据,存库
@@ -106,202 +95,6 @@ func (m *MatchJob) Execute() {
 	//
 }
 
-func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]interface{}) {
-	logger.Info("开始匹配第", batchIndex, "批用户。。。")
-	userMap, projectUserMap := m.EachAllBidInfo(matcher, datas)
-	logger.Info("第", batchIndex, "批用户匹配结束。。。")
-	logger.Info("第", batchIndex, "批开始保存到", Pushspace_temp, "表。。。")
-	index := 0
-	var saveBatch []map[string]interface{}
-	lock := &sync.Mutex{}
-	for u, i := range *userMap {
-		m.savePool <- true
-		m.saveWaitGroup.Add(1)
-		go func(user *UserInfo, infos *list.List) {
-			defer func() {
-				<-m.savePool
-				m.saveWaitGroup.Done()
-			}()
-			var pushArray = make(SortList, 0)
-			for e := infos.Front(); e != nil; e = e.Next() {
-				matchInfo := *(e.Value.(*MatchInfo))
-				pushArray = append(pushArray, &matchInfo)
-			}
-			//取最新50条
-			sort.Sort(pushArray)
-			var array []*MatchInfo
-			titleMap := map[string]bool{}
-			infoIdMap := map[string]bool{}
-			size := 0
-			for _, v2 := range pushArray {
-				title := util.ObjToString((*v2.Info)["title"])
-				_id := util.ObjToString((*v2.Info)["_id"])
-				pushInfoKey := PushInfoKey(user.Id, _id)
-				if titleMap[title] {
-					continue
-				}
-				titleMap[title] = true
-				isExists, err := redis.Exists(Pushcache_2_a, pushInfoKey)
-				if err != nil {
-					logger.Error(pushInfoKey, "推送信息redis判重出错", err)
-				}
-				if isExists {
-					continue
-				}
-				redis.Put(Pushcache_2_a, pushInfoKey, 1, OneDaySecond)
-				info := map[string]interface{}{}
-				for _, field := range SaveFields {
-					if (*v2.Info)[field] == nil {
-						continue
-					}
-					info[field] = (*v2.Info)[field]
-				}
-				infoIdMap[_id] = true
-				array = append(array, &MatchInfo{
-					Info: &info,
-					Keys: v2.Keys,
-				})
-				size++
-				maxPushSize := Config.MaxPushSize
-				if IsVipUser(user.VipStatus) {
-					maxPushSize = Config.VipMaxPushSize
-				}
-				if size == maxPushSize {
-					break
-				}
-			}
-			if size == 0 {
-				return
-			}
-			lock.Lock()
-			defer lock.Unlock()
-			saveBatch = append(saveBatch, map[string]interface{}{
-				"s_m_openid":    user.S_m_openid,
-				"a_m_openid":    user.A_m_openid,
-				"phone":         user.Phone,
-				"jpushid":       user.Jpushid,
-				"opushid":       user.Opushid,
-				"appphonetype":  user.AppPhoneType,
-				"userid":        user.Id,
-				"ratemode":      user.RateMode,
-				"wxpush":        user.WxPush,
-				"apppush":       user.AppPush,
-				"mailpush":      user.MailPush,
-				"pchelperpush":  user.PcHelperPush,
-				"usertype":      user.UserType,
-				"email":         user.Email,
-				"list":          array,
-				"size":          size,
-				"subscribe":     user.Subscribe,
-				"applystatus":   user.ApplyStatus,
-				"words":         user.Keys,
-				"modifydate":    user.ModifyDate,
-				"mergeorder":    user.MergeOrder,
-				"timestamp":     time.Now().Unix(),
-				"nickname":      user.NickName,
-				"firstpushtime": user.FirstPushTime,
-				"vipstatus":     user.VipStatus,
-			})
-			if len(saveBatch) == BulkSize {
-				mongodb.SaveBulk(Pushspace_temp, saveBatch...)
-				saveBatch = []map[string]interface{}{}
-			}
-		}(u, i)
-		index++
-		if index%500 == 0 {
-			logger.Info(Pushspace_temp, index)
-		}
-	}
-	m.saveWaitGroup.Wait()
-	if len(saveBatch) > 0 {
-		mongodb.SaveBulk(Pushspace_temp, saveBatch...)
-		saveBatch = []map[string]interface{}{}
-	}
-	logger.Info("第", batchIndex, "批保存到", Pushspace_temp, "表结束。。", index)
-	m.ToRelationProject(projectUserMap)
-}
-
-//关联项目
-func (m *MatchJob) ToRelationProject(projectUser *sync.Map) {
-	logger.Info("开始关联项目。。。")
-	index := 0
-	var updateproject [][]map[string]interface{}
-	lock := &sync.Mutex{}
-	projectUser.Range(func(key interface{}, value interface{}) bool {
-		k, _ := key.(string)
-		v, _ := value.(*[]*UserInfo)
-		m.savePool <- true
-		m.saveWaitGroup.Add(1)
-		go func(_id string, users *[]*UserInfo) {
-			defer func() {
-				<-m.savePool
-				m.saveWaitGroup.Done()
-			}()
-			list_last_infoid := ""
-			projectId := ""
-			if value, ok := m.allProject.Load(_id); ok {
-				project, _ := value.(*Project)
-				projectId = project.Id
-				list_last_infoid = project.List_last_infoid
-			} else {
-				projects := elastic.Get(Projectset, Projectset, fmt.Sprintf(ProjectQuery, _id))
-				if projects == nil || len(*projects) == 0 {
-					return
-				}
-				list := (*projects)[0]["list"].([]interface{})
-				if len(list) == 0 {
-					return
-				}
-				list_last, _ := list[len(list)-1].(map[string]interface{})
-				list_last_infoid = util.ObjToString(list_last["infoid"])
-				projectId, _ := (*projects)[0]["_id"].(string)
-				m.allProject.Store(_id, &Project{
-					Id:               projectId,
-					List_last_infoid: list_last_infoid,
-				})
-			}
-			if projectId == "" || list_last_infoid == "" {
-				return
-			}
-			lock.Lock()
-			defer lock.Unlock()
-			for _, user := range *users {
-				updateproject = append(updateproject, []map[string]interface{}{
-					map[string]interface{}{
-						"projectid": projectId,
-						"userid":    user.Id,
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"projectid":  projectId,
-							"infoid":     _id,
-							"userid":     user.Id,
-							"maxid":      list_last_infoid,
-							"subtypes":   user.O_vipjy.SubTypes,
-							"createtime": time.Now().Unix(),
-						},
-					},
-				})
-			}
-			if len(updateproject) == BigBulkSize {
-				mongodb.NewUpdateBulk(Pushspace_project, true, true, updateproject...)
-				updateproject = [][]map[string]interface{}{}
-			}
-		}(k, v)
-		index++
-		if index%500 == 0 {
-			logger.Info("关联项目", index)
-		}
-		return true
-	})
-	m.saveWaitGroup.Wait()
-	if len(updateproject) > 0 {
-		mongodb.NewUpdateBulk(Pushspace_project, true, true, updateproject...)
-		updateproject = [][]map[string]interface{}{}
-	}
-	logger.Info("关联项目结束。。。", index)
-}
-
 //加载数据到内存中
 func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[string]interface{} {
 	defer util.Catch()
@@ -342,13 +135,15 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[stri
 		"buyerclass":      1,
 	}).Sort("_id").Iter()
 	index := 0
+	loadBiddingPool := make(chan bool, Config.LoadBiddingPoolSize)
+	loadBiddingWaitGroup := &sync.WaitGroup{}
 	for data := make(map[string]interface{}); it.Next(&data); {
-		m.loadBiddingPool <- true
-		m.loadBiddingWaitGroup.Add(1)
+		loadBiddingPool <- true
+		loadBiddingWaitGroup.Add(1)
 		go func(temp map[string]interface{}) {
 			defer func() {
-				<-m.loadBiddingPool
-				m.loadBiddingWaitGroup.Done()
+				<-loadBiddingPool
+				loadBiddingWaitGroup.Done()
 			}()
 			_id := util.BsonIdToSId(temp["_id"])
 			temp["_id"] = _id
@@ -369,10 +164,10 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) *[]map[stri
 		data = make(map[string]interface{})
 		index++
 		if index%500 == 0 {
-			logger.Info(Bidding, index)
+			logger.Info("加载", Bidding, "数据:", index)
 		}
 	}
-	m.loadBiddingWaitGroup.Wait()
+	loadBiddingWaitGroup.Wait()
 	logger.Info(Bidding, "数据已经加载结束。。。", index)
 	return &res
 }
@@ -410,13 +205,17 @@ func (m *MatchJob) OnceUserBatch(batchIndex int, lastUserId *string) (int, *VipU
 	//免费用户
 	title_key_user := make(map[string]*[]*UserInfo)
 	title_notkey_user := make(map[string]*[]*UserInfo)
+	//
+	loadUserPool := make(chan bool, Config.LoadUserPoolSize)
+	loadUserWaitGroup := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
 	for userTemp := make(map[string]interface{}); query.Next(&userTemp); {
-		m.loadUserPool <- true
-		m.loadUserWaitGroup.Add(1)
+		loadUserPool <- true
+		loadUserWaitGroup.Add(1)
 		go func(temp map[string]interface{}) {
 			defer func() {
-				<-m.loadUserPool
-				m.loadUserWaitGroup.Done()
+				<-loadUserPool
+				loadUserWaitGroup.Done()
 			}()
 			user, o_msgset := public.NewUserInfoByUserColl(temp)
 			if user == nil {
@@ -519,8 +318,8 @@ func (m *MatchJob) OnceUserBatch(batchIndex int, lastUserId *string) (int, *VipU
 			user.Keys = originalKeys
 			user.Key_notkey = key_notkey
 			/***************start*****************/
-			m.loadUserLock.Lock()
-			defer m.loadUserLock.Unlock()
+			lock.Lock()
+			defer lock.Unlock()
 			if isVipUser {
 				user.O_vipjy = &O_vipjy{
 					ProjectMatch: util.IntAll(o_msgset["i_projectmatch"]),
@@ -599,7 +398,7 @@ func (m *MatchJob) OnceUserBatch(batchIndex int, lastUserId *string) (int, *VipU
 			break
 		}
 	}
-	m.loadUserWaitGroup.Wait()
+	loadUserWaitGroup.Wait()
 	//
 	vip_title_pjob := &KeyDfa{
 		Key_user:    &vip_title_key_user,
@@ -653,26 +452,48 @@ func (m *MatchJob) MakeKeyUser(keys []string, user *UserInfo, key_user *map[stri
 }
 
 //遍历数据并执行推送操作
-func (m *MatchJob) EachAllBidInfo(matcher Matcher, datas *[]map[string]interface{}) (*map[*UserInfo]*list.List, *sync.Map) {
+func (m *MatchJob) EachAllBidInfo(matcher Matcher, datas *[]map[string]interface{}) (*map[*UserInfo]*SortList, *map[*UserInfo]*[]string) {
 	defer util.Catch()
 	logger.Info("开始匹配数据。。。")
-	userMap := map[*UserInfo]*list.List{}
-	relationProject := &sync.Map{}
+	userMap := map[*UserInfo]*SortList{}
+	projectUserMap := map[*UserInfo]*[]string{}
+	lock := &sync.Mutex{}
 	var index int
+	matchPool := make(chan bool, Config.MatchPoolSize)
+	matchWaitGroup := &sync.WaitGroup{}
 	for _, temp := range *datas {
-		m.matchPool <- true
-		m.matchWaitGroup.Add(1)
+		matchPool <- true
+		matchWaitGroup.Add(1)
 		go func(info map[string]interface{}) {
 			defer func() {
-				m.matchWaitGroup.Done()
-				<-m.matchPool
+				matchWaitGroup.Done()
+				<-matchPool
 			}()
 			users, projectUsers := matcher.Match(&info)
+			lock.Lock()
+			defer lock.Unlock()
 			if users != nil && len(*users) > 0 {
-				m.EachInfoToUser(users, &info, &userMap)
+				for k, v := range *users {
+					l := userMap[k]
+					if l == nil {
+						l = &SortList{}
+					}
+					*l = append(*l, &MatchInfo{
+						Info: &info,
+						Keys: v.Keys,
+					})
+					userMap[k] = l
+				}
 			}
 			if projectUsers != nil && len(*projectUsers) > 0 {
-				relationProject.Store(util.ObjToString(info["_id"]), projectUsers)
+				for _, v := range *projectUsers {
+					l := projectUserMap[v]
+					if l == nil {
+						l = &[]string{}
+					}
+					*l = append(*l, util.ObjToString(info["_id"]))
+					projectUserMap[v] = l
+				}
 			}
 		}(temp)
 		index++
@@ -680,24 +501,225 @@ func (m *MatchJob) EachAllBidInfo(matcher Matcher, datas *[]map[string]interface
 			logger.Info("匹配数据", index)
 		}
 	}
-	m.matchWaitGroup.Wait()
+	matchWaitGroup.Wait()
 	logger.Info("匹配数据结束。。。", index)
-	return &userMap, relationProject
+	return &userMap, &projectUserMap
+}
+func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher, datas *[]map[string]interface{}) {
+	logger.Info("开始匹配第", batchIndex, "批用户。。。")
+	userMap, projectUserMap := m.EachAllBidInfo(matcher, datas)
+	logger.Info("第", batchIndex, "批用户匹配结束。。。")
+	logger.Info("第", batchIndex, "批开始保存到", Pushspace_temp, "表。。。")
+	index := 0
+	var saveBatch []map[string]interface{}
+	myMatchId := map[string]map[string]bool{}
+	myFilterId := map[string]map[string]bool{}
+	//
+	savePool := make(chan bool, Config.SavePoolSize)
+	saveWaitGroup := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	for k, v := range *userMap {
+		savePool <- true
+		saveWaitGroup.Add(1)
+		go func(user *UserInfo, infos *SortList) {
+			defer func() {
+				<-savePool
+				saveWaitGroup.Done()
+			}()
+			//取最新50条
+			sort.Sort(infos)
+			var array []*MatchInfo
+			matchTitle := map[string]bool{}
+			matchId := map[string]bool{}
+			filterId := map[string]bool{}
+			size := 0
+			for _, v2 := range *infos {
+				title := util.ObjToString((*v2.Info)["title"])
+				_id := util.ObjToString((*v2.Info)["_id"])
+				pushInfoKey := PushInfoKey(user.Id, _id)
+				if matchTitle[title] {
+					filterId[_id] = true
+					continue
+				}
+				matchTitle[title] = true
+				isExists, err := redis.Exists(Pushcache_2_a, pushInfoKey)
+				if err != nil {
+					logger.Error(pushInfoKey, "推送信息redis判重出错", err)
+				}
+				if isExists {
+					filterId[_id] = true
+					continue
+				}
+				redis.Put(Pushcache_2_a, pushInfoKey, 1, OneDaySecond)
+				info := map[string]interface{}{}
+				for _, field := range SaveFields {
+					if (*v2.Info)[field] == nil {
+						continue
+					}
+					info[field] = (*v2.Info)[field]
+				}
+				matchId[_id] = true
+				array = append(array, &MatchInfo{
+					Info: &info,
+					Keys: v2.Keys,
+				})
+				size++
+				maxPushSize := Config.MaxPushSize
+				if IsVipUser(user.VipStatus) {
+					maxPushSize = Config.VipMaxPushSize
+				}
+				if size == maxPushSize {
+					break
+				}
+			}
+			if size == 0 {
+				return
+			}
+			lock.Lock()
+			defer lock.Unlock()
+			myMatchId[user.Id] = matchId
+			myFilterId[user.Id] = filterId
+			saveBatch = append(saveBatch, map[string]interface{}{
+				"s_m_openid":    user.S_m_openid,
+				"a_m_openid":    user.A_m_openid,
+				"phone":         user.Phone,
+				"jpushid":       user.Jpushid,
+				"opushid":       user.Opushid,
+				"appphonetype":  user.AppPhoneType,
+				"userid":        user.Id,
+				"ratemode":      user.RateMode,
+				"wxpush":        user.WxPush,
+				"apppush":       user.AppPush,
+				"mailpush":      user.MailPush,
+				"pchelperpush":  user.PcHelperPush,
+				"usertype":      user.UserType,
+				"email":         user.Email,
+				"list":          array,
+				"size":          size,
+				"subscribe":     user.Subscribe,
+				"applystatus":   user.ApplyStatus,
+				"words":         user.Keys,
+				"modifydate":    user.ModifyDate,
+				"mergeorder":    user.MergeOrder,
+				"timestamp":     time.Now().Unix(),
+				"nickname":      user.NickName,
+				"firstpushtime": user.FirstPushTime,
+				"vipstatus":     user.VipStatus,
+			})
+			if len(saveBatch) == BulkSize {
+				mongodb.SaveBulk(Pushspace_temp, saveBatch...)
+				saveBatch = []map[string]interface{}{}
+			}
+		}(k, v)
+		index++
+		if index%500 == 0 {
+			logger.Info("保存到", Pushspace_temp, "表:", index)
+		}
+	}
+	saveWaitGroup.Wait()
+	if len(saveBatch) > 0 {
+		mongodb.SaveBulk(Pushspace_temp, saveBatch...)
+		saveBatch = []map[string]interface{}{}
+	}
+	logger.Info("第", batchIndex, "批保存到", Pushspace_temp, "表结束。。。", index)
+	m.ToRelationProject(projectUserMap, &myMatchId, &myFilterId)
 }
 
-//遍历用户加入到此条信息上
-func (m *MatchJob) EachInfoToUser(users *map[*UserInfo]*MatchUser, info *map[string]interface{}, userMap *map[*UserInfo]*list.List) {
-	defer m.userMapLock.Unlock()
-	m.userMapLock.Lock()
-	for k, v := range *users {
-		l := (*userMap)[k]
-		if l == nil {
-			l = list.New()
+//关联项目
+func (m *MatchJob) ToRelationProject(projectUser *map[*UserInfo]*[]string, myMatchId, myFilterId *map[string]map[string]bool) {
+	logger.Info("开始关联项目。。。")
+	index := 0
+	var updateproject [][]map[string]interface{}
+	//
+	savePool := make(chan bool, Config.SavePoolSize)
+	saveWaitGroup := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	for k, v := range *projectUser {
+		savePool <- true
+		saveWaitGroup.Add(1)
+		go func(user *UserInfo, _ids *[]string) {
+			defer func() {
+				<-savePool
+				saveWaitGroup.Done()
+			}()
+			needLength := Config.MaxPushSize - len((*myMatchId)[user.Id])
+			for _, _id := range *_ids {
+				if (*myFilterId)[user.Id] != nil && (*myFilterId)[user.Id][_id] {
+					continue
+				}
+				//如果有信息类型,优先用订阅匹配上的信息,然后最多关联50条
+				if len(user.O_vipjy.SubTypes) > 0 {
+					if (*myMatchId)[user.Id] == nil || !(*myMatchId)[user.Id][_id] {
+						if needLength > 0 {
+							needLength--
+						} else {
+							continue
+						}
+					}
+				} else { //如果没有信息类型,只关联订阅匹配上的信息
+					if (*myMatchId)[user.Id] == nil || !(*myMatchId)[user.Id][_id] {
+						continue
+					}
+				}
+				list_last_infoid := ""
+				projectId := ""
+				if value, ok := m.allProject.Load(_id); ok {
+					project, _ := value.(*Project)
+					projectId = project.Id
+					list_last_infoid = project.List_last_infoid
+				} else {
+					projects := elastic.Get(Projectset, Projectset, fmt.Sprintf(ProjectQuery, _id))
+					if projects == nil || len(*projects) == 0 {
+						continue
+					}
+					list := (*projects)[0]["list"].([]interface{})
+					if len(list) == 0 {
+						continue
+					}
+					list_last, _ := list[len(list)-1].(map[string]interface{})
+					list_last_infoid = util.ObjToString(list_last["infoid"])
+					projectId, _ = (*projects)[0]["_id"].(string)
+					m.allProject.Store(_id, &Project{
+						Id:               projectId,
+						List_last_infoid: list_last_infoid,
+					})
+				}
+				if projectId == "" || list_last_infoid == "" {
+					continue
+				}
+				lock.Lock()
+				updateproject = append(updateproject, []map[string]interface{}{
+					map[string]interface{}{
+						"projectid": projectId,
+						"userid":    user.Id,
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"projectid":  projectId,
+							"infoid":     _id,
+							"userid":     user.Id,
+							"maxid":      list_last_infoid,
+							"subtypes":   user.O_vipjy.SubTypes,
+							"createtime": time.Now().Unix(),
+						},
+					},
+				})
+				if len(updateproject) == BigBulkSize {
+					mongodb.NewUpdateBulk(Pushspace_project, true, true, updateproject...)
+					updateproject = [][]map[string]interface{}{}
+				}
+				lock.Unlock()
+			}
+		}(k, v)
+		index++
+		if index%500 == 0 {
+			logger.Info("关联项目:", index)
 		}
-		l.PushBack(&MatchInfo{
-			Info: info,
-			Keys: v.Keys,
-		})
-		(*userMap)[k] = l
 	}
+	saveWaitGroup.Wait()
+	if len(updateproject) > 0 {
+		mongodb.NewUpdateBulk(Pushspace_project, true, true, updateproject...)
+		updateproject = [][]map[string]interface{}{}
+	}
+	logger.Info("关联项目结束。。。", index)
 }

+ 7 - 2
src/jfw/modules/pushsubscribe/src/match/job/timetask.go

@@ -20,8 +20,13 @@ type MatchTimeTask struct {
 }
 
 func (m *MatchTimeTask) Execute() {
-	Jobs.Match.Execute()
-	util.WriteSysConfig("./task.json", &TaskConfig)
+	hour := time.Hour
+	//23点到1点之间,不执行定时任务
+	//订阅付费有每天0点的定时任务删除pushspace_temp表的数据,会有冲突
+	if hour < 23 && hour > 1 {
+		Jobs.Match.Execute()
+		util.WriteSysConfig("./task.json", &TaskConfig)
+	}
 	t := time.Duration(Config.MatchDuration) * time.Minute
 	logger.Info("start match job after", t)
 	time.AfterFunc(t, m.Execute)

二进制
src/jfw/modules/pushsubscribe/src/match/match


+ 1 - 1
src/jfw/modules/pushsubscribe/src/match/task.json

@@ -1 +1 @@
-{"startTime":"","lastId":"5da69c3fa5cb26b9b77bec0d"}
+{"lastTime":"","lastId":"5da72482a5cb26b9b7a130fb"}

+ 14 - 13
src/jfw/modules/pushsubscribe/src/public/util.go

@@ -11,19 +11,20 @@ import (
 )
 
 const (
-	BulkSize          = 20
-	BigBulkSize       = 100
-	OneDaySecond      = 86400
-	User              = "user"
-	Bidding           = "bidding"
-	Projectset        = "projectset"
-	Pushspace_project = "pushspace_project"
-	Pushspace_temp    = "pushspace_temp"
-	Pushspace_vip     = "pushspace_vip"
-	Pushspace_fail    = "pushspace_fail"
-	Pushspace         = "pushspace"
-	Pushcache_1       = "pushcache_1"
-	Pushcache_2_a     = "pushcache_2_a"
+	BulkSize           = 20
+	BigBulkSize        = 100
+	OneDaySecond       = 86400
+	User               = "user"
+	Bidding            = "bidding"
+	Projectset         = "projectset"
+	Pushspace_project  = "pushspace_project"
+	Pushspace_temp     = "pushspace_temp"
+	Pushspace_vip      = "pushspace_vip"
+	Pushspace_vip_fail = "pushspace_vip_fail"
+	Pushspace_fail     = "pushspace_fail"
+	Pushspace          = "pushspace"
+	Pushcache_1        = "pushcache_1"
+	Pushcache_2_a      = "pushcache_2_a"
 )
 
 var (

文件差异内容过多而无法显示
+ 0 - 0
src/jfw/modules/pushsubscribe/src/push/config.json


+ 2 - 17
src/jfw/modules/pushsubscribe/src/push/job/jobs.go

@@ -1,7 +1,6 @@
 package job
 
 import (
-	. "push/config"
 	"sync"
 )
 
@@ -10,23 +9,9 @@ var Jobs = struct {
 	Push        *PushJob
 	ProjectPush *ProjectPushJob
 }{
-	Move: &MoveJob{
-		moveLock:  &sync.Mutex{},
-		moveWait:  &sync.WaitGroup{},
-		movePool:  make(chan bool, Config.MovePoolSize),
-		mergeLock: &sync.Mutex{},
-		mergeWait: &sync.WaitGroup{},
-		mergePool: make(chan bool, Config.MergePoolSize),
-	},
+	Move: &MoveJob{},
 	Push: &PushJob{
-		pool: make(chan bool, Config.PushPoolSize),
-		wait: &sync.WaitGroup{},
 		lock: &sync.Mutex{},
 	},
-	ProjectPush: &ProjectPushJob{
-		pushPool: make(chan bool, Config.ProjectPushPoolSize),
-		pushWait: &sync.WaitGroup{},
-		loadPool: make(chan bool, Config.LoadProjectPoolSize),
-		loadWait: &sync.WaitGroup{},
-	},
+	ProjectPush: &ProjectPushJob{},
 }

+ 24 - 22
src/jfw/modules/pushsubscribe/src/push/job/movejob.go

@@ -22,12 +22,6 @@ type MoveUser struct {
 }
 
 type MoveJob struct {
-	moveLock  *sync.Mutex
-	moveWait  *sync.WaitGroup
-	movePool  chan bool
-	mergeLock *sync.Mutex
-	mergeWait *sync.WaitGroup
-	mergePool chan bool
 }
 
 func (m *MoveJob) Execute() {
@@ -45,15 +39,18 @@ func (m *MoveJob) Execute() {
 	}).Sort("userid").Iter()
 	moveUsers := map[string]*MoveUser{}
 	index, number, length := 0, 0, 0
+	//
+	moveWait := &sync.WaitGroup{}
+	movePool := make(chan bool, Config.MovePoolSize)
+	moveLock := &sync.Mutex{}
 	for data := make(map[string]interface{}); it.Next(&data); {
-		m.movePool <- true
-		m.moveWait.Add(1)
+		movePool <- true
+		moveWait.Add(1)
 		index++
 		go func(temp map[string]interface{}) {
 			defer func() {
-				<-m.movePool
-				m.moveWait.Done()
-				m.moveLock.Unlock()
+				<-movePool
+				moveWait.Done()
 			}()
 			userId := util.ObjToString(temp["userid"])
 			isVipUser := IsVipUser(util.IntAll(temp["vipstatus"]))
@@ -61,7 +58,8 @@ func (m *MoveJob) Execute() {
 			if isVipUser {
 				maxPushSize = Config.VipMaxPushSize
 			}
-			m.moveLock.Lock()
+			moveLock.Lock()
+			defer moveLock.Unlock()
 			moveUser := moveUsers[userId]
 			if moveUser != nil {
 				list, _ := moveUser.info["list"].(*SortList)
@@ -109,7 +107,7 @@ func (m *MoveJob) Execute() {
 			logger.Info("迁移数据:", index)
 		}
 	}
-	m.moveWait.Wait()
+	moveWait.Wait()
 	if length > 0 {
 		m.merge(&number, nowUnix, moveUsers)
 		length = 0
@@ -127,26 +125,30 @@ func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUs
 	updateArray_set := []map[string]interface{}{}
 	updateArray_delete := []interface{}{}
 	invalidArray_delete := []interface{}{}
+	//
+	mergeLock := &sync.Mutex{}
+	mergeWait := &sync.WaitGroup{}
+	mergePool := make(chan bool, Config.MergePoolSize)
 	for k, v := range moveUsers {
-		m.mergePool <- true
-		m.mergeWait.Add(1)
+		mergePool <- true
+		mergeWait.Add(1)
 		go func(userId string, moveUser *MoveUser) {
 			defer func() {
-				<-m.mergePool
-				m.mergeWait.Done()
+				<-mergePool
+				mergeWait.Done()
 			}()
 			sess := mongodb.GetMgoConn()
 			defer mongodb.DestoryMongoConn(sess)
 			var data map[string]interface{}
 			sess.DB(DbName).C(Pushspace).Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
 			if data == nil { //批量新增
-				m.mergeLock.Lock()
+				mergeLock.Lock()
 				saveArray = append(saveArray, moveUser.info)
 				saveArray_delete = append(saveArray_delete, moveUser.ids...)
 				if len(saveArray) == BulkSize {
 					m.saveBulk(sess, &saveArray, &saveArray_delete)
 				}
-				m.mergeLock.Unlock()
+				mergeLock.Unlock()
 			} else { //批量更新
 				setMap := map[string]interface{}{}
 				for _, field := range MoveFields {
@@ -198,14 +200,14 @@ func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUs
 					}
 				}
 				upSet["$set"] = setMap
-				m.mergeLock.Lock()
+				mergeLock.Lock()
 				updateArray_delete = append(updateArray_delete, moveUser.ids...)
 				updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
 				updateArray_set = append(updateArray_set, upSet)
 				if len(updateArray_query) == BulkSize {
 					m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
 				}
-				m.mergeLock.Unlock()
+				mergeLock.Unlock()
 			}
 		}(k, v)
 		index++
@@ -213,7 +215,7 @@ func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUs
 			logger.Info("第", *number, "次合并数据:", index)
 		}
 	}
-	m.mergeWait.Wait()
+	mergeWait.Wait()
 	if len(saveArray) > 0 {
 		m.saveBulk(nil, &saveArray, &saveArray_delete)
 	}

+ 15 - 15
src/jfw/modules/pushsubscribe/src/push/job/projectjob.go

@@ -18,10 +18,6 @@ import (
 )
 
 type ProjectPushJob struct {
-	pushPool chan bool
-	pushWait *sync.WaitGroup
-	loadPool chan bool
-	loadWait *sync.WaitGroup
 }
 
 func (p *ProjectPushJob) Execute() {
@@ -32,18 +28,21 @@ func (p *ProjectPushJob) Execute() {
 	projects := p.loadProject()
 	startId := ""
 	batchIndex := 0
+	//
+	pushPool := make(chan bool, Config.ProjectPushPoolSize)
+	pushWait := &sync.WaitGroup{}
 	for {
 		batchIndex++
 		batchCount, datas := p.loadPushspace_project(batchIndex, &startId)
 		logger.Info("开始第", batchIndex, "次关联项目匹配。。。")
 		index := 0
 		for k, v := range *datas {
-			p.pushPool <- true
-			p.pushWait.Add(1)
+			pushPool <- true
+			pushWait.Add(1)
 			go func(userId string, list []map[string]interface{}) {
 				defer func() {
-					<-p.pushPool
-					p.pushWait.Done()
+					<-pushPool
+					pushWait.Done()
 				}()
 				sess := mongodb.GetMgoConn()
 				defer mongodb.DestoryMongoConn(sess)
@@ -176,7 +175,7 @@ func (p *ProjectPushJob) Execute() {
 				logger.Info("第", batchIndex, "次关联项目匹配:", index)
 			}
 		}
-		p.pushWait.Wait()
+		pushWait.Wait()
 		logger.Info("第", batchIndex, "次关联项目匹配结束", index)
 		if batchCount < Config.PushBatch {
 			break
@@ -240,7 +239,6 @@ func (p *ProjectPushJob) loadPushspace_project(batchIndex int, startId *string)
 		}
 		prevUserId = userId
 	}
-	p.loadWait.Wait()
 	logger.Info("第", batchIndex, "批关联项目加载结束", index, *startId)
 	return index, &datas
 }
@@ -264,13 +262,15 @@ func (p *ProjectPushJob) loadProject() *sync.Map {
 	}).Sort("_id").Iter()
 	projectMap := &sync.Map{}
 	index := 0
+	loadPool := make(chan bool, Config.LoadProjectPoolSize)
+	loadWait := &sync.WaitGroup{}
 	for m := make(map[string]interface{}); it.Next(&m); {
-		p.loadPool <- true
-		p.loadWait.Add(1)
+		loadPool <- true
+		loadWait.Add(1)
 		go func(data map[string]interface{}) {
 			defer func() {
-				<-p.loadPool
-				p.loadWait.Done()
+				<-loadPool
+				loadWait.Done()
 			}()
 			_id := util.BsonIdToSId(data["_id"])
 			projectMap.Store(_id, data)
@@ -281,7 +281,7 @@ func (p *ProjectPushJob) loadProject() *sync.Map {
 			logger.Info("加载项目:", index)
 		}
 	}
-	p.loadWait.Wait()
+	loadWait.Wait()
 	logger.Info("加载项目结束。。。", index)
 	return projectMap
 }

+ 8 - 7
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -35,8 +35,6 @@ func init() {
 }
 
 type PushJob struct {
-	pool                    chan bool
-	wait                    *sync.WaitGroup
 	lock                    *sync.Mutex
 	minutePushPool          chan bool
 	fastigiumMinutePushPool chan bool
@@ -74,6 +72,9 @@ func (p *PushJob) StartPush(pusher Pusher, taskType int) {
 	logger.Info("推送任务", taskType, "开始推送。。。")
 	batchIndex := 0
 	startId := ""
+	//
+	pushPool := make(chan bool, Config.PushPoolSize)
+	pushWait := &sync.WaitGroup{}
 	for {
 		batchIndex++
 		isBreak, users := pusher.OncePushBatch(taskType, batchIndex, &startId)
@@ -90,15 +91,15 @@ func (p *PushJob) StartPush(pusher Pusher, taskType int) {
 						}
 					}
 				}()
-			case p.pool <- true:
+			case pushPool <- true:
 			}
-			p.wait.Add(1)
+			pushWait.Add(1)
 			go func(v map[string]interface{}, take bool) {
 				defer func() {
 					if take {
-						<-p.pool
+						<-pushPool
 					}
-					p.wait.Done()
+					pushWait.Done()
 				}()
 				u, pushWay := pusher.GetUserInfo(v)
 				if u == nil {
@@ -116,7 +117,7 @@ func (p *PushJob) StartPush(pusher Pusher, taskType int) {
 			break
 		}
 	}
-	p.wait.Wait()
+	pushWait.Wait()
 	logger.Info("推送任务结束。。。", taskType)
 }
 

二进制
src/jfw/modules/pushsubscribe/src/push/push


+ 15 - 11
src/jfw/modules/pushsubscribe/src/push/pusher/specialpush.go

@@ -16,9 +16,7 @@ type SpecialPush struct{}
 
 //获取需要推送的用户
 func (s *SpecialPush) OncePushBatch(taskType, batchIndex int, startId *string) (bool, *[]map[string]interface{}) {
-	query := map[string]interface{}{
-		"status": 1,
-	}
+	query := map[string]interface{}{}
 	if taskType == 3 {
 		query["ratemode"] = 3
 	} else if taskType == 4 {
@@ -93,17 +91,23 @@ func (s *SpecialPush) GetPushParam(mailPush bool, u *UserInfo, sl *SortList) *pu
 
 //推送以后处理
 func (s *SpecialPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
-	if pushResult == nil || (pushResult.WxStatus != -1 && pushResult.AppStatus != -1 && pushResult.MailStatus != -1) {
-		s.deletePushspaceVip(user)
-	} else {
+	s.deletePushspaceVip(user)
+	if pushResult != nil && (pushResult.WxStatus == -1 || pushResult.AppStatus == -1 || pushResult.MailStatus == -1) {
 		sess := mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(sess)
-		err := sess.DB(DbName).C(Pushspace_vip).UpdateId(user["_id"], map[string]interface{}{
-			"failtime": time.Now().Unix(),
-			"status":   -1,
-		})
+		if pushResult.WxStatus == -1 {
+			user["wxfail"] = 1
+		}
+		if pushResult.AppStatus == -1 {
+			user["appfail"] = 1
+		}
+		if pushResult.MailStatus == -1 {
+			user["mailfail"] = 1
+		}
+		user["failtime"] = time.Now().Unix()
+		err := sess.DB(DbName).C(Pushspace_vip_fail).Insert(user)
 		if err != nil {
-			logger.Error(u.Id, "更新出错", err)
+			logger.Error(u.Id, "新出错", err)
 		}
 	}
 }

+ 4 - 6
src/web/templates/weixin/vipsubscribe/trial_info.html

@@ -15,9 +15,9 @@
     <link rel="stylesheet" href="/vipsubscribe/css/trial_info.css?v={{Msg "seo" "version"}}">
 </head>
 
-<body style="height: 100vh">
+<body style="height: 100vh; background: #34355A;">
 <div class="trial_info">
-    <form class="form" id="formInfo">
+    <form class="form">
         <div class="trial_body">
             <div class="banner">
                 <div class="shadow"></div>
@@ -161,7 +161,7 @@
             }
         });
         //提交
-        $('#formInfo').submit(function (e) {
+        $(".btn").on("click",function (e) {
             var name = $('.name').val();
             var tel = $('.phone').val();
             var code = $('.phonecode').val();
@@ -180,7 +180,6 @@
                     window.location.replace("/weixin/pay/vipsubscribe_trial");
                 }
             });
-            return false;
         });
     });
 
@@ -232,9 +231,8 @@
         })
     }
     {{else}}
-    $('#formInfo').submit(function (e) {
+    $(".btn").on("click",function (e) {
         window.location.replace("/weixin/pay/vipsubscribe_trial");
-        return false;
     });
     {{end}}
 

部分文件因为文件数量过多而无法显示