wcj 5 ani în urmă
părinte
comite
667384d974

+ 2 - 1
src/jfw/modules/pushsubscribe/src/match/config.json

@@ -4,12 +4,13 @@
 	"redisServers": "pushcache_1=192.168.3.128:5000,pushcache_2_a=192.168.3.128:5001",
 	"maxPushSize": 50,
 	"vipMaxPushSize": 2000,
-	"maxSearch": 5000,
+	"maxSearch": 50000,
 	"mgoAddr": "192.168.3.128:27080",
 	"mgoSize": 10,
 	"testids": ["5d81c5a525ef8723ac0036f9"],
 	"filterWords":["项目","中标","公告"],
 	"matchPoolSize": 60,
+	"loadBiddingPoolSize": 50,
 	"matchDuration": 1, 
 	"userBatch":2,
 	"pcHelper":"127.0.0.1:8082"

+ 15 - 14
src/jfw/modules/pushsubscribe/src/match/config/config.go

@@ -5,20 +5,21 @@ import (
 )
 
 type config struct {
-	ElasticPoolSize int      `json:"elasticPoolSize"`
-	ElasticSearch   string   `json:"elasticSearch"`
-	RedisServers    string   `json:"redisServers"`
-	MaxPushSize     int      `json:"maxPushSize"`
-	VipMaxPushSize  int      `json:"vipMaxPushSize"`
-	MaxSearch       int      `json:"maxSearch"`
-	MgoAddr         string   `json:"mgoAddr"`
-	MgoSize         int      `json:"mgoSize"`
-	TestIds         []string `json:"testIds"`
-	FilterWords     []string `json:"filterWords"`
-	MatchPoolSize   int      `json:"matchPoolSize"`
-	MatchDuration   int64    `json:"matchDuration"`
-	UserBatch       int      `json:"userBatch"`
-	PcHelper        string   `json:"pcHelper"`
+	ElasticPoolSize     int      `json:"elasticPoolSize"`
+	ElasticSearch       string   `json:"elasticSearch"`
+	RedisServers        string   `json:"redisServers"`
+	MaxPushSize         int      `json:"maxPushSize"`
+	VipMaxPushSize      int      `json:"vipMaxPushSize"`
+	MaxSearch           int      `json:"maxSearch"`
+	MgoAddr             string   `json:"mgoAddr"`
+	MgoSize             int      `json:"mgoSize"`
+	TestIds             []string `json:"testIds"`
+	FilterWords         []string `json:"filterWords"`
+	MatchPoolSize       int      `json:"matchPoolSize"`
+	LoadBiddingPoolSize int      `json:"loadBiddingPoolSize"`
+	MatchDuration       int64    `json:"matchDuration"`
+	UserBatch           int      `json:"userBatch"`
+	PcHelper            string   `json:"pcHelper"`
 }
 
 type taskConfig struct {

+ 8 - 6
src/jfw/modules/pushsubscribe/src/match/job/job.go

@@ -15,11 +15,13 @@ type jobs struct {
 
 var Jobs = &jobs{
 	Match: &MatchJob{
-		datas:             &[]map[string]interface{}{},
-		matchPool:         make(chan bool, Config.MatchPoolSize),
-		eachInfoWaitGroup: &sync.WaitGroup{},
-		saveWaitGroup:     &sync.WaitGroup{},
-		userMapLock:       &sync.Mutex{},
-		saveBatch:         []map[string]interface{}{},
+		datas:                &[]map[string]interface{}{},
+		matchPool:            make(chan bool, Config.MatchPoolSize),
+		loadBiddingPool:      make(chan bool, Config.LoadBiddingPoolSize),
+		loadBiddingWaitGroup: &sync.WaitGroup{},
+		eachInfoWaitGroup:    &sync.WaitGroup{},
+		saveWaitGroup:        &sync.WaitGroup{},
+		userMapLock:          &sync.Mutex{},
+		saveBatch:            []map[string]interface{}{},
 	},
 }

+ 0 - 7
src/jfw/modules/pushsubscribe/src/match/job/matcher.go

@@ -1,7 +0,0 @@
-package job
-
-import . "public"
-
-type Matcher interface {
-	Match(info *map[string]interface{}) *map[*UserInfo]*MatchUser
-}

+ 55 - 72
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -5,7 +5,7 @@ import (
 	"encoding/json"
 	"fmt"
 	. "match/config"
-	"match/dfa"
+	. "match/matcher"
 	mutil "match/util"
 	"public"
 	. "public"
@@ -38,42 +38,16 @@ const (
 	DbName    = "qfw"
 )
 
-type Pjob struct {
-	InterestDfa    *dfa.DFA
-	NotInterestDfa *dfa.DFA
-	Key_user       *map[string]*[]*UserInfo
-	Notkey_user    *map[string]*[]*UserInfo
-}
-
-//所有用户的关键词和排除词
-func (p *Pjob) CreateDaf() {
-	//关键词
-	p.InterestDfa = &dfa.DFA{}
-	interestWords := make([]string, 0)
-	for k, _ := range *p.Key_user {
-		interestWords = append(interestWords, k)
-	}
-	p.InterestDfa.AddWord(interestWords...)
-	//排除关键词
-	p.NotInterestDfa = &dfa.DFA{}
-	notInterestWords := make([]string, 0)
-	for k, _ := range *p.Notkey_user {
-		notInterestWords = append(notInterestWords, k)
-	}
-	p.NotInterestDfa.AddWord(notInterestWords...)
-}
-
-type MatchUser struct {
-	Keys []string
-}
 type MatchJob struct {
-	datas             *[]map[string]interface{} //本次加载的数据
-	matchPool         chan bool
-	eachInfoWaitGroup *sync.WaitGroup
-	saveWaitGroup     *sync.WaitGroup
-	userMapLock       *sync.Mutex
-	lastUserId        string
-	saveBatch         []map[string]interface{}
+	datas                *[]map[string]interface{} //本次加载的数据
+	matchPool            chan bool
+	loadBiddingPool      chan bool
+	loadBiddingWaitGroup *sync.WaitGroup
+	eachInfoWaitGroup    *sync.WaitGroup
+	saveWaitGroup        *sync.WaitGroup
+	userMapLock          *sync.Mutex
+	lastUserId           string
+	saveBatch            []map[string]interface{}
 }
 
 //定时任务,匹配数据,存库
@@ -105,20 +79,20 @@ func (m *MatchJob) Execute() {
 		return
 	}
 	m.lastUserId = ""
-	user_batch_index := 0
+	batchIndex := 0
 	for {
-		user_batch_index++
-		user_batch_size, vipUser, freeUser := m.OnceUserBatch(user_batch_index)
-		if user_batch_size == 0 {
+		batchIndex++
+		batchSize, vipUser, freeUser := m.OnceUserBatch(batchIndex)
+		if batchSize == 0 {
 			break
 		}
-		if len(vipUser.Users > 0) {
-			m.ToMatch(user_batch_index, vipUser)
+		if len(vipUser.Users) > 0 {
+			m.ToMatch(batchIndex, vipUser)
 		}
-		if len(freeUser.Users > 0) {
-			m.ToMatch(user_batch_index, freeUser)
+		if len(*freeUser.Title_KeyDfa.Key_user) > 0 {
+			m.ToMatch(batchIndex, freeUser)
 		}
-		if user_batch_size < Config.UserBatch {
+		if batchSize < Config.UserBatch {
 			break
 		}
 	}
@@ -255,38 +229,47 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) bool {
 		"buyerclass":      1,
 	}).Sort("_id").Iter()
 	index := 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); {
-		index++
-		_id := util.BsonIdToSId(tmp["_id"])
-		tmp["_id"] = _id
-		if util.ObjToString(tmp["area"]) == "A" {
-			tmp["area"] = "全国"
-		}
-		res = append(res, tmp)
-		//信息缓存3天
-		info := map[string]interface{}{}
-		for _, v := range SaveFields {
-			if v == "_id" || tmp[v] == nil {
-				continue
+	for data := make(map[string]interface{}); it.Next(&data); {
+		m.loadBiddingPool <- true
+		m.loadBiddingWaitGroup.Add(1)
+		go func(temp map[string]interface{}) {
+			defer func() {
+				<-m.loadBiddingPool
+				m.loadBiddingWaitGroup.Done()
+			}()
+			_id := util.BsonIdToSId(temp["_id"])
+			temp["_id"] = _id
+			if util.ObjToString(temp["area"]) == "A" {
+				temp["area"] = "全国"
 			}
-			info[v] = tmp[v]
-		}
-		redis.Put("pushcache_1", "info_"+_id, info, 259200)
+			res = append(res, temp)
+			//信息缓存3天
+			info := map[string]interface{}{}
+			for _, v := range SaveFields {
+				if v == "_id" || temp[v] == nil {
+					continue
+				}
+				info[v] = temp[v]
+			}
+			redis.Put("pushcache_1", "info_"+_id, info, 259200)
+		}(data)
+		data = make(map[string]interface{})
+		index++
 		if index%500 == 0 {
 			logger.Info("当前加载数据:", index)
 		}
-		tmp = make(map[string]interface{})
 		if index >= count {
 			break
 		}
 	}
+	m.loadBiddingWaitGroup.Wait()
 	m.datas = &res
 	logger.Info(count, "条数据已经加载完成!")
 	return true
 }
 
 //初始化用户缓存
-func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser) {
+func (m *MatchJob) OnceUserBatch(batchIndex int) (int, *VipUser, *FreeUser) {
 	defer util.Catch()
 	//遍历用户
 	q := map[string]interface{}{
@@ -302,7 +285,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 	if len(_idq) > 0 {
 		q["_id"] = _idq
 	}
-	logger.Info("开始加载第", user_batch_index, "批用户", q)
+	logger.Info("开始加载第", batchIndex, "批用户", q)
 	session := mongodb.GetMgoConn()
 	defer mongodb.DestoryMongoConn(session)
 	query := session.DB(DbName).C("user").Find(&q).Select(public.UserCollFields).Iter()
@@ -346,7 +329,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 			logger.Error("获取用户关键词错误!", user.Id, err)
 			continue
 		}
-		logger.Info("第", user_batch_index, "批用户,userid", user.Id, "s_m_openid", user.S_m_openid, "a_m_openid", user.A_m_openid, "s_phone", user.Phone, "jpushid", user.Jpushid, "opushid", user.Opushid, "applystatus", user.ApplyStatus, "email", user.Email, "rateMode", user.RateMode, "wxpush", user.WxPush, "apppush", user.AppPush, "mailpush", user.MailPush, "vipstatus", user.VipStatus)
+		logger.Info("第", batchIndex, "批用户,userid", user.Id, "s_m_openid", user.S_m_openid, "a_m_openid", user.A_m_openid, "s_phone", user.Phone, "jpushid", user.Jpushid, "opushid", user.Opushid, "applystatus", user.ApplyStatus, "email", user.Email, "rateMode", user.RateMode, "wxpush", user.WxPush, "apppush", user.AppPush, "mailpush", user.MailPush, "vipstatus", user.VipStatus)
 		keys := []string{}                           //过滤后的关键词
 		notkeys := []string{}                        //排除词
 		key_notkey := map[string]map[string]bool{}   //关键词所对应的排除词
@@ -492,26 +475,26 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 		}
 	}
 	//
-	vip_title_pjob := &Pjob{
+	vip_title_pjob := &KeyDfa{
 		Key_user:    &vip_title_key_user,
 		Notkey_user: &vip_title_notkey_user,
 	}
 	vip_title_pjob.CreateDaf()
-	vip_detail_pjob := &Pjob{
+	vip_detail_pjob := &KeyDfa{
 		Key_user:    &vip_detail_key_user,
 		Notkey_user: &vip_detail_notkey_user,
 	}
 	vip_detail_pjob.CreateDaf()
-	vipUser.Title_Pjob = vip_title_pjob
-	vipUser.Detail_Pjob = vip_detail_pjob
+	vipUser.Title_KeyDfa = vip_title_pjob
+	vipUser.Detail_KeyDfa = vip_detail_pjob
 	//
-	title_pjob := &Pjob{
+	title_pjob := &KeyDfa{
 		Key_user:    &title_key_user,
 		Notkey_user: &title_notkey_user,
 	}
 	title_pjob.CreateDaf()
-	freeUser.Title_Pjob = title_pjob
-	logger.Info("第", user_batch_index, "批用户加载结束", n)
+	freeUser.Title_KeyDfa = title_pjob
+	logger.Info("第", batchIndex, "批用户加载结束", n)
 	return n, vipUser, freeUser
 }
 

+ 5 - 5
src/jfw/modules/pushsubscribe/src/match/job/freematch.go → src/jfw/modules/pushsubscribe/src/match/matcher/freematch.go

@@ -1,4 +1,4 @@
-package job
+package matcher
 
 import (
 	. "public"
@@ -7,7 +7,7 @@ import (
 
 //免费用户
 type FreeUser struct {
-	Title_Pjob *Pjob
+	Title_KeyDfa *KeyDfa
 }
 
 func NewFreeUser() *FreeUser {
@@ -17,10 +17,10 @@ func (f *FreeUser) Match(info *map[string]interface{}) *map[*UserInfo]*MatchUser
 	title, _ := (*info)["title"].(string)
 	title = strings.ToUpper(title)
 	//订阅词
-	keys := f.Title_Pjob.InterestDfa.Analy(title)
+	keys := f.Title_KeyDfa.Key.Analy(title)
 	//排除词
-	notkeys := f.Title_Pjob.NotInterestDfa.Analy(title)
-	users := f.GetFinalUser(keys, notkeys, f.Title_Pjob.Key_user, info)
+	notkeys := f.Title_KeyDfa.NotKey.Analy(title)
+	users := f.GetFinalUser(keys, notkeys, f.Title_KeyDfa.Key_user, info)
 	return users
 }
 

+ 38 - 0
src/jfw/modules/pushsubscribe/src/match/matcher/matcher.go

@@ -0,0 +1,38 @@
+package matcher
+
+import (
+	"match/dfa"
+	. "public"
+)
+
+type Matcher interface {
+	Match(info *map[string]interface{}) *map[*UserInfo]*MatchUser
+}
+type KeyDfa struct {
+	Key         *dfa.DFA
+	NotKey      *dfa.DFA
+	Key_user    *map[string]*[]*UserInfo
+	Notkey_user *map[string]*[]*UserInfo
+}
+
+//所有用户的关键词和排除词
+func (p *KeyDfa) CreateDaf() {
+	//关键词
+	p.Key = &dfa.DFA{}
+	keys := make([]string, 0)
+	for k, _ := range *p.Key_user {
+		keys = append(keys, k)
+	}
+	p.Key.AddWord(keys...)
+	//排除关键词
+	p.NotKey = &dfa.DFA{}
+	notKeys := make([]string, 0)
+	for k, _ := range *p.Notkey_user {
+		notKeys = append(notKeys, k)
+	}
+	p.NotKey.AddWord(notKeys...)
+}
+
+type MatchUser struct {
+	Keys []string
+}

+ 10 - 11
src/jfw/modules/pushsubscribe/src/match/job/vipmatch.go → src/jfw/modules/pushsubscribe/src/match/matcher/vipmatch.go

@@ -1,4 +1,4 @@
-package job
+package matcher
 
 import (
 	. "public"
@@ -8,13 +8,12 @@ import (
 //付费用户
 type VipUser struct {
 	Users           map[*UserInfo]bool
-	Title_Pjob      *Pjob
-	Detail_Pjob     *Pjob
+	Title_KeyDfa    *KeyDfa
+	Detail_KeyDfa   *KeyDfa
 	BuyerclassUsers map[string]map[*UserInfo]bool
 	AreaUsers       map[string]map[*UserInfo]bool
 	CityUsers       map[string]map[*UserInfo]bool
 	InfoTypeUsers   map[string]map[*UserInfo]bool
-	MatchJob        *MatchJob
 }
 
 func (v *VipUser) Add(k string, u *UserInfo, m *map[string]map[*UserInfo]bool) {
@@ -46,22 +45,22 @@ func (v *VipUser) Match(info *map[string]interface{}) *map[*UserInfo]*MatchUser
 	title, _ := (*info)["title"].(string)
 	title = strings.ToUpper(title)
 	//订阅词
-	keys := v.Title_Pjob.InterestDfa.Analy(title)
+	keys := v.Title_KeyDfa.Key.Analy(title)
 	//排除词
-	notkeys := v.Title_Pjob.NotInterestDfa.Analy(title)
-	title_users := v.GetFinalUser(keys, notkeys, v.Title_Pjob.Key_user)
+	notkeys := v.Title_KeyDfa.NotKey.Analy(title)
+	title_users := v.GetFinalUser(keys, notkeys, v.Title_KeyDfa.Key_user)
 	//开启智能匹配的用户,匹配projectscope
 	detail_users := &map[*UserInfo]*MatchUser{}
-	if v.Detail_Pjob != nil {
+	if v.Detail_KeyDfa != nil {
 		detail, _ := (*info)["projectscope"].(string)
 		if detail == "" {
 			detail, _ = (*info)["detail"].(string)
 		}
 		if detail != "" {
 			detail = strings.ToUpper(detail)
-			keys = v.Detail_Pjob.InterestDfa.Analy(detail)
-			notkeys = v.Detail_Pjob.NotInterestDfa.Analy(detail)
-			detail_users = v.GetFinalUser(keys, notkeys, v.Detail_Pjob.Key_user)
+			keys = v.Detail_KeyDfa.Key.Analy(detail)
+			notkeys = v.Detail_KeyDfa.NotKey.Analy(detail)
+			detail_users = v.GetFinalUser(keys, notkeys, v.Detail_KeyDfa.Key_user)
 			for d_k, d_u := range *detail_users {
 				if (*title_users)[d_k] != nil {
 					continue

+ 2 - 22
src/jfw/modules/pushsubscribe/src/public/entity.go

@@ -29,10 +29,9 @@ type UserInfo struct {
 	FirstPushTime int64                      //第一次推送时间
 	VipStatus     int                        // 1--试用 2--正式
 	MatchType     int                        //匹配方式 1-标题 2-正文
+	Size          int                        //模板消息中需要显示的数量,总条数
+	CreateTime    int64
 	O_jy          *O_jy
-	//
-	//Active int
-	//Fail   *Fail //失败重试
 }
 
 type O_jy struct {
@@ -49,25 +48,6 @@ type KeySet struct {
 	AppendKeys []string `json:"appendkey"` //附加词
 }
 
-type Fail struct {
-	Wx    int
-	App   int
-	Email int
-}
-
-type Bidding struct {
-	Id              string
-	Area            string
-	City            string
-	Buyerclass      string
-	Publishtime     int64
-	S_subscopeclass []string
-	Subtype         string
-	Title           string
-	Toptype         string
-	Type            string
-}
-
 type SortList []*MatchInfo
 
 func (s SortList) Len() int {

+ 1 - 1
src/jfw/modules/pushsubscribe/src/push/config.json

@@ -48,7 +48,7 @@
 		"14:00",
 		"18:00"
 	],
-	"vipPushDay": 7,
+	"vipPushDay": 1,
 	"vipPushWeek": "Friday",
 	"savePoolSize":10,
 	"pushPoolSize": 60,

+ 3 - 4
src/jfw/modules/pushsubscribe/src/push/job/jobs.go

@@ -19,10 +19,9 @@ var Jobs = struct {
 		mergePool: make(chan bool, Config.MergePoolSize),
 	},
 	Push: &PushJob{
-		pool:     make(chan bool, Config.PushPoolSize),
-		savePool: make(chan bool, Config.SavePoolSize),
-		wait:     &sync.WaitGroup{},
-		lock:     &sync.Mutex{},
+		pool: make(chan bool, Config.PushPoolSize),
+		wait: &sync.WaitGroup{},
+		lock: &sync.Mutex{},
 	},
 	Repair: &RepairJob{
 		pool: make(chan bool, Config.PushPoolSize),

+ 56 - 192
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -12,7 +12,6 @@ import (
 	"qfw/util"
 	"qfw/util/mail"
 	"qfw/util/mongodb"
-	"qfw/util/redis"
 	"strconv"
 	"strings"
 	"sync"
@@ -42,7 +41,6 @@ type PushJob struct {
 	lock                    *sync.Mutex
 	minutePushPool          chan bool
 	fastigiumMinutePushPool chan bool
-	savePool                chan bool
 }
 
 //taskType 1--一天三次推送 2--九点推送
@@ -58,7 +56,9 @@ func (p *PushJob) startPush(taskType int) {
 	defer util.Catch()
 	var pusher Pusher
 	if taskType == 1 || taskType == 2 {
-		pusher = &NormalPush{}
+		pusher = &NormalPush{
+			SavePool: make(chan bool, Config.SavePoolSize),
+		}
 	} else if taskType == 3 {
 		pusher = &SpecialPush{}
 	} else {
@@ -132,7 +132,7 @@ func (p *PushJob) startPush(taskType int) {
 						mailPush = 0
 					}
 				}
-				pushResult := p.selectPush(p.taskType, wxPush, appPush, mailPush, u, list)
+				pushResult := p.selectPush(pusher, p.taskType, wxPush, appPush, mailPush, u, list)
 				log.Println(pushResult)
 				pusher.AfterPush(pushResult, u, v)
 			}(temp, isTake)
@@ -146,126 +146,18 @@ func (p *PushJob) startPush(taskType int) {
 }
 
 //满足条件进行推送
-func (p *PushJob) selectPush(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (pushResult *putil.PushResult) {
+func (p *PushJob) selectPush(pusher Pusher, taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (pushResult *putil.PushResult) {
 	if wxPush == 1 || appPush == 1 || mailPush == 1 || u.PchelperPush == 1 {
-		pushResult = p.doPush(taskType, true, wxPush, appPush, mailPush, u, &list)
+		pushResult = p.doPush(pusher, taskType, wxPush, appPush, mailPush, u, &list)
 	}
 	return
 }
 
 //进入具体推送
-func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (pushResult *putil.PushResult) {
+func (p *PushJob) doPush(pusher Pusher, taskType int, wxPush, appPush, mailPush int, u *UserInfo, sl *SortList) (pushResult *putil.PushResult) {
 	defer util.Catch()
 	pushResult = &putil.PushResult{}
-	mailContent := ""
-	jpushtitle := ""
-	lastInfoDate := int64(0)
-	TitleArray := []string{}
-	infos := []*MatchInfo{}
-	infosLength := 0
-	publishTitle := map[string]bool{}
-	dateymd := util.NowFormat(util.Date_yyyyMMdd)
-	dayCountKey := DayCountKey(dateymd, k.Id)
-	onceCountKey := OnceCountKey(dateymd, k.Id)
-	dayCount := redis.GetInt("pushcache_2_a", dayCountKey)
-	isVipUser := IsVipUser(k.VipStatus)
 	now := time.Now()
-	for _, ks := range *sl {
-		k2 := *ks.Info
-		title := strings.Replace(k2["title"].(string), "\n", "", -1)
-		title = Re.ReplaceAllString(title, "$1")
-		area := util.ObjToString(k2["area"])
-		if area == "A" {
-			area = "全国"
-		}
-		newTitle := fmt.Sprintf("[%s]%s", area, title)
-		if publishTitle[newTitle] {
-			continue
-		}
-		publishTitle[newTitle] = true
-		infosLength++
-		infos = append(infos, ks)
-		TitleArray = append(TitleArray, newTitle)
-		if infosLength == 1 {
-			jpushtitle = title
-			lastInfoDate = util.Int64All(k2["publishtime"])
-		}
-		//增加行业的处理
-		industry := ""
-		industryclass := "industry"
-		if k2["s_subscopeclass"] != nil {
-			k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
-			if len(k2sub) > 0 {
-				industry = k2sub[0]
-				if industry != "" {
-					ss := strings.Split(industry, "_")
-					if len(ss) > 1 {
-						industry = ss[0]
-					}
-				}
-			}
-		}
-		if mailPush == 1 { //关于邮件的处理
-			mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
-			url := fmt.Sprintf("%s/article/mailprivate/%s.html", Config.JianyuDomain, mailSid)
-			classArea := "area"
-			classType := "type"
-			infotype := util.ObjToString(k2["subtype"])
-			if infotype == "" {
-				infotype = util.ObjToString(k2["toptype"])
-			}
-			if infotype == "" {
-				infotype = util.ObjToString(k2["type"])
-				if infotype == "tender" {
-					infotype = "招标"
-				} else if infotype == "bid" {
-					infotype = "中标"
-				}
-			}
-			dates := util.LongToDate(k2["publishtime"], false)
-			//标题替换
-			otitle := title
-			for _, kw := range k.Keys {
-				kws := strings.Split(kw, "+")
-				n := 0
-				otitle2 := otitle
-				for _, kwn := range kws {
-					ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
-					if ot != otitle {
-						n++
-						otitle2 = ot
-					} else {
-						break
-					}
-				}
-				if n == len(kws) {
-					otitle = otitle2
-					break
-				}
-			}
-			if industry == "" {
-				industryclass = ""
-			}
-			mailContent += fmt.Sprintf(Config.Mail_content, infosLength, url, otitle, classArea, area, classType, infotype, industryclass, industry, dates)
-		}
-		if isVipUser {
-			if dayCount >= Config.VipMaxPushSize {
-				break
-			}
-		} else {
-			//限制最大信息条数
-			if infosLength >= Config.MaxPushSize {
-				break
-			}
-		}
-		dayCount++
-	}
-	if infosLength == 0 {
-		logger.Info("推送任务", taskType, "没有要推送的数据!", k.Id)
-		return
-	}
-	redis.Put("pushcache_2_a", dayCountKey, dayCount, 86400)
-	redis.Put("pushcache_2_a", onceCountKey, infosLength, 86400)
 	//限制一分钟最大的推送数量
 	if p.fastigiumMinutePushPool != nil {
 		if hour := now.Hour(); hour >= FastigiumStart && hour <= FastigiumEnd {
@@ -274,44 +166,37 @@ func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 	} else if p.minutePushPool != nil {
 		<-p.minutePushPool //正常期
 	}
-	pushDate := ""
-	if taskType != 0 && isSave {
-		//推送记录id
-		pushDate = p.save(k, infos)
-		if pushDate == "" {
-			logger.Info("推送任务", taskType, "保存出错", k.Id)
-			return
-		}
-		logger.Info("推送任务", taskType, "保存成功", pushDate, k.Id)
-		pushResult.IsSaveSuccess = true
+	pushParam := pusher.GetPushParam(mailPush, u, sl)
+	if pushParam == nil {
+		logger.Info("推送任务", taskType, "没有要推送的数据!", u.Id)
+		return nil
 	}
-	if isVipUser && (k.RateMode == 3 || k.RateMode == 4) {
-		if now.Day() != Config.VipPushDay && now.Weekday().String() != Config.VipPushWeek {
-			pushResult.IsVipTempSave = true
-			return
-		} else {
-		}
+	pushResult.Infos = pushParam.Infos
+	pushResult.PushDate = pushParam.PushDate
+	pushResult.IsVipTempSave = pushParam.IsVipTempSave
+	if !pushParam.IsPush {
+		return pushResult
 	}
-	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
-	if pushResult.IsSaveSuccess {
+	logger.Info("推送任务", taskType, "开始进行终端推送", u.Id)
+	if pushResult.PushDate > 0 {
 		//pc端助手推送
-		if k.S_m_openid != "" {
-			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "s_m_openid", k.S_m_openid)
-			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.S_m_openid})
-			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "s_m_openid", k.S_m_openid)
+		if u.S_m_openid != "" {
+			logger.Info("推送任务", taskType, "开始助手推送", u.Id, "s_m_openid", u.S_m_openid)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": u.S_m_openid})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, u.Id, "s_m_openid", u.S_m_openid)
 		}
-		if k.Phone != "" {
-			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "phone", k.Phone)
-			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.Phone})
-			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "phone", k.Phone)
+		if u.Phone != "" {
+			logger.Info("推送任务", taskType, "开始助手推送", u.Id, "phone", u.Phone)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": u.Phone})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, u.Id, "phone", u.Phone)
 		}
 	}
 	if wxPush == 1 {
-		logger.Info("推送任务", taskType, "开始微信推送", k.ApplyStatus, k.Id)
+		logger.Info("推送任务", taskType, "开始微信推送", u.ApplyStatus, u.Id)
 		isPushOk := true
-		if k.ApplyStatus == 1 {
+		if u.ApplyStatus == 1 {
 			TmpTip := ""
-			minute := now.Unix() - lastInfoDate
+			minute := now.Unix() - pushParam.LastInfoDate
 			if minute > -1 && minute < 61 {
 				TmpTip = fmt.Sprintf("%d秒前发布的", minute)
 			} else {
@@ -325,13 +210,13 @@ func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			}
 			tip := util.If(TmpTip == "", "", TmpTip+":\n").(string)
 			lastTip := ""
-			if infosLength > 1 {
-				lastTip = fmt.Sprintf("...(共%d条)", infosLength)
+			if pushParam.InfosLength > 1 {
+				lastTip = fmt.Sprintf("...(共%d条)", pushParam.InfosLength)
 			}
 			LastTipLen := len([]rune(lastTip))
 			wxTitle := Config.VipWxTitle
-			if !isVipUser {
-				wxTitleKeys := strings.Join(k.Keys, ";")
+			if !pushParam.IsVipUser {
+				wxTitleKeys := strings.Join(u.Keys, ";")
 				if len([]rune(wxTitleKeys)) > 8 {
 					wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
 				}
@@ -341,12 +226,12 @@ func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			reLen := 200 - TitleLen - 10 - WxGroupLen - len([]rune(tip))
 			wxTplTitle := ""
 			bshow := false
-			for n := 1; n < len(TitleArray)+1; n++ {
-				curTitle := TitleArray[n-1]
+			for n := 1; n < len(pushParam.TitleArray)+1; n++ {
+				curTitle := pushParam.TitleArray[n-1]
 				tmptitle := wxTplTitle + fmt.Sprintf("%d %s\n", n, curTitle)
 				ch := reLen - len([]rune(tmptitle))
 				if ch < LastTipLen { //加上后大于后辍,则没有完全显示
-					if ch == 0 && n == len(TitleArray) {
+					if ch == 0 && n == len(pushParam.TitleArray) {
 						wxTplTitle = tmptitle
 						bshow = true
 					} else {
@@ -361,12 +246,12 @@ func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 					}
 				} else if ch == LastTipLen {
 					wxTplTitle = tmptitle
-					if n == len(TitleArray) {
+					if n == len(pushParam.TitleArray) {
 						bshow = true
 					}
 				} else {
 					wxTplTitle = tmptitle
-					if n == len(TitleArray) {
+					if n == len(pushParam.TitleArray) {
 						bshow = true
 					}
 				}
@@ -375,57 +260,57 @@ func (p *PushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 				lastTip = ""
 			}
 			//推送微信
-			isPushOk = putil.SendWeixin(k, tip+wxTplTitle+lastTip, wxTitle, pushDate)
+			isPushOk = putil.SendWeixin(u, tip+wxTplTitle+lastTip, wxTitle, pushParam.PushDate)
 			if isPushOk {
 				pushResult.WxStatus = 1
 			} else {
 				pushResult.WxStatus = -1
 			}
 		}
-		logger.Info("推送任务", taskType, "微信推送结束", k.ApplyStatus, isPushOk, k.Id)
+		logger.Info("推送任务", taskType, "微信推送结束", u.ApplyStatus, isPushOk, u.Id)
 	}
 	if appPush == 1 {
-		logger.Info("推送任务", taskType, "开始app推送", k.Id)
+		logger.Info("推送任务", taskType, "开始app推送", u.Id)
 		descriptAppend := ""
-		if infosLength > 1 {
-			descriptAppend = fmt.Sprintf("\n...(共%d条)", infosLength)
-			jpushtitle = fmt.Sprintf("1. %s", jpushtitle)
+		if pushParam.InfosLength > 1 {
+			descriptAppend = fmt.Sprintf("\n...(共%d条)", pushParam.InfosLength)
+			pushParam.JpushTitle = fmt.Sprintf("1. %s", pushParam.JpushTitle)
 		}
 		go mongodb.Update("user", map[string]interface{}{
-			"_id": bson.ObjectIdHex(k.Id),
+			"_id": bson.ObjectIdHex(u.Id),
 		}, map[string]interface{}{
 			"$inc": map[string]interface{}{
 				"i_apppushunread": 1,
 			},
 		}, false, false)
 		isPushOk := putil.SendApp(map[string]interface{}{
-			"phoneType":      k.AppPhoneType,
-			"descript":       jpushtitle,
+			"phoneType":      u.AppPhoneType,
+			"descript":       pushParam.JpushTitle,
 			"descriptAppend": descriptAppend,
 			"type":           "bid",
-			"userId":         k.Id,
-			"url":            "/jyapp/free/sess/" + Se.EncodeString(k.Id+",_id,"+strconv.Itoa(int(now.Unix()))+",historypush"),
-			"otherPushId":    k.Opushid,
-			"jgPushId":       k.Jpushid, //极光-推送id
+			"userId":         u.Id,
+			"url":            "/jyapp/free/sess/" + Se.EncodeString(u.Id+",_id,"+strconv.Itoa(int(now.Unix()))+",historypush"),
+			"otherPushId":    u.Opushid,
+			"jgPushId":       u.Jpushid, //极光-推送id
 		})
 		if isPushOk {
 			pushResult.AppStatus = 1
 		} else {
 			pushResult.AppStatus = -1
 		}
-		logger.Info("推送任务", taskType, "app推送结束", isPushOk, k.Id)
+		logger.Info("推送任务", taskType, "app推送结束", isPushOk, u.Id)
 	}
 	//发送邮件
 	if mailPush == 1 {
-		logger.Info("推送任务", taskType, "开始邮箱推送", k.Id)
-		html := fmt.Sprintf(Config.Mail_html, strings.Replace(strings.Join(k.Keys, ";"), "+", " ", -1), mailContent)
-		isPushOk := p.sendMail(k.Email, Config.Mail_title, html, nil)
+		logger.Info("推送任务", taskType, "开始邮箱推送", u.Id)
+		html := fmt.Sprintf(Config.Mail_html, strings.Replace(strings.Join(u.Keys, ";"), "+", " ", -1), pushParam.MailContent)
+		isPushOk := p.sendMail(u.Email, Config.Mail_title, html, nil)
 		if isPushOk {
 			pushResult.MailStatus = 1
 		} else {
 			pushResult.MailStatus = -1
 		}
-		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, k.Id)
+		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, u.Id)
 	}
 	return
 }
@@ -463,24 +348,3 @@ func (p *PushJob) sendMail(email, subject, html string, fmdatas []map[string]int
 	}
 	return status
 }
-
-//保存发送信息
-func (p *PushJob) save(k *UserInfo, matchInfos []*MatchInfo) string {
-	p.savePool <- true
-	defer func() {
-		<-p.savePool
-	}()
-	if Config.SaveSleep > 0 {
-		time.Sleep(time.Duration(Config.SaveSleep) * time.Millisecond)
-	}
-	unix := time.Now().Unix()
-	values := []interface{}{}
-	for _, matchInfo := range matchInfos {
-		values = append(values, k.Id, util.ObjToString((*matchInfo.Info)["_id"]), unix, strings.Join(matchInfo.Keys, " "), util.ObjToString((*matchInfo.Info)["area"]), util.ObjToString((*matchInfo.Info)["city"]), util.ObjToString((*matchInfo.Info)["buyerclass"]))
-	}
-	savecount := putil.Mysql.InsertBatch("pushsubscribe", []string{"userid", "infoid", "date", "matchkeys", "area", "city", "buyerclass"}, values)
-	if int(savecount) != len(values) {
-		logger.Error(k.Id, "批量保存有问题", len(values), savecount)
-	}
-	return fmt.Sprint(unix)
-}

+ 1 - 1
src/jfw/modules/pushsubscribe/src/push/job/repairjob.go

@@ -47,7 +47,7 @@ func (r *RepairJob) Execute(param string) bool {
 					mailPush = 1
 				}
 				list := putil.ToSortList(v["list"])
-				pushResult := Jobs.Push.selectPush(taskType, wxPush, appPush, mailPush, u, list)
+				pushResult := Jobs.Push.selectPush(pusher, taskType, wxPush, appPush, mailPush, u, list)
 				pusher.AfterPush(pushResult, u, v)
 			}(temp)
 		}

+ 15 - 46
src/jfw/modules/pushsubscribe/src/push/job/timetask.go

@@ -8,51 +8,23 @@ import (
 	"time"
 )
 
-type timeTask struct {
-	Move      *MoveTimeTask      //迁移数据
-	OncePush  *OncePushTimeTask  //九点推送
-	OtherPush *OtherPushTimeTask //一天三次
-}
+type TimeTask struct{}
 
-var Task = &timeTask{
-	Move:      &MoveTimeTask{},      //迁移数据
-	OncePush:  &OncePushTimeTask{},  //九点推送
-	OtherPush: &OtherPushTimeTask{}, //一天三次
-}
+func (t *TimeTask) Run() {
+	go t.push() //推送
+	go t.move() //迁移数据
 
-type OtherPushTimeTask struct {
 }
+func (t *TimeTask) push() {
+	for _, v := range Config.OtherPushTimes {
+		go t.crontab(v, 1)
 
-func (o *OtherPushTimeTask) Execute() {
-	for _, otherpushtime := range Config.OtherPushTimes {
-		h_m := strings.Split(otherpushtime, ":")
-		if len(h_m) != 2 {
-			log.Fatalln("OtherPushTimeTask", otherpushtime)
-			return
-		}
-		now := time.Now()
-		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
-		if newDate.Before(now) {
-			newDate = newDate.AddDate(0, 0, 1)
-		}
-		sub := newDate.Sub(now)
-		log.Println("start", otherpushtime, "OtherPushTimeTask after", sub)
-		timer := time.NewTimer(sub)
-		for {
-			select {
-			case <-timer.C:
-				go Jobs.Push.Execute(1)
-				timer.Reset(time.Hour * 24)
-			}
-		}
 	}
+	go t.crontab(Config.OncePushTime, 2)
 }
 
-type OncePushTimeTask struct {
-}
-
-func (o *OncePushTimeTask) Execute() {
-	h_m := strings.Split(Config.OncePushTime, ":")
+func (t *TimeTask) crontab(tm string, taskType int) {
+	h_m := strings.Split(tm, ":")
 	if len(h_m) == 2 {
 		now := time.Now()
 		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
@@ -60,26 +32,23 @@ func (o *OncePushTimeTask) Execute() {
 			newDate = newDate.AddDate(0, 0, 1)
 		}
 		sub := newDate.Sub(now)
-		log.Println("start", Config.OncePushTime, "OncePushTimeTask after", sub)
+		log.Println("start", tm, "after", sub)
 		timer := time.NewTimer(sub)
 		for {
 			select {
 			case <-timer.C:
-				go Jobs.Push.Execute(2)
+				go Jobs.Push.Execute(taskType)
 				timer.Reset(time.Hour * 24)
 			}
 		}
 	} else {
-		log.Fatalln("OncePushTimeTask", Config.OtherPushTimes)
+		log.Fatalln("crontab", tm)
 	}
 }
 
-type MoveTimeTask struct {
-}
-
-func (m *MoveTimeTask) Execute() {
+func (t *TimeTask) move() {
 	time.AfterFunc(time.Duration(Config.MoveDuration), func() {
 		Jobs.Move.Execute()
-		Task.Move.Execute()
+		t.move()
 	})
 }

+ 2 - 4
src/jfw/modules/pushsubscribe/src/push/main.go

@@ -17,7 +17,7 @@ import (
 )
 
 func main() {
-	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务")
+	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务")
 	move := flag.Int("v", 0, "1 优先迁移数据")
 	taskType := flag.Int("t", 1, "1 一天三次推送;2 九点推送")
 	flag.Parse()
@@ -50,8 +50,6 @@ func main() {
 	} else if *modle == 2 {
 		job.Jobs.Push.Execute(*taskType)
 	}
-	go job.Task.Move.Execute()
-	go job.Task.OtherPush.Execute()
-	go job.Task.OncePush.Execute()
+	(&job.TimeTask{}).Run()
 	<-chan bool(nil)
 }

+ 100 - 40
src/jfw/modules/pushsubscribe/src/push/pusher/normalpush.go

@@ -6,7 +6,8 @@ import (
 	putil "push/util"
 	"qfw/util"
 	"qfw/util/mongodb"
-	"sort"
+	"qfw/util/redis"
+	"strings"
 	"time"
 
 	"github.com/donnie4w/go-logger/logger"
@@ -14,7 +15,9 @@ import (
 )
 
 //正常推送,一天推送三次或者一天一次
-type NormalPush struct{}
+type NormalPush struct {
+	SavePool chan bool
+}
 
 //获取需要推送的用户
 func (n *NormalPush) OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{}) {
@@ -54,22 +57,21 @@ func (n *NormalPush) GetUserInfo(user map[string]interface{}) *UserInfo {
 
 //推送以后处理
 func (n *NormalPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
-	if pushResult.IsSaveSuccess {
-		if u.FirstPushTime == 0 {
-			go mongodb.Update("user", map[string]interface{}{
-				"_id": bson.ObjectIdHex(u.Id),
-			}, map[string]interface{}{
-				"$set": map[string]interface{}{
-					"l_firstpushtime": time.Now().Unix(),
-				},
-			}, false, false)
-		}
-		if pushResult.IsVipTempSave {
-			n.vipTempSave(u.Id, user)
-		}
-	} else {
+	if pushResult.PushDate == 0 {
 		return
 	}
+	if u.FirstPushTime == 0 {
+		go mongodb.Update("user", map[string]interface{}{
+			"_id": bson.ObjectIdHex(u.Id),
+		}, map[string]interface{}{
+			"$set": map[string]interface{}{
+				"l_firstpushtime": time.Now().Unix(),
+			},
+		}, false, false)
+	}
+	if pushResult.IsVipTempSave {
+		n.vipTempSave(u.Id, pushResult.PushDate, pushResult.Infos)
+	}
 	//判断是否要删除数据
 	sess := mongodb.GetMgoConn()
 	defer mongodb.DestoryMongoConn(sess)
@@ -79,6 +81,7 @@ func (n *NormalPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user m
 	}
 	if pushResult.WxStatus == -1 || pushResult.AppStatus == -1 || pushResult.MailStatus == -1 {
 		user["failtime"] = time.Now().Unix()
+		user["list"] = pushResult.Infos
 		if pushResult.WxStatus == -1 {
 			user["wxfail"] = 1
 		}
@@ -94,11 +97,61 @@ func (n *NormalPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user m
 		}
 	}
 }
+func (n *NormalPush) GetPushParam(mailPush int, u *UserInfo, sl *SortList) *putil.PushParam {
+	dateymd := util.NowFormat(util.Date_yyyyMMdd)
+	dayCountKey := DayCountKey(dateymd, u.Id)
+	onceCountKey := OnceCountKey(dateymd, u.Id)
+	dayCount := redis.GetInt("pushcache_2_a", dayCountKey)
+	pushParam := putil.NewPushParam(true, dayCount, mailPush, u, sl)
+	if pushParam == nil || pushParam.InfosLength == 0 {
+		return nil
+	}
+	redis.Put("pushcache_2_a", dayCountKey, dayCount+pushParam.InfosLength, 86400)
+	redis.Put("pushcache_2_a", onceCountKey, pushParam.InfosLength, 86400)
+	//推送记录id
+	pushParam.PushDate = n.save(u, pushParam.Infos)
+	if pushParam.PushDate == 0 {
+		logger.Info(u.Id, "保存出错", pushParam.InfosLength)
+		return pushParam
+	}
+	logger.Info(u.Id, "保存成功", pushParam.PushDate, pushParam.InfosLength)
+	now := time.Now()
+	if pushParam.IsVipUser && (u.RateMode == 3 || u.RateMode == 4) && now.Day() != Config.VipPushDay && now.Weekday().String() != Config.VipPushWeek {
+		pushParam.IsVipTempSave = true
+		pushParam.IsPush = false
+	}
+	return pushParam
+}
+
+//保存发送信息
+func (n *NormalPush) save(k *UserInfo, matchInfos []*MatchInfo) int64 {
+	n.SavePool <- true
+	defer func() {
+		<-n.SavePool
+	}()
+	if Config.SaveSleep > 0 {
+		time.Sleep(time.Duration(Config.SaveSleep) * time.Millisecond)
+	}
+	unix := time.Now().Unix()
+	values := []interface{}{}
+	for i := len(matchInfos) - 1; i >= 0; i-- {
+		matchInfo := matchInfos[i]
+		values = append(values, k.Id, util.ObjToString((*matchInfo.Info)["_id"]), unix, strings.Join(matchInfo.Keys, " "), util.ObjToString((*matchInfo.Info)["area"]), util.ObjToString((*matchInfo.Info)["city"]), util.ObjToString((*matchInfo.Info)["buyerclass"]))
+	}
+	savecount := putil.Mysql.InsertBatch("pushsubscribe", []string{"userid", "infoid", "date", "matchkeys", "area", "city", "buyerclass"}, values)
+	if int(savecount) != len(matchInfos) {
+		logger.Error(k.Id, "批量保存有问题", len(matchInfos), savecount)
+	}
+	if savecount == 0 {
+		return 0
+	}
+	return unix
+}
 
 //vip 每周 每月推送 暂时保存
-func (n *NormalPush) vipTempSave(userId string, v map[string]interface{}) {
-	newList := putil.ToSortList(v["list"])
+func (n *NormalPush) vipTempSave(userId string, pushdate int64, newList []*MatchInfo) {
 	pLength := len(newList)
+	logger.Info(userId, "开始保存到pushspace_vip表", pLength)
 	if pLength == 0 {
 		return
 	}
@@ -106,22 +159,21 @@ func (n *NormalPush) vipTempSave(userId string, v map[string]interface{}) {
 	defer mongodb.DestoryMongoConn(sess)
 	var data map[string]interface{}
 	coll := sess.DB(putil.DbName).C("pushspace_vip")
-	err := coll.Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"size": 1, "list": 1, "date": 1, "userid": 1}).One(&data)
-	if err != nil {
-		logger.Error(userId, "获取用户pushspace_vip数据出错", err)
-		return
-	}
+	coll.Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"size": 1, "list": 1, "date": 1, "userid": 1}).One(&data)
 	nowymd := util.NowFormat(util.Date_yyyyMMdd)
 	if data == nil { //批量新增
-		err = coll.Insert(map[string]interface{}{
-			"userid":     v["userid"],
-			"size":       v["size"],
-			"list":       v["list"],
+		if pLength > Config.MaxPushSize {
+			newList = newList[:Config.MaxPushSize]
+		}
+		err := coll.Insert(map[string]interface{}{
+			"userid":     userId,
+			"size":       pLength,
+			"list":       newList,
 			"date":       nowymd,
-			"createtime": time.Now().Unix(),
+			"createtime": pushdate,
 		})
 		if err != nil {
-			logger.Error(userId, "保存pushspace_vip出错", err)
+			logger.Error(userId, "保存pushspace_vip出错", err)
 			return
 		}
 	} else { //批量更新
@@ -129,22 +181,30 @@ func (n *NormalPush) vipTempSave(userId string, v map[string]interface{}) {
 		if nowymd != util.ObjToString(data["date"]) {
 			return
 		}
-		oldList := putil.ToSortList(data["list"])
-		if len(oldList)+pLength > Config.MaxPushSize {
-			newList = append(newList, oldList...)
-			sort.Sort(newList)
-			v["list"] = newList[:Config.MaxPushSize]
-		} else { //追加
-			upSet["$pushAll"] = map[string]interface{}{
-				"list": newList,
+		size := util.IntAll(data["size"])
+		set := map[string]interface{}{
+			"size": size + pLength,
+		}
+		oldList, _ := data["list"].([]interface{})
+		if size < Config.MaxPushSize {
+			count := Config.MaxPushSize - size
+			if pLength > count {
+				for i := 0; i < count; i++ {
+					oldList = append(oldList, newList[i])
+				}
+				set["list"] = oldList
+			} else { //追加
+				upSet["$pushAll"] = map[string]interface{}{
+					"list": newList,
+				}
 			}
 		}
-		v["size"] = util.IntAll(v["size"]) + pLength
-		upSet["$set"] = v
-		err = coll.UpdateId(data["_id"], upSet)
+		upSet["$set"] = set
+		err := coll.UpdateId(data["_id"], upSet)
 		if err != nil {
 			logger.Error(userId, "更新pushspace_vip出错", err)
 			return
 		}
 	}
+	logger.Info(userId, "保存到pushspace_vip表结束", pLength)
 }

+ 1 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/pusher.go

@@ -8,5 +8,6 @@ import (
 type Pusher interface {
 	OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{})
 	GetUserInfo(user map[string]interface{}) *UserInfo
+	GetPushParam(mailPush int, k *UserInfo, sl *SortList) *putil.PushParam
 	AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{})
 }

+ 10 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/repairpush.go

@@ -41,6 +41,16 @@ func (r *RepairPush) GetUserInfo(user map[string]interface{}) *UserInfo {
 	return putil.NewUserInfoByPushSpaceColl(user)
 }
 
+//获取推送参数
+func (r *RepairPush) GetPushParam(mailPush int, u *UserInfo, sl *SortList) *putil.PushParam {
+	pushParam := putil.NewPushParam(true, 0, mailPush, u, sl)
+	if pushParam != nil {
+		pushParam.IsPush = true
+	}
+	return pushParam
+
+}
+
 //推送以后处理
 func (r *RepairPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
 	sess := mongodb.GetMgoConn()

+ 27 - 2
src/jfw/modules/pushsubscribe/src/push/pusher/specialpush.go

@@ -15,7 +15,7 @@ type SpecialPush struct{}
 
 //获取需要推送的用户
 func (s *SpecialPush) OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{}) {
-	var query map[string]interface{}
+	query := map[string]interface{}{}
 	if len(Config.TestIds) > 0 {
 		query["userid"] = map[string]interface{}{
 			"$in": Config.TestIds,
@@ -38,10 +38,35 @@ func (s *SpecialPush) GetUserInfo(user map[string]interface{}) *UserInfo {
 		return nil
 	}
 	userInfo, _ := NewUserInfoByUserColl(*u)
+	if userInfo != nil {
+		userInfo.Size, _ = user["size"].(int)
+		userInfo.CreateTime, _ = user["createtime"].(int64)
+	}
 	return userInfo
 }
 
+func (s *SpecialPush) GetPushParam(mailPush int, u *UserInfo, sl *SortList) *putil.PushParam {
+	pushParam := putil.NewPushParam(false, 0, mailPush, u, sl)
+	if pushParam != nil {
+		pushParam.IsPush = true
+		pushParam.InfosLength = u.Size
+		pushParam.PushDate = u.CreateTime
+	}
+	return pushParam
+}
+
 //推送以后处理
 func (s *SpecialPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
-
+	if pushResult.PushDate == 0 {
+		return
+	}
+	if pushResult.WxStatus == 0 && pushResult.AppStatus == 0 && pushResult.MailStatus == 0 {
+		return
+	}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	err := sess.DB(putil.DbName).C("pushspace_vip").RemoveId(user["_id"])
+	if err != nil {
+		logger.Error(u.Id, "删除pushspace_vip数据出错", err)
+	}
 }

+ 134 - 1
src/jfw/modules/pushsubscribe/src/push/util/entity.go

@@ -1,10 +1,143 @@
 package util
 
+import (
+	"fmt"
+	. "public"
+	. "push/config"
+	"qfw/util"
+	"strings"
+)
+
 //推送返回结果
 type PushResult struct {
-	IsSaveSuccess bool
 	IsVipTempSave bool
 	WxStatus      int
 	AppStatus     int
 	MailStatus    int
+	PushDate      int64
+	Infos         []*MatchInfo
+}
+type PushParam struct {
+	JpushTitle    string
+	LastInfoDate  int64
+	TitleArray    []string
+	Infos         []*MatchInfo
+	InfosLength   int
+	IsVipUser     bool
+	MailContent   string
+	PushDate      int64
+	IsPush        bool
+	IsVipTempSave bool
+}
+
+func NewPushParam(isLimit bool, dayCount, mailPush int, u *UserInfo, sl *SortList) *PushParam {
+	mailContent := ""
+	jpushTitle := ""
+	lastInfoDate := int64(0)
+	titleArray := []string{}
+	infos := []*MatchInfo{}
+	infosLength := 0
+	publishTitle := map[string]bool{}
+	isVipUser := IsVipUser(u.VipStatus)
+	for _, ks := range *sl {
+		k2 := *ks.Info
+		title := strings.Replace(k2["title"].(string), "\n", "", -1)
+		title = Re.ReplaceAllString(title, "$1")
+		area := util.ObjToString(k2["area"])
+		if area == "A" {
+			area = "全国"
+		}
+		newTitle := fmt.Sprintf("[%s]%s", area, title)
+		if publishTitle[newTitle] {
+			continue
+		}
+		publishTitle[newTitle] = true
+		infosLength++
+		infos = append(infos, ks)
+		titleArray = append(titleArray, newTitle)
+		if infosLength == 1 {
+			jpushTitle = title
+			lastInfoDate = util.Int64All(k2["publishtime"])
+		}
+		//增加行业的处理
+		industry := ""
+		industryclass := "industry"
+		if k2["s_subscopeclass"] != nil {
+			k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
+			if len(k2sub) > 0 {
+				industry = k2sub[0]
+				if industry != "" {
+					ss := strings.Split(industry, "_")
+					if len(ss) > 1 {
+						industry = ss[0]
+					}
+				}
+			}
+		}
+		if mailPush == 1 { //关于邮件的处理
+			mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
+			url := fmt.Sprintf("%s/article/mailprivate/%s.html", Config.JianyuDomain, mailSid)
+			classArea := "area"
+			classType := "type"
+			infotype := util.ObjToString(k2["subtype"])
+			if infotype == "" {
+				infotype = util.ObjToString(k2["toptype"])
+			}
+			if infotype == "" {
+				infotype = util.ObjToString(k2["type"])
+				if infotype == "tender" {
+					infotype = "招标"
+				} else if infotype == "bid" {
+					infotype = "中标"
+				}
+			}
+			dates := util.LongToDate(k2["publishtime"], false)
+			//标题替换
+			otitle := title
+			for _, kw := range u.Keys {
+				kws := strings.Split(kw, "+")
+				n := 0
+				otitle2 := otitle
+				for _, kwn := range kws {
+					ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
+					if ot != otitle {
+						n++
+						otitle2 = ot
+					} else {
+						break
+					}
+				}
+				if n == len(kws) {
+					otitle = otitle2
+					break
+				}
+			}
+			if industry == "" {
+				industryclass = ""
+			}
+			mailContent += fmt.Sprintf(Config.Mail_content, infosLength, url, otitle, classArea, area, classType, infotype, industryclass, industry, dates)
+		}
+		if isLimit {
+			if isVipUser {
+				if dayCount >= Config.VipMaxPushSize {
+					break
+				}
+			} else {
+				//限制最大信息条数
+				if infosLength >= Config.MaxPushSize {
+					break
+				}
+			}
+			dayCount++
+		}
+	}
+	return &PushParam{
+		JpushTitle:   jpushTitle,
+		LastInfoDate: lastInfoDate,
+		TitleArray:   titleArray,
+		Infos:        infos,
+		InfosLength:  infosLength,
+		IsVipUser:    isVipUser,
+		MailContent:  mailContent,
+	}
 }

+ 3 - 4
src/jfw/modules/pushsubscribe/src/push/util/rpccall.go

@@ -2,7 +2,7 @@ package util
 
 import (
 	"encoding/json"
-	"log"
+	"fmt"
 	"net/rpc"
 	. "public"
 	. "push/config"
@@ -23,7 +23,7 @@ var (
 )
 
 //微信远程调用,实现模板发送消息
-func SendWeixin(k *UserInfo, remark, title, pushDate string) bool {
+func SendWeixin(k *UserInfo, remark, title string, pushDate int64) bool {
 	wxPushPool <- true
 	defer func() {
 		<-wxPushPool
@@ -41,10 +41,9 @@ func SendWeixin(k *UserInfo, remark, title, pushDate string) bool {
 		Service:     util.FormatDate(&now, util.Date_Short_Layout),
 		Color:       Config.WxColor,
 		DetailColor: Config.WxDetailColor,
-		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush") + "__" + pushDate,
+		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush") + "__" + fmt.Sprint(pushDate),
 	}
 	ok, res := qrpc.WxPush(Config.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
-	log.Println(ok, res)
 	if !ok && (strings.Contains(res, "[46004]") || strings.Contains(res, "[65302]") || strings.Contains(res, "[43004]") || strings.Contains(res, "[40003]")) {
 		mongodb.Update("user", map[string]interface{}{"_id": bson.ObjectIdHex(k.Id)}, map[string]interface{}{
 			"$set": map[string]interface{}{