Selaa lähdekoodia

Merge branch 'dev2.8.5' of http://192.168.3.207:10080/qmx/jy into dev2.8.5

wangkaiyue 5 vuotta sitten
vanhempi
commit
45a85217c7
30 muutettua tiedostoa jossa 548 lisäystä ja 518 poistoa
  1. 2 1
      src/jfw/front/dataExport.go
  2. 2 20
      src/jfw/modules/pushsubscribe/src/match/job/matchjob.go
  3. 22 1
      src/jfw/modules/pushsubscribe/src/public/util.go
  4. 0 1
      src/jfw/modules/pushsubscribe/src/push/config.json
  5. 0 2
      src/jfw/modules/pushsubscribe/src/push/config/config.go
  6. 0 5
      src/jfw/modules/pushsubscribe/src/push/job/jobs.go
  7. 8 8
      src/jfw/modules/pushsubscribe/src/push/job/movejob.go
  8. 0 67
      src/jfw/modules/pushsubscribe/src/push/job/normalpush.go
  9. 0 7
      src/jfw/modules/pushsubscribe/src/push/job/pusher.go
  10. 33 133
      src/jfw/modules/pushsubscribe/src/push/job/pushjob.go
  11. 15 113
      src/jfw/modules/pushsubscribe/src/push/job/repairjob.go
  12. 0 67
      src/jfw/modules/pushsubscribe/src/push/job/specialpush.go
  13. 14 48
      src/jfw/modules/pushsubscribe/src/push/job/timetask.go
  14. 0 1
      src/jfw/modules/pushsubscribe/src/push/main.go
  15. 150 0
      src/jfw/modules/pushsubscribe/src/push/pusher/normalpush.go
  16. 12 0
      src/jfw/modules/pushsubscribe/src/push/pusher/pusher.go
  17. 82 0
      src/jfw/modules/pushsubscribe/src/push/pusher/repairpush.go
  18. 47 0
      src/jfw/modules/pushsubscribe/src/push/pusher/specialpush.go
  19. 5 0
      src/jfw/modules/pushsubscribe/src/push/util/db.go
  20. 10 0
      src/jfw/modules/pushsubscribe/src/push/util/entity.go
  21. 2 3
      src/jfw/modules/pushsubscribe/src/push/util/rpccall.go
  22. 53 0
      src/jfw/modules/pushsubscribe/src/push/util/util.go
  23. 10 2
      src/jfw/modules/subscribepay/src/entity/dataexport.go
  24. 37 9
      src/jfw/modules/subscribepay/src/entity/subscribeVip.go
  25. 13 8
      src/jfw/modules/subscribepay/src/main_test.go
  26. 5 4
      src/jfw/modules/subscribepay/src/service/commonAction.go
  27. 20 7
      src/jfw/modules/subscribepay/src/service/orderListDetails.go
  28. 4 2
      src/jfw/modules/subscribepay/src/timetask/timetask.go
  29. 0 4
      src/web/templates/pc/myOrder.html
  30. 2 5
      src/web/templates/weixin/vipsubscribe/renew_pay.html

+ 2 - 1
src/jfw/front/dataExport.go

@@ -193,7 +193,8 @@ func (d *DataExport) QueryOrder() error {
 	} else {
 		query["order_status"] = map[string]interface{}{"ne": orderStatus_deleted}
 	}
-
+	//2.8.5  PC端 不显示vip订单
+	query["product_type"] = "历史数据"
 	//总数
 	countData := public.Mysql.Find(tableName_order, query, "", "", -1, 0)
 	count := len(*countData)

+ 2 - 20
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -301,25 +301,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 	logger.Info("开始加载第", user_batch_index, "批用户", q)
 	session := mongodb.GetMgoConn()
 	defer mongodb.DestoryMongoConn(session)
-	query := session.DB(DbName).C("user").Find(&q).Select(&map[string]interface{}{
-		"_id":             1,
-		"o_jy":            1,
-		"o_vipjy":         1,
-		"i_vip_status":    1,
-		"s_m_openid":      1,
-		"a_m_openid":      1,
-		"s_phone":         1,
-		"s_jpushid":       1,
-		"s_opushid":       1,
-		"i_ispush":        1,
-		"i_type":          1,
-		"i_supersearch":   1,
-		"s_appponetype":   1,
-		"i_applystatus":   1,
-		"a_mergeorder":    1,
-		"s_nickname":      1,
-		"l_firstpushtime": 1,
-	}).Iter()
+	query := session.DB(DbName).C("user").Find(&q).Select(public.UserCollFields).Iter()
 	n := 0
 	freeUser := NewFreeUser() //免费所有用户
 	vipUser := NewVipUser()   //vip所有用户
@@ -334,7 +316,7 @@ func (m *MatchJob) OnceUserBatch(user_batch_index int) (int, *VipUser, *FreeUser
 	title_notkey_user := make(map[string]*[]*UserInfo)
 	nowymd := util.NowFormat(util.Date_yyyyMMdd)
 	for temp := make(map[string]interface{}); query.Next(temp); {
-		user, o_msgset := public.NewUserInfo(temp)
+		user, o_msgset := public.NewUserInfoByUserColl(temp)
 		isVipUser := IsVipUser(user.VipStatus)
 		var allKeySet []*KeySet
 		var err error

+ 22 - 1
src/jfw/modules/pushsubscribe/src/public/util.go

@@ -13,6 +13,26 @@ import (
 
 var MailReg = regexp.MustCompile("^.+@.+$")
 
+var UserCollFields = map[string]interface{}{
+	"_id":             1,
+	"o_jy":            1,
+	"o_vipjy":         1,
+	"i_vip_status":    1,
+	"s_m_openid":      1,
+	"a_m_openid":      1,
+	"s_phone":         1,
+	"s_jpushid":       1,
+	"s_opushid":       1,
+	"i_ispush":        1,
+	"i_type":          1,
+	"i_supersearch":   1,
+	"s_appponetype":   1,
+	"i_applystatus":   1,
+	"a_mergeorder":    1,
+	"s_nickname":      1,
+	"l_firstpushtime": 1,
+}
+
 func IsVipUser(vipStatus int) bool {
 	if vipStatus == 1 || vipStatus == 2 {
 		return true
@@ -117,7 +137,8 @@ func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int
 	return wxpush, apppush, mailpush
 }
 
-func NewUserInfo(temp map[string]interface{}) (user *UserInfo, o_msgset map[string]interface{}) {
+//根据user表中的数据,生成UserInfo
+func NewUserInfoByUserColl(temp map[string]interface{}) (user *UserInfo, o_msgset map[string]interface{}) {
 	userId := fmt.Sprintf("%x", string(temp["_id"].(bson.ObjectId)))
 	s_m_openid := util.ObjToString(temp["s_m_openid"])
 	a_m_openid := util.ObjToString(temp["a_m_openid"])

+ 0 - 1
src/jfw/modules/pushsubscribe/src/push/config.json

@@ -43,7 +43,6 @@
 	"appPushServiceRpc":"127.0.0.1:5566",
 	"pcHelper":"192.168.20.129:8082",
 	"oncePushTime": "9:00",
-	"refreshTime": "7:00",
 	"otherPushTimes":[
 		"07:30",
 		"14:00",

+ 0 - 2
src/jfw/modules/pushsubscribe/src/push/config/config.go

@@ -9,7 +9,6 @@ import (
 
 type config struct {
 	JianyuDomain            string      `json:"jianyuDomain"`
-	Cassandra               *cassandra  `json:"cassandra"`
 	RedisServers            string      `json:"redisServers"`
 	PushPoolSize            int         `json:"pushPoolSize"`
 	MergePoolSize           int         `json:"mergePoolSize"`
@@ -34,7 +33,6 @@ type config struct {
 	PushBatch               int         `json:"pushBatch"`
 	OncePushTime            string      `json:"oncePushTime"`
 	OtherPushTimes          []string    `json:"otherPushTimes"`
-	RefreshTime             string      `json:"refreshTime"`
 	WxPoolSize              int         `json:"wxPoolSize"`
 	AppPoolSize             int         `json:"appPoolSize"`
 	MailSleep               int         `json:"mailSleep"`

+ 0 - 5
src/jfw/modules/pushsubscribe/src/push/job/jobs.go

@@ -5,11 +5,6 @@ import (
 	"sync"
 )
 
-const (
-	BulkSize = 200
-	DbName   = "qfw"
-)
-
 var Jobs = struct {
 	Move    *moveJob
 	Refresh *refreshJob

+ 8 - 8
src/jfw/modules/pushsubscribe/src/push/job/movejob.go

@@ -34,7 +34,7 @@ func (m *moveJob) Execute() {
 	nowUnix := time.Now().Unix()
 	sess := mongodb.GetMgoConn()
 	defer mongodb.DestoryMongoConn(sess)
-	it := sess.DB(DbName).C("pushspace_temp").Find(map[string]interface{}{
+	it := sess.DB(putil.DbName).C("pushspace_temp").Find(map[string]interface{}{
 		"timestamp": map[string]interface{}{
 			"$lt": nowUnix,
 		},
@@ -130,7 +130,7 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 			sess := mongodb.GetMgoConn()
 			defer mongodb.DestoryMongoConn(sess)
 			var data map[string]interface{}
-			err := sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
+			err := sess.DB(putil.DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1}).One(&data)
 			if err != nil {
 				logger.Error(userId, "获取用户pushspace表中出数据出错", err)
 				return
@@ -139,7 +139,7 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 				m.mergeLock.Lock()
 				saveArray = append(saveArray, move.info)
 				saveArray_delete = append(saveArray_delete, move.ids...)
-				if len(saveArray) == BulkSize {
+				if len(saveArray) == putil.BulkSize {
 					m.saveBulk(sess, &saveArray, &saveArray_delete)
 				}
 				m.mergeLock.Unlock()
@@ -195,7 +195,7 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 				updateArray_delete = append(updateArray_delete, move.ids...)
 				updateArray_query = append(updateArray_query, map[string]interface{}{"_id": data["_id"]})
 				updateArray_set = append(updateArray_set, upSet)
-				if len(updateArray_query) == BulkSize {
+				if len(updateArray_query) == putil.BulkSize {
 					m.updateBulk(sess, &updateArray_query, &updateArray_set, &updateArray_delete)
 				}
 				m.mergeLock.Unlock()
@@ -218,7 +218,7 @@ func (m *moveJob) merge(number *int, nowUnix int64, moves map[string]*moveJob) {
 	logger.Info("第", *number, "次合并数据结束。。。", index)
 }
 func (m *moveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, deletes *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace")
+	coll := sess.DB(putil.DbName).C("pushspace")
 	bulk := coll.Bulk()
 	for _, v := range *saves {
 		bulk.Insert(v)
@@ -232,7 +232,7 @@ func (m *moveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, d
 	*saves = []map[string]interface{}{}
 }
 func (m *moveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]interface{}, array_d *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace")
+	coll := sess.DB(putil.DbName).C("pushspace")
 	bulk := coll.Bulk()
 	for k, v := range *array_q {
 		bulk.Update(v, (*array_s)[k])
@@ -247,7 +247,7 @@ func (m *moveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]i
 	*array_s = []map[string]interface{}{}
 }
 func (m *moveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
-	coll := sess.DB(DbName).C("pushspace_temp")
+	coll := sess.DB(putil.DbName).C("pushspace_temp")
 	count := 0
 	bulk := coll.Bulk()
 	for _, v := range *array {
@@ -255,7 +255,7 @@ func (m *moveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
 		bulk.Remove(map[string]interface{}{
 			"_id": v,
 		})
-		if count == BulkSize {
+		if count == putil.BulkSize {
 			_, err := bulk.Run()
 			if nil != err {
 				logger.Info("DelBulkError", err)

+ 0 - 67
src/jfw/modules/pushsubscribe/src/push/job/normalpush.go

@@ -1,67 +0,0 @@
-package job
-
-import (
-	. "push/config"
-	"qfw/util"
-	"qfw/util/mongodb"
-
-	"github.com/donnie4w/go-logger/logger"
-	"gopkg.in/mgo.v2/bson"
-)
-
-//正常推送,一天推送三次或者一天一次
-type NormalPush struct{}
-
-func (n *NormalPush) OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{}) {
-	users := &[]map[string]interface{}{}
-	i := 0
-	lastId := ""
-	var query map[string]interface{}
-	//根据任务类型,查找ratemode
-	if taskType == 1 {
-		query = map[string]interface{}{
-			"ratemode": map[string]interface{}{
-				"$in": []int{1, 3, 4},
-			},
-		}
-	} else if taskType == 2 {
-		query = map[string]interface{}{
-			"ratemode": 2,
-		}
-	} else {
-		logger.Error("taskType error", taskType)
-		return i, users
-	}
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	if len(Config.TestIds) > 0 {
-		query["userid"] = map[string]interface{}{
-			"$in": Config.TestIds,
-		}
-	}
-	if *startId != "" {
-		query["_id"] = map[string]interface{}{
-			"$gt": bson.ObjectIdHex(*startId),
-		}
-	}
-	logger.Info("推送任务", taskType, "开始加载第", batchIndex, "批用户", query)
-	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
-	for temp := make(map[string]interface{}); it.Next(&temp); {
-		i++
-		lastId = util.BsonIdToSId(temp["_id"])
-		*users = append(*users, temp)
-		temp = make(map[string]interface{})
-		if i == Config.PushBatch {
-			break
-		}
-	}
-	logger.Info("推送任务", taskType, "第", batchIndex, "批用户加载结束", lastId)
-	return i, users
-}
-
-func (n *NormalPush) GetUser() {
-
-}
-func (n *NormalPush) AfterPush() {
-
-}

+ 0 - 7
src/jfw/modules/pushsubscribe/src/push/job/pusher.go

@@ -1,7 +0,0 @@
-package job
-
-type Pusher interface {
-	OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{})
-	GetUser()
-	AfterPush()
-}

+ 33 - 133
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -6,12 +6,12 @@ import (
 	"os"
 	. "public"
 	. "push/config"
+	. "push/pusher"
 	putil "push/util"
 	"qfw/util"
 	"qfw/util/mail"
 	"qfw/util/mongodb"
 	"qfw/util/redis"
-	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -46,6 +46,14 @@ type pushJob struct {
 
 //taskType 1--一天三次推送 2--九点推送
 func (p *pushJob) Execute(taskType int) {
+	p.startPush(taskType)
+	if taskType == 2 {
+		p.startPush(3)
+	}
+}
+
+//开始推送
+func (p *pushJob) startPush(taskType int) {
 	defer util.Catch()
 	var pusher Pusher
 	if taskType == 1 || taskType == 2 {
@@ -61,7 +69,7 @@ func (p *pushJob) Execute(taskType int) {
 	startId := ""
 	for {
 		batchIndex++
-		batch_size, users := pusher.OncePushBatch(taskType, batchIndex, &startId)
+		isBreak, users := pusher.OncePushBatch(taskType, batchIndex, &startId)
 		for _, temp := range *users {
 			isTake := true
 			select {
@@ -85,29 +93,9 @@ func (p *pushJob) Execute(taskType int) {
 					}
 					p.wait.Done()
 				}()
-				words, _ := v["words"].([]interface{})
-				u := &UserInfo{
-					Id:            util.ObjToString(v["userid"]),
-					Keys:          util.ObjArrToStringArr(words),
-					WxPush:        util.IntAll(v["wxpush"]),
-					AppPush:       util.IntAll(v["apppush"]),
-					MailPush:      util.IntAll(v["mailpush"]),
-					PchelperPush:  util.IntAll(v["pchelperpush"]),
-					Email:         util.ObjToString(v["email"]),
-					S_m_openid:    util.ObjToString(v["s_m_openid"]),
-					A_m_openid:    util.ObjToString(v["a_m_openid"]),
-					Phone:         util.ObjToString(v["phone"]),
-					Jpushid:       util.ObjToString(v["jpushid"]),
-					Opushid:       util.ObjToString(v["opushid"]),
-					UserType:      util.IntAll(v["usertype"]),
-					RateMode:      util.IntAllDef(v["ratemode"], 1),
-					AppPhoneType:  util.ObjToString(v["appphonetype"]),
-					ApplyStatus:   util.IntAll(v["applystatus"]),
-					Subscribe:     util.IntAllDef(v["subscribe"], 1),
-					ModifyDate:    util.ObjToString(v["modifydate"]),
-					MergeOrder:    v["mergeorder"],
-					FirstPushTime: util.Int64All(v["firstpushtime"]),
-					VipStatus:     util.IntAll(v["vipstatus"]),
+				u := pusher.GetUserInfo(v)
+				if u == nil {
+					return
 				}
 				logger.Info("推送任务", p.taskType, "开始推送用户", "userType", u.UserType, "userId", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid, "phoneType", u.AppPhoneType, "rateMode", u.RateMode, "email", u.Email)
 				wxPush, appPush, mailPush := 0, 0, 0
@@ -141,49 +129,11 @@ func (p *pushJob) Execute(taskType int) {
 						mailPush = 0
 					}
 				}
-				isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus := p.push(p.taskType, wxPush, appPush, mailPush, u, list)
-				if isSaveSuccess {
-					if u.FirstPushTime == 0 {
-						go mongodb.Update("user", map[string]interface{}{
-							"_id": bson.ObjectIdHex(u.Id),
-						}, map[string]interface{}{
-							"$set": map[string]interface{}{
-								"l_firstpushtime": time.Now().Unix(),
-							},
-						}, false, false)
-					}
-					if isVipTempSave {
-						p.vipTempSave(u.Id, v)
-					}
-				} else {
-					return
-				}
-				//判断是否要删除数据
-				sess := mongodb.GetMgoConn()
-				defer mongodb.DestoryMongoConn(sess)
-				err := sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
-				if err != nil {
-					logger.Error("推送任务", p.taskType, "remove error", err)
-				}
-				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
-					v["failtime"] = time.Now().Unix()
-					if wxStatus == -1 {
-						v["wxfail"] = 1
-					}
-					if appStatus == -1 {
-						v["appfail"] = 1
-					}
-					if mailStatus == -1 {
-						v["mailfail"] = 1
-					}
-					_, err := sess.DB(DbName).C("pushspace_fail").UpsertId(v["_id"], map[string]interface{}{"$set": v})
-					if err != nil {
-						logger.Error("推送任务", p.taskType, "update error", err)
-					}
-				}
+				pushResult := p.selectPush(p.taskType, wxPush, appPush, mailPush, u, list)
+				pusher.AfterPush(pushResult, u, v)
 			}(temp, isTake)
 		}
-		if batch_size < Config.PushBatch {
+		if isBreak {
 			break
 		}
 	}
@@ -191,14 +141,18 @@ func (p *pushJob) Execute(taskType int) {
 	logger.Info("推送任务结束。。。", p.taskType)
 }
 
-func (p *pushJob) push(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (isSaveSuccess, isVipTempSave bool, wxStatus, appStatus, mailStatus int) {
+//满足条件进行推送
+func (p *pushJob) selectPush(taskType int, wxPush, appPush, mailPush int, u *UserInfo, list SortList) (pushResult *putil.PushResult) {
 	if wxPush == 1 || appPush == 1 || mailPush == 1 || u.PchelperPush == 1 {
-		isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus = p.doPush(taskType, true, wxPush, appPush, mailPush, u, &list)
+		pushResult = p.doPush(taskType, true, wxPush, appPush, mailPush, u, &list)
 	}
-	return isSaveSuccess, isVipTempSave, wxStatus, appStatus, mailStatus
+	return
 }
-func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess, isVipTempSave bool, wxStatus, appStatus, mailStatus int) {
+
+//进入具体推送
+func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (pushResult *putil.PushResult) {
 	defer util.Catch()
+	pushResult = &putil.PushResult{}
 	mailContent := ""
 	jpushtitle := ""
 	lastInfoDate := int64(0)
@@ -325,17 +279,17 @@ func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			return
 		}
 		logger.Info("推送任务", taskType, "保存成功", pushDate, k.Id)
-		isSaveSuccess = true
+		pushResult.IsSaveSuccess = true
 	}
 	if isVipUser && (k.RateMode == 3 || k.RateMode == 4) {
 		if now.Day() != 28 && now.Weekday().String() != "Friday" {
-			isVipTempSave = true
+			pushResult.IsVipTempSave = true
 			return
 		} else {
 		}
 	}
 	logger.Info("推送任务", taskType, "开始进行终端推送", k.Id)
-	if isSaveSuccess {
+	if pushResult.IsSaveSuccess {
 		//pc端助手推送
 		if k.S_m_openid != "" {
 			logger.Info("推送任务", taskType, "开始助手推送", k.Id, "s_m_openid", k.S_m_openid)
@@ -419,9 +373,9 @@ func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			//推送微信
 			isPushOk = putil.SendWeixin(k, tip+wxTplTitle+lastTip, wxTitle, pushDate)
 			if isPushOk {
-				wxStatus = 1
+				pushResult.WxStatus = 1
 			} else {
-				wxStatus = -1
+				pushResult.WxStatus = -1
 			}
 		}
 		logger.Info("推送任务", taskType, "微信推送结束", k.ApplyStatus, isPushOk, k.Id)
@@ -451,9 +405,9 @@ func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 			"jgPushId":       k.Jpushid, //极光-推送id
 		})
 		if isPushOk {
-			appStatus = 1
+			pushResult.AppStatus = 1
 		} else {
-			appStatus = -1
+			pushResult.AppStatus = -1
 		}
 		logger.Info("推送任务", taskType, "app推送结束", isPushOk, k.Id)
 	}
@@ -463,9 +417,9 @@ func (p *pushJob) doPush(taskType int, isSave bool, wxPush, appPush, mailPush in
 		html := fmt.Sprintf(Config.Mail_html, strings.Replace(strings.Join(k.Keys, ";"), "+", " ", -1), mailContent)
 		isPushOk := p.sendMail(k.Email, Config.Mail_title, html, nil)
 		if isPushOk {
-			mailStatus = 1
+			pushResult.MailStatus = 1
 		} else {
-			mailStatus = -1
+			pushResult.MailStatus = -1
 		}
 		logger.Info("推送任务", taskType, "邮箱推送结束", isPushOk, k.Id)
 	}
@@ -526,57 +480,3 @@ func (p *pushJob) save(k *UserInfo, matchInfos []*MatchInfo) string {
 	}
 	return fmt.Sprint(unix)
 }
-
-//vip 每周 每月推送 暂时保存
-func (p *pushJob) vipTempSave(userId string, v map[string]interface{}) {
-	newList := putil.ToSortList(v["list"])
-	pLength := len(newList)
-	if pLength == 0 {
-		return
-	}
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	var data map[string]interface{}
-	coll := sess.DB(DbName).C("pushspace_vip")
-	err := coll.Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"size": 1, "list": 1, "date": 1, "userid": 1}).One(&data)
-	if err != nil {
-		logger.Error(userId, "获取用户pushspace_vip数据出错", err)
-		return
-	}
-	nowymd := util.NowFormat(util.Date_yyyyMMdd)
-	if data == nil { //批量新增
-		err = coll.Insert(map[string]interface{}{
-			"userid":     v["userid"],
-			"size":       v["size"],
-			"list":       v["list"],
-			"date":       nowymd,
-			"createtime": time.Now().Unix(),
-		})
-		if err != nil {
-			logger.Error(userId, "保存pushspace_vip出错", err)
-			return
-		}
-	} else { //批量更新
-		upSet := map[string]interface{}{}
-		if nowymd != util.ObjToString(data["date"]) {
-			return
-		}
-		oldList := putil.ToSortList(data["list"])
-		if len(oldList)+pLength > Config.MaxPushSize {
-			newList = append(newList, oldList...)
-			sort.Sort(newList)
-			v["list"] = newList[:Config.MaxPushSize]
-		} else { //追加
-			upSet["$pushAll"] = map[string]interface{}{
-				"list": newList,
-			}
-		}
-		v["size"] = util.IntAll(v["size"]) + pLength
-		upSet["$set"] = v
-		err = coll.UpdateId(data["_id"], upSet)
-		if err != nil {
-			logger.Error(userId, "更新pushspace_vip出错", err)
-			return
-		}
-	}
-}

+ 15 - 113
src/jfw/modules/pushsubscribe/src/push/job/repairjob.go

@@ -1,34 +1,31 @@
 package job
 
 import (
-	"public"
-	. "push/config"
+	. "push/pusher"
 	putil "push/util"
 	"qfw/util"
-	"qfw/util/mongodb"
-	"strings"
 	"sync"
 
 	"github.com/donnie4w/go-logger/logger"
-	"gopkg.in/mgo.v2/bson"
 )
 
 //补推接口
 type repairJob struct {
-	pool   chan bool
-	wait   *sync.WaitGroup
-	lock   *sync.Mutex
-	lastId string
-	users  *[]map[string]interface{}
+	pool chan bool
+	wait *sync.WaitGroup
+	lock *sync.Mutex
 }
 
 func (r *repairJob) Execute(param string) bool {
 	logger.Info("开始补推任务。。。")
-	batch_index := 0
+	var pusher Pusher = &RepairPush{}
+	taskType := 0
+	batchIndex := 0
+	startId := ""
 	for {
-		batch_index++
-		batch_size := r.OncePushBatch(batch_index, param)
-		for _, temp := range *r.users {
+		batchIndex++
+		isBreak, users := pusher.OncePushBatch(taskType, batchIndex, &startId)
+		for _, temp := range *users {
 			r.pool <- true
 			r.wait.Add(1)
 			go func(v map[string]interface{}) {
@@ -36,29 +33,7 @@ func (r *repairJob) Execute(param string) bool {
 					<-r.pool
 					r.wait.Done()
 				}()
-				words, _ := v["words"].([]interface{})
-				u := &public.UserInfo{
-					Id:         util.ObjToString(v["userid"]),
-					Keys:       util.ObjArrToStringArr(words),
-					WxPush:     util.IntAll(v["wxpush"]),
-					AppPush:    util.IntAll(v["apppush"]),
-					MailPush:   util.IntAll(v["mailpush"]),
-					Email:      util.ObjToString(v["email"]),
-					S_m_openid: util.ObjToString(v["s_m_openid"]),
-					A_m_openid: util.ObjToString(v["a_m_openid"]),
-					Phone:      util.ObjToString(v["phone"]),
-					Jpushid:    util.ObjToString(v["jpushid"]),
-					Opushid:    util.ObjToString(v["opushid"]),
-					UserType:   util.IntAll(v["usertype"]),
-					RateMode:   util.IntAllDef(v["ratemode"], 1),
-					//SmartSet:     util.IntAllDef(v["smartset"], 1),
-					//DataExport:   util.IntAll(v["dataexport"]),
-					AppPhoneType: util.ObjToString(v["appphonetype"]),
-					ApplyStatus:  util.IntAll(v["applystatus"]),
-					Subscribe:    util.IntAllDef(v["subscribe"], 1),
-					ModifyDate:   util.ObjToString(v["modifydate"]),
-					MergeOrder:   v["mergeorder"],
-				}
+				u := pusher.GetUserInfo(v)
 				logger.Info("补推任务", "开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "subscribe", u.Subscribe, "applystatus", u.ApplyStatus, "jpushid", u.Jpushid, "opushid", u.Opushid)
 				wxPush, appPush, mailPush := 0, 0, 0
 				wxFail, appFail, mailFail := util.IntAll(v["wxfail"]), util.IntAll(v["appfail"]), util.IntAll(v["mailfail"])
@@ -72,88 +47,15 @@ func (r *repairJob) Execute(param string) bool {
 					mailPush = 1
 				}
 				list := putil.ToSortList(v["list"])
-				_, _, wxStatus, appStatus, mailStatus := Jobs.Push.push(0, wxPush, appPush, mailPush, u, list)
-				sess := mongodb.GetMgoConn()
-				defer mongodb.DestoryMongoConn(sess)
-				if wxStatus == -1 || appStatus == -1 || mailStatus == -1 {
-					set := map[string]interface{}{}
-					inc := map[string]interface{}{}
-					if wxStatus == -1 {
-						inc["wxfail"] = 1
-					} else {
-						set["wxfail"] = 0
-					}
-					if appStatus == -1 {
-						inc["appfail"] = 1
-					} else {
-						set["appfail"] = 0
-					}
-					if mailStatus == -1 {
-						inc["mailfail"] = 1
-					} else {
-						set["mailfail"] = 0
-					}
-					update := map[string]interface{}{
-						"$inc": inc,
-					}
-					if len(set) > 0 {
-						update["$set"] = set
-					}
-					err := sess.DB(DbName).C("pushspace_fail").UpdateId(v["_id"], update)
-					if err != nil {
-						logger.Error("补推任务,update error", err)
-					}
-				} else {
-					err := sess.DB(DbName).C("pushspace_fail").RemoveId(v["_id"])
-					if err != nil {
-						logger.Error("补推任务,update error", err)
-					}
-				}
+				pushResult := Jobs.Push.selectPush(taskType, wxPush, appPush, mailPush, u, list)
+				pusher.AfterPush(pushResult, u, v)
 			}(temp)
 		}
-		if batch_size < Config.PushBatch || param != "all" {
+		if isBreak || param != "all" {
 			break
 		}
 	}
 	r.wait.Wait()
-	r.lastId = ""
-	r.users = nil
 	logger.Info("补推任务结束。。。")
 	return true
 }
-
-func (r *repairJob) OncePushBatch(batch_index int, param string) int {
-	r.users = &[]map[string]interface{}{}
-	i := 0
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	query := map[string]interface{}{}
-	if param == "all" {
-		if r.lastId != "" {
-			query["_id"] = map[string]interface{}{
-				"$gt": bson.ObjectIdHex(r.lastId),
-			}
-		}
-	} else {
-		ids := []bson.ObjectId{}
-		for _, v := range strings.Split(param, ",") {
-			ids = append(ids, bson.ObjectIdHex(v))
-		}
-		query["_id"] = map[string]interface{}{
-			"$in": ids,
-		}
-	}
-	logger.Info("补推任务,开始加载第", batch_index, "批用户", query)
-	it := sess.DB(DbName).C("pushspace_fail").Find(query).Sort("_id").Iter()
-	for temp := make(map[string]interface{}); it.Next(&temp); {
-		i++
-		r.lastId = util.BsonIdToSId(temp["_id"])
-		*r.users = append(*r.users, temp)
-		temp = make(map[string]interface{})
-		if param == "all" && i == Config.PushBatch {
-			break
-		}
-	}
-	logger.Info("补推任务,第", batch_index, "批用户加载结束", r.lastId)
-	return i
-}

+ 0 - 67
src/jfw/modules/pushsubscribe/src/push/job/specialpush.go

@@ -1,67 +0,0 @@
-package job
-
-import (
-	. "push/config"
-	"qfw/util"
-	"qfw/util/mongodb"
-
-	"github.com/donnie4w/go-logger/logger"
-	"gopkg.in/mgo.v2/bson"
-)
-
-//vip用户每月或者没周推送
-type SpecialPush struct{}
-
-func (s *SpecialPush) OncePushBatch(taskType, batchIndex int, startId *string) (int, *[]map[string]interface{}) {
-	users := &[]map[string]interface{}{}
-	i := 0
-	lastId := ""
-	var query map[string]interface{}
-	//根据任务类型,查找ratemode
-	if taskType == 1 {
-		query = map[string]interface{}{
-			"ratemode": map[string]interface{}{
-				"$in": []int{1, 3, 4},
-			},
-		}
-	} else if taskType == 2 {
-		query = map[string]interface{}{
-			"ratemode": 2,
-		}
-	} else {
-		logger.Error("taskType error", taskType)
-		return i, users
-	}
-	sess := mongodb.GetMgoConn()
-	defer mongodb.DestoryMongoConn(sess)
-	if len(Config.TestIds) > 0 {
-		query["userid"] = map[string]interface{}{
-			"$in": Config.TestIds,
-		}
-	}
-	if *startId != "" {
-		query["_id"] = map[string]interface{}{
-			"$gt": bson.ObjectIdHex(*startId),
-		}
-	}
-	logger.Info("推送任务", taskType, "开始加载第", batchIndex, "批用户", query)
-	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
-	for temp := make(map[string]interface{}); it.Next(&temp); {
-		i++
-		lastId = util.BsonIdToSId(temp["_id"])
-		*users = append(*users, temp)
-		temp = make(map[string]interface{})
-		if i == Config.PushBatch {
-			break
-		}
-	}
-	logger.Info("推送任务", taskType, "第", batchIndex, "批用户加载结束", lastId)
-	return i, users
-}
-
-func (s *SpecialPush) GetUser() {
-
-}
-func (s *SpecialPush) AfterPush() {
-
-}

+ 14 - 48
src/jfw/modules/pushsubscribe/src/push/job/timetask.go

@@ -9,14 +9,12 @@ import (
 )
 
 type timeTask struct {
-	Refresh   *RefreshTimeTask   //每天7点刷新用户信息
 	Move      *MoveTimeTask      //迁移数据
 	OncePush  *OncePushTimeTask  //九点推送
 	OtherPush *OtherPushTimeTask //一天三次
 }
 
 var Task = &timeTask{
-	Refresh:   &RefreshTimeTask{},   //每天7点刷新用户信息
 	Move:      &MoveTimeTask{},      //迁移数据
 	OncePush:  &OncePushTimeTask{},  //九点推送
 	OtherPush: &OtherPushTimeTask{}, //一天三次
@@ -39,16 +37,14 @@ func (o *OtherPushTimeTask) Execute() {
 		}
 		sub := newDate.Sub(now)
 		log.Println("start", otherpushtime, "OtherPushTimeTask after", sub)
-		time.AfterFunc(sub, func() {
-			go Jobs.Push.Execute(1)
-			ticker := time.NewTicker(time.Hour * 24)
-			for {
-				select {
-				case <-ticker.C:
-					go Jobs.Push.Execute(1)
-				}
+		timer := time.NewTimer(sub)
+		for {
+			select {
+			case <-timer.C:
+				go Jobs.Push.Execute(1)
+				timer.Reset(time.Hour * 24)
 			}
-		})
+		}
 	}
 }
 
@@ -65,46 +61,16 @@ func (o *OncePushTimeTask) Execute() {
 		}
 		sub := newDate.Sub(now)
 		log.Println("start", Config.OncePushTime, "OncePushTimeTask after", sub)
-		time.AfterFunc(sub, func() {
-			go Jobs.Push.Execute(2)
-			ticker := time.NewTicker(time.Hour * 24)
-			for {
-				select {
-				case <-ticker.C:
-					go Jobs.Push.Execute(2)
-				}
+		timer := time.NewTimer(sub)
+		for {
+			select {
+			case <-timer.C:
+				go Jobs.Push.Execute(2)
+				timer.Reset(time.Hour * 24)
 			}
-		})
-	} else {
-		log.Fatalln("OncePushTimeTask", Config.OtherPushTimes)
-	}
-}
-
-type RefreshTimeTask struct {
-}
-
-func (r *RefreshTimeTask) Execute() {
-	h_m := strings.Split(Config.RefreshTime, ":")
-	if len(h_m) == 2 {
-		now := time.Now()
-		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
-		if newDate.Before(now) {
-			newDate = newDate.AddDate(0, 0, 1)
 		}
-		sub := newDate.Sub(now)
-		log.Println("start", Config.OncePushTime, "RefreshTimeTask after", sub)
-		time.AfterFunc(sub, func() {
-			go Jobs.Refresh.Execute()
-			ticker := time.NewTicker(time.Hour * 24)
-			for {
-				select {
-				case <-ticker.C:
-					go Jobs.Refresh.Execute()
-				}
-			}
-		})
 	} else {
-		log.Fatalln("RefreshTimeTask", Config.RefreshTime)
+		log.Fatalln("OncePushTimeTask", Config.OtherPushTimes)
 	}
 }
 

+ 0 - 1
src/jfw/modules/pushsubscribe/src/push/main.go

@@ -45,7 +45,6 @@ func main() {
 		job.Jobs.Push.Execute(*taskType)
 	} else {
 		go job.Task.Move.Execute()
-		go job.Task.Refresh.Execute()
 		go job.Task.OtherPush.Execute()
 		go job.Task.OncePush.Execute()
 		if *move == 1 {

+ 150 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/normalpush.go

@@ -0,0 +1,150 @@
+package pusher
+
+import (
+	. "public"
+	. "push/config"
+	putil "push/util"
+	"qfw/util"
+	"qfw/util/mongodb"
+	"sort"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//正常推送,一天推送三次或者一天一次
+type NormalPush struct{}
+
+//获取需要推送的用户
+func (n *NormalPush) OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{}) {
+	var query map[string]interface{}
+	//根据任务类型,查找ratemode
+	if taskType == 1 {
+		query = map[string]interface{}{
+			"ratemode": map[string]interface{}{
+				"$in": []int{1, 3, 4},
+			},
+		}
+	} else if taskType == 2 {
+		query = map[string]interface{}{
+			"ratemode": 2,
+		}
+	} else {
+		logger.Error("taskType error", taskType)
+		return true, nil
+	}
+	if len(Config.TestIds) > 0 {
+		query["userid"] = map[string]interface{}{
+			"$in": Config.TestIds,
+		}
+	}
+	if *startId != "" {
+		query["_id"] = map[string]interface{}{
+			"$gt": bson.ObjectIdHex(*startId),
+		}
+	}
+	return putil.GetPushDatas(taskType, batchIndex, "pushspace", startId, query)
+}
+
+//获取用户缓存信息
+func (n *NormalPush) GetUserInfo(user map[string]interface{}) *UserInfo {
+	return putil.NewUserInfoByPushSpaceColl(user)
+}
+
+//推送以后处理
+func (n *NormalPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
+	if pushResult.IsSaveSuccess {
+		if u.FirstPushTime == 0 {
+			go mongodb.Update("user", map[string]interface{}{
+				"_id": bson.ObjectIdHex(u.Id),
+			}, map[string]interface{}{
+				"$set": map[string]interface{}{
+					"l_firstpushtime": time.Now().Unix(),
+				},
+			}, false, false)
+		}
+		if pushResult.IsVipTempSave {
+			n.vipTempSave(u.Id, user)
+		}
+	} else {
+		return
+	}
+	//判断是否要删除数据
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	err := sess.DB(putil.DbName).C("pushspace").RemoveId(user["_id"])
+	if err != nil {
+		logger.Error("推送任务", u.Id, "remove error", err)
+	}
+	if pushResult.WxStatus == -1 || pushResult.AppStatus == -1 || pushResult.MailStatus == -1 {
+		user["failtime"] = time.Now().Unix()
+		if pushResult.WxStatus == -1 {
+			user["wxfail"] = 1
+		}
+		if pushResult.AppStatus == -1 {
+			user["appfail"] = 1
+		}
+		if pushResult.MailStatus == -1 {
+			user["mailfail"] = 1
+		}
+		_, err := sess.DB(putil.DbName).C("pushspace_fail").UpsertId(user["_id"], map[string]interface{}{"$set": user})
+		if err != nil {
+			logger.Error("推送任务", u.Id, "update error", err)
+		}
+	}
+}
+
+//vip 每周 每月推送 暂时保存
+func (n *NormalPush) vipTempSave(userId string, v map[string]interface{}) {
+	newList := putil.ToSortList(v["list"])
+	pLength := len(newList)
+	if pLength == 0 {
+		return
+	}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	var data map[string]interface{}
+	coll := sess.DB(putil.DbName).C("pushspace_vip")
+	err := coll.Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"size": 1, "list": 1, "date": 1, "userid": 1}).One(&data)
+	if err != nil {
+		logger.Error(userId, "获取用户pushspace_vip数据出错", err)
+		return
+	}
+	nowymd := util.NowFormat(util.Date_yyyyMMdd)
+	if data == nil { //批量新增
+		err = coll.Insert(map[string]interface{}{
+			"userid":     v["userid"],
+			"size":       v["size"],
+			"list":       v["list"],
+			"date":       nowymd,
+			"createtime": time.Now().Unix(),
+		})
+		if err != nil {
+			logger.Error(userId, "保存pushspace_vip出错", err)
+			return
+		}
+	} else { //批量更新
+		upSet := map[string]interface{}{}
+		if nowymd != util.ObjToString(data["date"]) {
+			return
+		}
+		oldList := putil.ToSortList(data["list"])
+		if len(oldList)+pLength > Config.MaxPushSize {
+			newList = append(newList, oldList...)
+			sort.Sort(newList)
+			v["list"] = newList[:Config.MaxPushSize]
+		} else { //追加
+			upSet["$pushAll"] = map[string]interface{}{
+				"list": newList,
+			}
+		}
+		v["size"] = util.IntAll(v["size"]) + pLength
+		upSet["$set"] = v
+		err = coll.UpdateId(data["_id"], upSet)
+		if err != nil {
+			logger.Error(userId, "更新pushspace_vip出错", err)
+			return
+		}
+	}
+}

+ 12 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/pusher.go

@@ -0,0 +1,12 @@
+package pusher
+
+import (
+	. "public"
+	putil "push/util"
+)
+
+type Pusher interface {
+	OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{})
+	GetUserInfo(user map[string]interface{}) *UserInfo
+	AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{})
+}

+ 82 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/repairpush.go

@@ -0,0 +1,82 @@
+package pusher
+
+import (
+	. "public"
+	putil "push/util"
+	"qfw/util/mongodb"
+	"strings"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//推送失败,补推
+type RepairPush struct{}
+
+//获取需要推送的用户
+func (r *RepairPush) OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{}) {
+	param := args[0].(string)
+	query := map[string]interface{}{}
+	if param == "all" {
+		if *startId != "" {
+			query["_id"] = map[string]interface{}{
+				"$gt": bson.ObjectIdHex(*startId),
+			}
+		}
+	} else {
+		ids := []bson.ObjectId{}
+		for _, v := range strings.Split(param, ",") {
+			ids = append(ids, bson.ObjectIdHex(v))
+		}
+		query["_id"] = map[string]interface{}{
+			"$in": ids,
+		}
+	}
+	batchIndex = -1
+	return putil.GetPushDatas(taskType, batchIndex, "pushspace_fail", startId, query)
+}
+
+//获取用户缓存信息
+func (r *RepairPush) GetUserInfo(user map[string]interface{}) *UserInfo {
+	return putil.NewUserInfoByPushSpaceColl(user)
+}
+
+//推送以后处理
+func (r *RepairPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	if pushResult.WxStatus == -1 || pushResult.AppStatus == -1 || pushResult.MailStatus == -1 {
+		set := map[string]interface{}{}
+		inc := map[string]interface{}{}
+		if pushResult.WxStatus == -1 {
+			inc["wxfail"] = 1
+		} else {
+			set["wxfail"] = 0
+		}
+		if pushResult.AppStatus == -1 {
+			inc["appfail"] = 1
+		} else {
+			set["appfail"] = 0
+		}
+		if pushResult.MailStatus == -1 {
+			inc["mailfail"] = 1
+		} else {
+			set["mailfail"] = 0
+		}
+		update := map[string]interface{}{
+			"$inc": inc,
+		}
+		if len(set) > 0 {
+			update["$set"] = set
+		}
+		err := sess.DB("qfw").C("pushspace_fail").UpdateId(user["_id"], update)
+		if err != nil {
+			logger.Error("补推任务,update error", err)
+		}
+	} else {
+		err := sess.DB(putil.DbName).C("pushspace_fail").RemoveId(user["_id"])
+		if err != nil {
+			logger.Error("补推任务,update error", err)
+		}
+	}
+}

+ 47 - 0
src/jfw/modules/pushsubscribe/src/push/pusher/specialpush.go

@@ -0,0 +1,47 @@
+package pusher
+
+import (
+	. "public"
+	. "push/config"
+	putil "push/util"
+	"qfw/util/mongodb"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//vip用户每月或者没周推送
+type SpecialPush struct{}
+
+//获取需要推送的用户
+func (s *SpecialPush) OncePushBatch(taskType, batchIndex int, startId *string, args ...interface{}) (bool, *[]map[string]interface{}) {
+	var query map[string]interface{}
+	if len(Config.TestIds) > 0 {
+		query["userid"] = map[string]interface{}{
+			"$in": Config.TestIds,
+		}
+	}
+	if *startId != "" {
+		query["_id"] = map[string]interface{}{
+			"$gt": bson.ObjectIdHex(*startId),
+		}
+	}
+	return putil.GetPushDatas(taskType, batchIndex, "pushspace_vip", startId, query)
+}
+
+//获取用户最新信息
+func (s *SpecialPush) GetUserInfo(user map[string]interface{}) *UserInfo {
+	userId, _ := user["userid"].(string)
+	u := mongodb.FindById("user", userId, UserCollFields)
+	if u == nil || len(*u) == 0 {
+		logger.Error("user表中没有找到该用户信息", userId)
+		return nil
+	}
+	userInfo, _ := NewUserInfoByUserColl(*u)
+	return userInfo
+}
+
+//推送以后处理
+func (s *SpecialPush) AfterPush(pushResult *putil.PushResult, u *UserInfo, user map[string]interface{}) {
+
+}

+ 5 - 0
src/jfw/modules/pushsubscribe/src/push/util/db.go

@@ -6,6 +6,11 @@ import (
 	"qfw/util/mysql"
 )
 
+const (
+	BulkSize = 200
+	DbName   = "qfw"
+)
+
 var Mysql *mysql.Mysql
 
 func init() {

+ 10 - 0
src/jfw/modules/pushsubscribe/src/push/util/entity.go

@@ -0,0 +1,10 @@
+package util
+
+//推送返回结果
+type PushResult struct {
+	IsSaveSuccess bool
+	IsVipTempSave bool
+	WxStatus      int
+	AppStatus     int
+	MailStatus    int
+}

+ 2 - 3
src/jfw/modules/pushsubscribe/src/push/util/rpccall.go

@@ -6,7 +6,6 @@ import (
 	. "public"
 	. "push/config"
 	"qfw/util"
-	"qfw/util/jy"
 	"qfw/util/mongodb"
 	qrpc "qfw/util/rpc"
 	"strconv"
@@ -43,7 +42,7 @@ func SendWeixin(k *UserInfo, remark, title, pushDate string) bool {
 		DetailColor: Config.WxDetailColor,
 		Url:         Config.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush") + "__" + pushDate,
 	}
-	ok, res := jy.WxPush(Config.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
+	ok, res := qrpc.WxPush(Config.WeixinRpcServer, "WeiXinRpc.SubscribePush", p)
 	if !ok && (strings.Contains(res, "[46004]") || strings.Contains(res, "[65302]") || strings.Contains(res, "[43004]") || strings.Contains(res, "[40003]")) {
 		mongodb.Update("user", map[string]interface{}{"_id": bson.ObjectIdHex(k.Id)}, map[string]interface{}{
 			"$set": map[string]interface{}{
@@ -61,7 +60,7 @@ func SendApp(m map[string]interface{}) bool {
 	if Config.AppSleep > 0 {
 		time.Sleep(time.Duration(Config.AppSleep) * time.Millisecond)
 	}
-	return jy.AppPush(Config.AppPushServiceRpc, m)
+	return qrpc.AppPush(Config.AppPushServiceRpc, m)
 }
 
 //

+ 53 - 0
src/jfw/modules/pushsubscribe/src/push/util/util.go

@@ -3,9 +3,13 @@ package util
 import (
 	"encoding/json"
 	. "public"
+	. "push/config"
+	"qfw/util"
+	"qfw/util/mongodb"
 	"sort"
 	"time"
 
+	"github.com/donnie4w/go-logger/logger"
 	"gopkg.in/mgo.v2/bson"
 )
 
@@ -50,3 +54,52 @@ func LimitMaxOneMinutePush(pushPoll *chan bool, maxOneMinute int) {
 		}
 	}()
 }
+
+//根据pushspace表中的数据,生成UserInfo
+func NewUserInfoByPushSpaceColl(user map[string]interface{}) *UserInfo {
+	words, _ := user["words"].([]interface{})
+	return &UserInfo{
+		Id:            util.ObjToString(user["userid"]),
+		Keys:          util.ObjArrToStringArr(words),
+		WxPush:        util.IntAll(user["wxpush"]),
+		AppPush:       util.IntAll(user["apppush"]),
+		MailPush:      util.IntAll(user["mailpush"]),
+		PchelperPush:  util.IntAll(user["pchelperpush"]),
+		Email:         util.ObjToString(user["email"]),
+		S_m_openid:    util.ObjToString(user["s_m_openid"]),
+		A_m_openid:    util.ObjToString(user["a_m_openid"]),
+		Phone:         util.ObjToString(user["phone"]),
+		Jpushid:       util.ObjToString(user["jpushid"]),
+		Opushid:       util.ObjToString(user["opushid"]),
+		UserType:      util.IntAll(user["usertype"]),
+		RateMode:      util.IntAllDef(user["ratemode"], 1),
+		AppPhoneType:  util.ObjToString(user["appphonetype"]),
+		ApplyStatus:   util.IntAll(user["applystatus"]),
+		Subscribe:     util.IntAllDef(user["subscribe"], 1),
+		ModifyDate:    util.ObjToString(user["modifydate"]),
+		MergeOrder:    user["mergeorder"],
+		FirstPushTime: util.Int64All(user["firstpushtime"]),
+		VipStatus:     util.IntAll(user["vipstatus"]),
+	}
+}
+
+//获取一批次推送的数据
+func GetPushDatas(taskType, batchIndex int, collection string, startId *string, query map[string]interface{}) (bool, *[]map[string]interface{}) {
+	logger.Info(taskType, "开始加载第", batchIndex, "批用户", query)
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	users := []map[string]interface{}{}
+	i := 0
+	it := sess.DB(DbName).C(collection).Find(query).Sort("_id").Iter()
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		i++
+		*startId = util.BsonIdToSId(temp["_id"])
+		users = append(users, temp)
+		temp = make(map[string]interface{})
+		if batchIndex > 0 && i == Config.PushBatch {
+			break
+		}
+	}
+	logger.Info(taskType, "第", batchIndex, "批用户加载结束", *startId)
+	return i < Config.PushBatch, &users
+}

+ 10 - 2
src/jfw/modules/subscribepay/src/entity/dataexport.go

@@ -324,8 +324,16 @@ func SendMailToBJFinance(order *map[string]interface{}, pay_time, transaction_id
 				}
 			}
 		}
-		mail_title = "电子发票申请,剑鱼标讯历史数据订单【" + order_code + "】,请查收"
-		mailcontent = fmt.Sprintf(ExConf.Mail_invoice_finance_content, bill_title, company_flag, bill_company, taxnum_flag, bill_taxnum, order_code, create_time, pay_time, product_type, isShowTransaction, pay_way, transaction_id, offlineImgSrc, data_spec, data_count, order_money, user_mail, user_phone)
+		//历史数据导出
+		if product_type == "历史数据导出" {
+			mail_title = "电子发票申请,剑鱼标讯历史数据订单【" + order_code + "】,请查收"
+			mailcontent = fmt.Sprintf(ExConf.Mail_invoice_finance_content, bill_title, company_flag, bill_company, taxnum_flag, bill_taxnum, order_code, create_time, pay_time, product_type, isShowTransaction, pay_way, transaction_id, offlineImgSrc, data_spec, data_count, order_money, user_mail, user_phone)
+		} else if product_type == "VIP订阅导出" {
+			//vip
+			product_type = "VIP订阅"
+			mail_title = "电子发票申请,剑鱼标讯VIP订单【" + order_code + "】,请查收"
+			mailcontent = fmt.Sprintf("", bill_title, company_flag, bill_company, taxnum_flag, bill_taxnum, order_code, create_time, pay_time, product_type, isShowTransaction, pay_way, transaction_id, offlineImgSrc, data_spec, data_count, order_money, user_mail, user_phone)
+		}
 	}
 	//发送邮件
 	for _, finance_mail := range ExConf.Finance_emails {

+ 37 - 9
src/jfw/modules/subscribepay/src/entity/subscribeVip.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"log"
+	"pay"
 	"time"
 	"util"
 
@@ -387,18 +388,45 @@ func getSetMealPrice(c, i, u int) int {
 	}
 }
 
-//支付成功后,将该订单以外的所有订单状态改为已取消状态 已取消:-2
+//支付成功后,将该订单以外的所有订单状态改为已取消状态 已取消:-2  先关闭订单再改状态
 func PayCancel(userId string) bool {
-	bl := util.Mysql.Update("dataexport_order", bson.M{
+	queryMap := map[string]interface{}{
 		"user_id":      userId,
-		"product_type": "VIP订阅",
 		"order_status": 0,
-	}, bson.M{
-		"order_status": -2,
-	})
-	//逐个取消订单
-	if !bl {
+		"product_type": "VIP订阅",
+	}
+	i, order_len := 0, 0
+	order := util.Mysql.Find("dataexport_order", queryMap, "order_code,pay_way,out_trade_no,prepay_time,order_status", "create_time desc", -1, 0)
+	if order != nil {
+		order_len = len(*order)
+		//遍历未支付订单
+		for _, v := range *order {
+			flag := false
+			if qutil.Int64All(v["order_status"]) == 0 {
+				//关闭未支付订单
+				flag = pay.CloseDataExportOrder(qutil.ObjToString(v["pay_way"]), qutil.ObjToString(v["out_trade_no"]), qutil.ObjToString(v["prepay_time"]))
+
+			} else {
+				flag = true
+			}
+			if flag {
+				//更改未支付订单状态 -2  逐个取消订单
+				bl := util.Mysql.Update("dataexport_order", bson.M{
+					"user_id":      userId,
+					"product_type": "VIP订阅",
+					"order_status": 0,
+					"order_code":   v["order_code"],
+				}, bson.M{
+					"order_status": -2,
+				})
+				if bl {
+					i++
+				}
+			}
+		}
+	}
+	if i != order_len {
 		log.Printf("%s取消其他订单状态失败\n", userId)
 	}
-	return bl
+	return i == order_len //更改成功次数是否等于需要更改次数
 }

+ 13 - 8
src/jfw/modules/subscribepay/src/main_test.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"entity"
 	"fmt"
 	"log"
 	"testing"
@@ -15,16 +16,20 @@ func Test_asd(t *testing.T) {
 	fmt.Println(cc)
 }
 func Test_update(t *testing.T) {
-	if !util.MQFW.UpdateById("user", "5db11594f4e498161c85fcf2", bson.M{"$set": bson.M{"o_vipjy.i_trial": 1}}) {
+	if !util.MQFW.UpdateById("user", "5cda80dd2757344674c10c71", bson.M{"$set": bson.M{"o_vipjy.i_trial": 1}}) {
 		log.Println("更新使用状态出错")
 	}
 }
-
-func Test_CanTrial(t *testing.T) {
-	f := util.CanTrial("5db11594f4e498161c85fcf2")
-	fmt.Println(f)
+func Test_cancelOrder(t *testing.T) {
+	userId := "5d9fe93a27573439d033b294"
+	entity.PayCancel(userId)
 }
 
-func Test_SetTrialed(t *testing.T) {
-	util.SetTrialed("5db11594f4e498161c85fcf2")
-}
+//func Test_CanTrial(t *testing.T) {
+//	f := util.CanTrial("5d9fe93a27573439d033b294")
+//	fmt.Println(f)
+//}
+
+//func Test_SetTrialed(t *testing.T) {
+//	util.SetTrialed("5d9fe93a27573439d033b294")
+//}

+ 5 - 4
src/jfw/modules/subscribepay/src/service/commonAction.go

@@ -3,11 +3,12 @@ package service
 import (
 	"config"
 	"entity"
-	"github.com/go-xweb/xweb"
 	"pay"
 	qutil "qfw/util"
 	"time"
 	"util"
+
+	"github.com/go-xweb/xweb"
 )
 
 //付费公用方法
@@ -15,9 +16,9 @@ type CommonAction struct {
 	*xweb.Action
 	isPaySuccess xweb.Mapper `xweb:"/isPaySuccess"`          //数据导出是否支付成功
 	paySuccess   xweb.Mapper `xweb:"/dataReport/paySuccess"` //数据报告支付完成
-	deleteOrder xweb.Mapper `xweb:"/deleteOrder"` //删除订单
+	deleteOrder  xweb.Mapper `xweb:"/deleteOrder"`           //删除订单
 
-	applyInvoice xweb.Mapper `xweb:"/applyInvoice"`          //申请发票
+	applyInvoice xweb.Mapper `xweb:"/applyInvoice"` //申请发票
 }
 
 //----------------------------申请发票------------------------------------
@@ -44,7 +45,7 @@ func (d *CommonAction) ApplyInvoice() error {
 			"applyBill_taxnum":  applyBill_taxnum,
 			"applyBill_status":  applyBill_status,
 			"applyBill_type":    1,
-		})                                                   //修改操作
+		}) //修改操作
 	}
 	//判断条件
 	if updateBl {

+ 20 - 7
src/jfw/modules/subscribepay/src/service/orderListDetails.go

@@ -6,6 +6,7 @@ import (
 	"errors"
 	"fmt"
 	"log"
+	"pay"
 	qutil "qfw/util"
 	"regexp"
 	"strconv"
@@ -203,13 +204,26 @@ func (o *OrderListDetails) DeleteOrder() error {
 			o.SetRes(res, queryM)
 		}
 		var boo = false
-		if cancel == "cancel" {
-			//取消订单
-			boo = util.Mysql.Update(tableName_order, queryMap, map[string]interface{}{"order_status": -2})
-		} else {
-			//删除订单
-			boo = util.Mysql.Update(tableName_order, queryMap, map[string]interface{}{"order_status": -1})
+		var flag = false
+		//删除 取消前 先关闭订单
+		order := util.Mysql.FindOne(tableName_order, queryMap, "pay_way,out_trade_no,prepay_time,order_status", "")
+		if order != nil {
+			if qutil.IntAll((*order)["order_status"]) == 0 { //未支付状态下 删除订单需要先关闭订单
+				flag = pay.CloseDataExportOrder(qutil.ObjToString((*order)["pay_way"]), qutil.ObjToString((*order)["out_trade_no"]), qutil.ObjToString((*order)["prepay_time"]))
+			} else {
+				flag = true
+			}
 		}
+		if flag {
+			if cancel == "cancel" {
+				//取消订单
+				boo = util.Mysql.Update(tableName_order, queryMap, map[string]interface{}{"order_status": -2})
+			} else {
+				//删除订单
+				boo = util.Mysql.Update(tableName_order, queryMap, map[string]interface{}{"order_status": -1})
+			}
+		}
+
 		o.ServeJson(map[string]interface{}{
 			"success":     boo,
 			"res":         res,
@@ -222,7 +236,6 @@ func (o *OrderListDetails) DeleteOrder() error {
 
 //设置邮箱-发送验证码
 func (o *OrderListDetails) SetEmail() {
-	log.Println("--")
 	email := o.GetString("email")
 	userId := qutil.ObjToString(o.GetSession("userId"))
 	if userId != "" {

+ 4 - 2
src/jfw/modules/subscribepay/src/timetask/timetask.go

@@ -37,7 +37,7 @@ func syncVipUpgrade() {
 			"i_isvalid": map[string]interface{}{
 				"$ne": 1,
 			},
-		}).Select(map[string]interface{}{"s_userid": 1, "o_area": 1, "a_buyerclass": 1}).Iter()
+		}).Select(map[string]interface{}{"s_userid": 1, "o_area": 1, "a_buyerclass": 1, "o_buyset": 1}).Iter()
 		for m := make(map[string]interface{}); it.Next(&m); {
 			_id := qutil.BsonIdToSId(m["_id"])
 			s_userid := qutil.ObjToString(m["s_userid"])
@@ -46,11 +46,13 @@ func syncVipUpgrade() {
 				continue
 			}
 			o_area, _ := m["o_area"].(map[string]interface{})
+			o_buyset, _ := m["o_buyset"].(map[string]interface{})
 			a_buyerclass, _ := m["a_buyerclass"].([]string)
 			if util.MQFW.UpdateById("user", s_userid, map[string]interface{}{
 				"$set": map[string]interface{}{
 					"o_vipjy.o_area":       o_area,
 					"o_vipjy.a_buyerclass": a_buyerclass,
+					"o_vipjy.o_buyset":     o_buyset,
 				},
 			}) {
 				util.MQFW.UpdateById("vip_upgrade", _id, map[string]interface{}{
@@ -114,7 +116,7 @@ func checkIsExpire() {
 
 //即将到期或者已到期发推送消息
 func expireRemind() {
-	crontab(true, TimetaskConfig.ExpireRemind, func() {
+	crontab(false, TimetaskConfig.ExpireRemind, func() {
 		log.Println("定时任务,开始推送消息")
 		sess := util.MQFW.GetMgoConn()
 		defer util.MQFW.DestoryMongoConn(sess)

+ 0 - 4
src/web/templates/pc/myOrder.html

@@ -316,10 +316,6 @@
 							$(".backTop").click();
 							for (var i = 0; i < data.list.length; i++) {
 								var obj = data.list[i];
-								//vip订单不显示 2.8.5
-								if (obj.data_count==null){
-									continue;
-								}
 								var id = obj.id;
 								var orderCode = obj.order_code;
 								var publishTime = obj.filter_publishtime;

+ 2 - 5
src/web/templates/weixin/vipsubscribe/renew_pay.html

@@ -621,12 +621,9 @@
 	    $(".save_renew").on("click", function(){
 	    	console.log(nowUpgrade)
 	    	let times = $('.choose_item.select_cycle .info').attr('placeholder');
-	        let ok = times.match("月")
-	        if (ok !== null){
+	        if (times.match("月") !== null){
 	        	times = Number(times.replace("个月",""));
-	        }
-	        let oks = times.match("年")
-	        if (oks !== null){
+	        }else if (times.match("年") !== null){
 	        	times = 12 * Number(times.replace("年",""));
 	        }
 	    	// 当续费时间 + 当前已经买的时间超过36个月,给出提醒