ソースを参照

Merge branch 'dev2.8.5' of http://192.168.3.207:10080/qmx/jy into dev2.8.5

xuzhiheng 5 年 前
コミット
0405163730
32 ファイル変更672 行追加577 行削除
  1. 2 1
      src/config.json
  2. 3 4
      src/jfw/active/active_seal.go
  3. 3 4
      src/jfw/front/adv.go
  4. 3 3
      src/jfw/modules/app/src/app/front/login.go
  5. 2 3
      src/jfw/modules/pushsubscribe/src/match/config.json
  6. 14 15
      src/jfw/modules/pushsubscribe/src/match/config/config.go
  7. 0 0
      src/jfw/modules/pushsubscribe/src/match/job/freematch.go
  8. 15 86
      src/jfw/modules/pushsubscribe/src/match/job/matchjob.go
  9. 0 0
      src/jfw/modules/pushsubscribe/src/match/job/vipmatch.go
  10. 0 89
      src/jfw/modules/pushsubscribe/src/match/util/util.go
  11. 1 1
      src/jfw/modules/pushsubscribe/src/public/rpccall.go
  12. 174 0
      src/jfw/modules/pushsubscribe/src/public/util.go
  13. 2 1
      src/jfw/modules/pushsubscribe/src/push/config.json
  14. 2 8
      src/jfw/modules/pushsubscribe/src/push/config/config.go
  15. 8 7
      src/jfw/modules/pushsubscribe/src/push/job/jobs.go
  16. 24 23
      src/jfw/modules/pushsubscribe/src/push/job/movejob.go
  17. 67 0
      src/jfw/modules/pushsubscribe/src/push/job/normalpush.go
  18. 7 0
      src/jfw/modules/pushsubscribe/src/push/job/pusher.go
  19. 141 119
      src/jfw/modules/pushsubscribe/src/push/job/pushjob.go
  20. 1 1
      src/jfw/modules/pushsubscribe/src/push/job/repairjob.go
  21. 67 0
      src/jfw/modules/pushsubscribe/src/push/job/specialpush.go
  22. 4 0
      src/jfw/modules/pushsubscribe/src/push/main.go
  23. 1 1
      src/jfw/modules/pushsubscribe/src/push/util/rpccall.go
  24. 0 115
      src/jfw/modules/pushsubscribe/src/push/util/util.go
  25. 1 2
      src/jfw/modules/subscribepay/src/config.json
  26. 11 0
      src/jfw/modules/subscribepay/src/config/config.go
  27. 15 0
      src/jfw/modules/subscribepay/src/message.json
  28. 58 9
      src/jfw/modules/subscribepay/src/timetask/timetask.go
  29. 28 8
      src/jfw/modules/weixin/src/jrpc/jrpc.go
  30. 1 2
      src/jfw/nodemgr/nodemgr.go
  31. 9 45
      src/jfw/public/rpccall.go
  32. 8 30
      src/jfw/timetask/followtimetask.go

+ 2 - 1
src/config.json

@@ -63,7 +63,8 @@
         "keysetIndex": "/wxkeyset/keyset/index?tiptext=%s",
         "followEntDetail": "/jylab/followent/detail/%s",
         "mymenu": "/front/wxMyOrder/myMenu",
-		"historypush": "/swordfish/historypush?times=%s"
+		"historypush": "/swordfish/historypush?times=%s",
+		"expireTip": "/front/vipsubscribe/renewPage/%s"
     },
     "jy_activeset": {
         "activitystartcode": "3201000000",

+ 3 - 4
src/jfw/active/active_seal.go

@@ -109,7 +109,6 @@ func SealSendMsg() {
 			"$lte": subtime,
 		},
 	}, `{"i_sealValue":-1}`, `{"s_nickname":1,"s_openid":1,"i_sealValue":1,"i_sealCount":1,"l_timestamp":1}`, false, -1, -1)
