wcj 6 年之前
父节点
当前提交
6a2d67333f

+ 2 - 1
src/jfw/modules/pushsubscribe/src/match/config.json

@@ -11,5 +11,6 @@
 	"matchPoolSize": 60,
 	"matchDuration": 1, 
 	"userBatch":2,
-	"pcHelper":"127.0.0.1:8082"
+	"pcHelper":"127.0.0.1:8082",
+	"mailReg":"^.+@.+$"
 }

+ 1 - 0
src/jfw/modules/pushsubscribe/src/match/config/config.go

@@ -18,6 +18,7 @@ type sysConfig struct {
 	MatchDuration   int64    `json:"matchDuration"`
 	UserBatch       int      `json:"userBatch"`
 	PcHelper        string   `json:"pcHelper"`
+	MailReg         string   `json:"mailReg"`
 }
 
 type taskConfig struct {

+ 19 - 7
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -12,6 +12,7 @@ import (
 	"qfw/util/elastic"
 	"qfw/util/mongodb"
 	"qfw/util/redis"
+	"regexp"
 	"sort"
 	"strings"
 	"sync"
@@ -23,6 +24,7 @@ import (
 
 var (
 	SaveFields = []string{"_id", "area", "bidamount", "bidopentime", "budget", "buyer", "otitle", "projectname", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type", "winner"}
+	MailReg    = regexp.MustCompile(SysConfig.MailReg)
 )
 
 type Pjob struct {
@@ -335,13 +337,14 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) int {
 		isPush := util.IntAllDef(temp["i_ispush"], 1)
 		jpushid := util.ObjToString(temp["s_jpushid"])
 		opushid := util.ObjToString(temp["s_opushid"])
+		appPhoneType := util.ObjToString(temp["s_appponetype"])
 		pchelperPush := 0
 		//公众号取关用户
 		if userType == 0 && isPush == 0 {
 			continue
-		} else if userType == 2 && jpushid == "" && opushid == "" {
+		} else if userType == 2 && jpushid == "" && (opushid == "" || appPhoneType == "") {
 			continue
-		} else if (userType == 1 || (userType == 5 && isPush == 0)) && jpushid == "" && opushid == "" {
+		} else if (userType == 1 || (userType == 5 && isPush == 0)) && jpushid == "" && (opushid == "" || appPhoneType == "") {
 			if s_phone == "" || !mutil.PcHelperIsOnLine(s_phone) {
 				continue
 			} else {
@@ -351,6 +354,13 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) int {
 		applystatus := util.IntAll(temp["i_applystatus"])
 		o_msgset, _ := temp["o_jy"].(map[string]interface{})
 		wxpush, apppush, mailpush := mutil.ModeTransform(userType, o_msgset)
+		email := strings.TrimSpace(util.ObjToString(o_msgset["s_email"]))
+		if !MailReg.MatchString(email) {
+			mailpush = 0
+		}
+		if wxpush != 1 && apppush != 1 && mailpush != 1 {
+			continue
+		}
 		var allkeysTemp []elastic.KeyConfig
 		_bs, err := json.Marshal(o_msgset["a_key"])
 		if err == nil {
@@ -382,7 +392,9 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) int {
 		}
 		userId := fmt.Sprintf("%x", string(temp["_id"].(bson.ObjectId)))
 		smartset := util.IntAll(temp["i_smartset"])
-		logger.Info("第", user_batch_index, "批用户,userid", userId, "s_m_openid", s_m_openid, "a_m_openid", a_m_openid, "s_phone", s_phone, "jpushid", jpushid, "opushid", opushid, "applystatus", applystatus, "smartset", smartset)
+		rateMode := util.IntAllDef(o_msgset["i_ratemode"], 2)
+		dataExport := util.IntAll(temp["i_dataexport"])
+		logger.Info("第", user_batch_index, "批用户,userid", userId, "s_m_openid", s_m_openid, "a_m_openid", a_m_openid, "s_phone", s_phone, "jpushid", jpushid, "opushid", opushid, "applystatus", applystatus, "smartset", smartset, "email", email, "rateMode", rateMode, "dataExport", dataExport, "wxpush", wxpush, "apppush", apppush, "mailpush", mailpush)
 		keys := []string{}                           //过滤后的关键词
 		notkeys := []string{}                        //排除词
 		key_notkey := map[string]map[string]bool{}   //关键词所对应的排除词
@@ -443,19 +455,19 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) int {
 			AppPush:      apppush,
 			MailPush:     mailpush,
 			PchelperPush: pchelperPush,
-			Email:        util.ObjToString(o_msgset["s_email"]),
+			Email:        email,
 			S_m_openid:   s_m_openid,
 			A_m_openid:   a_m_openid,
 			Phone:        s_phone,
 			Jpushid:      jpushid,
 			Opushid:      opushid,
 			UserType:     userType,
-			RateMode:     util.IntAllDef(o_msgset["i_ratemode"], 2),
+			RateMode:     rateMode,
 			AllKeys:      allkeysTemp, //原始关键词
 			SmartSet:     smartset,
-			DataExport:   util.IntAll(temp["i_dataexport"]),
+			DataExport:   dataExport,
 			ModifyDate:   modifydate,
-			AppPhoneType: util.ObjToString(temp["s_appponetype"]),
+			AppPhoneType: appPhoneType,
 			ApplyStatus:  applystatus,
 			Subscribe:    isPush,
 			MergeOrder:   temp["a_mergeorder"],

+ 23 - 10
src/jfw/modules/pushsubscribe/src/push/job/dopush.go

@@ -32,12 +32,17 @@ func init() {
 		for {
 			select {
 			case <-t.C:
-				for i := 0; i < SysConfig.MinutePushSize-len(DoPush.minutePushPool); i++ {
+				logger.Info("开始执行1分钟定时任务。。。")
+				m_size := SysConfig.MinutePushSize - len(DoPush.minutePushPool)
+				for i := 0; i < m_size; i++ {
 					DoPush.minutePushPool <- true
 				}
-				for i := 0; i < SysConfig.FastigiumMinutePushSize-len(DoPush.fastigiumMinutePushPool); i++ {
+				logger.Info("1分钟定时任务 minutePushPool", m_size, len(DoPush.minutePushPool))
+				f_size := SysConfig.FastigiumMinutePushSize - len(DoPush.fastigiumMinutePushPool)
+				for i := 0; i < f_size; i++ {
 					DoPush.fastigiumMinutePushPool <- true
 				}
+				logger.Info("1分钟定时任务 fastigiumMinutePushPool", f_size, len(DoPush.fastigiumMinutePushPool))
 			}
 		}
 	}()
@@ -178,27 +183,32 @@ func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k
 		}
 	}
 	if i == 0 {
-		logger.Info("推送任务", taskType, "没有要推送的数据!", k.S_m_openid, k.A_m_openid, k.Phone)
+		logger.Info("推送任务", taskType, "没有要推送的数据!", k.Id)
 		return
 	}
 	if taskType != 0 && isSave {
 		//推送记录id
 		pushId := putil.SaveSendInfo(k, pushIds, infos)
 		if pushId == "" {
-			logger.Info("推送任务", taskType, "保存到cassandra出错", k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
+			logger.Info("推送任务", taskType, "保存到cassandra出错", k.Id)
 			return
 		} else {
-			logger.Info("推送任务", taskType, "成功保存到cassandra", pushId, k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
+			logger.Info("推送任务", taskType, "成功保存到cassandra", pushId, k.Id)
 		}
 		isSaveSuccess = true
 		//pc端助手推送
 		if k.S_m_openid != "" {
-			putil.SendPcHelper(map[string]interface{}{"clientCode": k.S_m_openid})
+			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "s_m_openid", k.S_m_openid)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.S_m_openid})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "s_m_openid", k.S_m_openid)
 		}
 		if k.Phone != "" {
-			putil.SendPcHelper(map[string]interface{}{"clientCode": k.Phone})
+			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "phone", k.Phone)
+			isPushOk := putil.SendPcHelper(map[string]interface{}{"clientCode": k.Phone})
+			logger.Info("推送任务", taskType, "助手推送结束", isPushOk, k.Id, "phone", k.Phone)
 		}
 	}
+	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
 	//限制一分钟最大的推送数量
 	hour := time.Now().Hour()
 	fastigiumStart, fastigiumEnd := 0, 0
@@ -213,6 +223,7 @@ func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k
 		<-d.minutePushPool //正常期
 	}
 	if wxPush == 1 {
+		logger.Info("推送任务", taskType, "开始微信推送", k.Id)
 		isPushOk := true
 		if k.ApplyStatus == 1 {
 			TmpTip := ""
@@ -288,9 +299,10 @@ func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k
 				wxStatus = -1
 			}
 		}
-		logger.Info("推送任务", taskType, "微信推送", isPushOk, k.Id, k.S_m_openid, k.RateMode, k.ApplyStatus)
+		logger.Info("推送任务", taskType, "微信推送结束", isPushOk, k.Id)
 	}
 	if appPush == 1 {
+		logger.Info("推送任务", taskType, "开始app推送", k.Id)
 		if len([]rune(jpushtitle)) > 80 {
 			jpushtitle = string([]rune(jpushtitle)[:80]) + "..."
 		}
@@ -320,10 +332,11 @@ func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k
 		} else {
 			appStatus = -1
 		}
-		logger.Info("推送任务", taskType, "app推送", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.AppPhoneType, k.Jpushid, k.Opushid, k.RateMode)
+		logger.Info("推送任务", taskType, "app推送结束", isPushOk, k.Id)
 	}
 	//发送邮件
 	if mailPush == 1 {
+		logger.Info("推送任务", taskType, "开始邮箱推送", k.Id)
 		html := fmt.Sprintf(SysConfig.Mail_html, strings.Replace(strings.Join(k.OriginalKeys, ";"), "+", " ", -1), mailContent)
 		subject := fmt.Sprintf(SysConfig.Mail_title, "招标")
 		isPushOk := d.SendMail(k.Email, subject, html, fmdatas)
@@ -332,7 +345,7 @@ func (d *doPush) Do(taskType int, isSave bool, wxPush, appPush, mailPush int, k
 		} else {
 			mailStatus = -1
 		}
-		logger.Info("推送任务", taskType, "发送邮件", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.Email, k.DataExport)
+		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, k.Id)
 	}
 	return
 }

