wcj 5 éve
szülő
commit
63e09080d8

+ 2 - 1
src/config.json

@@ -62,7 +62,8 @@
         "futureIndex": "/active/future/index",
         "keysetIndex": "/wxkeyset/keyset/index?tiptext=%s",
         "followEntDetail": "/jylab/followent/detail/%s",
-        "mymenu": "/front/wxMyOrder/myMenu"
+        "mymenu": "/front/wxMyOrder/myMenu",
+		"historypush": "/swordfish/historypush?pushdate=%s"
     },
     "jy_activeset": {
         "activitystartcode": "3201000000",

+ 3 - 4
src/jfw/front/front.go

@@ -830,10 +830,9 @@ func (m *Front) Sess(ostr string) error {
 						}(strs[1:])...)
 					}
 				}
-				if str[1] == "uid" {
-					if strings.Contains(actionurl, "historypush") {
-						actionurl = actionurl + "?times=" + str[2]
-					}
+				//后续这个判断要去掉
+				if str[1] == "uid" && str[3] == "rssset" && strings.Contains(actionurl, "historypush") {
+					actionurl = actionurl + "?times=" + str[2]
 				}
 				m.Redirect(actionurl)
 			} else {

+ 3 - 3
src/jfw/modules/pushsubscribe/src/match/config/config.go

@@ -4,7 +4,7 @@ import (
 	"qfw/util"
 )
 
-type sysConfig struct {
+type config struct {
 	ElasticPoolSize      int      `json:"elasticPoolSize"`
 	ElasticSearch        string   `json:"elasticSearch"`
 	RedisServers         string   `json:"redisServers"`
@@ -28,11 +28,11 @@ type taskConfig struct {
 }
 
 var (
-	SysConfig  *sysConfig
+	Config     *config
 	TaskConfig *taskConfig
 )
 
 func init() {
-	util.ReadConfig("./config.json", &SysConfig)
+	util.ReadConfig("./config.json", &Config)
 	util.ReadConfig("./task.json", &TaskConfig)
 }

+ 2 - 2
src/jfw/modules/pushsubscribe/src/match/job/job.go

@@ -1,7 +1,7 @@
 package job
 
 import (
-	"match/config"
+	. "match/config"
 	"sync"
 )
 
@@ -16,7 +16,7 @@ type jobs struct {
 var Jobs = &jobs{
 	Match: &MatchJob{
 		datas:             &[]map[string]interface{}{},
-		matchPool:         make(chan bool, config.SysConfig.MatchPoolSize),
+		matchPool:         make(chan bool, Config.MatchPoolSize),
 		eachInfoWaitGroup: &sync.WaitGroup{},
 		saveWaitGroup:     &sync.WaitGroup{},
 		userMapLock:       &sync.Mutex{},

+ 14 - 11
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -24,7 +24,7 @@ import (
 
 var (
 	SaveFields = []string{"_id", "area", "city", "buyerclass", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type"}
-	MailReg    = regexp.MustCompile(SysConfig.MailReg)
+	MailReg    = regexp.MustCompile(Config.MailReg)
 )
 
 const (
@@ -115,7 +115,7 @@ func (m *MatchJob) Execute() {
 		}
 		m.ToMatch(user_batch_index, vipUser)
 		m.ToMatch(user_batch_index, freeUser)
-		if user_batch_size < SysConfig.UserBatch {
+		if user_batch_size < Config.UserBatch {
 			break
 		}
 	}
@@ -155,7 +155,7 @@ func (m *MatchJob) ToMatch(batchIndex int, matcher Matcher) {
 				Info: &info,
 				Keys: v2.Keys,
 			})
-			if len(array) == SysConfig.MaxPushSize {
+			if len(array) == Config.MaxPushSize {
 				break
 			}
 		}
@@ -227,9 +227,9 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) bool {
 	if count == 0 {
 		return false
 	}
-	if count > SysConfig.MaxSearch {
-		count = SysConfig.MaxSearch
-		logger.Info("目前数据多于", SysConfig.MaxSearch, ",只加载了", SysConfig.MaxSearch, "条!")
+	if count > Config.MaxSearch {
+		count = Config.MaxSearch
+		logger.Info("目前数据多于", Config.MaxSearch, ",只加载了", Config.MaxSearch, "条!")
 	}
 	var res []map[string]interface{}
 	sess := mongodb.GetMgoConn()
@@ -252,6 +252,9 @@ func (m *MatchJob) LoadBidding(lastId, newId string, lastTime int64) bool {
 		index++
 		_id := util.BsonIdToSId(tmp["_id"])
 		tmp["_id"] = _id
+		if util.ObjToString(tmp["area"]) == "A" {
+			tmp["area"] = "全国"
+		}
 		res = append(res, tmp)
 		//信息缓存3天
 		info := map[string]interface{}{}
@@ -283,8 +286,8 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 		"i_appid": 2,
 	}
 	_idq := map[string]interface{}{}
-	if len(SysConfig.TestIds) > 0 {
-		_idq["$in"] = mutil.ToObjectIds(SysConfig.TestIds)
+	if len(Config.TestIds) > 0 {
+		_idq["$in"] = mutil.ToObjectIds(Config.TestIds)
 	}
 	if m.lastUserId != "" {
 		_idq["$gt"] = bson.ObjectIdHex(m.lastUserId)
@@ -374,7 +377,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 		var allKeySet []*KeySet
 		var err error
 		if isVipUser {
-			if dayCount := redis.GetInt("pushcache_2_a", DayCountKey(nowymd, userId)); dayCount >= SysConfig.VipOneDayMaxPushSize {
+			if dayCount := redis.GetInt("pushcache_2_a", DayCountKey(nowymd, userId)); dayCount >= Config.VipOneDayMaxPushSize {
 				logger.Info("vip用户达到一天最大推送数量", userId, dayCount)
 				continue
 			}
@@ -425,7 +428,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 			//continue
 			//}
 			isFilter := false
-			for _, fv := range SysConfig.FilterWords {
+			for _, fv := range Config.FilterWords {
 				if fv == key {
 					isFilter = true
 					break
@@ -569,7 +572,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 		m.lastUserId = user.Id
 		temp = make(map[string]interface{})
 		n++
-		if n == SysConfig.UserBatch {
+		if n == Config.UserBatch {
 			break
 		}
 	}

+ 1 - 1
src/jfw/modules/pushsubscribe/src/match/job/timetask.go

@@ -22,7 +22,7 @@ type MatchTimeTask struct {
 func (m *MatchTimeTask) Execute() {
 	Jobs.Match.Execute()
 	util.WriteSysConfig("./task.json", &TaskConfig)
-	t := time.Duration(SysConfig.MatchDuration) * time.Minute
+	t := time.Duration(Config.MatchDuration) * time.Minute
 	logger.Info("start match job after", t)
 	time.AfterFunc(t, m.Execute)
 }

+ 3 - 3
src/jfw/modules/pushsubscribe/src/match/main.go

@@ -18,9 +18,9 @@ func main() {
 	flag.Parse()
 	logger.SetConsole(false)
 	logger.SetRollingDaily("./logs", "match.log")
-	mongodb.InitMongodbPool(SysConfig.MgoSize, SysConfig.MgoAddr, "qfw")
-	redis.InitRedis(SysConfig.RedisServers)
-	elastic.InitElasticSize(SysConfig.ElasticSearch, SysConfig.ElasticPoolSize)
+	mongodb.InitMongodbPool(Config.MgoSize, Config.MgoAddr, "qfw")
+	redis.InitRedis(Config.RedisServers)
+	elastic.InitElasticSize(Config.ElasticSearch, Config.ElasticPoolSize)
 	log.Println("订阅推送-匹配数据程序启动。。。")
 	if *modle == 1 {
 		job.Jobs.Match.Execute()

+ 1 - 1
src/jfw/modules/pushsubscribe/src/match/util/rpccall.go

@@ -12,7 +12,7 @@ import (
 func PcHelperIsOnLine(phone string) bool {
 	defer util.Catch()
 	var repl string
-	client, err := rpc.DialHTTP("tcp", SysConfig.PcHelper)
+	client, err := rpc.DialHTTP("tcp", Config.PcHelper)
 	if err != nil {
 		logger.Error(err.Error())
 		return false

+ 15 - 14
src/jfw/modules/pushsubscribe/src/push/config.json

@@ -1,24 +1,23 @@
 {
 	"rpcPort":"1122",
 	"jianyuDomain": "https://web-jydev-wcj.jianyu360.cn",
-	"cassandra": {
-		"cachesize": 10000,
-		"host": ["192.168.3.207"],
-		"open": true,
-		"size": 5,
-		"timeout": 20
-	},
+	"mysql": {
+        "dbName": "jianyu",
+        "address": "192.168.3.11:3366",
+        "userName": "root",
+        "passWord": "Topnet123"
+    },
 	"redisServers": "pushcache_2_a=192.168.3.128:5001,pushcache_2_b=192.168.3.128:5002",
 	"mail_content": "<tr><td><num>%d</num></td><td><div class='tit'><a style='color: #000;text-decoration: none;' href='%s?mail' >%s</a></div></td><td style='float: right;' class='infos' ><span class='%s'>%s</span><span class='%s'>%s</span><span class='%s'>%s</span><span class='time'>%s</span></td></tr>",
 	"mail_html": "<body><style> *,body,html{margin:0;padding:0;font-family: tahoma, arial, 'Hiragino Sans GB', 'Microsoft YaHei', 宋体, sans-serif;font-size:16px; }#all{margin:0 auto;width:1024px;overflow:hidden;}.head{margin:5x;margin-top:20px;}.des{padding-bottom:15px;border-bottom:1px solid #e8ecee;color: #686868;}td a:hover {color: #fe7379;text-decoration: underline;} .tit{width:560px;overflow: hidden;    white-space: nowrap;text-overflow: ellipsis;}.area {background-color: #2cb7ca;border-radius: 3px;color: #fff;padding: 1px 2px;}.type {background-color: #ffba00;border-radius: 3px;color: #fff;padding: 1px 2px;margin-left:5px;}.industry {background-color: #25c78c;border-radius: 3px;color: #fff;padding: 1px 2px;margin-left:5px;}.infos span{display:inline-block;margin-left:5px;}td{padding-top:8px;padding-bottom:8px;height:20px;line-height:20px;}num{padding:0 5px 0 0; font-size:16px;color:#2cb7ca;font-weight:bolder;}.keys{color:blue;} </style><div id='all'><div class='head'><IMG width='100px' src=http://www.zhaobiao.info/images/swordfish/sf_01.png /></div><div class='head des'>根据您设置的关键词 :<span class='keys'>%s</span>,剑鱼标讯为您推送30天之内的信息。点击标题可查看详情信息</div><table cellpadding='0' cellspacing='0'>%s</table></div> </body>",
-	"mail_title": "您有新的%s信息-剑鱼标讯",
+	"mail_title": "您有新的招标信息-剑鱼标讯",
 	"mails": [
 		{
 			"addr": "smtp.exmail.qq.com",
 			"port": 465,
 			"pwd": "ue9Rg9Sf4CVtdm5a",
 			"user": "public03@topnet.net.cn",
-			"mailPollSize": 5,
+			"mailPoolSize": 5,
 			"mailReTry":1
 		},
 		{
@@ -26,7 +25,7 @@
 			"port": 465,
 			"pwd": "Mu^$i21673",
 			"user": "public04@topnet.net.cn",
-			"mailPollSize": 4,
+			"mailPoolSize": 4,
 			"mailReTry":1
 		}
 	],
@@ -43,27 +42,29 @@
 	"appPushServiceRpc":"127.0.0.1:5566",
 	"pcHelper":"192.168.20.129:8082",
 	"oncePushTime": "9:00",
+	"refreshTime": "7:00",
 	"otherPushTimes":[
 		"07:30",
 		"14:00",
 		"18:00"
 	],
-	"cassandraPollSize":10,
+	"savePoolSize":10,
 	"pushPoolSize": 60,
 	"mergePoolSize": 40,
 	"movePoolSize": 40,
 	"minutePushSize": 300,
 	"fastigiumMinutePushSize": 100,
 	"fastigiumTime":"9-11",
-	"wxPollSize": 40,
-	"appPollSize": 50,
+	"wxPoolSize": 40,
+	"appPoolSize": 50,
 	"mailSleep":200,
-	"cassandraSleep":200,
+	"saveSleep":200,
 	"appSleep":5,
 	"wxSleep":5,
 	"pcHelperSleep":5,
 	"isPushMail":true,
 	"pushBatch":2,
 	"moveBatch":1000,
+	"moveDuration": 30,
 	"timeoutWarn":"http://10.171.112.160:19281/_send/_mail?program=testgo2&to=wangchuanjin@topnet.net.cn,wangkaiyue@topnet.net.cn&title=剑鱼平台报警_pushsubscribe_push&body=订阅推送放入通道超时"
 }

+ 21 - 13
src/jfw/modules/pushsubscribe/src/push/config/config.go

@@ -7,7 +7,7 @@ import (
 	"strings"
 )
 
-type sysConfig struct {
+type config struct {
 	JianyuDomain            string      `json:"jianyuDomain"`
 	Cassandra               *cassandra  `json:"cassandra"`
 	RedisServers            string      `json:"redisServers"`
@@ -32,30 +32,38 @@ type sysConfig struct {
 	PushBatch               int         `json:"pushBatch"`
 	OncePushTime            string      `json:"oncePushTime"`
 	OtherPushTimes          []string    `json:"otherPushTimes"`
-	WxPollSize              int         `json:"wxPollSize"`
+	RefreshTime             string      `json:"refreshTime"`
+	WxPoolSize              int         `json:"wxPoolSize"`
 	VipOneDayMaxPushSize    int         `json:"vipOneDayMaxPushSize"`
-	AppPollSize             int         `json:"appPollSize"`
+	AppPoolSize             int         `json:"appPoolSize"`
 	MailSleep               int         `json:"mailSleep"`
-	CassandraSleep          int         `json:"cassandraSleep"`
+	SaveSleep               int         `json:"saveSleep"`
 	AppSleep                int         `json:"appSleep"`
 	WxSleep                 int         `json:"wxSleep"`
 	PcHelperSleep           int         `json:"pcHelperSleep"`
 	IsPushMail              bool        `json:"isPushMail"`
-	CassandraPollSize       int         `json:"cassandraPollSize"`
+	SavePoolSize            int         `json:"savePoolSize"`
 	MinutePushSize          int         `json:"minutePushSize"`
 	FastigiumMinutePushSize int         `json:"fastigiumMinutePushSize"`
 	FastigiumTime           string      `json:"fastigiumTime"`
 	NinePushRedisTimeout    int         `json:"ninePushRedisTimeout"`
 	RpcPort                 string      `json:"rpcPort"`
 	MoveBatch               int         `json:"moveBatch"`
+	MoveDuration            int         `json:"moveDuration"`
 	TimeoutWarn             string      `json:"timeoutWarn"`
+	Mysql                   struct {
+		DbName   string
+		Address  string
+		UserName string
+		PassWord string
+	} `json:"mysql"`
 }
 type pushMail struct {
 	Addr         string `json:"addr"`
 	Port         int    `json:"port"`
 	Pwd          string `json:"pwd"`
 	User         string `json:"user"`
-	MailPollSize int    `json:"mailPollSize"`
+	MailPoolSize int    `json:"mailPoolSize"`
 	MailReTry    int    `json:"mailReTry"`
 }
 type cassandra struct {
@@ -70,28 +78,28 @@ var (
 	Gmails         []*mail.GmailAuth
 	Se             = util.SimpleEncrypt{Key: "topnet"}
 	Re             = regexp.MustCompile("<[^>]+>([^<]+)?<[^>]+>")
-	SysConfig      *sysConfig
+	Config         *config
 	WxGroupLen     int
 	FastigiumStart int
 	FastigiumEnd   int
 )
 
 func init() {
-	util.ReadConfig("./config.json", &SysConfig)
+	util.ReadConfig("./config.json", &Config)
 	//
-	if fastigiumTimes := strings.Split(SysConfig.FastigiumTime, "-"); len(fastigiumTimes) == 2 {
+	if fastigiumTimes := strings.Split(Config.FastigiumTime, "-"); len(fastigiumTimes) == 2 {
 		FastigiumStart = util.IntAll(fastigiumTimes[0])
 		FastigiumEnd = util.IntAll(fastigiumTimes[1])
 	}
-	WxGroupLen = len([]rune(SysConfig.WxGroup))
-	Gmails = make([]*mail.GmailAuth, len(SysConfig.Mails))
-	for k, v := range SysConfig.Mails {
+	WxGroupLen = len([]rune(Config.WxGroup))
+	Gmails = make([]*mail.GmailAuth, len(Config.Mails))
+	for k, v := range Config.Mails {
 		Gmails[k] = &mail.GmailAuth{
 			SmtpHost: v.Addr,
 			SmtpPort: v.Port,
 			User:     v.User,
 			Pwd:      v.Pwd,
-			PoolSize: v.MailPollSize,
+			PoolSize: v.MailPoolSize,
 			ReTry:    v.MailReTry,
 		}
 	}

+ 0 - 349
src/jfw/modules/pushsubscribe/src/push/job/dopush.go

@@ -1,349 +0,0 @@
-package job
-
-import (
-	"fmt"
-	"os"
-	. "public"
-	. "push/config"
-	putil "push/util"
-	"qfw/util"
-	"qfw/util/mail"
-	"qfw/util/mongodb"
-	"qfw/util/redis"
-	"strconv"
-	"strings"
-	"time"
-
-	"github.com/donnie4w/go-logger/logger"
-	"gopkg.in/mgo.v2/bson"
-)
-
-func init() {
-	if SysConfig.MinutePushSize > 0 {
-		putil.LimitMaxOneMinutePush(&DoPush.minutePushPool, SysConfig.MinutePushSize)
-	}
-	if SysConfig.FastigiumMinutePushSize > 0 {
-		putil.LimitMaxOneMinutePush(&DoPush.fastigiumMinutePushPool, SysConfig.FastigiumMinutePushSize)
-	}
-}
-
-type doPush struct {
-	minutePushPool          chan bool
-	fastigiumMinutePushPool chan bool
-}
-
-func (d *doPush) Execute(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list, tempList SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
-	if wxPush == 1 || appPush == 1 || mailPush == 1 || u.PchelperPush == 1 {
-		if list != nil && tempList != nil {
-			isSaveSuccess, _, appStatus, _ = d.Do(taskType, true, 0, appPush, 0, u, &list)
-			if isSaveSuccess {
-				_, wxStatus, _, mailStatus = d.Do(taskType, false, wxPush, 0, mailPush, u, &tempList)
-			}
-		} else if list != nil {
-			isSaveSuccess, wxStatus, appStatus, mailStatus = d.Do(taskType, true, wxPush, appPush, mailPush, u, &list)
-		} else if tempList != nil {
-			_, wxStatus, appStatus, mailStatus = d.Do(taskType, false, wxPush, 0, mailPush, u, &tempList)
-			isSaveSuccess = true
-		}
-	}
-	return isSaveSuccess, wxStatus, appStatus, mailStatus
-}
-func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
-	defer util.Catch()
-	mailContent := ""
-	i := 0
-	jpushtitle := ""
-	lastInfoDate := int64(0)
-	TitleArray := []string{}
-	infos := []*MatchInfo{}
-	publishTitle := map[string]bool{}
-	dateymd := util.NowFormat(util.Date_yyyyMMdd)
-	dayCountKey := DayCountKey(dateymd, k.Id)
-	onceCountKey := DayCountKey(dateymd, k.Id)
-	dayCount := redis.GetInt("pushcache_2_a", dayCountKey)
-	isVipUser := IsVipUser(k.VipStatus)
-	for _, ks := range *sl {
-		k2 := *ks.Info
-		title := strings.Replace(k2["title"].(string), "\n", "", -1)
-		title = Re.ReplaceAllString(title, "$1")
-		area := util.ObjToString(k2["area"])
-		if area == "A" {
-			area = "全国"
-		}
-		newTitle := fmt.Sprintf("[%s]%s", area, title)
-		if publishTitle[newTitle] {
-			continue
-		}
-		publishTitle[newTitle] = true
-		i++
-		infos = append(infos, ks)
-		TitleArray = append(TitleArray, newTitle)
-		if i == 1 {
-			jpushtitle = title
-			lastInfoDate = util.Int64All(k2["publishtime"])
-		}
-		//增加行业的处理
-		industry := ""
-		industryclass := "industry"
-		if k2["s_subscopeclass"] != nil {
-			k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
-			if len(k2sub) > 0 {
-				industry = k2sub[0]
-				if industry != "" {
-					ss := strings.Split(industry, "_")
-					if len(ss) > 1 {
-						industry = ss[0]
-					}
-				}
-			}
-		}
-		if mailPush == 1 { //关于邮件的处理
-			mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
-			url := fmt.Sprintf("%s/article/mailprivate/%s.html", SysConfig.JianyuDomain, mailSid)
-			classArea := "area"
-			classType := "type"
-			infotype := util.ObjToString(k2["subtype"])
-			if infotype == "" {
-				infotype = util.ObjToString(k2["toptype"])
-			}
-			if infotype == "" {
-				infotype = util.ObjToString(k2["type"])
-				if infotype == "tender" {
-					infotype = "招标"
-				} else if infotype == "bid" {
-					infotype = "中标"
-				}
-			}
-			dates := util.LongToDate(k2["publishtime"], false)
-			//标题替换
-			otitle := title
-			for _, kw := range k.Keys {
-				kws := strings.Split(kw, "+")
-				n := 0
-				otitle2 := otitle
-				for _, kwn := range kws {
-					ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
-					if ot != otitle {
-						n++
-						otitle2 = ot
-					} else {
-						break
-					}
-				}
-				if n == len(kws) {
-					otitle = otitle2
-					break
-				}
-			}
-			if industry == "" {
-				industryclass = ""
-			}
-			mailContent += fmt.Sprintf(SysConfig.Mail_content, i, url, otitle, classArea, area, classType, infotype, industryclass, industry, dates)
-		}
-		if isVipUser {
-			if dayCount >= SysConfig.VipOneDayMaxPushSize {
-				break
-			}
-		} else {
-			//限制最大信息条数
-			if i >= SysConfig.MaxPushSize {
-				break
-			}
-		}
-		dayCount++
-	}
-	if i == 0 {
-		logger.Info("推送任务", taskType, "没有要推送的数据!", k.Id)
-		return
-	}
-	redis.Put("pushcache_2_a", dayCountKey, dayCount, 86400)
-	redis.Put("pushcache_2_a", onceCountKey, i, 86400)
-	//限制一分钟最大的推送数量
-	if d.fastigiumMinutePushPool != nil {
-		if hour := time.Now().Hour(); hour >= FastigiumStart && hour <= FastigiumEnd {
-			<-d.fastigiumMinutePushPool //高峰期
-		}
-	} else if d.minutePushPool != nil {
-		<-d.minutePushPool //正常期
-	}
-	if taskType != 0 && isSave {
-		//推送记录id
-		pushId := putil.SaveSendInfo(k, infos)
-		if pushId == "" {
-			logger.Info("推送任务", taskType, "保存到cassandra出错", k.Id)
-			return
-		}
-		logger.Info("推送任务", taskType, "成功保存到cassandra", pushId, k.Id)
-		isSaveSuccess = true
-	}
-	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
-	if isSaveSuccess {
-		//pc端助手推送
-		if k.S_m_openid != "" {
-			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "s_m_openid", k.S_m_openid)
-			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.S_m_openid})
-			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "s_m_openid", k.S_m_openid)
-		}
-		if k.Phone != "" {
-			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "phone", k.Phone)
-			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.Phone})
-			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "phone", k.Phone)
-		}
-	}
-	if wxPush == 1 {
-		logger.Info("推送任务", taskType, "开始微信推送", k.ApplyStatus, k.Id)
-		isPushOk := true
-		if k.ApplyStatus == 1 {
-			TmpTip := ""
-			minute := time.Now().Unix() - lastInfoDate
-			if minute > -1 && minute < 61 {
-				TmpTip = fmt.Sprintf("%d秒前发布的", minute)
-			} else {
-				minute = minute / 60
-				if minute < 121 {
-					if minute < 1 {
-						minute = 1
-					}
-					TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
-				}
-			}
-			Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
-			LastTip := ""
-			if i > 1 {
-				LastTip = fmt.Sprintf("...(共%d条)", i)
-			}
-			LastTipLen := len([]rune(LastTip))
-			wxTitleKeys := strings.Join(k.Keys, ";")
-			if len([]rune(wxTitleKeys)) > 8 {
-				wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
-			}
-			wxtitle := fmt.Sprintf(SysConfig.WxTitle, wxTitleKeys)
-			TitleLen := len([]rune(wxtitle))
-			reLen := 200 - TitleLen - 10 - WxGroupLen - len([]rune(Tip1))
-			WXTitle := ""
-			bshow := false
-			for n := 1; n < len(TitleArray)+1; n++ {
-				curTitle := TitleArray[n-1]
-				tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
-				ch := reLen - len([]rune(tmptitle))
-				if ch < LastTipLen { //加上后大于后辍,则没有完全显示
-					if ch == 0 && n == len(TitleArray) {
-						WXTitle = tmptitle
-						bshow = true
-					} else {
-						ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
-						if ch_1 > 8 {
-							curLen := len([]rune(curTitle))
-							if ch_1 > curLen {
-								ch_1 = curLen
-							}
-							WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
-						}
-					}
-				} else if ch == LastTipLen {
-					WXTitle = tmptitle
-					if n == len(TitleArray) {
-						bshow = true
-					}
-				} else {
-					WXTitle = tmptitle
-					if n == len(TitleArray) {
-						bshow = true
-					}
-				}
-			}
-			if bshow {
-				LastTip = ""
-			}
-			//推送微信
-			isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, wxtitle)
-			if isPushOk {
-				wxStatus = 1
-			} else {
-				wxStatus = -1
-			}
-		}
-		logger.Info("推送任务", taskType, "微信推送结束", k.ApplyStatus, isPushOk, k.Id)
-	}
-	if appPush == 1 {
-		logger.Info("推送任务", taskType, "开始app推送", k.Id)
-		descriptAppend := ""
-		if i > 1 {
-			descriptAppend = fmt.Sprintf("\n...(共%d条)", i)
-			jpushtitle = fmt.Sprintf("1. %s", jpushtitle)
-		}
-		go mongodb.Update("user", map[string]interface{}{
-			"_id": bson.ObjectIdHex(k.Id),
-		}, map[string]interface{}{
-			"$inc": map[string]interface{}{
-				"i_apppushunread": 1,
-			},
-		}, false, false)
-		isPushOk := putil.SendApp(map[string]interface{}{
-			"phoneType":      k.AppPhoneType,
-			"descript":       jpushtitle,
-			"descriptAppend": descriptAppend,
-			"type":           "bid",
-			"userId":         k.Id,
-			"url":            "/jyapp/free/sess/" + Se.EncodeString(k.Id+",_id,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"),
-			//"url":       "/jyapp/free/sess/" + Se.EncodeString(sess_openid+",_id,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"),
-			//"url":         "/jyapp/free/sess/" + push.Se.EncodeString(k.Openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushlist") + "__" + pushid,
-			"otherPushId": k.Opushid,
-			"jgPushId":    k.Jpushid, //极光-推送id
-		})
-		if isPushOk {
-			appStatus = 1
-		} else {
-			appStatus = -1
-		}
-		logger.Info("推送任务", taskType, "app推送结束", isPushOk, k.Id)
-	}
-	//发送邮件
-	if mailPush == 1 {
-		logger.Info("推送任务", taskType, "开始邮箱推送", k.Id)
-		html := fmt.Sprintf(SysConfig.Mail_html, strings.Replace(strings.Join(k.Keys, ";"), "+", " ", -1), mailContent)
-		subject := fmt.Sprintf(SysConfig.Mail_title, "招标")
-		isPushOk := d.SendMail(k.Email, subject, html, nil)
-		if isPushOk {
-			mailStatus = 1
-		} else {
-			mailStatus = -1
-		}
-		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, k.Id)
-	}
-	return
-}
-
-//推送邮件(含附件)
-func (d *doPush) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
-	if !SysConfig.IsPushMail || len(Gmails) == 0 {
-		return true
-	}
-	if SysConfig.MailSleep > 0 {
-		time.Sleep(time.Duration(SysConfig.MailSleep) * time.Millisecond)
-	}
-	defer util.Catch()
-	//生成附件
-	var fnamepath, rename string
-	// if len(fmdatas) > 0 { //开启导出
-	// 	fnamepath, rename = putil.GetBidInfoXlsx(fmdatas)
-	// }
-	status := false
-	index := len(email) % 2
-	if index >= len(Gmails) {
-		index = 0
-	}
-	gmail := Gmails[index]
-	for i := 0; i < len(Gmails); i++ {
-		status = mail.GSendMail("剑鱼标讯", email, "", "", subject, html, fnamepath, rename, gmail)
-		if status {
-			break
-		} else {
-			gmail = Gmails[index^1]
-		}
-	}
-	if fnamepath != "" {
-		os.Remove(fnamepath)
-	}
-	return status
-}

+ 14 - 11
src/jfw/modules/pushsubscribe/src/push/job/job.go

@@ -10,25 +10,28 @@ const (
 	DbName   = "qfw"
 )
 
-var DoPush = &doPush{}
-
 var Jobs = struct {
-	Push   *pushJob
-	Repair *repairJob
+	Move    *moveJob
+	Refresh *refreshJob
+	Push    *pushJob
+	Repair  *repairJob
 }{
-	Push: &pushJob{
-		pool:      make(chan bool, config.SysConfig.PushPoolSize),
-		wait:      &sync.WaitGroup{},
-		lock:      &sync.Mutex{},
+	Move: &moveJob{
 		moveLock:  &sync.Mutex{},
 		moveWait:  &sync.WaitGroup{},
-		movePool:  make(chan bool, config.SysConfig.MovePoolSize),
+		movePool:  make(chan bool, config.Config.MovePoolSize),
 		mergeLock: &sync.Mutex{},
 		mergeWait: &sync.WaitGroup{},
-		mergePool: make(chan bool, config.SysConfig.MergePoolSize),
+		mergePool: make(chan bool, config.Config.MergePoolSize),
+	},
+	Refresh: &refreshJob{},
+	Push: &pushJob{
+		pool: make(chan bool, config.Config.PushPoolSize),
+		wait: &sync.WaitGroup{},
+		lock: &sync.Mutex{},
 	},
 	Repair: &repairJob{
-		pool: make(chan bool, config.SysConfig.PushPoolSize),
+		pool: make(chan bool, config.Config.PushPoolSize),
 		wait: &sync.WaitGroup{},
 		lock: &sync.Mutex{},
 	},

+ 273 - 0
src/jfw/modules/pushsubscribe/src/push/job/movejob.go

@@ -0,0 +1,273 @@
+package job
+
+import (
+	. "public"
+	. "push/config"
+	putil "push/util"
+	"qfw/util"
+	"qfw/util/mongodb"
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	mgo "gopkg.in/mgo.v2"
+)
+
+type moveJob struct {
+	info      map[string]interface{}
+	ids       []interface{}
+	isVipUser bool
+	moveLock  *sync.Mutex
+	moveWait  *sync.WaitGroup
+	movePool  chan bool
+	mergeLock *sync.Mutex
+	mergeWait *sync.WaitGroup
+	mergePool chan bool
+}
+
+func (m *moveJob) Execute() {
+	defer util.Catch()
+	Jobs.Push.lock.Lock()
+	defer Jobs.Push.lock.Unlock()
+	logger.Info("开始迁移数据。。。")
+	nowUnix := time.Now().Unix()
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	it := sess.DB(DbName).C("pushspace_temp").Find(map[string]interface{}{
+		"timestamp": map[string]interface{}{
+			"$lt": nowUnix,
+		},
+	}).Sort("userid").Iter()
+	moves := map[string]*moveJob{}
+	logger.Info("开始遍历pushspace_temp")
+	index, number, length := 0, 0, 0
+	for data := make(map[string]interface{}); it.Next(&data); {
+		m.movePool <- true
+		m.moveWait.Add(1)
+		index++
+		go func(temp map[string]interface{}) {
+			defer func() {
+				<-m.movePool
+				m.moveWait.Done()
+				m.moveLock.Unlock()
+			}()
+			userId := util.ObjToString(temp["userid"])
+			isVipUser := IsVipUser(util.IntAll(temp["vipstatus"]))
+			maxPushSize := Config.MaxPushSize
+			if isVipUser {
+				maxPushSize = Config.VipOneDayMaxPushSize
+			}
+			m.moveLock.Lock()
+			move := moves[userId]
+			if move != nil {
+				list, _ := move.info["list"].(SortList)
+				idMap := map[string]bool{}
+				for _, v := range list {
+					idMap[util.ObjToString((*v.Info)["_id"])] = true
+				}
+				newList := putil.ToSortList(temp["list"])
+				for _, v := range newList {
+					if idMap[util.ObjToString((*v.Info)["_id"])] {
+						continue
+					}
+					list = append(list, v)
+				}
+				sort.Sort(list)
+				if len(list) > maxPushSize {
+					list = list[:maxPushSize]
+				}
+				temp["list"] = list
+				move.info = temp
+				move.isVipUser = isVipUser
+				move.ids = append(move.ids, temp["_id"])
+			} else {
+				temp["list"] = putil.ToSortList(temp["list"])
+				move = &moveJob{
+					info:      temp,
+					ids:       []interface{}{temp["_id"]},
+					isVipUser: isVipUser,
+				}
+			}
+			moves[userId] = move
+			length++
+			if length == Config.MoveBatch {
+				m.Merge(&number, nowUnix, moves)
+				length = 0
+				moves = map[string]*moveJob{}
+			}
+		}(data)
+		data = make(map[string]interface{})
+		if index%500 == 0 {
+			logger.Info("pushspace_temp加载到内存:", index)
+		}
+	}
+	m.moveWait.Wait()
+	if length > 0 {
+		m.Merge(&number, nowUnix, moves)
+		length = 0
+		moves = map[string]*moveJob{}
+	}
+	logger.Info("迁移数据结束。。。", index)
+}
+func (m *moveJob) Merge(number *int, nowUnix int64, moves map[string]*moveJob) {
+	*number++
+	logger.Info("第", *number, "次开始合并数据")
+	index := 0
+	saveArray := []map[string]interface{}{}
+	saveArray_delete := []interface{}{}
+	updateArray_query := []map[string]interface{}{}
+	updateArray_set := []map[string]interface{}{}
+	updateArray_delete := []interface{}{}
+	for u, m := range moves {
+		m.mergePool <- true
+		m.mergeWait.Add(1)
+		go func(userId string, move *moveJob) {
+			defer func() {
+				<-m.mergePool
+				m.mergeWait.Done()
+			}()
+			sess := mongodb.GetMgoConn()
+			defer mongodb.DestoryMongoConn(sess)
+			var data map[string]interface{}
+			sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
+			if data == nil { //批量新增
+				m.mergeLock.Lock()
+				saveArray = append(saveArray, move.info)
+				saveArray_delete = append(saveArray_delete, move.ids...)
+				if len(saveArray) == BulkSize {
+					m.SaveBulk(sess, &saveArray, &saveArray_delete)
+				}
+				m.mergeLock.Unlock()
+			} else { //批量更新
+				setMap := map[string]interface{}{}
+				for _, field := range MoveFields {
+					if move.info[field] == nil {
+						continue
+					}
+					setMap[field] = move.info[field]
+				}
+				//
+				newListOrig, _ := move.info["list"].(SortList)
+				if newListOrig == nil || len(newListOrig) == 0 {
+					return
+				}
+				pushAll := make(map[string]interface{})
+				oldList := putil.ToSortList(data["list"])
+				idMap := map[string]bool{}
+				for _, vv := range oldList {
+					idMap[util.ObjToString((*vv.Info)["_id"])] = true
+				}
+				newList := make(SortList, 0)
+				//去重
+				for _, vv := range newListOrig {
+					if idMap[util.ObjToString((*vv.Info)["_id"])] {
+						continue
+					}
+					newList = append(newList, vv)
+				}
+				pLength := len(newList)
+				if pLength == 0 {
+					return
+				}
+				rLength := len(oldList)
+				maxPushSize := Config.MaxPushSize
+				if move.isVipUser {
+					maxPushSize = Config.VipOneDayMaxPushSize
+				}
+				if rLength+pLength > maxPushSize {
+					newList = append(newList, oldList...)
+					sort.Sort(newList)
+					setMap["list"] = newList[:maxPushSize]
+					setMap["size"] = maxPushSize
+				} else { //追加
+					setMap["size"] = rLength + pLength
+					pushAll["list"] = newList
+				}
+				upSet := map[string]interface{}{
+					"$set": setMap,
+				}
+				if len(pushAll) > 0 {
+					upSet["$pushAll"] = pushAll
+				}
+				m.mergeLock.Lock()
+				updateArray_delete = append(updateArray_delete, move.ids...)
+				updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
+				updateArray_set = append(updateArray_set, upSet)
+				if len(updateArray_query) == BulkSize {
+					m.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
+				}
+				m.mergeLock.Unlock()
+			}
+		}(u, m)
+		index++
+		if index%500 == 0 {
+			logger.Info("第", *number, "次合并数据:", index)
+		}
+	}
+	m.mergeWait.Wait()
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	if len(saveArray) > 0 {
+		m.SaveBulk(sess, &saveArray, &saveArray_delete)
+	}
+	if len(updateArray_query) > 0 {
+		m.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
+	}
+	logger.Info("第", *number, "次合并数据结束。。。", index)
+}
+func (m *moveJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
+	coll := sess.DB(DbName).C("pushspace")
+	bulk := coll.Bulk()
+	for _, v := range *saves {
+		bulk.Insert(v)
+	}
+	_, err := bulk.Run()
+	if nil != err {
+		logger.Info("BulkError", err)
+	} else {
+		m.DelBulk(sess, deletes)
+	}
+	*saves = []map[string]interface{}{}
+}
+func (m *moveJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
+	coll := sess.DB(DbName).C("pushspace")
+	bulk := coll.Bulk()
+	for k, v := range *array_q {
+		bulk.Update(v, (*array_s)[k])
+	}
+	_, err := bulk.Run()
+	if nil != err {
+		logger.Info("UpdateBulkError", err)
+	} else {
+		m.DelBulk(sess, array_d)
+	}
+	*array_q = []map[string]interface{}{}
+	*array_s = []map[string]interface{}{}
+}
+func (m *moveJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
+	coll := sess.DB(DbName).C("pushspace_temp")
+	count := 0
+	bulk := coll.Bulk()
+	for _, v := range *array {
+		count++
+		bulk.Remove(map[string]interface{}{
+			"_id": v,
+		})
+		if count == BulkSize {
+			_, err := bulk.Run()
+			if nil != err {
+				logger.Info("DelBulkError", err)
+			}
+			count = 0
+			bulk = coll.Bulk()
+		}
+	}
+	if count > 0 {
+		_, err := bulk.Run()
+		if nil != err {
+			logger.Info("DelBulkError", err)
+		}
+	}
+	*array = []interface{}{}
+}

+ 357 - 349
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -1,18 +1,22 @@
 package job
 
 import (
+	"fmt"
 	"net/http"
+	"os"
 	. "public"
 	. "push/config"
 	putil "push/util"
 	"qfw/util"
+	"qfw/util/mail"
 	"qfw/util/mongodb"
-	"sort"
+	"qfw/util/redis"
+	"strconv"
+	"strings"
 	"sync"
 	"time"
 
 	"github.com/donnie4w/go-logger/logger"
-	mgo "gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
 )
 
@@ -20,276 +24,33 @@ var (
 	MoveFields = []string{"s_m_openid", "a_m_openid", "phone", "usertype", "jpushid", "opushid", "words", "ratemode", "wxpush", "apppush", "mailpush", "pchelperpush", "timestamp", "subscribe", "applystatus", "appphonetype", "email", "size", "modifydate", "mergeorder", "nickname", "firstpushtime"}
 )
 
-type Move struct {
-	Info map[string]interface{}
-	Ids  []interface{}
+func init() {
+	if Config.MinutePushSize > 0 {
+		putil.LimitMaxOneMinutePush(&Jobs.Push.minutePushPool, Config.MinutePushSize)
+	}
+	if Config.FastigiumMinutePushSize > 0 {
+		putil.LimitMaxOneMinutePush(&Jobs.Push.fastigiumMinutePushPool, Config.FastigiumMinutePushSize)
+	}
 }
 
 type pushJob struct {
-	taskType  int
-	pool      chan bool
-	wait      *sync.WaitGroup
-	lock      *sync.Mutex
-	moveLock  *sync.Mutex
-	moveWait  *sync.WaitGroup
-	movePool  chan bool
-	mergeLock *sync.Mutex
-	mergeWait *sync.WaitGroup
-	mergePool chan bool
-	lastId    string
-	users     *[]map[string]interface{}
+	taskType                int
+	pool                    chan bool
+	wait                    *sync.WaitGroup
+	lock                    *sync.Mutex
+	lastId                  string
+	users                   *[]map[string]interface{}
+	vipTempSave             bool
+	minutePushPool          chan bool
+	fastigiumMinutePushPool chan bool
 }
 
-//taskType 1--实时推送 2--实时推送+一天三次的8点推送 3--一天三次推送 4--九点推送
-func (p *pushJob) Execute(taskType int, isMoveDatas bool) {
+//taskType 1--一天三次推送 2--九点推送
+func (p *pushJob) Execute(taskType int) {
+	defer util.Catch()
 	p.lock.Lock()
 	defer p.lock.Unlock()
 	p.taskType = taskType
-	logger.Info("开始推送任务。。。", p.taskType)
-	if isMoveDatas {
-		p.Move()
-	}
-	p.Push()
-}
-func (p *pushJob) Move() {
-	defer util.Catch()
-	logger.Info("推送任务", p.taskType, "开始迁移数据。。。")
-	nowUnix := time.Now().Unix()
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	it := sess.DB(DbName).C("pushspace_temp").Find(map[string]interface{}{
-		"timestamp": map[string]interface{}{
-			"$lt": nowUnix,
-		},
-	}).Sort("userid").Iter()
-	moves := map[string]*Move{}
-	logger.Info("推送任务", p.taskType, "开始遍历pushspace_temp")
-	index, number, length := 0, 0, 0
-	for data := make(map[string]interface{}); it.Next(&data); {
-		p.movePool <- true
-		p.moveWait.Add(1)
-		index++
-		go func(temp map[string]interface{}) {
-			defer func() {
-				<-p.movePool
-				p.moveWait.Done()
-				p.moveLock.Unlock()
-			}()
-			userId := util.ObjToString(temp["userid"])
-			p.moveLock.Lock()
-			move := moves[userId]
-			if move != nil {
-				list, _ := move.Info["list"].(SortList)
-				idMap := map[string]bool{}
-				for _, v := range list {
-					idMap[util.ObjToString((*v.Info)["_id"])] = true
-				}
-				newList := putil.ToSortList(temp["list"])
-				for _, v := range newList {
-					if idMap[util.ObjToString((*v.Info)["_id"])] {
-						continue
-					}
-					list = append(list, v)
-				}
-				sort.Sort(list)
-				if len(list) > SysConfig.MaxPushSize {
-					list = list[:SysConfig.MaxPushSize]
-				}
-				temp["list"] = list
-				move.Info = temp
-				move.Ids = append(move.Ids, temp["_id"])
-			} else {
-				temp["list"] = putil.ToSortList(temp["list"])
-				move = &Move{
-					Info: temp,
-					Ids:  []interface{}{temp["_id"]},
-				}
-			}
-			moves[userId] = move
-			length++
-			if length == SysConfig.MoveBatch {
-				p.Merge(&number, nowUnix, moves)
-				length = 0
-				moves = map[string]*Move{}
-			}
-		}(data)
-		data = make(map[string]interface{})
-		if index%500 == 0 {
-			logger.Info("推送任务", p.taskType, "pushspace_temp加载到内存:", index)
-		}
-	}
-	p.moveWait.Wait()
-	if length > 0 {
-		p.Merge(&number, nowUnix, moves)
-		length = 0
-		moves = map[string]*Move{}
-	}
-	logger.Info("推送任务", p.taskType, "迁移数据结束。。。", index)
-}
-func (p *pushJob) Merge(number *int, nowUnix int64, moves map[string]*Move) {
-	*number++
-	logger.Info("推送任务", p.taskType, "第", *number, "次开始合并数据")
-	index := 0
-	saveArray := []map[string]interface{}{}
-	saveArray_delete := []interface{}{}
-	updateArray_query := []map[string]interface{}{}
-	updateArray_set := []map[string]interface{}{}
-	updateArray_delete := []interface{}{}
-	for u, m := range moves {
-		p.mergePool <- true
-		p.mergeWait.Add(1)
-		go func(userId string, move *Move) {
-			defer func() {
-				<-p.mergePool
-				p.mergeWait.Done()
-			}()
-			sess := mongodb.GetMgoConn()
-			defer mongodb.DestoryMongoConn(sess)
-			var data map[string]interface{}
-			sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
-			if data == nil { //批量新增
-				p.mergeLock.Lock()
-				saveArray = append(saveArray, move.Info)
-				saveArray_delete = append(saveArray_delete, move.Ids...)
-				if len(saveArray) == BulkSize {
-					p.SaveBulk(sess, &saveArray, &saveArray_delete)
-				}
-				p.mergeLock.Unlock()
-			} else { //批量更新
-				setMap := map[string]interface{}{}
-				for _, field := range MoveFields {
-					if move.Info[field] == nil {
-						continue
-					}
-					setMap[field] = move.Info[field]
-				}
-				//
-				newListOrig, _ := move.Info["list"].(SortList)
-				if newListOrig == nil || len(newListOrig) == 0 {
-					return
-				}
-				pushAll := make(map[string]interface{})
-				for _, v := range []string{"", "temp"} {
-					oldList := putil.ToSortList(data[v+"list"])
-					if v == "temp" && oldList == nil {
-						continue
-					}
-					idMap := map[string]bool{}
-					for _, vv := range oldList {
-						idMap[util.ObjToString((*vv.Info)["_id"])] = true
-					}
-					newList := make(SortList, 0)
-					//去重
-					for _, vv := range newListOrig {
-						if idMap[util.ObjToString((*vv.Info)["_id"])] {
-							continue
-						}
-						newList = append(newList, vv)
-					}
-					pLength := len(newList)
-					if pLength == 0 {
-						continue
-					}
-					rLength := len(oldList)
-					if rLength+pLength > SysConfig.MaxPushSize {
-						newList = append(newList, oldList...)
-						sort.Sort(newList)
-						setMap[v+"list"] = newList[:SysConfig.MaxPushSize]
-						setMap[v+"size"] = SysConfig.MaxPushSize
-					} else { //追加
-						setMap[v+"size"] = rLength + pLength
-						pushAll[v+"list"] = newList
-					}
-				}
-				upSet := map[string]interface{}{
-					"$set": setMap,
-				}
-				if len(pushAll) > 0 {
-					upSet["$pushAll"] = pushAll
-				}
-				p.mergeLock.Lock()
-				updateArray_delete = append(updateArray_delete, move.Ids...)
-				updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
-				updateArray_set = append(updateArray_set, upSet)
-				if len(updateArray_query) == BulkSize {
-					p.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
-				}
-				p.mergeLock.Unlock()
-			}
-		}(u, m)
-		index++
-		if index%500 == 0 {
-			logger.Info("推送任务", p.taskType, "第", *number, "次合并数据:", index)
-		}
-	}
-	p.mergeWait.Wait()
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	if len(saveArray) > 0 {
-		p.SaveBulk(sess, &saveArray, &saveArray_delete)
-	}
-	if len(updateArray_query) > 0 {
-		p.UpdateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
-	}
-	logger.Info("推送任务", p.taskType, "第", *number, "次合并数据结束。。。", index)
-}
-func (p *pushJob) SaveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace")
-	bulk := coll.Bulk()
-	for _, v := range *saves {
-		bulk.Insert(v)
-	}
-	_, err := bulk.Run()
-	if nil != err {
-		logger.Info("BulkError", err)
-	} else {
-		p.DelBulk(sess, deletes)
-	}
-	*saves = []map[string]interface{}{}
-}
-func (p *pushJob) UpdateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace")
-	bulk := coll.Bulk()
-	for k, v := range *array_q {
-		bulk.Update(v, (*array_s)[k])
-	}
-	_, err := bulk.Run()
-	if nil != err {
-		logger.Info("UpdateBulkError", err)
-	} else {
-		p.DelBulk(sess, array_d)
-	}
-	*array_q = []map[string]interface{}{}
-	*array_s = []map[string]interface{}{}
-}
-func (p *pushJob) DelBulk(sess *mgo.Session, array *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace_temp")
-	count := 0
-	bulk := coll.Bulk()
-	for _, v := range *array {
-		count++
-		bulk.Remove(map[string]interface{}{
-			"_id": v,
-		})
-		if count == BulkSize {
-			_, err := bulk.Run()
-			if nil != err {
-				logger.Info("DelBulkError", err)
-			}
-			count = 0
-			bulk = coll.Bulk()
-		}
-	}
-	if count > 0 {
-		_, err := bulk.Run()
-		if nil != err {
-			logger.Info("DelBulkError", err)
-		}
-	}
-	*array = []interface{}{}
-}
-func (p *pushJob) Push() {
-	defer util.Catch()
 	logger.Info("推送任务", p.taskType, "开始推送。。。")
 	batch_index := 0
 	for {
@@ -300,10 +61,10 @@ func (p *pushJob) Push() {
 			select {
 			case <-time.After(5 * time.Minute):
 				isTake = false
-				logger.Error("推送任务", p.taskType, "推送放入通道超时,", temp["userid"], len(DoPush.minutePushPool), len(DoPush.fastigiumMinutePushPool))
+				logger.Error("推送任务", p.taskType, "推送放入通道超时,", temp["userid"], len(p.minutePushPool), len(p.fastigiumMinutePushPool))
 				go func() {
-					if SysConfig.TimeoutWarn != "" {
-						if _, err := http.Get(SysConfig.TimeoutWarn); err != nil {
+					if Config.TimeoutWarn != "" {
+						if _, err := http.Get(Config.TimeoutWarn); err != nil {
 							logger.Error("发送告警邮件错误", err)
 						}
 					}
@@ -344,56 +105,17 @@ func (p *pushJob) Push() {
 				}
 				logger.Info("推送任务", p.taskType, "开始推送用户", "userType", u.UserType, "userId", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid, "phoneType", u.AppPhoneType, "rateMode", u.RateMode, "email", u.Email)
 				wxPush, appPush, mailPush := 0, 0, 0
-				if p.taskType == 1 {
-					if u.WxPush == 1 {
-						if u.ApplyStatus == 1 {
-							wxPush = -1
-						} else {
-							wxPush = 1
-						}
-					}
-					if u.AppPush == 1 {
-						appPush = 1
-					}
-					if u.MailPush == 1 {
-						if u.UserType == 0 && u.ApplyStatus == 1 {
-							mailPush = -1
-						} else if u.UserType == 5 && u.ApplyStatus == 1 && u.AppPush == 0 {
-							mailPush = -1
-						} else {
-							mailPush = 1
-						}
-					}
-				} else if p.taskType == 2 || p.taskType == 4 {
-					if u.WxPush == 1 {
-						wxPush = 1
-					}
-					if u.AppPush == 1 {
-						appPush = 1
-					}
-					if u.MailPush == 1 {
-						mailPush = 1
-					}
-				} else if p.taskType == 3 {
-					if u.WxPush == 1 && u.ApplyStatus == 1 {
-						wxPush = 1
-					}
-					if u.MailPush == 1 {
-						mailPush = 1
-					}
+				if u.WxPush == 1 {
+					wxPush = 1
 				}
-				list := putil.ToSortList(v["list"])
-				tempList := putil.ToSortList(v["templist"])
-				t_wxPush, t_mailPush := util.IntAll(v["tempwxpush"]), util.IntAll(v["tempmailpush"])
-				if tempList != nil {
-					if wxPush == 1 && t_wxPush == 0 {
-						wxPush = 0
-					}
-					if mailPush == 1 && t_mailPush == 0 {
-						mailPush = 0
-					}
+				if u.AppPush == 1 {
+					appPush = 1
 				}
-				logger.Info("推送任务", p.taskType, "用户接收方式", "userId", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "pchelperPush", u.PchelperPush, "t_wxPush", t_wxPush, "t_mailPush", t_mailPush)
+				if u.MailPush == 1 {
+					mailPush = 1
+				}
+				list := putil.ToSortList(v["list"])
+				logger.Info("推送任务", p.taskType, "用户接收方式", "userId", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "pchelperPush", u.PchelperPush)
 				if wxPush != 1 && appPush != 1 && mailPush != 1 {
 					return
 				}
@@ -413,7 +135,7 @@ func (p *pushJob) Push() {
 						mailPush = 0
 					}
 				}
-				isSaveSuccess, wxStatus, appStatus, mailStatus := DoPush.Execute(p.taskType, wxPush, appPush, mailPush, u, list, tempList)
+				isSaveSuccess, wxStatus, appStatus, mailStatus := p.Push(p.taskType, wxPush, appPush, mailPush, u, list)
 				if isSaveSuccess {
 					if u.FirstPushTime == 0 {
 						go mongodb.Update("user", map[string]interface{}{
@@ -430,28 +152,9 @@ func (p *pushJob) Push() {
 				//判断是否要删除数据
 				sess := mongodb.GetMgoConn()
 				defer mongodb.DestoryMongoConn(sess)
-				if wxPush == -1 || mailPush == -1 {
-					//如果该用户还有微信或者邮箱推送,把list字段的值挪到templist
-					update := map[string]interface{}{}
-					set := map[string]interface{}{
-						"tempwxpush":   wxPush,
-						"tempmailpush": mailPush,
-					}
-					if tempList == nil {
-						update["$rename"] = map[string]interface{}{"list": "templist"}
-					} else {
-						update["$unset"] = map[string]interface{}{"list": ""}
-					}
-					update["$set"] = set
-					err := sess.DB(DbName).C("pushspace").UpdateId(v["_id"], update)
-					if err != nil {
-						logger.Error("推送任务", p.taskType, "update error", err)
-					}
-				} else {
-					err := sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
-					if err != nil {
-						logger.Error("推送任务", p.taskType, "remove error", err)
-					}
+				err := sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
+				if err != nil {
+					logger.Error("推送任务", p.taskType, "remove error", err)
 				}
 				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
 					f_count, err := sess.DB(DbName).C("pushspace").FindId(v["_id"]).Count()
@@ -493,7 +196,7 @@ func (p *pushJob) Push() {
 				}
 			}(temp, isTake)
 		}
-		if batch_size < SysConfig.PushBatch {
+		if batch_size < Config.PushBatch {
 			break
 		}
 	}
@@ -509,16 +212,11 @@ func (p *pushJob) OncePushBatch(batch_index int) int {
 	defer mongodb.DestoryMongoConn(sess)
 	var query map[string]interface{}
 	//根据任务类型,查找ratemode
-	if p.taskType == 1 || p.taskType == 2 {
+	if p.taskType == 1 {
 		query = map[string]interface{}{
 			"ratemode": 1,
 		}
-	} else if p.taskType == 3 {
-		query = map[string]interface{}{
-			"ratemode":    1,
-			"applystatus": 1,
-		}
-	} else if p.taskType == 4 {
+	} else if p.taskType == 2 {
 		query = map[string]interface{}{
 			"ratemode": 2,
 		}
@@ -526,9 +224,9 @@ func (p *pushJob) OncePushBatch(batch_index int) int {
 		logger.Error("taskType error", p.taskType)
 		return i
 	}
-	if len(SysConfig.TestIds) > 0 {
+	if len(Config.TestIds) > 0 {
 		query["userid"] = map[string]interface{}{
-			"$in": SysConfig.TestIds,
+			"$in": Config.TestIds,
 		}
 	}
 	if p.lastId != "" {
@@ -543,10 +241,320 @@ func (p *pushJob) OncePushBatch(batch_index int) int {
 		p.lastId = util.BsonIdToSId(temp["_id"])
 		*p.users = append(*p.users, temp)
 		temp = make(map[string]interface{})
-		if i == SysConfig.PushBatch {
+		if i == Config.PushBatch {
 			break
 		}
 	}
 	logger.Info("推送任务", p.taskType, "第", batch_index, "批用户加载结束", p.lastId)
 	return i
 }
+
+func (p *pushJob) Push(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
+	if wxPush == 1 || appPush == 1 || mailPush == 1 || u.PchelperPush == 1 {
+		if list != nil {
+			isSaveSuccess, wxStatus, appStatus, mailStatus = p.DoPush(taskType, true, wxPush, appPush, mailPush, u, &list)
+		}
+	}
+	return isSaveSuccess, wxStatus, appStatus, mailStatus
+}
+func (p *pushJob) DoPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxStatus, appStatus, mailStatus int) {
+	defer util.Catch()
+	mailContent := ""
+	jpushtitle := ""
+	lastInfoDate := int64(0)
+	TitleArray := []string{}
+	infos := []*MatchInfo{}
+	infosLength := 0
+	publishTitle := map[string]bool{}
+	dateymd := util.NowFormat(util.Date_yyyyMMdd)
+	dayCountKey := DayCountKey(dateymd, k.Id)
+	onceCountKey := OnceCountKey(dateymd, k.Id)
+	dayCount := redis.GetInt("pushcache_2_a", dayCountKey)
+	isVipUser := IsVipUser(k.VipStatus)
+	now := time.Now()
+	for _, ks := range *sl {
+		k2 := *ks.Info
+		title := strings.Replace(k2["title"].(string), "\n", "", -1)
+		title = Re.ReplaceAllString(title, "$1")
+		area := util.ObjToString(k2["area"])
+		if area == "A" {
+			area = "全国"
+		}
+		newTitle := fmt.Sprintf("[%s]%s", area, title)
+		if publishTitle[newTitle] {
+			continue
+		}
+		publishTitle[newTitle] = true
+		infosLength++
+		infos = append(infos, ks)
+		TitleArray = append(TitleArray, newTitle)
+		if infosLength == 1 {
+			jpushtitle = title
+			lastInfoDate = util.Int64All(k2["publishtime"])
+		}
+		//增加行业的处理
+		industry := ""
+		industryclass := "industry"
+		if k2["s_subscopeclass"] != nil {
+			k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
+			if len(k2sub) > 0 {
+				industry = k2sub[0]
+				if industry != "" {
+					ss := strings.Split(industry, "_")
+					if len(ss) > 1 {
+						industry = ss[0]
+					}
+				}
+			}
+		}
+		if mailPush == 1 { //关于邮件的处理
+			mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
+			url := fmt.Sprintf("%s/article/mailprivate/%s.html", Config.JianyuDomain, mailSid)
+			classArea := "area"
+			classType := "type"
+			infotype := util.ObjToString(k2["subtype"])
+			if infotype == "" {
+				infotype = util.ObjToString(k2["toptype"])
+			}
+			if infotype == "" {
+				infotype = util.ObjToString(k2["type"])
+				if infotype == "tender" {
+					infotype = "招标"
+				} else if infotype == "bid" {
+					infotype = "中标"
+				}
+			}
+			dates := util.LongToDate(k2["publishtime"], false)
+			//标题替换
+			otitle := title
+			for _, kw := range k.Keys {
+				kws := strings.Split(kw, "+")
+				n := 0
+				otitle2 := otitle
+				for _, kwn := range kws {
+					ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
+					if ot != otitle {
+						n++
+						otitle2 = ot
+					} else {
+						break
+					}
+				}
+				if n == len(kws) {
+					otitle = otitle2
+					break
+				}
+			}
+			if industry == "" {
+				industryclass = ""
+			}
+			mailContent += fmt.Sprintf(Config.Mail_content, infosLength, url, otitle, classArea, area, classType, infotype, industryclass, industry, dates)
+		}
+		if isVipUser {
+			if dayCount >= Config.VipOneDayMaxPushSize {
+				break
+			}
+		} else {
+			//限制最大信息条数
+			if infosLength >= Config.MaxPushSize {
+				break
+			}
+		}
+		dayCount++
+	}
+	if infosLength == 0 {
+		logger.Info("推送任务", taskType, "没有要推送的数据!", k.Id)
+		return
+	}
+	redis.Put("pushcache_2_a", dayCountKey, dayCount, 86400)
+	redis.Put("pushcache_2_a", onceCountKey, infosLength, 86400)
+	//限制一分钟最大的推送数量
+	if p.fastigiumMinutePushPool != nil {
+		if hour := now.Hour(); hour >= FastigiumStart && hour <= FastigiumEnd {
+			<-p.fastigiumMinutePushPool //高峰期
+		}
+	} else if p.minutePushPool != nil {
+		<-p.minutePushPool //正常期
+	}
+	pushDate := ""
+	if taskType != 0 && isSave {
+		//推送记录id
+		pushDate = putil.SaveSendInfo(k, infos)
+		if pushDate == "" {
+			logger.Info("推送任务", taskType, "保存出错", k.Id)
+			return
+		}
+		logger.Info("推送任务", taskType, "保存成功", pushDate, k.Id)
+		isSaveSuccess = true
+	}
+	if isVipUser && (now.Day() == 28 || now.Weekday().String() == "Friday") && (k.RateMode == 3 || k.RateMode == 4) {
+
+	}
+	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
+	if isSaveSuccess {
+		//pc端助手推送
+		if k.S_m_openid != "" {
+			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "s_m_openid", k.S_m_openid)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.S_m_openid})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "s_m_openid", k.S_m_openid)
+		}
+		if k.Phone != "" {
+			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "phone", k.Phone)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.Phone})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "phone", k.Phone)
+		}
+	}
+	if wxPush == 1 {
+		logger.Info("推送任务", taskType, "开始微信推送", k.ApplyStatus, k.Id)
+		isPushOk := true
+		if k.ApplyStatus == 1 {
+			TmpTip := ""
+			minute := now.Unix() - lastInfoDate
+			if minute > -1 && minute < 61 {
+				TmpTip = fmt.Sprintf("%d秒前发布的", minute)
+			} else {
+				minute = minute / 60
+				if minute < 121 {
+					if minute < 1 {
+						minute = 1
+					}
+					TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
+				}
+			}
+			Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
+			LastTip := ""
+			if infosLength > 1 {
+				LastTip = fmt.Sprintf("...(共%d条)", infosLength)
+			}
+			LastTipLen := len([]rune(LastTip))
+			wxTitleKeys := strings.Join(k.Keys, ";")
+			if len([]rune(wxTitleKeys)) > 8 {
+				wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
+			}
+			wxtitle := fmt.Sprintf(Config.WxTitle, wxTitleKeys)
+			TitleLen := len([]rune(wxtitle))
+			reLen := 200 - TitleLen - 10 - WxGroupLen - len([]rune(Tip1))
+			WXTitle := ""
+			bshow := false
+			for n := 1; n < len(TitleArray)+1; n++ {
+				curTitle := TitleArray[n-1]
+				tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
+				ch := reLen - len([]rune(tmptitle))
+				if ch < LastTipLen { //加上后大于后辍,则没有完全显示
+					if ch == 0 && n == len(TitleArray) {
+						WXTitle = tmptitle
+						bshow = true
+					} else {
+						ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
+						if ch_1 > 8 {
+							curLen := len([]rune(curTitle))
+							if ch_1 > curLen {
+								ch_1 = curLen
+							}
+							WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
+						}
+					}
+				} else if ch == LastTipLen {
+					WXTitle = tmptitle
+					if n == len(TitleArray) {
+						bshow = true
+					}
+				} else {
+					WXTitle = tmptitle
+					if n == len(TitleArray) {
+						bshow = true
+					}
+				}
+			}
+			if bshow {
+				LastTip = ""
+			}
+			//推送微信
+			isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, wxtitle, pushDate)
+			if isPushOk {
+				wxStatus = 1
+			} else {
+				wxStatus = -1
+			}
+		}
+		logger.Info("推送任务", taskType, "微信推送结束", k.ApplyStatus, isPushOk, k.Id)
+	}
+	if appPush == 1 {
+		logger.Info("推送任务", taskType, "开始app推送", k.Id)
+		descriptAppend := ""
+		if infosLength > 1 {
+			descriptAppend = fmt.Sprintf("\n...(共%d条)", infosLength)
+			jpushtitle = fmt.Sprintf("1. %s", jpushtitle)
+		}
+		go mongodb.Update("user", map[string]interface{}{
+			"_id": bson.ObjectIdHex(k.Id),
+		}, map[string]interface{}{
+			"$inc": map[string]interface{}{
+				"i_apppushunread": 1,
+			},
+		}, false, false)
+		isPushOk := putil.SendApp(map[string]interface{}{
+			"phoneType":      k.AppPhoneType,
+			"descript":       jpushtitle,
+			"descriptAppend": descriptAppend,
+			"type":           "bid",
+			"userId":         k.Id,
+			"url":            "/jyapp/free/sess/" + Se.EncodeString(k.Id+",_id,"+strconv.Itoa(int(now.Unix()))+",historypush"),
+			"otherPushId":    k.Opushid,
+			"jgPushId":       k.Jpushid, //极光-推送id
+		})
+		if isPushOk {
+			appStatus = 1
+		} else {
+			appStatus = -1
+		}
+		logger.Info("推送任务", taskType, "app推送结束", isPushOk, k.Id)
+	}
+	//发送邮件
+	if mailPush == 1 {
+		logger.Info("推送任务", taskType, "开始邮箱推送", k.Id)
+		html := fmt.Sprintf(Config.Mail_html, strings.Replace(strings.Join(k.Keys, ";"), "+", " ", -1), mailContent)
+		isPushOk := p.SendMail(k.Email, Config.Mail_title, html, nil)
+		if isPushOk {
+			mailStatus = 1
+		} else {
+			mailStatus = -1
+		}
+		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, k.Id)
+	}
+	return
+}
+
+//推送邮件(含附件)
+func (p *pushJob) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
+	if !Config.IsPushMail || len(Gmails) == 0 {
+		return true
+	}
+	if Config.MailSleep > 0 {
+		time.Sleep(time.Duration(Config.MailSleep) * time.Millisecond)
+	}
+	defer util.Catch()
+	//生成附件
+	var fnamepath, rename string
+	// if len(fmdatas) > 0 { //开启导出
+	// 	fnamepath, rename = putil.GetBidInfoXlsx(fmdatas)
+	// }
+	status := false
+	index := len(email) % 2
+	if index >= len(Gmails) {
+		index = 0
+	}
+	gmail := Gmails[index]
+	for i := 0; i < len(Gmails); i++ {
+		status = mail.GSendMail("剑鱼标讯", email, "", "", subject, html, fnamepath, rename, gmail)
+		if status {
+			break
+		} else {
+			gmail = Gmails[index^1]
+		}
+	}
+	if fnamepath != "" {
+		os.Remove(fnamepath)
+	}
+	return status
+}

+ 13 - 0
src/jfw/modules/pushsubscribe/src/push/job/refreshjob.go

@@ -0,0 +1,13 @@
+package job
+
+import "qfw/util/mongodb"
+
+var pool = make(chan bool, 20)
+
+type refreshJob struct{}
+
+func (r *refreshJob) Execute() {
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	//sess.DB("qfw").C("user").Find()
+}

+ 3 - 4
src/jfw/modules/pushsubscribe/src/push/job/repairjob.go

@@ -72,8 +72,7 @@ func (r *repairJob) Execute(param string) bool {
 					mailPush = 1
 				}
 				list := putil.ToSortList(v["list"])
-				tempList := putil.ToSortList(v["templist"])
-				_, wxStatus, appStatus, mailStatus := DoPush.Execute(0, wxPush, appPush, mailPush, u, list, tempList)
+				_, wxStatus, appStatus, mailStatus := Jobs.Push.Push(0, wxPush, appPush, mailPush, u, list)
 				sess := mongodb.GetMgoConn()
 				defer mongodb.DestoryMongoConn(sess)
 				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
@@ -112,7 +111,7 @@ func (r *repairJob) Execute(param string) bool {
 				}
 			}(temp)
 		}
-		if batch_size < SysConfig.PushBatch || param != "all" {
+		if batch_size < Config.PushBatch || param != "all" {
 			break
 		}
 	}
@@ -151,7 +150,7 @@ func (r *repairJob) OncePushBatch(batch_index int, param string) int {
 		r.lastId = util.BsonIdToSId(temp["_id"])
 		*r.users = append(*r.users, temp)
 		temp = make(map[string]interface{})
-		if param == "all" && i == SysConfig.PushBatch {
+		if param == "all" && i == Config.PushBatch {
 			break
 		}
 	}

+ 56 - 14
src/jfw/modules/pushsubscribe/src/push/job/timetask.go

@@ -9,12 +9,16 @@ import (
 )
 
 type timeTask struct {
-	NinePush  *NinePushTimeTask  //九点推送
+	Refresh   *RefreshTimeTask   //每天7点刷新用户信息
+	Move      *MoveTimeTask      //迁移数据
+	OncePush  *OncePushTimeTask  //九点推送
 	OtherPush *OtherPushTimeTask //一天三次
 }
 
 var Task = &timeTask{
-	NinePush:  &NinePushTimeTask{},  //九点推送
+	Refresh:   &RefreshTimeTask{},   //每天7点刷新用户信息
+	Move:      &MoveTimeTask{},      //迁移数据
+	OncePush:  &OncePushTimeTask{},  //九点推送
 	OtherPush: &OtherPushTimeTask{}, //一天三次
 }
 
@@ -22,10 +26,10 @@ type OtherPushTimeTask struct {
 }
 
 func (o *OtherPushTimeTask) Execute() {
-	for _, otherpushtime := range SysConfig.OtherPushTimes {
+	for _, otherpushtime := range Config.OtherPushTimes {
 		h_m := strings.Split(otherpushtime, ":")
 		if len(h_m) != 2 {
-			log.Fatalln("error:otherpushtimes", otherpushtime)
+			log.Fatalln("OtherPushTimeTask", otherpushtime)
 			return
 		}
 		now := time.Now()
@@ -34,25 +38,25 @@ func (o *OtherPushTimeTask) Execute() {
 			newDate = newDate.AddDate(0, 0, 1)
 		}
 		sub := newDate.Sub(now)
-		log.Println("start", otherpushtime, "pushjob after", sub)
+		log.Println("start", otherpushtime, "OtherPushTimeTask after", sub)
 		time.AfterFunc(sub, func() {
-			go Jobs.Push.Execute(2, true)
+			go Jobs.Push.Execute(1)
 			ticker := time.NewTicker(time.Hour * 24)
 			for {
 				select {
 				case <-ticker.C:
-					go Jobs.Push.Execute(2, true)
+					go Jobs.Push.Execute(1)
 				}
 			}
 		})
 	}
 }
 
-type NinePushTimeTask struct {
+type OncePushTimeTask struct {
 }
 
-func (n *NinePushTimeTask) Execute() {
-	h_m := strings.Split(SysConfig.OncePushTime, ":")
+func (o *OncePushTimeTask) Execute() {
+	h_m := strings.Split(Config.OncePushTime, ":")
 	if len(h_m) == 2 {
 		now := time.Now()
 		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
@@ -60,18 +64,56 @@ func (n *NinePushTimeTask) Execute() {
 			newDate = newDate.AddDate(0, 0, 1)
 		}
 		sub := newDate.Sub(now)
-		log.Println("start", SysConfig.OncePushTime, "pushjob after", sub)
+		log.Println("start", Config.OncePushTime, "OncePushTimeTask after", sub)
 		time.AfterFunc(sub, func() {
-			go Jobs.Push.Execute(4, true)
+			go Jobs.Push.Execute(2)
 			ticker := time.NewTicker(time.Hour * 24)
 			for {
 				select {
 				case <-ticker.C:
-					go Jobs.Push.Execute(4, true)
+					go Jobs.Push.Execute(2)
 				}
 			}
 		})
 	} else {
-		log.Fatalln("error:oncepushtime", SysConfig.OtherPushTimes)
+		log.Fatalln("OncePushTimeTask", Config.OtherPushTimes)
 	}
 }
+
+type RefreshTimeTask struct {
+}
+
+func (r *RefreshTimeTask) Execute() {
+	h_m := strings.Split(Config.RefreshTime, ":")
+	if len(h_m) == 2 {
+		now := time.Now()
+		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
+		if newDate.Before(now) {
+			newDate = newDate.AddDate(0, 0, 1)
+		}
+		sub := newDate.Sub(now)
+		log.Println("start", Config.OncePushTime, "RefreshTimeTask after", sub)
+		time.AfterFunc(sub, func() {
+			go Jobs.Refresh.Execute()
+			ticker := time.NewTicker(time.Hour * 24)
+			for {
+				select {
+				case <-ticker.C:
+					go Jobs.Refresh.Execute()
+				}
+			}
+		})
+	} else {
+		log.Fatalln("RefreshTimeTask", Config.RefreshTime)
+	}
+}
+
+type MoveTimeTask struct {
+}
+
+func (m *MoveTimeTask) Execute() {
+	time.AfterFunc(time.Duration(Config.MoveDuration), func() {
+		Jobs.Move.Execute()
+		Task.Move.Execute()
+	})
+}

+ 11 - 25
src/jfw/modules/pushsubscribe/src/push/main.go

@@ -10,59 +10,45 @@ import (
 	. "push/config"
 	"push/job"
 	prpc "push/rpc"
-	"push/util"
 	"qfw/util/mongodb"
 	"qfw/util/redis"
-	ca "ucbsutil/cassandra"
 
 	"github.com/donnie4w/go-logger/logger"
 )
 
 func main() {
 	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务")
-	taskType := flag.Int("t", 1, "1 实时推送;2 实时推送+一天三次的推送;3 一天三次推送;4 九点推送")
-	customCaDate := flag.String("d", "", "自定义日志库,推送数据时间;例(2006-01-02)")
-	moveDatas := flag.String("v", "y", "是否迁移数据")
+	taskType := flag.Int("t", 1, "1 一天三次推送;2 九点推送")
 	flag.Parse()
 	logger.SetConsole(false)
 	logger.SetRollingDaily("./logs", "push.log")
-	mongodb.InitMongodbPool(SysConfig.MgoSize, SysConfig.MgoAddr, "qfw")
-	redis.InitRedis(SysConfig.RedisServers)
-	//初始化cassandra
-	ca.ViewCacheLen = true
-	ca.InitCassandra("jianyu",
-		SysConfig.Cassandra.Size,
-		SysConfig.Cassandra.Host,
-		map[string]int{
-			"cachesize": SysConfig.Cassandra.Cachesize,
-			"timeout":   SysConfig.Cassandra.Timeout,
-		},
-	)
+	mongodb.InitMongodbPool(Config.MgoSize, Config.MgoAddr, "qfw")
+	log.Println("mongodb初始化完成!")
+	redis.InitRedis(Config.RedisServers)
+	log.Println("redis初始化完成!")
 	//register rpc
 	err := rpc.Register(new(prpc.Rpc))
 	if err != nil {
 		log.Fatalln("register rpc error", err)
 	}
 	rpc.HandleHTTP()
-	lis, err := net.Listen("tcp", ":"+SysConfig.RpcPort)
+	lis, err := net.Listen("tcp", ":"+Config.RpcPort)
 	if err != nil {
 		log.Fatalln("listen rpc error", err)
 	} else {
 		go http.Serve(lis, nil)
 	}
-	if *customCaDate != "" {
-		util.CustomCaDate = *customCaDate
-	}
 	//
 	log.Println("订阅推送-推送程序启动。。。")
-	isMoveDatas := *moveDatas == "y"
 	if *modle == 1 {
-		job.Jobs.Push.Execute(*taskType, isMoveDatas)
+		job.Jobs.Push.Execute(*taskType)
 	} else {
+		go job.Task.Move.Execute()
+		go job.Task.Refresh.Execute()
 		go job.Task.OtherPush.Execute()
-		go job.Task.NinePush.Execute()
+		go job.Task.OncePush.Execute()
 		if *modle == 2 {
-			job.Jobs.Push.Execute(*taskType, isMoveDatas)
+			job.Jobs.Push.Execute(*taskType)
 		}
 		<-chan bool(nil)
 	}

BIN
src/jfw/modules/pushsubscribe/src/push/push.exe~


+ 20 - 0
src/jfw/modules/pushsubscribe/src/push/util/db.go

@@ -0,0 +1,20 @@
+package util
+
+import (
+	"log"
+	. "push/config"
+	"qfw/util/mysql"
+)
+
+var Mysql *mysql.Mysql
+
+func init() {
+	Mysql = &mysql.Mysql{
+		DBName:   Config.Mysql.DbName,
+		Address:  Config.Mysql.Address,
+		UserName: Config.Mysql.UserName,
+		PassWord: Config.Mysql.PassWord,
+	}
+	Mysql.Init()
+	log.Println("mysql初始化完成!")
+}

+ 16 - 16
src/jfw/modules/pushsubscribe/src/push/util/rpccall.go

@@ -18,32 +18,32 @@ import (
 )
 
 var (
-	wxPushPool  = make(chan bool, SysConfig.WxPollSize)
-	appPushPool = make(chan bool, SysConfig.AppPollSize)
+	wxPushPool  = make(chan bool, Config.WxPoolSize)
+	appPushPool = make(chan bool, Config.AppPoolSize)
 )
 
 //微信远程调用,实现模板发送消息
-func SendWeixin(k *UserInfo, remark, title string) bool {
+func SendWeixin(k *UserInfo, remark, title, pushDate string) bool {
 	wxPushPool <- true
 	defer func() {
 		<-wxPushPool
 	}()
-	if SysConfig.WxSleep > 0 {
-		time.Sleep(time.Duration(SysConfig.WxSleep) * time.Millisecond)
+	if Config.WxSleep > 0 {
+		time.Sleep(time.Duration(Config.WxSleep) * time.Millisecond)
 	}
 	now := time.Now()
 	p := &qrpc.NotifyMsg{
 		Openid:      k.S_m_openid,
 		Title:       title,
 		Remark:      remark,
-		Detail:      SysConfig.WxGroup,
+		Detail:      Config.WxGroup,
 		Date:        "",
 		Service:     util.FormatDate(&now, util.Date_Short_Layout),
-		Color:       SysConfig.WxColor,
-		DetailColor: SysConfig.WxDetailColor,
-		Url:         SysConfig.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",rssset"),
+		Color:       Config.WxColor,
+		DetailColor: Config.WxDetailColor,
+		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"+"__"+pushDate),
 	}
-	ok, res := jy.WxPush(SysConfig.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
+	ok, res := jy.WxPush(Config.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
 	if !ok && (strings.Contains(res, "[46004]") || strings.Contains(res, "[65302]") || strings.Contains(res, "[43004]") || strings.Contains(res, "[40003]")) {
 		mongodb.Update("user", map[string]interface{}{"_id": bson.ObjectIdHex(k.Id)}, map[string]interface{}{
 			"$set": map[string]interface{}{
@@ -58,20 +58,20 @@ func SendApp(m map[string]interface{}) bool {
 	defer func() {
 		<-appPushPool
 	}()
-	if SysConfig.AppSleep > 0 {
-		time.Sleep(time.Duration(SysConfig.AppSleep) * time.Millisecond)
+	if Config.AppSleep > 0 {
+		time.Sleep(time.Duration(Config.AppSleep) * time.Millisecond)
 	}
-	return jy.AppPush(SysConfig.AppPushServiceRpc, m)
+	return jy.AppPush(Config.AppPushServiceRpc, m)
 }
 
 //
 func SendPcHelper(m map[string]interface{}) bool {
 	defer util.Catch()
-	if SysConfig.PcHelperSleep > 0 {
-		time.Sleep(time.Duration(SysConfig.PcHelperSleep) * time.Millisecond)
+	if Config.PcHelperSleep > 0 {
+		time.Sleep(time.Duration(Config.PcHelperSleep) * time.Millisecond)
 	}
 	var repl string
-	client, err := rpc.DialHTTP("tcp", SysConfig.PcHelper)
+	client, err := rpc.DialHTTP("tcp", Config.PcHelper)
 	if err != nil {
 		logger.Error(err.Error())
 		return false

+ 15 - 141
src/jfw/modules/pushsubscribe/src/push/util/util.go

@@ -6,17 +6,15 @@ import (
 	. "public"
 	. "push/config"
 	"qfw/util"
-	"qfw/util/jy"
 	"sort"
+	"strings"
 	"time"
-	ca "ucbsutil/cassandra"
 
 	"github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
-var cassandraPoll = make(chan bool, SysConfig.CassandraPollSize)
-var CustomCaDate = ""
+var savePool = make(chan bool, Config.SavePoolSize)
 
 //重新设置用户类型
 func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
@@ -106,150 +104,26 @@ func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int
 }
 
 //保存发送信息
-func SaveSendInfo(k *UserInfo, infos []*MatchInfo) string {
-	cassandraPoll <- true
+func SaveSendInfo(k *UserInfo, matchInfos []*MatchInfo) string {
+	savePool <- true
 	defer func() {
-		<-cassandraPoll
+		<-savePool
 	}()
-	if SysConfig.CassandraSleep > 0 {
-		time.Sleep(time.Duration(SysConfig.CassandraSleep) * time.Millisecond)
+	if Config.SaveSleep > 0 {
+		time.Sleep(time.Duration(Config.SaveSleep) * time.Millisecond)
 	}
-	now := time.Now()
-	if CustomCaDate != "" {
-		h := now.Hour()
-		m := now.Minute()
-		now, _ = time.Parse(util.Date_Short_Layout, CustomCaDate)
-		now = now.Add(time.Hour * time.Duration(h)).Add(time.Minute * time.Duration(m))
+	unix := time.Now().Unix()
+	values := []interface{}{}
+	for _, matchInfo := range matchInfos {
+		values = append(values, k.Id, util.ObjToString((*matchInfo.Info)["_id"]), unix, strings.Join(matchInfo.Keys, " "), util.ObjToString((*matchInfo.Info)["area"]), util.ObjToString((*matchInfo.Info)["city"]), util.ObjToString((*matchInfo.Info)["buyerclass"]))
 	}
-	date := now.Unix()
-	dateymd := now.Format(util.Date_yyyyMMdd)
-	var saves []*map[string]interface{}
-	for _, v := range infos {
-		wxpush := map[string]interface{}{
-			"dateymd":    dateymd,
-			"uid":        k.Id,
-			"date":       date,
-			"pushinfo":   fmt.Sprint((*v.Info)["_id"]),
-			"keys":       v.Keys,
-			"area":       util.ObjToString((*v.Info)["area"]),
-			"city":       util.ObjToString((*v.Info)["city"]),
-			"buyerclass": util.ObjToString((*v.Info)["buyerclass"]),
-		}
-		time.Sleep(200 * time.Millisecond)
-		if ca.Save("jy_pushhistory", wxpush) {
-			(*v.Info)["matchkeys"] = v.Keys
-			saves = append(saves, v.Info)
-		}
-	}
-	if len(saves) > 0 {
-		updateRedis(date, k, &saves)
-		return fmt.Sprint(date)
+	savecount := Mysql.InsertBatch("pushsubscribe", []string{"userid", "infoid", "date", "matchkeys", "area", "city", "buyerclass"}, values)
+	if int(savecount) != len(values) {
+		logger.Error(k.Id, "批量保存有问题", len(values), savecount)
 	}
-	return ""
+	return fmt.Sprint(unix)
 }
 
-func updateRedis(date int64, k *UserInfo, infos *[]*map[string]interface{}) {
-	pc_a, pc_a_err := jy.HistoryPush.GetPushCache_A(k.Id)
-	pc_b, pc_b_err := jy.HistoryPush.GetPushCache_B(k.Id)
-	if pc_a_err != nil {
-		logger.Error("updateRedis error", k.Id, pc_a_err, pc_b_err)
-		return
-	}
-	if pc_a == nil {
-		return
-	}
-	list_a := []map[string]interface{}{}
-	list_b := []map[string]interface{}{}
-	for k, info := range *infos {
-		newInfo := jy.HistoryPush.InfoFormat(&jy.PushCa{
-			Date:   date,
-			InfoId: util.ObjToString((*info)["_id"]),
-			Index:  k + 1,
-		}, info)
-		list_a = append(list_a, newInfo)
-	}
-	length_a := len(list_a) + len(pc_a.Infos)
-	switch pc_a.Type {
-	case 0, 1:
-		pc_a.Type = 1
-		list_a = append(list_a, pc_a.Infos...)
-		if length_a > 500 {
-			if pc_b != nil {
-				list_b = append(list_b, list_a[500:]...)
-			}
-			list_a = list_a[:500]
-			pc_a.Count = 500
-		} else if length_a == 500 {
-			pc_a.Count = 0
-		} else {
-			pc_a.Count = length_a
-		}
-		break
-	case 2:
-		list_a = append(list_a, pc_a.Infos...)
-		pc_a.Type = 3
-		if length_a > 500 {
-			if pc_b != nil {
-				list_b = append(list_b, list_a[500:]...)
-			}
-			list_a = list_a[:500]
-			pc_a.Count = 500 - len(*infos)
-		} else if length_a == 500 {
-			pc_a.Count = len(pc_a.Infos)
-		}
-		break
-	case 3:
-		if length_a > 500 {
-			if pc_a.Count > 0 && pc_a.Count < len(pc_a.Infos) {
-				pc_a_a := pc_a.Infos[:500-pc_a.Count]
-				pc_a_b := pc_a.Infos[500-pc_a.Count:]
-				list_a = append(list_a, pc_a_a...)
-				if len(list_a) > 500 {
-					pc_a.Type = 1
-					pc_a.Count = 500
-					if pc_b != nil {
-						list_b = append(list_b, list_a[500:]...)
-						list_b = append(list_b, pc_a_b...)
-					}
-					list_a = list_a[:500]
-				} else if len(list_a) == 500 {
-					pc_a.Type = 1
-					pc_a.Count = 0
-					if pc_b != nil {
-						list_b = append(list_b, pc_a_b...)
-					}
-				} else {
-					needLength := 500 - len(list_a)
-					if needLength > len(pc_a_b) {
-						list_a = append(list_a, pc_a_b...)
-						pc_a.Count = len(pc_a_b)
-					} else {
-						list_a = append(list_a, pc_a_b[:needLength]...)
-						if pc_b != nil {
-							list_b = append(list_b, pc_a_b[needLength:]...)
-						}
-						pc_a.Count = needLength
-					}
-				}
-			} else {
-				jy.HistoryPush.ClearPushCache(k.Id)
-				logger.Error(k.Id, "count in redis cache is error", pc_a.Count, len(pc_a.Infos))
-			}
-		} else {
-			list_a = append(list_a, pc_a.Infos...)
-		}
-		break
-	}
-	pc_a.Infos = list_a
-	jy.HistoryPush.PutPushCache_A(k.Id, pc_a)
-	if pc_b != nil {
-		list_b = append(list_b, pc_b...)
-		if len(list_b) > 3500 {
-			list_b = list_b[:3500]
-		}
-		jy.HistoryPush.PutPushCache_B(k.Id, list_b)
-	}
-}
 func ToObjectIds(ids []string) []bson.ObjectId {
 	_ids := []bson.ObjectId{}
 	for _, v := range ids {

+ 1 - 1
src/jfw/modules/subscribepay/src/timetask/timetask.go

@@ -156,7 +156,7 @@ func crontab(flag bool, c string, f func()) {
 	}
 	now := time.Now()
 	t := time.Date(now.Year(), now.Month(), now.Day(), qutil.IntAll(array[0]), qutil.IntAll(array[1]), 0, 0, time.Local)
-	if t.Sub(now).Nanoseconds() < 0 {
+	if t.Before(now) {
 		t = t.AddDate(0, 0, 1)
 	}
 	timer := time.NewTimer(t.Sub(now))