-	var result rpc.RpcResult = "N"
 	if len(*list) > 0 {
 		//参与活动人数
 		var allCount = mongodb.Count("person_seal", nil)
@@ -140,7 +139,7 @@ func SealSendMsg() {
 					title = "【优秀员工证书】\n你的优秀员工证书已获得" + strconv.Itoa(count) + "个好友认证,打败全国" + lastVal + "%的网友,快去高调炫耀一下!"
 				}
 				webdomain := util.ObjToString(config.Sysconfig["webdomain"])
-				err := public.SendACTIVEApplyMsg(&rpc.NotifyMsg{
+				ok := public.SendACTIVEApplyMsg(&rpc.NotifyMsg{
 					Openid:  openid,
 					Title:   title,
 					Detail:  strconv.Itoa(ranking),
@@ -148,8 +147,8 @@ func SealSendMsg() {
 					Date:    "",
 					Remark:  remark,
 					Url:     webdomain + "/front/sess/" + sese.EncodeString(openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",sealSend"),
-				}, &result)
-				if err == nil {
+				})
+				if ok {
 					var updata = make(map[string]interface{})
 					updata["i_sendSign"] = 1
 					ok := mongodb.Update("person_seal", `{"s_openid":"`+openid+`"}`, map[string]interface{}{

+ 3 - 4
src/jfw/front/adv.go

@@ -111,7 +111,6 @@ func (a *Adv) DoInfo() error {
 				if len(id) > 0 {
 					advMsg = "T" //保存成功
 					//
-					var result rpc.RpcResult = "N"
 					var nowData = time.Now()
 					var dslData = util.FormatDate(&nowData, util.Date_Short_Layout)
 					var dtlData = util.FormatDate(&nowData, util.Date_Time_Layout)
@@ -119,7 +118,7 @@ func (a *Adv) DoInfo() error {
 					go func() {
 						for _, v := range advList {
 							if util.ObjToString(v) != "" {
-								err := public.SendLOGApplyMsg(&rpc.NotifyMsg{
+								ok := public.SendLOGApplyMsg(&rpc.NotifyMsg{
 									Openid:  openId,
 									Title:   "您好!您有新的客户信息。",
 									Detail:  "新客户信息",                 //手机号
@@ -127,8 +126,8 @@ func (a *Adv) DoInfo() error {
 									Date:    dslData + " " + dtlData, //推送时间
 									Remark:  "客户信息:" + PhoneNum,
 									Url:     "",
-								}, &result)
-								if err != nil {
+								})
+								if !ok {
 									log.Println("保函发送报错:", PhoneNum, openId)
 								}
 							}

+ 3 - 3
src/jfw/modules/app/src/app/front/login.go

@@ -10,8 +10,8 @@ import (
 	"log"
 	"net/http"
 	qutil "qfw/util"
-	"qfw/util/jy"
 	"qfw/util/redis"
+	qrpc "qfw/util/rpc"
 	"regexp"
 	"strings"
 	"time"
@@ -913,7 +913,7 @@ func afterLogin(user map[string]interface{}, session *httpsession.Session, rid,
 				})
 				//
 				log.Println("踢人下线", old_ponetype, userid, old_rid, old_oid)
-				jy.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
+				qrpc.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
 					"type":        "signOut",
 					"descript":    kickedTip,
 					"jgPushId":    old_rid,
@@ -1087,7 +1087,7 @@ func checkRepeatLogin(user *map[string]interface{}, rid, oid string) {
 		//
 		go func() {
 			log.Println("踢人下线", ponetype, userid, jpushid, opushid)
-			jy.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
+			qrpc.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
 				"type":        "signOut",
 				"descript":    kickedTip,
 				"jgPushId":    jpushid,

+ 2 - 3
src/jfw/modules/pushsubscribe/src/match/config.json

@@ -3,8 +3,8 @@
 	"elasticSearch": "http://192.168.3.128:9800",
 	"redisServers": "pushcache_1=192.168.3.128:5000,pushcache_2_a=192.168.3.128:5001",
 	"maxPushSize": 50,
+	"vipMaxPushSize": 2000,
 	"maxSearch": 5000,
-	"vipOneDayMaxPushSize": 2000,
 	"mgoAddr": "192.168.3.128:27080",
 	"mgoSize": 10,
 	"testids": ["5cee3e2a61fd002c800e2569"],
@@ -12,6 +12,5 @@
 	"matchPoolSize": 60,
 	"matchDuration": 1, 
 	"userBatch":2,
-	"pcHelper":"127.0.0.1:8082",
-	"mailReg":"^.+@.+$"
+	"pcHelper":"127.0.0.1:8082"
 }

+ 14 - 15
src/jfw/modules/pushsubscribe/src/match/config/config.go

@@ -5,21 +5,20 @@ import (
 )
 
 type config struct {
-	ElasticPoolSize      int      `json:"elasticPoolSize"`
-	ElasticSearch        string   `json:"elasticSearch"`
-	RedisServers         string   `json:"redisServers"`
-	MaxPushSize          int      `json:"maxPushSize"`
-	VipOneDayMaxPushSize int      `json:"vipOneDayMaxPushSize"`
-	MaxSearch            int      `json:"maxSearch"`
-	MgoAddr              string   `json:"mgoAddr"`
-	MgoSize              int      `json:"mgoSize"`
-	TestIds              []string `json:"testIds"`
-	FilterWords          []string `json:"filterWords"`
-	MatchPoolSize        int      `json:"matchPoolSize"`
-	MatchDuration        int64    `json:"matchDuration"`
-	UserBatch            int      `json:"userBatch"`
-	PcHelper             string   `json:"pcHelper"`
-	MailReg              string   `json:"mailReg"`
+	ElasticPoolSize int      `json:"elasticPoolSize"`
+	ElasticSearch   string   `json:"elasticSearch"`
+	RedisServers    string   `json:"redisServers"`
+	MaxPushSize     int      `json:"maxPushSize"`
+	VipMaxPushSize  int      `json:"vipMaxPushSize"`
+	MaxSearch       int      `json:"maxSearch"`
+	MgoAddr         string   `json:"mgoAddr"`
+	MgoSize         int      `json:"mgoSize"`
+	TestIds         []string `json:"testIds"`
+	FilterWords     []string `json:"filterWords"`
+	MatchPoolSize   int      `json:"matchPoolSize"`
+	MatchDuration   int64    `json:"matchDuration"`
+	UserBatch       int      `json:"userBatch"`
+	PcHelper        string   `json:"pcHelper"`
 }
 
 type taskConfig struct {

+ 0 - 0
src/jfw/modules/pushsubscribe/src/match/job/freeuser.go → src/jfw/modules/pushsubscribe/src/match/job/freematch.go


+ 15 - 86
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -7,12 +7,12 @@ import (
 	. "match/config"
 	"match/dfa"
 	mutil "match/util"
+	"public"
 	. "public"
 	"qfw/util"
 	"qfw/util/elastic"
 	"qfw/util/mongodb"
 	"qfw/util/redis"
-	"regexp"
 	"sort"
 	"strings"
 	"sync"
@@ -24,7 +24,6 @@ import (
 
 var (
 	SaveFields = []string{"_id", "area", "city", "buyerclass", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type"}
-	MailReg    = regexp.MustCompile(Config.MailReg)
 )
 
 const (
@@ -155,7 +154,11 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher) {
 				Info: &info,
 				Keys: v2.Keys,
 			})
-			if len(array) == Config.MaxPushSize {
+			maxPushSize := Config.MaxPushSize
+			if IsVipUser(user.VipStatus) {
+				maxPushSize = Config.VipMaxPushSize
+			}
+			if len(array) == maxPushSize {
 				break
 			}
 		}
@@ -331,54 +334,13 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 	title_notkey_user := make(map[string]*[]*UserInfo)
 	nowymd := util.NowFormat(util.Date_yyyyMMdd)
 	for temp := make(map[string]interface{}); query.Next(temp); {
-		userId := fmt.Sprintf("%x", string(temp["_id"].(bson.ObjectId)))
-		s_m_openid := util.ObjToString(temp["s_m_openid"])
-		a_m_openid := util.ObjToString(temp["a_m_openid"])
-		s_phone := util.ObjToString(temp["s_phone"])
-		userType := mutil.GetUserType(s_m_openid, a_m_openid, s_phone, util.IntAll(temp["i_type"]))
-		isPush := util.IntAllDef(temp["i_ispush"], 1)
-		jpushid := util.ObjToString(temp["s_jpushid"])
-		opushid := util.ObjToString(temp["s_opushid"])
-		appPhoneType := util.ObjToString(temp["s_appponetype"])
-		pchelperPush := 0
-		//公众号取关用户
-		if userType == 0 && isPush == 0 {
-			logger.Info("过滤掉,公众号取关用户", userId)
-			continue
-		} else if userType == 2 && jpushid == "" && (opushid == "" || appPhoneType == "") {
-			logger.Info("过滤掉,app用户没有登录", userId)
-			continue
-		} else if (userType == 1 || (userType == 5 && isPush == 0)) && jpushid == "" && (opushid == "" || appPhoneType == "") {
-			if s_phone == "" || !mutil.PcHelperIsOnLine(s_phone) {
-				logger.Info("过滤掉,s_phone为空或者pc助手s_phone不在线", userId)
-				continue
-			} else {
-				pchelperPush = 1
-			}
-		}
-		applystatus := util.IntAll(temp["i_applystatus"])
-		vipStatus := util.IntAll(temp["i_vip_status"])
-		isVipUser := IsVipUser(vipStatus)
-		var o_msgset map[string]interface{}
-		if isVipUser {
-			o_msgset, _ = temp["o_vipjy"].(map[string]interface{})
-		} else {
-			o_msgset, _ = temp["o_jy"].(map[string]interface{})
-		}
-		wxpush, apppush, mailpush := mutil.ModeTransform(userType, o_msgset)
-		email := strings.TrimSpace(util.ObjToString(o_msgset["s_email"]))
-		if !MailReg.MatchString(email) {
-			mailpush = 0
-		}
-		if wxpush != 1 && apppush != 1 && mailpush != 1 {
-			logger.Info("过滤掉,wxpush apppush mailpush 都不是1", userId, wxpush, apppush, mailpush)
-			continue
-		}
+		user, o_msgset := public.NewUserInfo(temp)
+		isVipUser := IsVipUser(user.VipStatus)
 		var allKeySet []*KeySet
 		var err error
 		if isVipUser {
-			if dayCount := redis.GetInt("pushcache_2_a", DayCountKey(nowymd, userId)); dayCount >= Config.VipOneDayMaxPushSize {
-				logger.Info("vip用户达到一天最大推送数量", userId, dayCount)
+			if dayCount := redis.GetInt("pushcache_2_a", DayCountKey(nowymd, user.Id)); dayCount >= Config.VipMaxPushSize {
+				logger.Info("vip用户达到一天最大推送数量", user.Id, dayCount)
 				continue
 			}
 			vip_items, _ := o_msgset["a_items"].([]interface{})
@@ -395,11 +357,10 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 			allKeySet, err = m.GetKeySet(o_msgset["a_key"])
 		}
 		if err != nil {
-			logger.Error("获取用户关键词错误!", userId, err)
+			logger.Error("获取用户关键词错误!", user.Id, err)
 			continue
 		}
-		rateMode := util.IntAllDef(o_msgset["i_ratemode"], 2)
-		logger.Info("第", user_batch_index, "批用户,userid", userId, "s_m_openid", s_m_openid, "a_m_openid", a_m_openid, "s_phone", s_phone, "jpushid", jpushid, "opushid", opushid, "applystatus", applystatus, "email", email, "rateMode", rateMode, "wxpush", wxpush, "apppush", apppush, "mailpush", mailpush, "vipstatus", vipStatus)
+		logger.Info("第", user_batch_index, "批用户,userid", user.Id, "s_m_openid", user.S_m_openid, "a_m_openid", user.A_m_openid, "s_phone", user.Phone, "jpushid", user.Jpushid, "opushid", user.Opushid, "applystatus", user.ApplyStatus, "email", user.Email, "rateMode", user.RateMode, "wxpush", user.WxPush, "apppush", user.AppPush, "mailpush", user.MailPush, "vipstatus", user.VipStatus)
 		keys := []string{}                           //过滤后的关键词
 		notkeys := []string{}                        //排除词
 		key_notkey := map[string]map[string]bool{}   //关键词所对应的排除词
@@ -465,43 +426,11 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 			}
 		}
 		if !isVipUser && len(originalKeys) == 0 {
-			logger.Info("过滤掉,没有关键词", userId)
+			logger.Info("过滤掉,没有关键词", user.Id)
 			continue
 		}
-		modifydate := ""
-		md, _ := o_msgset["l_modifydate"].(int64)
-		if md > 0 {
-			modifydate = util.FormatDateByInt64(&md, util.Date_Short_Layout)
-		}
-		if modifydate == "" {
-			now := time.Now()
-			modifydate = util.FormatDate(&now, util.Date_Short_Layout)
-		}
-		user := &UserInfo{
-			Id:           userId,
-			Keys:         originalKeys,
-			Key_notkey:   key_notkey,
-			WxPush:       wxpush,
-			AppPush:      apppush,
-			MailPush:     mailpush,
-			PchelperPush: pchelperPush,
-			Email:        email,
-			S_m_openid:   s_m_openid,
-			A_m_openid:   a_m_openid,
-			Phone:        s_phone,
-			Jpushid:      jpushid,
-			Opushid:      opushid,
-			UserType:     userType,
-			RateMode:     rateMode,
-			ModifyDate:   modifydate,
-			AppPhoneType: appPhoneType,
-			ApplyStatus:  applystatus,
-			Subscribe:    isPush,
-			MatchType:    util.IntAllDef(o_msgset["i_matchway"], 1),
-			MergeOrder:   temp["a_mergeorder"],
-			NickName:     util.ObjToString(temp["s_nickname"]),
-			VipStatus:    vipStatus,
-		}
+		user.Keys = originalKeys
+		user.Key_notkey = key_notkey
 		/***************start*****************/
 		if isVipUser {
 			//vip付费-采购单位行业

+ 0 - 0
src/jfw/modules/pushsubscribe/src/match/job/vipuser.go → src/jfw/modules/pushsubscribe/src/match/job/vipmatch.go


+ 0 - 89
src/jfw/modules/pushsubscribe/src/match/util/util.go

@@ -1,98 +1,9 @@
 package util
 
 import (
-	"qfw/util"
-
 	"gopkg.in/mgo.v2/bson"
 )
 
-//重新设置用户类型
-func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
-	if userType == 0 {
-		if s_m_openid != "" && a_m_openid == "" && phone == "" {
-			userType = 0 //公众号
-		} else if s_m_openid == "" && phone != "" {
-			userType = 1 //app手机号
-		} else if s_m_openid == "" && a_m_openid != "" {
-			userType = 2 //app微信
-			//} else if s_m_openid != "" && a_m_openid == "" && phone == "" {
-			//userType = 3 //用户合并以后只有微信用户
-			//} else if s_m_openid == "" && (a_m_openid != "" || phone != "") {
-			//userType = 4 //用户合并以后只有app用户
-		} else if s_m_openid != "" && (a_m_openid != "" || phone != "") {
-			userType = 5 //用户合并以后公众号和app用户都有
-		} else {
-			userType = -1
-		}
-	}
-	return userType
-}
-
-//推送方式转换
-func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int) {
-	mode := util.IntAll(o_msgset["i_mode"])
-	wxpush := util.IntAll(o_msgset["i_wxpush"])
-	apppush := util.IntAll(o_msgset["i_apppush"])
-	mailpush := util.IntAll(o_msgset["i_mailpush"])
-	if wxpush == 1 || apppush == 1 || mailpush == 1 {
-		return wxpush, apppush, mailpush
-	}
-	//老的app用户
-	if userType == 1 || userType == 2 {
-		switch mode {
-		case 0, 1:
-			apppush = 1
-			break
-		case 2:
-			mailpush = 1
-			break
-		case 3:
-			apppush = 1
-			mailpush = 1
-			break
-		}
-		if apppush == 0 && mailpush == 0 {
-			apppush = 1
-		}
-	} else if userType == 0 {
-		switch mode {
-		case 0, 1:
-			wxpush = 1
-			break
-		case 2:
-			mailpush = 1
-			break
-		case 3:
-			wxpush = 1
-			mailpush = 1
-			break
-		}
-		if wxpush == 0 && mailpush == 0 {
-			wxpush = 1
-		}
-	} else {
-		switch mode {
-		case 0, 1, 3:
-			if userType == 3 {
-				wxpush = 1
-			} else if userType == 4 {
-				apppush = 1
-			} else if userType == 5 {
-				wxpush = 1
-				apppush = 1
-			}
-			if mode == 3 {
-				mailpush = 1
-			}
-			break
-		case 2:
-			mailpush = 1
-			break
-		}
-	}
-	return wxpush, apppush, mailpush
-}
-
 func ToObjectIds(ids []string) []bson.ObjectId {
 	_ids := []bson.ObjectId{}
 	for _, v := range ids {

+ 1 - 1
src/jfw/modules/pushsubscribe/src/match/util/rpccall.go → src/jfw/modules/pushsubscribe/src/public/rpccall.go

@@ -1,4 +1,4 @@
-package util
+package public
 
 import (
 	"net/rpc"

+ 174 - 0
src/jfw/modules/pushsubscribe/src/public/util.go

@@ -2,8 +2,17 @@ package public
 
 import (
 	"fmt"
+	"qfw/util"
+	"regexp"
+	"strings"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
 )
 
+var MailReg = regexp.MustCompile("^.+@.+$")
+
 func IsVipUser(vipStatus int) bool {
 	if vipStatus == 1 || vipStatus == 2 {
 		return true
@@ -20,3 +29,168 @@ func DayCountKey(nowymd, id string) string {
 func OnceCountKey(nowymd, id string) string {
 	return fmt.Sprintf("oncecount_%s_%s", nowymd, id)
 }
+
+//重新设置用户类型
+func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
+	if userType == 0 {
+		if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			userType = 0 //公众号
+		} else if s_m_openid == "" && phone != "" {
+			userType = 1 //app手机号
+		} else if s_m_openid == "" && a_m_openid != "" {
+			userType = 2 //app微信
+			//} else if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			//userType = 3 //用户合并以后只有微信用户
+			//} else if s_m_openid == "" && (a_m_openid != "" || phone != "") {
+			//userType = 4 //用户合并以后只有app用户
+		} else if s_m_openid != "" && (a_m_openid != "" || phone != "") {
+			userType = 5 //用户合并以后公众号和app用户都有
+		} else {
+			userType = -1
+		}
+	}
+	return userType
+}
+
+//推送方式转换
+func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int) {
+	mode := util.IntAll(o_msgset["i_mode"])
+	wxpush := util.IntAll(o_msgset["i_wxpush"])
+	apppush := util.IntAll(o_msgset["i_apppush"])
+	mailpush := util.IntAll(o_msgset["i_mailpush"])
+	if wxpush == 1 || apppush == 1 || mailpush == 1 {
+		return wxpush, apppush, mailpush
+	}
+	//老的app用户
+	if userType == 1 || userType == 2 {
+		switch mode {
+		case 0, 1:
+			apppush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			apppush = 1
+			mailpush = 1
+			break
+		}
+		if apppush == 0 && mailpush == 0 {
+			apppush = 1
+		}
+	} else if userType == 0 {
+		switch mode {
+		case 0, 1:
+			wxpush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			wxpush = 1
+			mailpush = 1
+			break
+		}
+		if wxpush == 0 && mailpush == 0 {
+			wxpush = 1
+		}
+	} else {
+		switch mode {
+		case 0, 1, 3:
+			if userType == 3 {
+				wxpush = 1
+			} else if userType == 4 {
+				apppush = 1
+			} else if userType == 5 {
+				wxpush = 1
+				apppush = 1
+			}
+			if mode == 3 {
+				mailpush = 1
+			}
+			break
+		case 2:
+			mailpush = 1
+			break
+		}
+	}
+	return wxpush, apppush, mailpush
+}
+
+func NewUserInfo(temp map[string]interface{}) (user *UserInfo, o_msgset map[string]interface{}) {
+	userId := fmt.Sprintf("%x", string(temp["_id"].(bson.ObjectId)))
+	s_m_openid := util.ObjToString(temp["s_m_openid"])
+	a_m_openid := util.ObjToString(temp["a_m_openid"])
+	s_phone := util.ObjToString(temp["s_phone"])
+	userType := GetUserType(s_m_openid, a_m_openid, s_phone, util.IntAll(temp["i_type"]))
+	isPush := util.IntAllDef(temp["i_ispush"], 1)
+	jpushid := util.ObjToString(temp["s_jpushid"])
+	opushid := util.ObjToString(temp["s_opushid"])
+	appPhoneType := util.ObjToString(temp["s_appponetype"])
+	applystatus := util.IntAll(temp["i_applystatus"])
+	vipStatus := util.IntAll(temp["i_vip_status"])
+	if IsVipUser(vipStatus) {
+		o_msgset, _ = temp["o_vipjy"].(map[string]interface{})
+	} else {
+		o_msgset, _ = temp["o_jy"].(map[string]interface{})
+	}
+	wxpush, apppush, mailpush := ModeTransform(userType, o_msgset)
+	email := strings.TrimSpace(util.ObjToString(o_msgset["s_email"]))
+	rateMode := util.IntAllDef(o_msgset["i_ratemode"], 2)
+	pchelperPush := 0
+	//公众号取关用户
+	if userType == 0 && isPush == 0 {
+		logger.Info("过滤掉,公众号取关用户", userId)
+		return
+	} else if userType == 2 && jpushid == "" && (opushid == "" || appPhoneType == "") {
+		logger.Info("过滤掉,app用户没有登录", userId)
+		return
+	} else if (userType == 1 || (userType == 5 && isPush == 0)) && jpushid == "" && (opushid == "" || appPhoneType == "") {
+		if s_phone == "" || !PcHelperIsOnLine(s_phone) {
+			logger.Info("过滤掉,s_phone为空或者pc助手s_phone不在线", userId)
+			return
+		} else {
+			pchelperPush = 1
+		}
+	}
+	if !MailReg.MatchString(email) {
+		mailpush = 0
+	}
+	if wxpush != 1 && apppush != 1 && mailpush != 1 {
+		logger.Info("过滤掉,wxpush apppush mailpush 都不是1", userId, wxpush, apppush, mailpush)
+		return
+	}
+	modifydate := ""
+	md, _ := o_msgset["l_modifydate"].(int64)
+	if md > 0 {
+		modifydate = util.FormatDateByInt64(&md, util.Date_Short_Layout)
+	}
+	if modifydate == "" {
+		now := time.Now()
+		modifydate = util.FormatDate(&now, util.Date_Short_Layout)
+	}
+	user = &UserInfo{
+		Id:           userId,
+		WxPush:       wxpush,
+		AppPush:      apppush,
+		MailPush:     mailpush,
+		PchelperPush: pchelperPush,
+		Email:        email,
+		S_m_openid:   s_m_openid,
+		A_m_openid:   a_m_openid,
+		Phone:        s_phone,
+		Jpushid:      jpushid,
+		Opushid:      opushid,
+		UserType:     userType,
+		RateMode:     rateMode,
+		ModifyDate:   modifydate,
+		AppPhoneType: appPhoneType,
+		ApplyStatus:  applystatus,
+		Subscribe:    isPush,
+		MatchType:    util.IntAllDef(o_msgset["i_matchway"], 1),
+		MergeOrder:   temp["a_mergeorder"],
+		NickName:     util.ObjToString(temp["s_nickname"]),
+		VipStatus:    vipStatus,
+	}
+	return
+}

+ 2 - 1
src/jfw/modules/pushsubscribe/src/push/config.json

@@ -30,7 +30,7 @@
 		}
 	],
 	"maxPushSize": 50,
-	"vipOneDayMaxPushSize": 2000,
+	"vipMaxPushSize": 2000,
 	"mgoAddr": "192.168.3.128:27080",
 	"mgoSize": 10,
 	"testids": ["5cee3e2a61fd002c800e2569"],
@@ -38,6 +38,7 @@
 	"wxColor": "#2cb7ca",
 	"wxGroup": "招标信息",
 	"wxTitle": "根据你订阅的关键词“%s”,剑鱼标讯为你推送以下信息。如果不想继续收到此类信息,可进入招标订阅的设置页面取消订阅。",
+	"vipWxTitle": "根据你当前订阅,剑鱼标讯为你推送以下信息。如果不想继续收到此类信息,可进入招标订阅的设置页面取消订阅。",
 	"wxDetailColor":"#686868",
 	"appPushServiceRpc":"127.0.0.1:5566",
 	"pcHelper":"192.168.20.129:8082",

+ 2 - 8
src/jfw/modules/pushsubscribe/src/push/config/config.go

@@ -19,6 +19,7 @@ type config struct {
 	Mail_title              string      `json:"mail_title"`
 	Mails                   []*pushMail `json:"mails"`
 	MaxPushSize             int         `json:"maxPushSize"`
+	VipMaxPushSize          int         `json:"vipMaxPushSize"`
 	MgoAddr                 string      `json:"mgoAddr"`
 	MgoSize                 int         `json:"mgoSize"`
 	TestIds                 []string    `json:"testIds"`
@@ -26,6 +27,7 @@ type config struct {
 	WxColor                 string      `json:"wxColor"`
 	WxGroup                 string      `json:"wxGroup"`
 	WxTitle                 string      `json:"wxTitle"`
+	VipWxTitle              string      `json:"vipWxTitle"`
 	WxDetailColor           string      `json:"wxDetailColor"`
 	AppPushServiceRpc       string      `json:"appPushServiceRpc"`
 	PcHelper                string      `json:"pcHelper"`
@@ -34,7 +36,6 @@ type config struct {
 	OtherPushTimes          []string    `json:"otherPushTimes"`
 	RefreshTime             string      `json:"refreshTime"`
 	WxPoolSize              int         `json:"wxPoolSize"`
-	VipOneDayMaxPushSize    int         `json:"vipOneDayMaxPushSize"`
 	AppPoolSize             int         `json:"appPoolSize"`
 	MailSleep               int         `json:"mailSleep"`
 	SaveSleep               int         `json:"saveSleep"`
@@ -66,13 +67,6 @@ type pushMail struct {
 	MailPoolSize int    `json:"mailPoolSize"`
 	MailReTry    int    `json:"mailReTry"`
 }
-type cassandra struct {
-	Cachesize int      `json:"cachesize"`
-	Host      []string `json:"host"`
-	Open      bool     `json:"open"`
-	Size      int      `json:"size"`
-	Timeout   int      `json:"timeout"`
-}
 
 var (
 	Gmails         []*mail.GmailAuth

+ 8 - 7
src/jfw/modules/pushsubscribe/src/push/job/job.go → src/jfw/modules/pushsubscribe/src/push/job/jobs.go

@@ -1,7 +1,7 @@
 package job
 
 import (
-	"push/config"
+	. "push/config"
 	"sync"
 )
 
@@ -19,19 +19,20 @@ var Jobs = struct {
 	Move: &moveJob{
 		moveLock:  &sync.Mutex{},
 		moveWait:  &sync.WaitGroup{},
-		movePool:  make(chan bool, config.Config.MovePoolSize),
+		movePool:  make(chan bool, Config.MovePoolSize),
 		mergeLock: &sync.Mutex{},
 		mergeWait: &sync.WaitGroup{},
-		mergePool: make(chan bool, config.Config.MergePoolSize),
+		mergePool: make(chan bool, Config.MergePoolSize),
 	},
 	Refresh: &refreshJob{},
 	Push: &pushJob{
-		pool: make(chan bool, config.Config.PushPoolSize),
-		wait: &sync.WaitGroup{},
-		lock: &sync.Mutex{},
+		pool:     make(chan bool, Config.PushPoolSize),
+		savePool: make(chan bool, Config.SavePoolSize),
+		wait:     &sync.WaitGroup{},
+		lock:     &sync.Mutex{},
 	},
 	Repair: &repairJob{
-		pool: make(chan bool, config.Config.PushPoolSize),
+		pool: make(chan bool, Config.PushPoolSize),
 		wait: &sync.WaitGroup{},
 		lock: &sync.Mutex{},
 	},

+ 24 - 23
src/jfw/modules/pushsubscribe/src/push/job/movejob.go

@@ -56,7 +56,7 @@ func (m *moveJob) Execute() {
 			isVipUser := IsVipUser(util.IntAll(temp["vipstatus"]))
 			maxPushSize := Config.MaxPushSize
 			if isVipUser {
-				maxPushSize = Config.VipOneDayMaxPushSize
+				maxPushSize = Config.VipMaxPushSize
 			}
 			m.moveLock.Lock()
 			move := moves[userId]
@@ -92,7 +92,7 @@ func (m *moveJob) Execute() {
 			moves[userId] = move
 			length++
 			if length == Config.MoveBatch {
-				m.Merge(&number, nowUnix, moves)
+				m.merge(&number, nowUnix, moves)
 				length = 0
 				moves = map[string]*moveJob{}
 			}
@@ -104,13 +104,13 @@ func (m *moveJob) Execute() {
 	}
 	m.moveWait.Wait()
 	if length > 0 {
-		m.Merge(&number, nowUnix, moves)
+		m.merge(&number, nowUnix, moves)
 		length = 0
 		moves = map[string]*moveJob{}
 	}
 	logger.Info("迁移数据结束。。。", index)
 }
-func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
+func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 	*number++
 	logger.Info("第", *number, "次开始合并数据")
 	index := 0
@@ -130,13 +130,17 @@ func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 			sess := mongodb.GetMgoConn()
 			defer mongodb.DestoryMongoConn(sess)
 			var data map[string]interface{}
-			sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
+			err := sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
+			if err != nil {
+				logger.Error(userId, "获取用户pushspace表中出数据出错", err)
+				return
+			}
 			if data == nil { //批量新增
 				m.mergeLock.Lock()
 				saveArray = append(saveArray, move.info)
 				saveArray_delete = append(saveArray_delete, move.ids...)
 				if len(saveArray) == BulkSize {
-					m.SaveBulk(sess, &saveArray, &saveArray_delete)
+					m.saveBulk(sess, &saveArray, &saveArray_delete)
 				}
 				m.mergeLock.Unlock()
 			} else { //批量更新
@@ -152,7 +156,6 @@ func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 				if newListOrig == nil || len(newListOrig) == 0 {
 					return
 				}
-				pushAll := make(map[string]interface{})
 				oldList := putil.ToSortList(data["list"])
 				idMap := map[string]bool{}
 				for _, vv := range oldList {
@@ -173,8 +176,9 @@ func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 				rLength := len(oldList)
 				maxPushSize := Config.MaxPushSize
 				if move.isVipUser {
-					maxPushSize = Config.VipOneDayMaxPushSize
+					maxPushSize = Config.VipMaxPushSize
 				}
+				upSet := map[string]interface{}{}
 				if rLength+pLength > maxPushSize {
 					newList = append(newList, oldList...)
 					sort.Sort(newList)
@@ -182,20 +186,17 @@ func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 					setMap["size"] = maxPushSize
 				} else { //追加
 					setMap["size"] = rLength + pLength
-					pushAll["list"] = newList
-				}
-				upSet := map[string]interface{}{
-					"$set": setMap,
-				}
-				if len(pushAll) > 0 {
-					upSet["$pushAll"] = pushAll
+					upSet["$pushAll"] = map[string]interface{}{
+						"list": newList,
+					}
 				}
+				upSet["$set"] = setMap
 				m.mergeLock.Lock()
 				updateArray_delete = append(updateArray_delete, move.ids...)
 				updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
 				updateArray_set = append(updateArray_set, upSet)
 				if len(updateArray_query) == BulkSize {
-					m.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
+					m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
 				}
 				m.mergeLock.Unlock()
 			}
@@ -209,14 +210,14 @@ func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 	sess := mongodb.GetMgoConn()
 	defer mongodb.DestoryMongoConn(sess)
 	if len(saveArray) > 0 {
-		m.SaveBulk(sess, &saveArray, &saveArray_delete)
+		m.saveBulk(sess, &saveArray, &saveArray_delete)
 	}
 	if len(updateArray_query) > 0 {
-		m.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
+		m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
 	}
 	logger.Info("第", *number, "次合并数据结束。。。", index)
 }
-func (m *moveJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
+func (m *moveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
 	coll := sess.DB(DbName).C("pushspace")
 	bulk := coll.Bulk()
 	for _, v := range *saves {
@@ -226,11 +227,11 @@ func (m *moveJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, d
 	if nil != err {
 		logger.Info("BulkError", err)
 	} else {
-		m.DelBulk(sess, deletes)
+		m.delBulk(sess, deletes)
 	}
 	*saves = []map[string]interface{}{}
 }
-func (m *moveJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
+func (m *moveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
 	coll := sess.DB(DbName).C("pushspace")
 	bulk := coll.Bulk()
 	for k, v := range *array_q {
@@ -240,12 +241,12 @@ func (m *moveJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]i
 	if nil != err {
 		logger.Info("UpdateBulkError", err)
 	} else {
-		m.DelBulk(sess, array_d)
+		m.delBulk(sess, array_d)
 	}
 	*array_q = []map[string]interface{}{}
 	*array_s = []map[string]interface{}{}
 }
-func (m *moveJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
+func (m *moveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
 	coll := sess.DB(DbName).C("pushspace_temp")
 	count := 0
 	bulk := coll.Bulk()

+ 67 - 0
src/jfw/modules/pushsubscribe/src/push/job/normalpush.go

@@ -0,0 +1,67 @@
+package job
+
+import (
+	. "push/config"
+	"qfw/util"
+	"qfw/util/mongodb"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//正常推送,一天推送三次或者一天一次
+type NormalPush struct{}
+
+func (n *NormalPush) OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{}) {
+	users := &[]map[string]interface{}{}
+	i := 0
+	lastId := ""
+	var query map[string]interface{}
+	//根据任务类型,查找ratemode
+	if taskType == 1 {
+		query = map[string]interface{}{
+			"ratemode": map[string]interface{}{
+				"$in": []int{1, 3, 4},
+			},
+		}
+	} else if taskType == 2 {
+		query = map[string]interface{}{
+			"ratemode": 2,
+		}
+	} else {
+		logger.Error("taskType error", taskType)
+		return i, users
+	}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	if len(Config.TestIds) > 0 {
+		query["userid"] = map[string]interface{}{
+			"$in": Config.TestIds,
+		}
+	}
+	if *startId != "" {
+		query["_id"] = map[string]interface{}{
+			"$gt": bson.ObjectIdHex(*startId),
+		}
+	}
+	logger.Info("推送任务", taskType, "开始加载第", batchIndex, "批用户", query)
+	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		i++
+		lastId = util.BsonIdToSId(temp["_id"])
+		*users = append(*users, temp)
+		temp = make(map[string]interface{})
+		if i == Config.PushBatch {
+			break
+		}
+	}
+	logger.Info("推送任务", taskType, "第", batchIndex, "批用户加载结束", lastId)
+	return i, users
+}
+
+func (n *NormalPush) GetUser() {
+
+}
+func (n *NormalPush) AfterPush() {
+
+}

+ 7 - 0
src/jfw/modules/pushsubscribe/src/push/job/pusher.go

@@ -0,0 +1,7 @@
+package job
+
+type Pusher interface {
+	OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{})
+	GetUser()
+	AfterPush()
+}

+ 141 - 119
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -11,6 +11,7 @@ import (
 	"qfw/util/mail"
 	"qfw/util/mongodb"
 	"qfw/util/redis"
+	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -38,25 +39,30 @@ type pushJob struct {
 	pool                    chan bool
 	wait                    *sync.WaitGroup
 	lock                    *sync.Mutex
-	lastId                  string
-	users                   *[]map[string]interface{}
-	vipTempSave             bool
 	minutePushPool          chan bool
 	fastigiumMinutePushPool chan bool
+	savePool                chan bool
 }
 
 //taskType 1--一天三次推送 2--九点推送
 func (p *pushJob) Execute(taskType int) {
 	defer util.Catch()
+	var pusher Pusher
+	if taskType == 1 || taskType == 2 {
+		pusher = &NormalPush{}
+	} else if taskType == 3 {
+		pusher = &SpecialPush{}
+	}
 	p.lock.Lock()
 	defer p.lock.Unlock()
 	p.taskType = taskType
 	logger.Info("推送任务", p.taskType, "开始推送。。。")
-	batch_index := 0
+	batchIndex := 0
+	startId := ""
 	for {
-		batch_index++
-		batch_size := p.OncePushBatch(batch_index)
-		for _, temp := range *p.users {
+		batchIndex++
+		batch_size, users := pusher.OncePushBatch(taskType, batchIndex, &startId)
+		for _, temp := range *users {
 			isTake := true
 			select {
 			case <-time.After(5 * time.Minute):
@@ -114,11 +120,11 @@ func (p *pushJob) Execute(taskType int) {
 				if u.MailPush == 1 {
 					mailPush = 1
 				}
-				list := putil.ToSortList(v["list"])
 				logger.Info("推送任务", p.taskType, "用户接收方式", "userId", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "pchelperPush", u.PchelperPush)
 				if wxPush != 1 && appPush != 1 && mailPush != 1 {
 					return
 				}
+				list := putil.ToSortList(v["list"])
 				//再对取消关注以及app没有登录的用户进行过滤,但是依然可以进行助手推送
 				if u.Subscribe == 0 {
 					wxPush = 0
@@ -135,7 +141,7 @@ func (p *pushJob) Execute(taskType int) {
 						mailPush = 0
 					}
 				}
-				isSaveSuccess, wxStatus, appStatus, mailStatus := p.Push(p.taskType, wxPush, appPush, mailPush, u, list)
+				isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus := p.push(p.taskType, wxPush, appPush, mailPush, u, list)
 				if isSaveSuccess {
 					if u.FirstPushTime == 0 {
 						go mongodb.Update("user", map[string]interface{}{
@@ -146,6 +152,9 @@ func (p *pushJob) Execute(taskType int) {
 							},
 						}, false, false)
 					}
+					if isVipTempSave {
+						p.vipTempSave(u.Id, v)
+					}
 				} else {
 					return
 				}
@@ -157,41 +166,19 @@ func (p *pushJob) Execute(taskType int) {
 					logger.Error("推送任务", p.taskType, "remove error", err)
 				}
 				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
-					f_count, err := sess.DB(DbName).C("pushspace").FindId(v["_id"]).Count()
-					if err != nil {
-						logger.Error("推送任务", p.taskType, "find count error", err)
-						return
+					v["failtime"] = time.Now().Unix()
+					if wxStatus == -1 {
+						v["wxfail"] = 1
 					}
-					if f_count == 0 {
-						v["failtime"] = time.Now().Unix()
-						if wxStatus == -1 {
-							v["wxfail"] = 1
-						}
-						if appStatus == -1 {
-							v["appfail"] = 1
-						}
-						if mailStatus == -1 {
-							v["mailfail"] = 1
-						}
-						err := sess.DB(DbName).C("pushspace_fail").Insert(v)
-						if err != nil {
-							logger.Error("推送任务", p.taskType, "update error", err)
-						}
-					} else {
-						f_update := map[string]interface{}{}
-						if wxStatus == -1 {
-							f_update["wxfail"] = 1
-						}
-						if appStatus == -1 {
-							f_update["appfail"] = 1
-						}
-						if mailStatus == -1 {
-							f_update["mailfail"] = 1
-						}
-						err := sess.DB(DbName).C("pushspace_fail").UpdateId(v["_id"], map[string]interface{}{"$set": f_update})
-						if err != nil {
-							logger.Error("推送任务", p.taskType, "update error", err)
-						}
+					if appStatus == -1 {
+						v["appfail"] = 1
+					}
+					if mailStatus == -1 {
+						v["mailfail"] = 1
+					}
+					_, err := sess.DB(DbName).C("pushspace_fail").UpsertId(v["_id"], map[string]interface{}{"$set": v})
+					if err != nil {
+						logger.Error("推送任务", p.taskType, "update error", err)
 					}
 				}
 			}(temp, isTake)
@@ -201,63 +188,16 @@ func (p *pushJob) Execute(taskType int) {
 		}
 	}
 	p.wait.Wait()
-	p.lastId = ""
-	p.users = nil
 	logger.Info("推送任务结束。。。", p.taskType)
 }
-func (p *pushJob) OncePushBatch(batch_index int) int {
-	p.users = &[]map[string]interface{}{}
-	i := 0
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	var query map[string]interface{}
-	//根据任务类型,查找ratemode
-	if p.taskType == 1 {
-		query = map[string]interface{}{
-			"ratemode": 1,
-		}
-	} else if p.taskType == 2 {
-		query = map[string]interface{}{
-			"ratemode": 2,
-		}
-	} else {
-		logger.Error("taskType error", p.taskType)
-		return i
-	}
-	if len(Config.TestIds) > 0 {
-		query["userid"] = map[string]interface{}{
-			"$in": Config.TestIds,
-		}
-	}
-	if p.lastId != "" {
-		query["_id"] = map[string]interface{}{
-			"$gt": bson.ObjectIdHex(p.lastId),
-		}
-	}
-	logger.Info("推送任务", p.taskType, "开始加载第", batch_index, "批用户", query)
-	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
-	for temp := make(map[string]interface{}); it.Next(&temp); {
-		i++
-		p.lastId = util.BsonIdToSId(temp["_id"])
-		*p.users = append(*p.users, temp)
-		temp = make(map[string]interface{})
-		if i == Config.PushBatch {
-			break
-		}
-	}
-	logger.Info("推送任务", p.taskType, "第", batch_index, "批用户加载结束", p.lastId)
-	return i
-}
 
-func (p *pushJob) Push(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
+func (p *pushJob) push(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (isSaveSuccess, isVipTempSave bool, wxStatus, appStatus, mailStatus int) {
 	if wxPush == 1 || appPush == 1 || mailPush == 1 || u.PchelperPush == 1 {
-		if list != nil {
-			isSaveSuccess, wxStatus, appStatus, mailStatus = p.DoPush(taskType, true, wxPush, appPush, mailPush, u, &list)
-		}
+		isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus = p.doPush(taskType, true, wxPush, appPush, mailPush, u, &list)
 	}
-	return isSaveSuccess, wxStatus, appStatus, mailStatus
+	return isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus
 }
-func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
+func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess, isVipTempSave bool, wxStatus, appStatus, mailStatus int) {
 	defer util.Catch()
 	mailContent := ""
 	jpushtitle := ""
@@ -351,7 +291,7 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			mailContent += fmt.Sprintf(Config.Mail_content, infosLength, url, otitle, classArea, area, classType, infotype, industryclass, industry, dates)
 		}
 		if isVipUser {
-			if dayCount >= Config.VipOneDayMaxPushSize {
+			if dayCount >= Config.VipMaxPushSize {
 				break
 			}
 		} else {
@@ -379,7 +319,7 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 	pushDate := ""
 	if taskType != 0 && isSave {
 		//推送记录id
-		pushDate = putil.SaveSendInfo(k, infos)
+		pushDate = p.save(k, infos)
 		if pushDate == "" {
 			logger.Info("推送任务", taskType, "保存出错", k.Id)
 			return
@@ -387,8 +327,12 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 		logger.Info("推送任务", taskType, "保存成功", pushDate, k.Id)
 		isSaveSuccess = true
 	}
-	if isVipUser && (now.Day() == 28 || now.Weekday().String() == "Friday") && (k.RateMode == 3 || k.RateMode == 4) {
-
+	if isVipUser && (k.RateMode == 3 || k.RateMode == 4) {
+		if now.Day() != 28 && now.Weekday().String() != "Friday" {
+			isVipTempSave = true
+			return
+		} else {
+		}
 	}
 	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
 	if isSaveSuccess {
@@ -421,56 +365,59 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 					TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
 				}
 			}
-			Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
-			LastTip := ""
+			tip := util.If(TmpTip == "", "", TmpTip+":\n").(string)
+			lastTip := ""
 			if infosLength > 1 {
-				LastTip = fmt.Sprintf("...(共%d条)", infosLength)
+				lastTip = fmt.Sprintf("...(共%d条)", infosLength)
 			}
-			LastTipLen := len([]rune(LastTip))
-			wxTitleKeys := strings.Join(k.Keys, ";")
-			if len([]rune(wxTitleKeys)) > 8 {
-				wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
+			LastTipLen := len([]rune(lastTip))
+			wxTitle := Config.VipWxTitle
+			if !isVipUser {
+				wxTitleKeys := strings.Join(k.Keys, ";")
+				if len([]rune(wxTitleKeys)) > 8 {
+					wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
+				}
+				wxTitle = fmt.Sprintf(Config.WxTitle, wxTitleKeys)
 			}
-			wxtitle := fmt.Sprintf(Config.WxTitle, wxTitleKeys)
-			TitleLen := len([]rune(wxtitle))
-			reLen := 200 - TitleLen - 10 - WxGroupLen - len([]rune(Tip1))
-			WXTitle := ""
+			TitleLen := len([]rune(wxTitle))
+			reLen := 200 - TitleLen - 10 - WxGroupLen - len([]rune(tip))
+			wxTplTitle := ""
 			bshow := false
 			for n := 1; n < len(TitleArray)+1; n++ {
 				curTitle := TitleArray[n-1]
-				tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
+				tmptitle := wxTplTitle + fmt.Sprintf("%d %s\n", n, curTitle)
 				ch := reLen - len([]rune(tmptitle))
 				if ch < LastTipLen { //加上后大于后辍,则没有完全显示
 					if ch == 0 && n == len(TitleArray) {
-						WXTitle = tmptitle
+						wxTplTitle = tmptitle
 						bshow = true
 					} else {
-						ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
+						ch_1 := reLen - len([]rune(wxTplTitle)) - LastTipLen
 						if ch_1 > 8 {
 							curLen := len([]rune(curTitle))
 							if ch_1 > curLen {
 								ch_1 = curLen
 							}
-							WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
+							wxTplTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
 						}
 					}
 				} else if ch == LastTipLen {
-					WXTitle = tmptitle
+					wxTplTitle = tmptitle
 					if n == len(TitleArray) {
 						bshow = true
 					}
 				} else {
-					WXTitle = tmptitle
+					wxTplTitle = tmptitle
 					if n == len(TitleArray) {
 						bshow = true
 					}
 				}
 			}
 			if bshow {
-				LastTip = ""
+				lastTip = ""
 			}
 			//推送微信
-			isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, wxtitle, pushDate)
+			isPushOk = putil.SendWeixin(k, tip+wxTplTitle+lastTip, wxTitle, pushDate)
 			if isPushOk {
 				wxStatus = 1
 			} else {
@@ -514,7 +461,7 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 	if mailPush == 1 {
 		logger.Info("推送任务", taskType, "开始邮箱推送", k.Id)
 		html := fmt.Sprintf(Config.Mail_html, strings.Replace(strings.Join(k.Keys, ";"), "+", " ", -1), mailContent)
-		isPushOk := p.SendMail(k.Email, Config.Mail_title, html, nil)
+		isPushOk := p.sendMail(k.Email, Config.Mail_title, html, nil)
 		if isPushOk {
 			mailStatus = 1
 		} else {
@@ -526,7 +473,7 @@ func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 }
 
 //推送邮件(含附件)
-func (p *pushJob) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
+func (p *pushJob) sendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
 	if !Config.IsPushMail || len(Gmails) == 0 {
 		return true
 	}
@@ -558,3 +505,78 @@ func (p *pushJob) SendMail(email, subject, html string, fmdatas []map[string]int
 	}
 	return status
 }
+
+//保存发送信息
+func (p *pushJob) save(k *UserInfo, matchInfos []*MatchInfo) string {
+	p.savePool <- true
+	defer func() {
+		<-p.savePool
+	}()
+	if Config.SaveSleep > 0 {
+		time.Sleep(time.Duration(Config.SaveSleep) * time.Millisecond)
+	}
+	unix := time.Now().Unix()
+	values := []interface{}{}
+	for _, matchInfo := range matchInfos {
+		values = append(values, k.Id, util.ObjToString((*matchInfo.Info)["_id"]), unix, strings.Join(matchInfo.Keys, " "), util.ObjToString((*matchInfo.Info)["area"]), util.ObjToString((*matchInfo.Info)["city"]), util.ObjToString((*matchInfo.Info)["buyerclass"]))
+	}
+	savecount := putil.Mysql.InsertBatch("pushsubscribe", []string{"userid", "infoid", "date", "matchkeys", "area", "city", "buyerclass"}, values)
+	if int(savecount) != len(values) {
+		logger.Error(k.Id, "批量保存有问题", len(values), savecount)
+	}
+	return fmt.Sprint(unix)
+}
+
+//vip 每周 每月推送 暂时保存
+func (p *pushJob) vipTempSave(userId string, v map[string]interface{}) {
+	newList := putil.ToSortList(v["list"])
+	pLength := len(newList)
+	if pLength == 0 {
+		return
+	}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	var data map[string]interface{}
+	coll := sess.DB(DbName).C("pushspace_vip")
+	err := coll.Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"size": 1, "list": 1, "date": 1, "userid": 1}).One(&data)
+	if err != nil {
+		logger.Error(userId, "获取用户pushspace_vip数据出错", err)
+		return
+	}
+	nowymd := util.NowFormat(util.Date_yyyyMMdd)
+	if data == nil { //批量新增
+		err = coll.Insert(map[string]interface{}{
+			"userid":     v["userid"],
+			"size":       v["size"],
+			"list":       v["list"],
+			"date":       nowymd,
+			"createtime": time.Now().Unix(),
+		})
+		if err != nil {
+			logger.Error(userId, "保存pushspace_vip出错", err)
+			return
+		}
+	} else { //批量更新
+		upSet := map[string]interface{}{}
+		if nowymd != util.ObjToString(data["date"]) {
+			return
+		}
+		oldList := putil.ToSortList(data["list"])
+		if len(oldList)+pLength > Config.MaxPushSize {
+			newList = append(newList, oldList...)
+			sort.Sort(newList)
+			v["list"] = newList[:Config.MaxPushSize]
+		} else { //追加
+			upSet["$pushAll"] = map[string]interface{}{
+				"list": newList,
+			}
+		}
+		v["size"] = util.IntAll(v["size"]) + pLength
+		upSet["$set"] = v
+		err = coll.UpdateId(data["_id"], upSet)
+		if err != nil {
+			logger.Error(userId, "更新pushspace_vip出错", err)
+			return
+		}
+	}
+}

+ 1 - 1
src/jfw/modules/pushsubscribe/src/push/job/repairjob.go

@@ -72,7 +72,7 @@ func (r *repairJob) Execute(param string) bool {
 					mailPush = 1
 				}
 				list := putil.ToSortList(v["list"])
-				_, wxStatus, appStatus, mailStatus := Jobs.Push.Push(0, wxPush, appPush, mailPush, u, list)
+				_, _, wxStatus, appStatus, mailStatus := Jobs.Push.push(0, wxPush, appPush, mailPush, u, list)
 				sess := mongodb.GetMgoConn()
 				defer mongodb.DestoryMongoConn(sess)
 				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {

+ 67 - 0
src/jfw/modules/pushsubscribe/src/push/job/specialpush.go

@@ -0,0 +1,67 @@
+package job
+
+import (
+	. "push/config"
+	"qfw/util"
+	"qfw/util/mongodb"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//vip用户每月或者没周推送
+type SpecialPush struct{}
+
+func (s *SpecialPush) OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{}) {
+	users := &[]map[string]interface{}{}
+	i := 0
+	lastId := ""
+	var query map[string]interface{}
+	//根据任务类型,查找ratemode
+	if taskType == 1 {
+		query = map[string]interface{}{
+			"ratemode": map[string]interface{}{
+				"$in": []int{1, 3, 4},
+			},
+		}
+	} else if taskType == 2 {
+		query = map[string]interface{}{
+			"ratemode": 2,
+		}
+	} else {
+		logger.Error("taskType error", taskType)
+		return i, users
+	}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	if len(Config.TestIds) > 0 {
+		query["userid"] = map[string]interface{}{
+			"$in": Config.TestIds,
+		}
+	}
+	if *startId != "" {
+		query["_id"] = map[string]interface{}{
+			"$gt": bson.ObjectIdHex(*startId),
+		}
+	}
+	logger.Info("推送任务", taskType, "开始加载第", batchIndex, "批用户", query)
+	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		i++
+		lastId = util.BsonIdToSId(temp["_id"])
+		*users = append(*users, temp)
+		temp = make(map[string]interface{})
+		if i == Config.PushBatch {
+			break
+		}
+	}
+	logger.Info("推送任务", taskType, "第", batchIndex, "批用户加载结束", lastId)
+	return i, users
+}
+
+func (s *SpecialPush) GetUser() {
+
+}
+func (s *SpecialPush) AfterPush() {
+
+}

+ 4 - 0
src/jfw/modules/pushsubscribe/src/push/main.go

@@ -18,6 +18,7 @@ import (
 
 func main() {
 	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务")
+	move := flag.Int("v", 0, "1 优先迁移数据")
 	taskType := flag.Int("t", 1, "1 一天三次推送;2 九点推送")
 	flag.Parse()
 	logger.SetConsole(false)
@@ -47,6 +48,9 @@ func main() {
 		go job.Task.Refresh.Execute()
 		go job.Task.OtherPush.Execute()
 		go job.Task.OncePush.Execute()
+		if *move == 1 {
+			job.Jobs.Move.Execute()
+		}
 		if *modle == 2 {
 			job.Jobs.Push.Execute(*taskType)
 		}

+ 1 - 1
src/jfw/modules/pushsubscribe/src/push/util/rpccall.go

@@ -41,7 +41,7 @@ func SendWeixin(k *UserInfo, remark, title, pushDate string) bool {
 		Service:     util.FormatDate(&now, util.Date_Short_Layout),
 		Color:       Config.WxColor,
 		DetailColor: Config.WxDetailColor,
-		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"+"__"+pushDate),
+		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush") + "__" + pushDate,
 	}
 	ok, res := jy.WxPush(Config.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
 	if !ok && (strings.Contains(res, "[46004]") || strings.Contains(res, "[65302]") || strings.Contains(res, "[43004]") || strings.Contains(res, "[40003]")) {

+ 0 - 115
src/jfw/modules/pushsubscribe/src/push/util/util.go

@@ -2,128 +2,13 @@ package util
 
 import (
 	"encoding/json"
-	"fmt"
 	. "public"
-	. "push/config"
-	"qfw/util"
 	"sort"
-	"strings"
 	"time"
 
-	"github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
-var savePool = make(chan bool, Config.SavePoolSize)
-
-//重新设置用户类型
-func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
-	if userType == 0 {
-		if s_m_openid != "" && a_m_openid == "" && phone == "" {
-			userType = 0 //公众号
-		} else if s_m_openid == "" && phone != "" {
-			userType = 1 //app手机号
-		} else if s_m_openid == "" && a_m_openid != "" {
-			userType = 2 //app微信
-		} else if s_m_openid != "" && a_m_openid == "" && phone == "" {
-			userType = 3 //用户合并以后只有微信用户
-		} else if s_m_openid == "" && (a_m_openid != "" || phone != "") {
-			userType = 4 //用户合并以后只有app用户
-		} else if s_m_openid != "" && (a_m_openid != "" || phone != "") {
-			userType = 5 //用户合并以后公众号和app用户都有
-		} else {
-			userType = -1
-		}
-	}
-	return userType
-}
-
-//推送方式转换
-func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int) {
-	mode := util.IntAll(o_msgset["i_mode"])
-	wxpush := util.IntAll(o_msgset["i_wxpush"])
-	apppush := util.IntAll(o_msgset["i_apppush"])
-	mailpush := util.IntAll(o_msgset["i_mailpush"])
-	if wxpush == 1 || apppush == 1 || mailpush == 1 {
-		return wxpush, apppush, mailpush
-	}
-	//老的app用户
-	if userType == 1 || userType == 2 {
-		switch mode {
-		case 0, 1:
-			apppush = 1
-			break
-		case 2:
-			mailpush = 1
-			break
-		case 3:
-			apppush = 1
-			mailpush = 1
-			break
-		}
-		if apppush == 0 && mailpush == 0 {
-			apppush = 1
-		}
-	} else if userType == 0 {
-		switch mode {
-		case 0, 1:
-			wxpush = 1
-			break
-		case 2:
-			mailpush = 1
-			break
-		case 3:
-			wxpush = 1
-			mailpush = 1
-			break
-		}
-		if wxpush == 0 && mailpush == 0 {
-			wxpush = 1
-		}
-	} else {
-		switch mode {
-		case 0, 1, 3:
-			if userType == 3 {
-				wxpush = 1
-			} else if userType == 4 {
-				apppush = 1
-			} else if userType == 5 {
-				wxpush = 1
-				apppush = 1
-			}
-			if mode == 3 {
-				mailpush = 1
-			}
-			break
-		case 2:
-			mailpush = 1
-			break
-		}
-	}
-	return wxpush, apppush, mailpush
-}
-
-//保存发送信息
-func SaveSendInfo(k *UserInfo, matchInfos []*MatchInfo) string {
-	savePool <- true
-	defer func() {
-		<-savePool
-	}()
-	if Config.SaveSleep > 0 {
-		time.Sleep(time.Duration(Config.SaveSleep) * time.Millisecond)
-	}
-	unix := time.Now().Unix()
-	values := []interface{}{}
-	for _, matchInfo := range matchInfos {
-		values = append(values, k.Id, util.ObjToString((*matchInfo.Info)["_id"]), unix, strings.Join(matchInfo.Keys, " "), util.ObjToString((*matchInfo.Info)["area"]), util.ObjToString((*matchInfo.Info)["city"]), util.ObjToString((*matchInfo.Info)["buyerclass"]))
-	}
-	savecount := Mysql.InsertBatch("pushsubscribe", []string{"userid", "infoid", "date", "matchkeys", "area", "city", "buyerclass"}, values)
-	if int(savecount) != len(values) {
-		logger.Error(k.Id, "批量保存有问题", len(values), savecount)
-	}
-	return fmt.Sprint(unix)
-}
-
 func ToObjectIds(ids []string) []bson.ObjectId {
 	_ids := []bson.ObjectId{}
 	for _, v := range ids {

+ 1 - 2
src/jfw/modules/subscribepay/src/config.json

@@ -44,6 +44,5 @@
             "pwd": "ue9Rg9Sf4CVtdm5a",
             "user": "public03@topnet.net.cn"
         }
-    ],
-	"wxTplExpire":"Iky7z3veZ6hy-ISchmQSgBvRd-34KzSyuYr0_fUbesE"
+    ]
 }

+ 11 - 0
src/jfw/modules/subscribepay/src/config/config.go

@@ -3,6 +3,7 @@ package config
 import (
 	qutil "qfw/util"
 	"qfw/util/mail"
+	qrpc "qfw/util/rpc"
 )
 
 type config struct {
@@ -42,9 +43,18 @@ type timetaskConfig struct {
 	SyncVipUpgrade string
 	CheckIsExpire  string
 }
+type messageConfig struct {
+	WxTplExpire struct {
+		Id       string
+		First    *qrpc.TmplItem
+		Keyword2 *qrpc.TmplItem
+		Keyword3 *qrpc.TmplItem
+	}
+}
 
 var Config *config
 var TimetaskConfig *timetaskConfig
+var MessageConfig *messageConfig
 
 //发送邮件邮箱
 var GmailAuth []*mail.GmailAuth
@@ -60,6 +70,7 @@ func init() {
 	//程序配置文件
 	qutil.ReadConfig(&Config)
 	qutil.ReadConfig("./timetask.json", &TimetaskConfig)
+	qutil.ReadConfig("./message.json", &MessageConfig)
 	for _, v := range Config.Mail {
 		mail := &mail.GmailAuth{
 			SmtpHost: v.Addr,

+ 15 - 0
src/jfw/modules/subscribepay/src/message.json

@@ -0,0 +1,15 @@
+{
+	"wxTplExpire": {
+		"id": "3_VPNbD7fmfd8BsdjLW-a7FOP4wIhEGV7Jx-11-9c7g",
+		"first":{
+			"value":"您的VIP订阅服务%s到期,为不影响您的使用。请立即续费",
+			"color":"#FF0000"
+		},
+		"keyword2": {
+			"value":"-"
+		},
+		"keyword3": {
+			"value":"VIP订阅"
+		}
+	}
+}

+ 58 - 9
src/jfw/modules/subscribepay/src/timetask/timetask.go

@@ -1,11 +1,12 @@
 package timetask
 
 import (
-	"config"
+	. "config"
+	"fmt"
 	"log"
 	qutil "qfw/util"
-	"qfw/util/jy"
 	qrpc "qfw/util/rpc"
+	"strconv"
 	"strings"
 	"time"
 	"util"
@@ -15,6 +16,8 @@ const (
 	threeday = 259200
 )
 
+var Se = qutil.SimpleEncrypt{Key: "topnet"}
+
 func Run() {
 	go syncVipUpgrade()
 	go checkIsExpire()
@@ -23,7 +26,7 @@ func Run() {
 
 //vip升级 下个月生效 同步
 func syncVipUpgrade() {
-	crontab(true, config.TimetaskConfig.SyncVipUpgrade, func() {
+	crontab(true, TimetaskConfig.SyncVipUpgrade, func() {
 		log.Println("定时任务,开始同步vip升级数据")
 		sess := util.MQFW.GetMgoConn()
 		defer util.MQFW.DestoryMongoConn(sess)
@@ -68,7 +71,7 @@ func syncVipUpgrade() {
 
 //每天0点 检查试用、vip服务是否到期
 func checkIsExpire() {
-	crontab(true, config.TimetaskConfig.CheckIsExpire, func() {
+	crontab(true, TimetaskConfig.CheckIsExpire, func() {
 		log.Println("定时任务,开始更新vip状态")
 		now_unix := time.Now().Unix()
 		sess := util.MQFW.GetMgoConn()
@@ -111,7 +114,7 @@ func checkIsExpire() {
 
 //即将到期或者已到期发推送消息
 func expireRemind() {
-	crontab(false, config.TimetaskConfig.ExpireRemind, func() {
+	crontab(true, TimetaskConfig.ExpireRemind, func() {
 		log.Println("定时任务,开始推送消息")
 		sess := util.MQFW.GetMgoConn()
 		defer util.MQFW.DestoryMongoConn(sess)
@@ -121,6 +124,13 @@ func expireRemind() {
 				"$in": []int{1, 2},
 			},
 		}).Select(map[string]interface{}{
+			"s_m_openid":       1,
+			"s_jpushid":        1,
+			"s_opushid":        1,
+			"s_appponetype":    1,
+			"i_applystatus":    1,
+			"s_nickname":       1,
+			"i_ispush":         1,
 			"i_vip_status":     1,
 			"l_vip_endtime":    1,
 			"i_vip_expire_tip": 1,
@@ -128,12 +138,51 @@ func expireRemind() {
 		for m := make(map[string]interface{}); it.Next(&m); {
 			_id := qutil.BsonIdToSId(m["_id"])
 			i_vip_expire_tip := qutil.Int64All(m["i_vip_expire_tip"])
-			isPushOk := false
+			wxPushOk, appPushOk := false, false
 			if i_vip_expire_tip == 1 || i_vip_expire_tip == 2 {
-				jy.WxPush(config.Config.Weixinrpc, "WeiXinRpc.SendCustomTplMsg", &qrpc.NotifyMsg{})
-				log.Println("推送消息,用户", _id, m["i_vip_status"], m["l_vip_endtime"], i_vip_expire_tip)
+				tp := "will"
+				tpMsg := "即将"
+				if i_vip_expire_tip == 2 {
+					tp = "exprie"
+					tpMsg = "已"
+				}
+				l_vip_endtime := qutil.Int64All(m["l_vip_endtime"])
+				s_m_openid := qutil.ObjToString(m["s_m_openid"])
+				i_applystatus := qutil.IntAll(m["i_applystatus"])
+				isPushWx := qutil.IntAllDef(m["i_ispush"], 1)
+				s_jpushid := qutil.ObjToString(m["s_jpushid"])
+				s_opushid := qutil.ObjToString(m["s_opushid"])
+				s_appponetype := qutil.ObjToString(m["s_appponetype"])
+				log.Println("推送消息", _id, "i_vip_status", m["i_vip_status"], "l_vip_endtime", l_vip_endtime, "i_vip_expire_tip", i_vip_expire_tip, "s_m_openid", s_m_openid, "isPushWx", isPushWx, "i_applystatus", i_applystatus, "s_appponetype", s_appponetype, "s_jpushid", s_jpushid, "s_opushid", s_opushid)
+				if isPushWx == 1 && i_applystatus == 1 && s_m_openid != "" {
+					tmplData := map[string]*qrpc.TmplItem{
+						"first": &qrpc.TmplItem{
+							Value: fmt.Sprintf(MessageConfig.WxTplExpire.First.Value, tpMsg),
+							Color: MessageConfig.WxTplExpire.First.Color,
+						},
+						"keyword1": &qrpc.TmplItem{
+							Value: qutil.ObjToString(m["s_nickname"]),
+						},
+						"keyword2": &qrpc.TmplItem{
+							Value: MessageConfig.WxTplExpire.Keyword2.Value,
+						},
+						"keyword3": &qrpc.TmplItem{
+							Value: MessageConfig.WxTplExpire.Keyword3.Value,
+						},
+						"keyword4": &qrpc.TmplItem{
+							Value: qutil.FormatDateByInt64(&l_vip_endtime, qutil.Date_Short_Layout),
+						},
+					}
+					wxPushOk, _ = qrpc.WxSendTmplMsg(Config.Weixinrpc, &qrpc.WxTmplMsg{
+						OpenId:   s_m_openid,
+						TplId:    MessageConfig.WxTplExpire.Id,
+						TmplData: tmplData,
+						Url:      Config.WebDomain + "/front/sess/" + Se.EncodeString(s_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",expireTip") + "__" + tp,
+					})
+					log.Println("微信推送", _id, wxPushOk)
+				}
 			}
-			if isPushOk {
+			if wxPushOk || appPushOk {
 				util.MQFW.UpdateById("user", _id, map[string]interface{}{
 					"$set": map[string]interface{}{
 						"i_vip_expire_tip": 0,

+ 28 - 8
src/jfw/modules/weixin/src/jrpc/jrpc.go

@@ -47,7 +47,7 @@ func (w *WeiXinRpc) SubscribePush(param *qrpc.NotifyMsg, ret *qrpc.RpcResult) er
 		log.Println("SubscribePush error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SubscribePush success!", param.Openid)
+		log.Println("SubscribePush success", param.Openid)
 	}
 	return nil
 }
@@ -65,7 +65,7 @@ func (w *WeiXinRpc) SendPushMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResult) erro
 		log.Println("SendPushMsg error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SendPushMsg success!", param.Openid)
+		log.Println("SendPushMsg success", param.Openid)
 	}
 	return nil
 }
@@ -84,7 +84,7 @@ func (w *WeiXinRpc) SendBidOpenMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResult) e
 		log.Println("SendBidOpenMsg error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SendBidOpenMsg success!", param.Openid)
+		log.Println("SendBidOpenMsg success", param.Openid)
 	}
 	return nil
 }
@@ -103,7 +103,7 @@ func (w *WeiXinRpc) SendLOGApplyMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResult)
 		log.Println("SendLOGApplyMsg error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SendLOGApplyMsg success!", param.Openid)
+		log.Println("SendLOGApplyMsg success", param.Openid)
 	}
 	return nil
 }
@@ -124,7 +124,7 @@ func (w *WeiXinRpc) SendACTIVEApplyMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResul
 		log.Println("SendACTIVEApplyMsg error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SendACTIVEApplyMsg success!", param.Openid)
+		log.Println("SendACTIVEApplyMsg success", param.Openid)
 	}
 	return nil
 }
@@ -272,7 +272,7 @@ func (w *WeiXinRpc) SendFeedbackNotifyMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcRe
 	if err != nil {
 		log.Println(err.Error())
 	} else {
-		log.Println("send feedbackNotify success!")
+		log.Println("send feedbackNotify success")
 	}
 	return nil
 }
@@ -358,6 +358,26 @@ func (w *WeiXinRpc) GetPrepayId(param map[string]string, res *[]byte) error {
 	return nil
 }
 
+//最通用的发送模板消息
+func (w *WeiXinRpc) SendTmplMsg(param *qrpc.WxTmplMsg, ret *qrpc.RpcResult) error {
+	tmplData := make(weixin.TmplData)
+	for k, v := range param.TmplData {
+		tmplData[k] = weixin.TmplItem{
+			Value: v.Value,
+			Color: v.Color,
+		}
+	}
+	_, err := w.Wwx.PostTemplateMessage(param.OpenId, param.TplId, param.Url, tmplData)
+	if err != nil {
+		*ret = "N"
+		log.Println("SendTmplMsg error", err, param.OpenId)
+		return err
+	}
+	*ret = "Y"
+	log.Println("SendTmplMsg success", param.OpenId)
+	return nil
+}
+
 //自定义模板消息
 func (w *WeiXinRpc) SendCustomTplMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResult) error {
 	_, err := w.Wwx.PostTemplateMessage(param.Openid, param.TplId, param.Url,
@@ -372,7 +392,7 @@ func (w *WeiXinRpc) SendCustomTplMsg(param *qrpc.NotifyMsg, ret *qrpc.RpcResult)
 		log.Println("SendCustomTplMsg error", err, param.Openid)
 	} else {
 		*ret = "Y"
-		log.Println("SendCustomTplMsg success!", param.Openid)
+		log.Println("SendCustomTplMsg success", param.Openid)
 	}
 	return nil
 }
@@ -397,7 +417,7 @@ func (w *WeiXinRpc) SendCustomMsg(p *[]byte, ret *string) error {
 		log.Println("SendCustomMsg error", err, d["data"])
 	} else {
 		*ret = "Y"
-		log.Println("SendCustomMsg success!")
+		log.Println("SendCustomMsg success")
 	}
 	return nil
 }

+ 1 - 2
src/jfw/nodemgr/nodemgr.go

@@ -121,7 +121,6 @@ func MasterMgr() {
 					user, _ := public.MQFW.FindById("user", id, `{"s_m_openid":1}`)
 					if user != nil && (*user)["s_m_openid"] != nil {
 						openid, _ := (*user)["s_m_openid"].(string)
-						var res rpc.RpcResult = "N"
 						msg := rpc.NotifyMsg{
 							Openid:  openid,
 							Title:   "剑鱼节点问题",
@@ -131,7 +130,7 @@ func MasterMgr() {
 							Remark:  "",
 							Url:     webdomain + "/_set/_setMaster2019",
 						}
-						log.Println("发送模板消息", openid, public.SendBidOpenMsg(&msg, &res))
+						log.Println("发送模板消息", openid, public.SendBidOpenMsg(&msg))
 					}
 				}
 			}

+ 9 - 45
src/jfw/public/rpccall.go

@@ -18,57 +18,21 @@ func init() {
 }
 
 //发送管理员模板消息
-func SendBidOpenMsg(p *qrpc.NotifyMsg, repl *qrpc.RpcResult) (err error) {
-	util.Try(func() {
-		client, e := rpc.DialHTTP("tcp", rpcserver)
-		defer client.Close()
-		if e != nil {
-			err = e
-			log.Println(p.Openid + "---" + err.Error())
-			return
-		}
-		err = client.Call("WeiXinRpc.SendBidOpenMsg", p, &repl)
-		if err != nil {
-			log.Println(p.Openid + "---" + err.Error())
-		}
-	}, func(e interface{}) {})
-	return
+func SendBidOpenMsg(p *qrpc.NotifyMsg) bool {
+	ok, _ := qrpc.WxPush(rpcserver, "WeiXinRpc.SendBidOpenMsg", p)
+	return ok
 }
 
 //发送保函申请消息
-func SendLOGApplyMsg(p *qrpc.NotifyMsg, repl *qrpc.RpcResult) (err error) {
-	util.Try(func() {
-		client, e := rpc.DialHTTP("tcp", rpcserver)
-		defer client.Close()
-		if e != nil {
-			err = e
-			log.Println(p.Openid + "---" + err.Error())
-			return
-		}
-		err = client.Call("WeiXinRpc.SendLOGApplyMsg", p, &repl)
-		if err != nil {
-			log.Println(p.Openid + "---" + err.Error())
-		}
-	}, func(e interface{}) {})
-	return
+func SendLOGApplyMsg(p *qrpc.NotifyMsg) bool {
+	ok, _ := qrpc.WxPush(rpcserver, "WeiXinRpc.SendLOGApplyMsg", p)
+	return ok
 }
 
 //发送年终活动消息
-func SendACTIVEApplyMsg(p *qrpc.NotifyMsg, repl *qrpc.RpcResult) (err error) {
-	util.Try(func() {
-		client, e := rpc.DialHTTP("tcp", rpcserver)
-		defer client.Close()
-		if e != nil {
-			err = e
-			log.Println(p.Openid + "---" + err.Error())
-			return
-		}
-		err = client.Call("WeiXinRpc.SendACTIVEApplyMsg", p, &repl)
-		if err != nil {
-			log.Println(p.Openid + "---" + err.Error())
-		}
-	}, func(e interface{}) {})
-	return
+func SendACTIVEApplyMsg(p *qrpc.NotifyMsg) bool {
+	ok, _ := qrpc.WxPush(rpcserver, "WeiXinRpc.SendACTIVEApplyMsg", p)
+	return ok
 }
 
 //项目更新推送

+ 8 - 30
src/jfw/timetask/followtimetask.go

@@ -7,7 +7,6 @@ import (
 	public "jfw/public"
 	"log"
 	"qfw/util"
-	"qfw/util/jy"
 	"qfw/util/redis"
 	rpc "qfw/util/rpc"
 	"strconv"
@@ -57,15 +56,13 @@ func tip() {
 	if !ok || datas == nil || len(*datas) == 0 {
 		return
 	}
-	var ids []bson.ObjectId
-	var tips []bson.M
 	for _, v := range *datas {
 		_id := (v["_id"].(bson.ObjectId)).Hex()
 		userId, _ := v["s_userid"].(string)
 		l_bidopentime, _ := v["l_bidopentime"].(int64)
 		l_updatetime, _ := v["l_updatetime"].(int64)
 		projectname, _ := v["s_projectname"].(string)
-		userdata, ok := mongodb.FindById("user", userId, `{"s_m_openid":1,"a_m_openid":1,"s_phone":1,"s_nickname":1,"s_jpushid":1,"s_opushid":1,"s_appponetype":1,"i_type":1,"i_applystatus":1,"i_ispush":1}`)
+		userdata, ok := mongodb.FindById("user", userId, `{"s_m_openid":1,"s_nickname":1,"s_jpushid":1,"s_opushid":1,"s_appponetype":1,"i_type":1,"i_applystatus":1,"i_ispush":1}`)
 		if !ok || userdata == nil {
 			continue
 		}
@@ -107,18 +104,15 @@ func tip() {
 		title = fmt.Sprintf(title, projectname, bidopen)
 		//极光推送
 		s_m_openid, _ := (*userdata)["s_m_openid"].(string)
-		a_m_openid, _ := (*userdata)["a_m_openid"].(string)
-		s_phone, _ := (*userdata)["s_phone"].(string)
 		applystatus := util.IntAll((*userdata)["i_applystatus"])
 		isPushWx := util.IntAllDef((*userdata)["i_ispush"], 1)
 		jpushid, _ := (*userdata)["s_jpushid"].(string)
 		opushid, _ := (*userdata)["s_opushid"].(string)
-		var s_err string
-		var result rpc.RpcResult = "N" //发送模板消息返回结果
+		phoneType, _ := (*userdata)["s_appponetype"].(string)
+		log.Println("开标提醒", userId, applystatus, isPushWx, s_m_openid, phoneType, jpushid, opushid)
 		if isPushWx == 1 && applystatus == 1 && s_m_openid != "" {
-			log.Println("开标提醒-微信提醒", userId, s_m_openid)
 			//发送模板消息
-			err := public.SendBidOpenMsg(&rpc.NotifyMsg{
+			isPushOk := public.SendBidOpenMsg(&rpc.NotifyMsg{
 				Openid:  s_m_openid,
 				Title:   title,
 				Detail:  projectname,           //项目名称
@@ -126,14 +120,11 @@ func tip() {
 				Date:    utdate + " " + uttime, //用户设置提醒的时间
 				Remark:  followTipMsg["remark"].(string),
 				Url:     config.Sysconfig["webdomain"].(string) + "/front/sess/" + se.EncodeString(s_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",followset") + "__list__" + util.EncodeArticleId2ByCheck(_id),
-			}, &result)
-			if err != nil {
-				s_err = err.Error()
-			}
+			})
+			log.Println("开标提醒-微信提醒", userId, isPushOk)
 		}
 		if jpushid != "" || opushid != "" {
-			phoneType, _ := (*userdata)["s_appponetype"].(string)
-			isPushOk := jy.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
+			isPushOk := rpc.AppPush(config.Sysconfig["appPushServiceRpc"].(string), map[string]interface{}{
 				"phoneType":   phoneType,
 				"otherPushId": opushid,
 				"jgPushId":    jpushid,
@@ -142,20 +133,7 @@ func tip() {
 				"userId":      userId,
 				"url":         "/jyapp/free/sess/" + se.EncodeString(userId+",_id,"+strconv.Itoa(int(time.Now().Unix()))+",followset") + "__list__" + util.EncodeArticleId2ByCheck(_id),
 			})
-			if isPushOk {
-				result = "Y"
-			}
-			log.Println("开标提醒-app推送", userId, s_m_openid, a_m_openid, s_phone, isPushOk, "-----", jpushid)
-		}
-		ids = append(ids, v["_id"].(bson.ObjectId))
-		tips = append(tips, bson.M{"l_time": time.Now().Unix(), "s_status": result, "s_error": s_err})
-	}
-	//记录提醒日志
-	if len(ids) > 0 {
-		for k, v := range ids {
-			mongodb.Update("follow_project", bson.M{"_id": v}, bson.M{
-				"$push": bson.M{"a_tip": tips[k]},
-			}, false, false)
+			log.Println("开标提醒-app推送", userId, phoneType, jpushid, opushid, isPushOk)
 		}
 	}
 }