+ 11 - 3
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -1,6 +1,9 @@
 package job
 
 import (
+	"fmt"
+	"net/http"
+	"net/url"
 	. "public"
 	. "push/config"
 	putil "push/util"
@@ -260,7 +263,12 @@ func (p *pushJob) Push() {
 		batch_index++
 		batch_size := p.OncePushBatch(batch_index)
 		for _, temp := range *p.users {
-			p.pool <- true
+			select {
+			case <-time.After(10 * time.Minute):
+				http.Get("http://123.56.236.148:19281/_send/_mail?program=testgo2&to=wangchuanjin@topnet.net.cn&title=push_剑鱼平台报警&body=" + url.QueryEscape(fmt.Sprintf("等待推送超过10分钟,%v minutePushPool %d fastigiumMinutePushPool %d", temp["userid"], len(DoPush.minutePushPool), len(DoPush.fastigiumMinutePushPool))))
+				p.pool <- true
+			case p.pool <- true:
+			}
 			p.wait.Add(1)
 			go func(v map[string]interface{}) {
 				defer func() {
@@ -292,7 +300,7 @@ func (p *pushJob) Push() {
 					MergeOrder:    v["mergeorder"],
 					FirstPushTime: util.Int64All(v["firstpushtime"]),
 				}
-				logger.Info("推送任务", p.taskType, "开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid)
+				logger.Info("推送任务", p.taskType, "开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid, "phoneType", u.AppPhoneType, "rateMode", u.RateMode, "smartSet", u.SmartSet, "email", u.Email, "dataExport", u.DataExport)
 				wxPush, appPush, mailPush := 0, 0, 0
 				if p.taskType == 1 {
 					if u.WxPush == 1 {
@@ -343,7 +351,7 @@ func (p *pushJob) Push() {
 						mailPush = 0
 					}
 				}
-				logger.Info("推送任务", p.taskType, "用户接收方式,userid", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "t_wxPush", t_wxPush, "t_mailPush", t_mailPush)
+				logger.Info("推送任务", p.taskType, "用户接收方式,userid", u.Id, "wxPush", wxPush, "appPush", appPush, "mailPush", mailPush, "pchelperPush", u.PchelperPush, "t_wxPush", t_wxPush, "t_mailPush", t_mailPush)
 				if wxPush != 1 && appPush != 1 && mailPush != 1 {
 					return
 				}

+ 1 - 4
src/jfw/modules/pushsubscribe/src/push/util/util.go

@@ -116,23 +116,20 @@ func SaveSendInfo(k *UserInfo, pushIds []string, infos []map[string]interface{})
 		time.Sleep(time.Duration(SysConfig.CassandraSleep) * time.Millisecond)
 	}
 	now := time.Now()
-
 	if CustomCaDate != "" {
 		h := now.Hour()
 		m := now.Minute()
 		now, _ = time.Parse(util.Date_Short_Layout, CustomCaDate)
 		now = now.Add(time.Hour * time.Duration(h)).Add(time.Minute * time.Duration(m))
 	}
-
 	date := now.Unix()
-
 	wxpush := map[string]interface{}{
 		"dateymd":  now.Format(util.Date_yyyyMMdd),
 		"uid":      k.Id,
 		"date":     date,
 		"pushinfo": strings.Join(pushIds, ","),
 	}
-	if ca.SaveCacheByTimeOut("jy_pushsubscribe", wxpush, 10) {
+	if ca.Save("jy_pushsubscribe", wxpush) {
 		updateRedis(date, k, infos)
 		return fmt.Sprint(date)
 	}

+ 1 - 1
src/jfw/modules/pushsubscribe/src/statistics/main.go

@@ -29,7 +29,7 @@ func main() {
 	redis.InitRedis(SysConfig.RedisServers)
 	mongodb.InitMongodbPool(SysConfig.MgoSize, SysConfig.MgoAddr, "qfw")
 	statistics()
-	ticker := time.NewTicker(3 * time.Minute)
+	ticker := time.NewTicker(30 * time.Minute)
 	for {
 		select {
 		case <-ticker.C: