wcj 6 年之前
父节点
当前提交
c34904e12e
共有 24 个文件被更改,包括 2650 次插入0 次删除
  1. 13 0
      src/jfw/modules/pushsubscribe/src/match/config.json
  2. 34 0
      src/jfw/modules/pushsubscribe/src/match/config/config.go
  3. 142 0
      src/jfw/modules/pushsubscribe/src/match/dfa/interestanalysis.go
  4. 45 0
      src/jfw/modules/pushsubscribe/src/match/dfa/interestanalysis_test.go
  5. 40 0
      src/jfw/modules/pushsubscribe/src/match/job/job.go
  6. 603 0
      src/jfw/modules/pushsubscribe/src/match/job/matchjob.go
  7. 27 0
      src/jfw/modules/pushsubscribe/src/match/job/timetask.go
  8. 30 0
      src/jfw/modules/pushsubscribe/src/match/main.go
  9. 二进制
      src/jfw/modules/pushsubscribe/src/match/src
  10. 1 0
      src/jfw/modules/pushsubscribe/src/match/task.json
  11. 102 0
      src/jfw/modules/pushsubscribe/src/match/util/util.go
  12. 65 0
      src/jfw/modules/pushsubscribe/src/public/entity.go
  13. 60 0
      src/jfw/modules/pushsubscribe/src/push/config.json
  14. 95 0
      src/jfw/modules/pushsubscribe/src/push/config/config.go
  15. 24 0
      src/jfw/modules/pushsubscribe/src/push/job/job.go
  16. 804 0
      src/jfw/modules/pushsubscribe/src/push/job/pushjob.go
  17. 139 0
      src/jfw/modules/pushsubscribe/src/push/job/timetask.go
  18. 56 0
      src/jfw/modules/pushsubscribe/src/push/main.go
  19. 二进制
      src/jfw/modules/pushsubscribe/src/push/src
  20. 93 0
      src/jfw/modules/pushsubscribe/src/push/util/excel.go
  21. 116 0
      src/jfw/modules/pushsubscribe/src/push/util/rpccall.go
  22. 160 0
      src/jfw/modules/pushsubscribe/src/push/util/util.go
  23. 1 0
      src/jfw/modules/pushsubscribe/src/push/xlsx/readme.txt
  24. 二进制
      src/jfw/modules/pushsubscribe/src/push/xlsx/temp.xlsx

+ 13 - 0
src/jfw/modules/pushsubscribe/src/match/config.json

@@ -0,0 +1,13 @@
+{
+	"elasticPoolSize": 10,
+	"elasticSearch": "http://192.168.3.18:9800",
+	"maxPushSize": 50,
+	"maxSearch": 5000,
+	"mgoAddr": "192.168.3.18:27080",
+	"mgoSize": 10,
+	"testids": ["5c8f4f4325ef8723d0bc1082"],
+	"filterWords":["项目","中标","公告"],
+	"matchPoolSize": 60,
+	"matchDuration": 1, 
+	"userBatch":2
+}

+ 34 - 0
src/jfw/modules/pushsubscribe/src/match/config/config.go

@@ -0,0 +1,34 @@
+package config
+
+import (
+	"qfw/util"
+)
+
+type sysConfig struct {
+	ElasticPoolSize int      `json:"elasticPoolSize"`
+	ElasticSearch   string   `json:"elasticSearch"`
+	MaxPushSize     int      `json:"maxPushSize"`
+	MaxSearch       int      `json:"maxSearch"`
+	MgoAddr         string   `json:"mgoAddr"`
+	MgoSize         int      `json:"mgoSize"`
+	TestIds         []string `json:"testIds"`
+	FilterWords     []string `json:"filterWords"`
+	MatchPoolSize   int      `json:"matchPoolSize"`
+	MatchDuration   int64    `json:"matchDuration"`
+	UserBatch       int      `json:"userBatch"`
+}
+
+type taskConfig struct {
+	StartTime string `json:"startTime"`
+	LastId    string `json:"lastId"`
+}
+
+var (
+	SysConfig  *sysConfig
+	TaskConfig *taskConfig
+)
+
+func init() {
+	util.ReadConfig("./config.json", &SysConfig)
+	util.ReadConfig("./task.json", &TaskConfig)
+}

+ 142 - 0
src/jfw/modules/pushsubscribe/src/match/dfa/interestanalysis.go

@@ -0,0 +1,142 @@
+/**
+ *兴趣分析
+ *
+ */
+package dfa
+
+import (
+	"log"
+	"strings"
+)
+
+//DFA实现
+type DFA struct {
+	link        map[string]interface{} //存放or
+	linkAnd     map[string]int         //存放and
+	linkAndWord map[string]interface{} //存放and中的拆分词
+
+}
+
+//添加词组,用于初始化,该方法是可以调用多次的
+func (d *DFA) AddWord(words ...string) {
+	if d.link == nil {
+		d.link = make(map[string]interface{})
+		d.linkAnd = make(map[string]int)
+		d.linkAndWord = make(map[string]interface{})
+	}
+	var nowMap *map[string]interface{}
+	for _, key := range words {
+		keys := strings.Split(key, "+")
+		lenkeys := len(keys)
+		if lenkeys > 1 {
+			d.linkAnd[key] = lenkeys
+			for k := 0; k < lenkeys; k++ {
+				minKey := keys[k]
+				nowMap = &d.linkAndWord
+				for i := 0; i < len(minKey); i++ {
+					kc := minKey[i : i+1]
+					if v, ok := (*nowMap)[kc]; ok {
+						nowMap, _ = v.(*map[string]interface{})
+					} else {
+						newMap := map[string]interface{}{}
+						newMap["YN"] = "N"
+						(*nowMap)[kc] = &newMap
+						nowMap = &newMap
+					}
+					if i == len(minKey)-1 {
+						(*nowMap)["YN"] = "Y"
+						if (*nowMap)["key"] == nil {
+							(*nowMap)["key"] = make(map[string]int)
+						}
+						(*nowMap)["key"].(map[string]int)[key] = k
+					}
+				}
+			}
+		} else {
+			nowMap = &d.link
+			for i := 0; i < len(key); i++ {
+				kc := key[i : i+1]
+				if v, ok := (*nowMap)[kc]; ok {
+					nowMap, _ = v.(*map[string]interface{})
+				} else {
+					newMap := map[string]interface{}{}
+					newMap["YN"] = "N"
+					(*nowMap)[kc] = &newMap
+					nowMap = &newMap
+				}
+
+				if i == len(key)-1 {
+					(*nowMap)["YN"] = "Y"
+				}
+			}
+		}
+	}
+}
+func (d *DFA) Clear() {
+	d.link = nil
+}
+
+//从给定的内容中找出匹配上的关键词
+func (d *DFA) Analy(src string) []string {
+	if d.link == nil {
+		log.Println("请先添加词组")
+		return []string{}
+	}
+	keywords := []string{}
+	tempMap := make(map[string][]bool)
+	for i := 0; i < len(src); i++ {
+		nowMap := &d.link
+		length := 0 // 匹配标识数默认为0
+		//flag := false // 敏感词结束标识位:用于敏感词只有1位的情况
+		for j := i; j < len(src); j++ {
+			word := src[j : j+1]
+			nowMap, _ = (*nowMap)[word].(*map[string]interface{})
+			if nowMap != nil {
+				length = length + 1
+				tag, _ := (*nowMap)["YN"].(string)
+				if "Y" == tag {
+					//flag = true
+					keywords = append(keywords, src[i:i+length])
+				}
+			} else {
+				break
+			}
+		}
+		nowMap = &d.linkAndWord
+		length = 0
+		for j := i; j < len(src); j++ {
+			word := src[j : j+1]
+			nowMap, _ = (*nowMap)[word].(*map[string]interface{})
+			if nowMap != nil {
+				length = length + 1
+				tag, _ := (*nowMap)["YN"].(string)
+				if "Y" == tag {
+					mkeys := (*nowMap)["key"].(map[string]int)
+					for k, v := range mkeys {
+						tempBool := tempMap[k]
+						if tempBool == nil {
+							tempBool = make([]bool, d.linkAnd[k])
+							tempMap[k] = tempBool
+						}
+						tempBool[v] = true
+					}
+				}
+			} else {
+				break
+			}
+		}
+	}
+	for k, v := range tempMap {
+		ball := true
+		for _, m := range v {
+			if !m {
+				ball = false
+				break
+			}
+		}
+		if ball {
+			keywords = append(keywords, k)
+		}
+	}
+	return keywords
+}

+ 45 - 0
src/jfw/modules/pushsubscribe/src/match/dfa/interestanalysis_test.go

@@ -0,0 +1,45 @@
+package dfa
+
+import (
+	"log"
+	"strings"
+	"testing"
+	"time"
+)
+
+var d *DFA = &DFA{}
+
+func copyMap(m map[string]int) (m2 map[string]int) {
+	m2 = make(map[string]int)
+	for k, v := range m {
+		m2[k] = v
+	}
+	return m2
+}
+
+func TestAnaly(t *testing.T) {
+	d.AddWord("办公", "办+楼", "河+省", "完+你们8")
+	log.Println(strings.Split("河+南+", "+")[2])
+	t1 := time.Now()
+	log.Println(d.Analy("这胡省锦河涛写给江泽民的信我们你们于办公楼上你完就是啊。"), "=====")
+	log.Println(time.Now().Sub(t1).Seconds())
+	d.Clear()
+	//log.Println(d.Analy("这是胡锦涛写给江泽民的信啊。"))
+
+}
+
+func Test_Label(t *testing.T) {
+	log.Println("000----")
+
+	for _, v := range []int{1, 2, 3, 4, 5} {
+		log.Println(v)
+	L1:
+		for _, vv := range []string{"a", "b", "c", "d"} {
+			log.Println(vv)
+			if vv == "add" {
+				break L1
+			}
+		}
+	}
+	log.Println("111----")
+}

+ 40 - 0
src/jfw/modules/pushsubscribe/src/match/job/job.go

@@ -0,0 +1,40 @@
+package job
+
+import (
+	"match/config"
+	. "public"
+	"sync"
+)
+
+const (
+	BulkSize  = 200
+	ShowField = `"_id","title","detail","projectscope","publishtime","toptype","subtype","type","area","href","areaval","infoformat",` +
+		`"projectname","buyer","winner","budget","bidamount","bidopentime","s_subscopeclass"`
+	SortQuery         = `{"publishtime":"desc"}`
+	DB                = "bidding"
+	IDRange           = `{"range":{"id":{"gt":"%s","lte":"%s"}}},{"range":{"publishtime":{"gt": %d}}}`
+	TimeRange         = `{"range":{"comeintime":{"gte":%d,"lte":%d}}}`
+	MaxId             = `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
+	Mongodb_ShowField = `{"title":1,"detail":1,"projectscope":1,"publishtime":1,"toptype":1,"subtype":1,"type":1,"area":1,"href":1,"areaval":1,"infoformat":1,"projectname":1,"buyer":1,"winner":1,"budget":1,"bidamount":1,"bidopentime":1,"s_subscopeclass":1}`
+	DbName            = "qfw"
+)
+
+type Job interface {
+	Execute()
+}
+
+type jobs struct {
+	Match Job
+}
+
+var Jobs = &jobs{
+	Match: &MatchJob{
+		datas:             &[]map[string]interface{}{},
+		users:             &map[string]*UserInfo{},
+		matchPool:         make(chan bool, config.SysConfig.MatchPoolSize),
+		eachInfoWaitGroup: &sync.WaitGroup{},
+		saveWaitGroup:     &sync.WaitGroup{},
+		userMapLock:       &sync.Mutex{},
+		saveBatch:         []map[string]interface{}{},
+	},
+}

+ 603 - 0
src/jfw/modules/pushsubscribe/src/match/job/matchjob.go

@@ -0,0 +1,603 @@
+package job
+
+import (
+	"container/list"
+	"encoding/json"
+	"fmt"
+	. "match/config"
+	"match/dfa"
+	putil "match/util"
+	. "public"
+	"qfw/util"
+	"qfw/util/elastic"
+	"qfw/util/mongodb"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+var (
+	SaveFields = []string{"_id", "area", "bidamount", "bidopentime", "budget", "buyer", "otitle", "projectname", "publishtime", "s_subscopeclass", "subtype", "title", "toptype", "type", "winner"}
+)
+
+type Pjob struct {
+	InterestDfa    *dfa.DFA
+	NotInterestDfa *dfa.DFA
+	Key_user       *map[string]*[]*UserInfo
+	Notkey_user    *map[string]*[]*UserInfo
+}
+
+//所有用户的关键词和排除词
+func (p *Pjob) CreateDaf() {
+	//关键词
+	p.InterestDfa = &dfa.DFA{}
+	interestWords := make([]string, 0)
+	for k, _ := range *p.Key_user {
+		interestWords = append(interestWords, k)
+	}
+	p.InterestDfa.AddWord(interestWords...)
+	//排除关键词
+	p.NotInterestDfa = &dfa.DFA{}
+	notInterestWords := make([]string, 0)
+	for k, _ := range *p.Notkey_user {
+		notInterestWords = append(notInterestWords, k)
+	}
+	p.NotInterestDfa.AddWord(notInterestWords...)
+}
+
+type MatchUser struct {
+	User *UserInfo
+	Keys []string
+}
+type MatchJob struct {
+	datas             *[]map[string]interface{} //本次加载的数据
+	users             *map[string]*UserInfo     //所有用户
+	matchPool         chan bool
+	eachInfoWaitGroup *sync.WaitGroup
+	saveWaitGroup     *sync.WaitGroup
+	userMapLock       *sync.Mutex
+	lastUserId        string
+	saveBatch         []map[string]interface{}
+}
+
+//定时任务,匹配数据,存库
+func (m *MatchJob) Execute() {
+	defer util.Catch()
+	st, _ := time.ParseInLocation(util.Date_Full_Layout, TaskConfig.StartTime, time.Local)
+	lastTime := st.Unix()
+	_id := util.ObjToString(TaskConfig.LastId)
+	logger.Info("开始匹配数据任务。。。", _id, lastTime)
+	//获取本次查询的最大id
+	idQuery := ""
+	if _id == "" {
+		idQuery = strings.Replace(fmt.Sprintf(MaxId, _id), `"gt"`, `"gte"`, -1)
+	} else {
+		idQuery = fmt.Sprintf(MaxId, _id)
+	}
+	resId := elastic.Get(DB, DB, idQuery)
+	lastid := ""
+	var comeintime interface{}
+	if resId != nil && *resId != nil && len(*resId) == 1 {
+		lastid = util.ObjToString((*resId)[0]["_id"])
+		comeintime = (*resId)[0]["comeintime"]
+	} else {
+		logger.Info("匹配数据,获取本次查询的最大id的时候,未查找到数据!", idQuery)
+		return
+	}
+	if !m.LoadBidding(_id, lastid, lastTime) {
+		logger.Info("匹配数据,加载数据到内存中的时候,未查找到数据!")
+		return
+	}
+	m.lastUserId = ""
+	user_batch_index := 0
+	for {
+		user_batch_index++
+		user_batch_size := m.OnceUserBatch(user_batch_index)
+		if user_batch_size == 0 {
+			break
+		}
+		a_key_user := make(map[string]*[]*UserInfo)
+		a_notkey_user := make(map[string]*[]*UserInfo)
+		//开启智能订阅的用户
+		s_key_user := make(map[string]*[]*UserInfo)
+		s_notkey_user := make(map[string]*[]*UserInfo)
+		for _, v := range *m.users {
+			m.MakeKeyUser(v.Keys, v, &a_key_user)
+			m.MakeKeyUser(v.NotKeys, v, &a_notkey_user)
+			if v.SmartSet == 1 {
+				m.MakeKeyUser(v.Keys, v, &s_key_user)
+				m.MakeKeyUser(v.NotKeys, v, &s_notkey_user)
+			}
+		}
+		m.ToMatch(user_batch_index, a_key_user, a_notkey_user, s_key_user, s_notkey_user)
+		if user_batch_size < SysConfig.UserBatch {
+			break
+		}
+	}
+	//
+	logger.Info("匹配数据任务结束。。。", lastid, comeintime)
+	//
+	TaskConfig.LastId = lastid
+	TaskConfig.StartTime = util.FormatDateWithObj(&comeintime, util.Date_Full_Layout)
+	m.datas = &[]map[string]interface{}{}
+	m.users = &map[string]*UserInfo{}
+}
+func (m *MatchJob) ToMatch(batchIndex int, a_key_user, a_notkey_user, s_key_user, s_notkey_user map[string]*[]*UserInfo) {
+	logger.Info("匹配数据任务,开始匹配第", batchIndex, "批用户")
+	a_p := &Pjob{
+		Key_user:    &a_key_user,
+		Notkey_user: &a_notkey_user,
+	}
+	a_p.CreateDaf()
+	//
+	s_p := &Pjob{
+		Key_user:    &s_key_user,
+		Notkey_user: &s_notkey_user,
+	}
+	s_p.CreateDaf()
+	m.Save(a_p, s_p)
+	a_key_user = make(map[string]*[]*UserInfo)
+	a_notkey_user = make(map[string]*[]*UserInfo)
+	//开启智能订阅的用户
+	s_key_user = make(map[string]*[]*UserInfo)
+	s_notkey_user = make(map[string]*[]*UserInfo)
+	logger.Info("匹配数据任务,第", batchIndex, "批用户匹配结束")
+}
+func (m *MatchJob) Save(a_p, s_p *Pjob) {
+	logger.Info("匹配数据任务,开始保存到pushmail_temp表")
+	userMap := m.EachAllBidInfo(a_p, s_p)
+	//加锁,保存数据和转移数据不能同时进行
+	index := 0
+	for openid, listInfos := range *userMap {
+		var pushArray = make(SortList, 0)
+		for e := listInfos.Front(); e != nil; e = e.Next() {
+			matchInfo := *(e.Value.(*MatchInfo))
+			pushArray = append(pushArray, &matchInfo)
+		}
+		//取最新50条
+		sort.Sort(pushArray)
+		var array []*MatchInfo
+		size := 0
+		for _, v2 := range pushArray {
+			size++
+			info := map[string]interface{}{}
+			for _, field := range SaveFields {
+				if (*v2.Info)[field] == nil {
+					continue
+				}
+				info[field] = (*v2.Info)[field]
+			}
+			array = append(array, &MatchInfo{
+				Info: &info,
+				Keys: v2.Keys,
+			})
+			if len(array) == SysConfig.MaxPushSize {
+				break
+			}
+		}
+		user := (*m.users)[openid]
+		m.saveBatch = append(m.saveBatch, map[string]interface{}{
+			"s_m_openid":   user.S_m_openid,
+			"a_m_openid":   user.A_m_openid,
+			"phone":        user.Phone,
+			"jpushid":      user.Jpushid,
+			"opushid":      user.Opushid,
+			"appphonetype": user.AppPhoneType,
+			"userid":       user.Id,
+			"ratemode":     user.RateMode,
+			"wxpush":       user.WxPush,
+			"apppush":      user.AppPush,
+			"mailpush":     user.MailPush,
+			"smartset":     user.SmartSet,
+			"usertype":     user.UserType,
+			"email":        user.Email,
+			"dataexport":   user.DataExport,
+			"list":         array,
+			"size":         size,
+			"subscribe":    user.Subscribe,
+			"applystatus":  user.ApplyStatus,
+			"words":        user.OriginalKeys,
+			"modifydate":   user.ModifyDate,
+			"mergeorder":   user.MergeOrder,
+			"timestamp":    time.Now().Unix(),
+		})
+		if len(m.saveBatch) == BulkSize {
+			mongodb.SaveBulk("pushspace_temp", m.saveBatch...)
+			m.saveBatch = []map[string]interface{}{}
+		}
+		index++
+		if index%500 == 0 {
+			logger.Info("匹配数据任务,保存到pushmail_temp表", index)
+		}
+	}
+	if len(m.saveBatch) > 0 {
+		mongodb.SaveBulk("pushspace_temp", m.saveBatch...)
+		m.saveBatch = []map[string]interface{}{}
+	}
+	logger.Info("匹配数据任务,保存到pushmail_temp表结束", index)
+}
+
+//加载数据到内存中
+func (m *MatchJob) LoadBidding(_id, lastid string, lastTime int64) bool {
+	defer util.Catch()
+	c_query := map[string]interface{}{
+		"publishtime": map[string]interface{}{
+			"$gt": lastTime - 7*86400,
+		},
+		"extracttype": 1,
+	}
+	idQuery := map[string]interface{}{}
+	if _id != "" {
+		idQuery["$gt"] = bson.ObjectIdHex(_id)
+	}
+	if lastid != "" {
+		idQuery["$lte"] = bson.ObjectIdHex(lastid)
+	}
+	if len(idQuery) > 0 {
+		c_query["_id"] = idQuery
+	}
+	logger.Info("匹配数据任务,mongodb query:", c_query)
+	count := mongodb.Count("bidding", c_query)
+	logger.Info("匹配数据任务,本次数据共", count, "条")
+	if count == 0 {
+		return false
+	}
+	if count > SysConfig.MaxSearch {
+		count = SysConfig.MaxSearch
+		logger.Info("匹配数据任务,目前数据多于", SysConfig.MaxSearch, ",只加载了", SysConfig.MaxSearch, "条!")
+	}
+	var res []map[string]interface{}
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	it := sess.DB(DbName).C("bidding").Find(c_query).Select(mongodb.ObjToOth(Mongodb_ShowField)).Sort("_id").Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); {
+		index++
+		tmp["_id"] = util.BsonIdToSId(tmp["_id"])
+		res = append(res, tmp)
+		if index%500 == 0 {
+			logger.Info("匹配数据任务,当前加载数据:", index)
+		}
+		tmp = make(map[string]interface{})
+		if index >= count {
+			break
+		}
+	}
+	m.datas = &res
+	logger.Info("匹配数据任务,", count, "条数据已经加载完成!")
+	return true
+}
+
+//初始化用户缓存
+func (m *MatchJob) OnceUserBatch(user_batch_index int) int {
+	defer util.Catch()
+	m.users = &map[string]*UserInfo{}
+	//遍历用户
+	q := map[string]interface{}{
+		"i_appid": 2,
+	}
+	_idq := map[string]interface{}{}
+	if len(SysConfig.TestIds) > 0 {
+		_idq["$in"] = putil.ToObjectIds(SysConfig.TestIds)
+	}
+	if m.lastUserId != "" {
+		_idq["$gt"] = bson.ObjectIdHex(m.lastUserId)
+	}
+	if len(_idq) > 0 {
+		q["_id"] = _idq
+	}
+	logger.Info("匹配数据任务,开始加载第", user_batch_index, "批用户", q)
+	session := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(session)
+	query := session.DB(DbName).C("user").Find(&q).Select(&map[string]interface{}{
+		"_id":           1,
+		"o_jy":          1,
+		"s_m_openid":    1,
+		"a_m_openid":    1,
+		"s_phone":       1,
+		"s_jpushid":     1,
+		"s_opushid":     1,
+		"i_ispush":      1,
+		"i_dataexport":  1,
+		"i_type":        1,
+		"i_smartset":    1,
+		"i_supersearch": 1,
+		"s_appponetype": 1,
+		"i_applystatus": 1,
+		"a_mergeorder":  1,
+	}).Iter()
+	n := 0
+	for temp := make(map[string]interface{}); query.Next(temp); {
+		s_m_openid := util.ObjToString(temp["s_m_openid"])
+		a_m_openid := util.ObjToString(temp["a_m_openid"])
+		s_phone := util.ObjToString(temp["s_phone"])
+		userType := putil.GetUserType(s_m_openid, a_m_openid, s_phone, util.IntAllDef(temp["i_type"], 0))
+		isPush := util.IntAllDef(temp["i_ispush"], 1)
+		//公众号取关用户,后面还有pc助手推送,暂时不过滤app用户
+		if userType == 0 && isPush == 0 {
+			continue
+		}
+		applystatus := util.IntAll(temp["i_applystatus"])
+		o_msgset, _ := temp["o_jy"].(map[string]interface{})
+		wxpush, apppush, mailpush := putil.ModeTransform(userType, o_msgset)
+		jpushid := util.ObjToString(temp["s_jpushid"])
+		opushid := util.ObjToString(temp["s_opushid"])
+		var allkeysTemp []elastic.KeyConfig
+		_bs, err := json.Marshal(o_msgset["a_key"])
+		if err == nil {
+			json.Unmarshal(_bs, &allkeysTemp)
+		}
+		allkeys := []elastic.KeyConfig{}
+		if len(allkeysTemp) > 0 {
+			//一个字或者配置文件中的词,不推送
+			for _, vs := range allkeysTemp {
+				isFilter := false
+				vskey := strings.Replace(strings.Join(vs.Keys, ""), " ", "", -1)
+				if len([]rune(vskey)) == 1 {
+					continue
+				}
+				for _, fv := range SysConfig.FilterWords {
+					if fv == vskey {
+						isFilter = true
+						break
+					}
+				}
+				if !isFilter {
+					allkeys = append(allkeys, vs)
+				}
+			}
+		}
+		////////////////
+		if len(allkeys) == 0 {
+			continue
+		}
+		userId := fmt.Sprintf("%x", string(temp["_id"].(bson.ObjectId)))
+		logger.Info("匹配数据任务,第", user_batch_index, "批用户,userid", userId, "s_m_openid", s_m_openid, "a_m_openid", a_m_openid, "s_phone", s_phone, "jpushid", jpushid, "opushid", opushid, "applystatus", applystatus)
+		keys := []string{}                           //过滤后的关键词
+		notkeys := []string{}                        //排除词
+		key_notkey := map[string]map[string]bool{}   //关键词所对应的排除词
+		key_area := map[string]map[string]bool{}     //关键词所对应的信息范围
+		key_infotype := map[string]map[string]bool{} //关键词所对应的信息类型
+		for _, vs := range allkeys {
+			key := strings.Join(vs.Keys, "+")
+			keys = append(keys, key)
+			notkeys = append(notkeys, vs.NotKeys...)
+			//转大写
+			keyTemp := strings.ToUpper(key)
+			//建立与排除词的对应关系
+			for _, notkey := range vs.NotKeys {
+				notkeyTemp := strings.ToUpper(notkey)
+				if key_notkey[keyTemp] == nil {
+					key_notkey[keyTemp] = map[string]bool{}
+				}
+				key_notkey[keyTemp][notkeyTemp] = true
+			}
+			//建立与信息范围的对应关系
+			for _, area := range vs.Areas {
+				if key_area[keyTemp] == nil {
+					key_area[keyTemp] = map[string]bool{}
+				}
+				key_area[keyTemp][area] = true
+			}
+			//建立与信息类型的对应关系
+			for _, infotype := range vs.InfoTypes {
+				if key_infotype[keyTemp] == nil {
+					key_infotype[keyTemp] = map[string]bool{}
+				}
+				key_infotype[keyTemp][infotype] = true
+			}
+		}
+		//
+		keysTemp := []string{} //原始关键词
+		for _, vs := range allkeysTemp {
+			keysTemp = append(keysTemp, strings.Join(vs.Keys, "+"))
+		}
+		smartset := util.IntAllDef(temp["i_smartset"], 0)
+		modifydate := ""
+		md, _ := o_msgset["l_modifydate"].(int64)
+		if md > 0 {
+			modifydate = util.FormatDateByInt64(&md, util.Date_Short_Layout)
+		}
+		if modifydate == "" {
+			now := time.Now()
+			modifydate = util.FormatDate(&now, util.Date_Short_Layout)
+		}
+		user := &UserInfo{
+			Id:           userId,
+			OriginalKeys: keysTemp,
+			Keys:         keys, //原始关键词
+			NotKeys:      notkeys,
+			Key_notkey:   key_notkey,
+			Key_area:     key_area,
+			Key_infotype: key_infotype,
+			WxPush:       wxpush,
+			AppPush:      apppush,
+			MailPush:     mailpush,
+			Email:        util.ObjToString(o_msgset["s_email"]),
+			S_m_openid:   s_m_openid,
+			A_m_openid:   a_m_openid,
+			Phone:        s_phone,
+			Jpushid:      jpushid,
+			Opushid:      opushid,
+			UserType:     userType,
+			RateMode:     util.IntAllDef(o_msgset["i_ratemode"], 2),
+			AllKeys:      allkeysTemp, //原始关键词
+			SmartSet:     smartset,
+			DataExport:   util.IntAllDef(temp["i_dataexport"], 0),
+			ModifyDate:   modifydate,
+			AppPhoneType: util.ObjToString(temp["s_appponetype"]),
+			ApplyStatus:  applystatus,
+			Subscribe:    isPush,
+			MergeOrder:   temp["a_mergeorder"],
+		}
+		(*m.users)[user.Id] = user
+		m.lastUserId = user.Id
+		temp = make(map[string]interface{})
+		n++
+		if n == SysConfig.UserBatch {
+			break
+		}
+	}
+	logger.Info("匹配数据任务,第", user_batch_index, "批用户加载结束", n)
+	return n
+}
+
+//把用户挂在词下面
+func (m *MatchJob) MakeKeyUser(keys []string, user *UserInfo, key_user *map[string]*[]*UserInfo) {
+	mp := map[string]bool{}
+	for _, key := range keys {
+		v := strings.ToUpper(key)
+		if v == "" || mp[v] {
+			continue
+		}
+		mp[v] = true
+		var arr *[]*UserInfo
+		if nil == (*key_user)[v] {
+			arr = &[]*UserInfo{}
+			(*key_user)[v] = arr
+		} else {
+			arr = (*key_user)[v]
+			(*key_user)[v] = arr
+		}
+		*arr = append(*arr, user)
+	}
+}
+
+//遍历数据并执行推送操作
+func (m *MatchJob) EachAllBidInfo(a_p *Pjob, s_p *Pjob) *map[string]*list.List {
+	defer util.Catch()
+	logger.Info("匹配数据任务,开始遍历数据。。。")
+	userMap := &map[string]*list.List{}
+	var count int
+	for _, temp := range *m.datas {
+		m.matchPool <- true
+		m.eachInfoWaitGroup.Add(1)
+		count++
+		go func(info map[string]interface{}) {
+			defer func() {
+				m.eachInfoWaitGroup.Done()
+				<-m.matchPool
+			}()
+			title := util.ObjToString(info["title"])
+			if title == "" {
+				return
+			}
+			titleTemp := strings.ToUpper(title)
+			area := util.ObjToString(info["area"])
+			toptype := util.ObjToString(info["toptype"])
+			//订阅词
+			keys := a_p.InterestDfa.Analy(titleTemp)
+			//排除词
+			notkeys := a_p.NotInterestDfa.Analy(titleTemp)
+			users := m.GetFinalUser(keys, notkeys, a_p.Key_user, area, toptype, true)
+			//开启智能匹配的用户,匹配projectscope
+			if s_p != nil {
+				projectscope := util.ObjToString(info["projectscope"])
+				if projectscope == "" {
+					projectscope = util.ObjToString(info["detail"])
+				}
+				if projectscope != "" {
+					projectscopeTemp := strings.ToUpper(projectscope)
+					keys = s_p.InterestDfa.Analy(projectscopeTemp)
+					notkeys = s_p.NotInterestDfa.Analy(projectscopeTemp)
+					s_users := m.GetFinalUser(keys, notkeys, s_p.Key_user, area, toptype, false)
+					for _, s_u := range *s_users {
+						if (*users)[s_u.User.Id] != nil {
+							continue
+						}
+						(*users)[s_u.User.Id] = s_u
+					}
+				}
+			}
+			if len(*users) > 0 {
+				m.EachInfoToUser(users, &info, userMap)
+			}
+		}(temp)
+		if count%500 == 0 {
+			logger.Info("匹配数据任务,当前信息索引:", count)
+		}
+	}
+	m.eachInfoWaitGroup.Wait()
+	logger.Info("匹配数据任务,数据遍历完成!")
+	return userMap
+}
+
+//获取最终的用户,排除词、信息范围、信息类型之后的
+//返回匹配上的用户和没有匹配到的用户
+func (m *MatchJob) GetFinalUser(keys, notkeys []string, key_user *map[string]*[]*UserInfo, area, toptype string, flag bool) *map[string]*MatchUser {
+	keyMap := map[string]bool{}
+	for _, v := range keys {
+		keyMap[v] = true
+	}
+	y_users := map[string]*MatchUser{} //匹配到用户
+	//遍历所有用户
+	for k, us := range *key_user {
+		if !keyMap[k] { //改关键词没有匹配到的用户
+			continue
+		}
+		for _, u := range *us {
+			//获取该词下面所有的用户
+			//遍历我的排除词,如果存在的话,排除自己
+			isContinue := false
+			for _, notkey := range notkeys {
+				if u.Key_notkey[k][notkey] {
+					isContinue = true
+					break
+				}
+			}
+			if isContinue {
+				continue
+			}
+			//遍历我的信息范围,看该信息是不是在我的信息范围中
+			if len(u.Key_area[k]) > 0 && !u.Key_area[k][area] {
+				continue
+			}
+			//遍历我的信息类型,看该信息是不是在我的信息类型中
+			if len(u.Key_infotype[k]) > 0 && !u.Key_infotype[k][toptype] {
+				continue
+			}
+			matchUser := y_users[u.Id]
+			if matchUser == nil {
+				matchUser = &MatchUser{
+					User: u,
+					Keys: []string{},
+				}
+			}
+			matchUser.Keys = append(matchUser.Keys, k)
+			y_users[u.Id] = matchUser
+		}
+	}
+	//获取最终没有匹配到的用户,进行正文或者范围匹配
+	users := map[string]*MatchUser{}
+	for k, v := range *m.users {
+		if y_users[k] == nil {
+			continue
+		}
+		users[v.Id] = &MatchUser{
+			User: v,
+			Keys: y_users[k].Keys,
+		}
+	}
+	return &users
+}
+
+//遍历用户加入到此条信息上
+func (m *MatchJob) EachInfoToUser(users *map[string]*MatchUser, info *map[string]interface{}, userMap *map[string]*list.List) {
+	defer m.userMapLock.Unlock()
+	m.userMapLock.Lock()
+	for k, v := range *users {
+		l := (*userMap)[k]
+		if l == nil {
+			l = list.New()
+		}
+		l.PushBack(&MatchInfo{
+			Info: info,
+			Keys: v.Keys,
+		})
+		(*userMap)[k] = l
+	}
+}

+ 27 - 0
src/jfw/modules/pushsubscribe/src/match/job/timetask.go

@@ -0,0 +1,27 @@
+package job
+
+import (
+	"log"
+	. "match/config"
+	"qfw/util"
+	"time"
+)
+
+type timeTask struct {
+	Match *MatchTimeTask //匹配数据
+}
+
+var Task = &timeTask{
+	Match: &MatchTimeTask{},
+}
+
+type MatchTimeTask struct {
+}
+
+func (m *MatchTimeTask) Execute() {
+	Jobs.Match.Execute()
+	util.WriteSysConfig("./task.json", &TaskConfig)
+	t := time.Duration(SysConfig.MatchDuration) * time.Minute
+	log.Println("start match job after", t)
+	time.AfterFunc(t, m.Execute)
+}

+ 30 - 0
src/jfw/modules/pushsubscribe/src/match/main.go

@@ -0,0 +1,30 @@
+//订阅推送-匹配数据服务
+package main
+
+import (
+	"flag"
+	"log"
+	. "match/config"
+	"match/job"
+	"qfw/util/elastic"
+	"qfw/util/mongodb"
+
+	"github.com/donnie4w/go-logger/logger"
+)
+
+func main() {
+	modle := flag.Int("m", 0, "默认:0;1 匹配数据")
+	flag.Parse()
+	logger.SetConsole(false)
+	logger.SetRollingDaily("./logs", "match.log")
+	mongodb.InitMongodbPool(SysConfig.MgoSize, SysConfig.MgoAddr, "qfw")
+	elastic.InitElasticSize(SysConfig.ElasticSearch, SysConfig.ElasticPoolSize)
+	log.Println("订阅推送-匹配数据程序启动。。。")
+	if *modle == 1 {
+		job.Jobs.Match.Execute()
+	} else {
+		go job.Task.Match.Execute()
+		flag := make(chan bool)
+		<-flag
+	}
+}

二进制
src/jfw/modules/pushsubscribe/src/match/src


+ 1 - 0
src/jfw/modules/pushsubscribe/src/match/task.json

@@ -0,0 +1 @@
+{"startTime":"","lastId":"596f20265d11e1c7455dc5bc"}

+ 102 - 0
src/jfw/modules/pushsubscribe/src/match/util/util.go

@@ -0,0 +1,102 @@
+package util
+
+import (
+	"qfw/util"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+//重新设置用户类型
+func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
+	if userType == 0 {
+		if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			userType = 0 //公众号
+		} else if s_m_openid == "" && phone != "" {
+			userType = 1 //app手机号
+		} else if s_m_openid == "" && a_m_openid != "" {
+			userType = 2 //app微信
+		} else if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			userType = 3 //用户合并以后只有微信用户
+		} else if s_m_openid == "" && (a_m_openid != "" || phone != "") {
+			userType = 4 //用户合并以后只有app用户
+		} else if s_m_openid != "" && (a_m_openid != "" || phone != "") {
+			userType = 5 //用户合并以后公众号和app用户都有
+		} else {
+			userType = -1
+		}
+	}
+	return userType
+}
+
+//推送方式转换
+func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int) {
+	mode := util.IntAll(o_msgset["i_mode"])
+	wxpush := util.IntAll(o_msgset["i_wxpush"])
+	apppush := util.IntAll(o_msgset["i_apppush"])
+	mailpush := util.IntAll(o_msgset["i_mailpush"])
+	if wxpush == 1 || apppush == 1 || mailpush == 1 {
+		return wxpush, apppush, mailpush
+	}
+	//老的app用户
+	if userType == 1 || userType == 2 {
+		switch mode {
+		case 0, 1:
+			apppush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			apppush = 1
+			mailpush = 1
+			break
+		}
+		if apppush == 0 && mailpush == 0 {
+			apppush = 1
+		}
+	} else if userType == 0 {
+		switch mode {
+		case 0, 1:
+			wxpush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			wxpush = 1
+			mailpush = 1
+			break
+		}
+		if wxpush == 0 && mailpush == 0 {
+			wxpush = 1
+		}
+	} else {
+		switch mode {
+		case 0, 1, 3:
+			if userType == 3 {
+				wxpush = 1
+			} else if userType == 4 {
+				apppush = 1
+			} else if userType == 5 {
+				wxpush = 1
+				apppush = 1
+			}
+			if mode == 3 {
+				mailpush = 1
+			}
+			break
+		case 2:
+			mailpush = 1
+			break
+		}
+	}
+	return wxpush, apppush, mailpush
+}
+
+func ToObjectIds(ids []string) []bson.ObjectId {
+	_ids := []bson.ObjectId{}
+	for _, v := range ids {
+		_ids = append(_ids, bson.ObjectIdHex(v))
+	}
+	return _ids
+}

+ 65 - 0
src/jfw/modules/pushsubscribe/src/public/entity.go

@@ -0,0 +1,65 @@
+package public
+
+import (
+	"qfw/util"
+	"qfw/util/elastic"
+)
+
+type UserInfo struct {
+	Id           string                     //mongoid
+	Province     string                     //省份
+	Key_notkey   map[string]map[string]bool //关键词-排除词
+	Key_area     map[string]map[string]bool //关键词-信息范围
+	Key_infotype map[string]map[string]bool //关键词-信息类型
+	OriginalKeys []string                   //用户兴趣
+	Keys         []string                   //用户兴趣
+	NotKeys      []string                   //用户不感兴趣
+	S_m_openid   string                     //公众号openid
+	A_m_openid   string                     //app微信登录openid
+	Phone        string                     //app手机号登录
+	Jpushid      string
+	Opushid      string
+	InterestDate int64
+	WxPush       int
+	AppPush      int
+	MailPush     int
+	RateMode     int
+	SmartSet     int //智能订阅 1开启 0关闭
+	Email        string
+	DataExport   int //是否导出数据 1开启 0关闭
+	AllKeys      []elastic.KeyConfig
+	ModifyDate   string
+	AppPhoneType string
+	ApplyStatus  int
+	Subscribe    int
+	UserType     int
+	MergeOrder   interface{}
+	//
+	//Active int
+	//Fail   *Fail //失败重试
+}
+
+type Fail struct {
+	Wx    int
+	App   int
+	Email int
+}
+
+type SortList []*MatchInfo
+
+func (s SortList) Len() int {
+	return len(s)
+}
+
+func (s SortList) Less(i, j int) bool {
+	return util.Int64All((*s[i].Info)["publishtime"]) > util.Int64All((*s[j].Info)["publishtime"])
+}
+
+func (s SortList) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+type MatchInfo struct {
+	Info *map[string]interface{}
+	Keys []string
+}

+ 60 - 0
src/jfw/modules/pushsubscribe/src/push/config.json

@@ -0,0 +1,60 @@
+{
+	"jianyuDomain": "https://web-jydev-wcj.jianyu360.cn",
+	"cassandra": {
+		"cachesize": 10000,
+		"host": ["192.168.3.207"],
+		"open": true,
+		"size": 5,
+		"timeout": 20
+	},
+	"redisServers": "pushcache=192.168.3.18:3379",
+	"mail_content": "<tr><td><num>%d</num></td><td><div class='tit'><a style='color: #000;text-decoration: none;' href='%s?mail' >%s</a></div></td><td style='float: right;' class='infos' ><span class='%s'>%s</span><span class='%s'>%s</span><span class='%s'>%s</span><span class='time'>%s</span></td></tr>",
+	"mail_html": "<body><style> *,body,html{margin:0;padding:0;font-family: tahoma, arial, 'Hiragino Sans GB', 'Microsoft YaHei', 宋体, sans-serif;font-size:16px; }#all{margin:0 auto;width:1024px;overflow:hidden;}.head{margin:5x;margin-top:20px;}.des{padding-bottom:15px;border-bottom:1px solid #e8ecee;color: #686868;}td a:hover {color: #fe7379;text-decoration: underline;} .tit{width:560px;overflow: hidden;    white-space: nowrap;text-overflow: ellipsis;}.area {background-color: #2cb7ca;border-radius: 3px;color: #fff;padding: 1px 2px;}.type {background-color: #ffba00;border-radius: 3px;color: #fff;padding: 1px 2px;margin-left:5px;}.industry {background-color: #25c78c;border-radius: 3px;color: #fff;padding: 1px 2px;margin-left:5px;}.infos span{display:inline-block;margin-left:5px;}td{padding-top:8px;padding-bottom:8px;height:20px;line-height:20px;}num{padding:0 5px 0 0; font-size:16px;color:#2cb7ca;font-weight:bolder;}.keys{color:blue;} </style><div id='all'><div class='head'><IMG width='100px' src=http://www.zhaobiao.info/images/swordfish/sf_01.png /></div><div class='head des'>根据您设置的关键词 :<span class='keys'>%s</span>,剑鱼为您推送30天之内的信息。点击标题可查看详情信息</div><table cellpadding='0' cellspacing='0'>%s</table></div> </body>",
+	"mail_title": "您有新的%s信息-剑鱼招标订阅",
+	"mails": [{
+		"addr": "smtp.exmail.qq.com",
+		"port": 465,
+		"pwd": "ue9Rg9Sf4CVtdm5a",
+		"user": "public03@topnet.net.cn"
+	}],
+	"maxPushSize": 50,
+	"messyCodeEmailReg": "(@(126|163)\\.com)$",
+	"mgoAddr": "192.168.3.18:27080",
+	"mgoSize": 10,
+	"testids": ["5c8f4f4325ef8723d0bc1082"],
+	"weixinRpcServer": "127.0.0.1:8083",
+	"wxColor": "#2cb7ca",
+	"wxContent": "剑鱼推送",
+	"wxGroup": "招标信息",
+	"wxTitle": "根据你订阅的关键词“%s”,剑鱼为你推送以下信息。如果不想继续收到此类信息,可进入招标订阅的设置页面取消订阅。",
+	"wxDetailColor":"#686868",
+	"appPushServiceRpc":"127.0.0.1:5566",
+	"pcHelper":"192.168.20.129:8082",
+	"startPushTime":"08:00",
+	"endPushTime":"20:00",
+	"oncePushTime": "9:00",
+	"otherPushTimes":[
+		"14:00",
+		"18:00"
+	],
+	"cassandraPollSize":10,
+	"pushPoolSize": 60,
+	"minutePushSize": 300,
+	"fastigiumMinutePushSize": 100,
+	"fastigiumTime":"9-11",
+	"mailPollSize": 20,
+	"wxPollSize": 40,
+	"appPollSize": 50,
+	"pushDuration": 2,
+	"retry": 3,
+	"mailReTry":2,
+	"mailSleep":200,
+	"cassandraSleep":200,
+	"appSleep":5,
+	"wxSleep":5,
+	"pcHelperSleep":5,
+	"isPushMail":true,
+	"inactivityPushHour": 10,
+	"pushBatch":2,
+	"ninePushRedisTimeout": 172800
+}

+ 95 - 0
src/jfw/modules/pushsubscribe/src/push/config/config.go

@@ -0,0 +1,95 @@
+package config
+
+import (
+	"qfw/util"
+	"qfw/util/mail"
+	"regexp"
+)
+
+type sysConfig struct {
+	JianyuDomain            string      `json:"jianyuDomain"`
+	Cassandra               *cassandra  `json:"cassandra"`
+	RedisServers            string      `json:"redisServers"`
+	PushPoolSize            int         `json:"pushPoolSize"`
+	Mail_content            string      `json:"mail_content"`
+	Mail_html               string      `json:"mail_html"`
+	Mail_title              string      `json:"mail_title"`
+	Mails                   []*pushMail `json:"mails"`
+	MaxPushSize             int         `json:"maxPushSize"`
+	MessyCodeEmailReg       string      `json:"messyCodeEmailReg"`
+	MgoAddr                 string      `json:"mgoAddr"`
+	MgoSize                 int         `json:"mgoSize"`
+	TestIds                 []string    `json:"testIds"`
+	WeixinRpcServer         string      `json:"weixinRpcServer"`
+	WxColor                 string      `json:"wxColor"`
+	WxGroup                 string      `json:"wxGroup"`
+	WxContent               string      `json:"wxContent"`
+	WxTitle                 string      `json:"wxTitle"`
+	WxDetailColor           string      `json:"wxDetailColor"`
+	AppPushServiceRpc       string      `json:"appPushServiceRpc"`
+	PcHelper                string      `json:"pcHelper"`
+	PushDuration            int64       `json:"pushDuration"`
+	Retry                   int         `json:"retry"`
+	InactivityPushHour      int         `json:"inactivityPushHour"`
+	PushBatch               int         `json:"pushBatch"`
+	StartPushTime           string      `json:"startPushTime"`
+	EndPushTime             string      `json:"endPushTime"`
+	OncePushTime            string      `json:"oncePushTime"`
+	OtherPushTimes          []string    `json:"otherPushTimes"`
+	MailPollSize            int         `json:"mailPollSize"`
+	WxPollSize              int         `json:"wxPollSize"`
+	AppPollSize             int         `json:"appPollSize"`
+	MailReTry               int         `json:"mailReTry"`
+	MailSleep               int         `json:"mailSleep"`
+	CassandraSleep          int         `json:"cassandraSleep"`
+	AppSleep                int         `json:"appSleep"`
+	WxSleep                 int         `json:"wxSleep"`
+	PcHelperSleep           int         `json:"pcHelperSleep"`
+	IsPushMail              bool        `json:"isPushMail"`
+	CassandraPollSize       int         `json:"cassandraPollSize"`
+	MinutePushSize          int         `json:"minutePushSize"`
+	FastigiumMinutePushSize int         `json:"fastigiumMinutePushSize"`
+	FastigiumTime           string      `json:"fastigiumTime"`
+	NinePushRedisTimeout    int         `json:"ninePushRedisTimeout"`
+}
+type pushMail struct {
+	Addr string `json:"addr"`
+	Port int    `json:"port"`
+	Pwd  string `json:"pwd"`
+	User string `json:"user"`
+}
+type cassandra struct {
+	Cachesize int      `json:"cachesize"`
+	Host      []string `json:"host"`
+	Open      bool     `json:"open"`
+	Size      int      `json:"size"`
+	Timeout   int      `json:"timeout"`
+}
+
+var (
+	Gmails       chan *mail.GmailAuth
+	Gmail        *mail.GmailAuth
+	Se           = util.SimpleEncrypt{Key: "topnet"}
+	Re           = regexp.MustCompile("<[^>]+>([^<]+)?<[^>]+>")
+	SysConfig    *sysConfig
+	WxContentLen int
+)
+
+func init() {
+	//html过滤
+	util.ReadConfig("./config.json", &SysConfig)
+	//
+	WxContentLen = len([]rune(SysConfig.WxContent))
+	//Gmails = make(chan *mail.GmailAuth, len(SysConfig.Mails))
+	for _, v := range SysConfig.Mails {
+		Gmail = &mail.GmailAuth{
+			SmtpHost: v.Addr,
+			SmtpPort: v.Port,
+			User:     v.User,
+			Pwd:      v.Pwd,
+			PoolSize: SysConfig.MailPollSize,
+			ReTry:    SysConfig.MailReTry,
+		}
+		break
+	}
+}

+ 24 - 0
src/jfw/modules/pushsubscribe/src/push/job/job.go

@@ -0,0 +1,24 @@
+package job
+
+import (
+	"push/config"
+	"sync"
+)
+
+type Job interface {
+	MySelf() interface{}
+	Execute(taskType int, isMove bool)
+}
+
+type jobs struct {
+	Nine Job
+	Push Job
+}
+
+var Jobs = &jobs{
+	Push: &PushJob{
+		pushPool:    make(chan bool, config.SysConfig.PushPoolSize),
+		pushWait:    &sync.WaitGroup{},
+		pushJobLock: &sync.Mutex{},
+	},
+}

+ 804 - 0
src/jfw/modules/pushsubscribe/src/push/job/pushjob.go

@@ -0,0 +1,804 @@
+package job
+
+import (
+	"encoding/json"
+	"fmt"
+	"os"
+	. "public"
+	. "push/config"
+	putil "push/util"
+	"qfw/util"
+	"qfw/util/mail"
+	"qfw/util/mongodb"
+	"regexp"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	mgo "gopkg.in/mgo.v2"
+	"gopkg.in/mgo.v2/bson"
+)
+
+const (
+	BulkSize = 200
+	DbName   = "qfw"
+)
+
+var (
+	messyCodeEmailReg = regexp.MustCompile(SysConfig.MessyCodeEmailReg)
+	MoveFields        = []string{"s_m_openid", "a_m_openid", "phone", "usertype", "jpushid", "opushid", "words", "ratemode", "wxpush", "apppush", "mailpush", "smartset", "timestamp", "subscribe", "applystatus", "appphonetype", "email", "size", "modifydate", "mergeorder"}
+)
+
+func init() {
+	mySelf := Jobs.Push.MySelf().(*PushJob)
+	//推送1分钟限制
+	mySelf.minutePushPool = make(chan bool, SysConfig.MinutePushSize)
+	mySelf.fastigiumMinutePushPool = make(chan bool, SysConfig.FastigiumMinutePushSize)
+	for i := 0; i < SysConfig.MinutePushSize; i++ {
+		mySelf.minutePushPool <- true
+	}
+	for i := 0; i < SysConfig.FastigiumMinutePushSize; i++ {
+		mySelf.fastigiumMinutePushPool <- true
+	}
+	go func() {
+		t := time.NewTicker(time.Minute)
+		for {
+			select {
+			case <-t.C:
+				for i := 0; i < SysConfig.MinutePushSize-len(mySelf.minutePushPool); i++ {
+					mySelf.minutePushPool <- true
+				}
+				for i := 0; i < SysConfig.FastigiumMinutePushSize-len(mySelf.fastigiumMinutePushPool); i++ {
+					mySelf.fastigiumMinutePushPool <- true
+				}
+			}
+		}
+	}()
+}
+
+type PushJob struct {
+	pushPool                chan bool
+	minutePushPool          chan bool
+	fastigiumMinutePushPool chan bool
+	pushWait                *sync.WaitGroup
+	pushJobLock             *sync.Mutex
+	lastId                  string
+	pushDatas               *[]map[string]interface{}
+}
+
+func (p *PushJob) MySelf() interface{} {
+	return p
+}
+
+//taskType 1--实时推送 2--实时推送+一天三次的8点推送 3--一天三次推送 4--九点推送
+func (p *PushJob) Execute(taskType int, isMoveDatas bool) {
+	p.pushJobLock.Lock()
+	defer p.pushJobLock.Unlock()
+	logger.Info("开始推送任务。。。", taskType)
+	if isMoveDatas {
+		p.Move()
+	}
+	p.Push(taskType)
+}
+func (p *PushJob) Move() {
+	logger.Info("推送任务,开始迁移数据。。。")
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	nowUnix := time.Now().Unix()
+	it := sess.DB(DbName).C("pushspace_temp").Find(map[string]interface{}{
+		"timestamp": map[string]interface{}{
+			"$lt": nowUnix,
+		},
+	}).Sort("_id").Iter()
+	_index := 0
+	pushspace_temp := map[string]map[string]interface{}{}
+	logger.Info("推送任务,开始把pushspace_temp表的数据加载到内存中")
+	for _temp := make(map[string]interface{}); it.Next(&_temp); {
+		userId := util.ObjToString(_temp["userid"])
+		if pushspace_temp[userId] != nil {
+			sl := make(SortList, 0)
+			if newList := p.ToSortList(_temp["list"], userId); newList != nil {
+				sl = append(sl, newList...)
+			}
+			oldList, _ := pushspace_temp[userId]["list"].(SortList)
+			sl = append(sl, oldList...)
+			sort.Sort(sl)
+			if len(sl) > SysConfig.MaxPushSize {
+				sl = sl[:SysConfig.MaxPushSize]
+			}
+			_temp["list"] = sl
+		} else {
+			_temp["list"] = p.ToSortList(_temp["list"], userId)
+		}
+		pushspace_temp[userId] = _temp
+		_temp = make(map[string]interface{})
+		_index++
+		if _index%500 == 0 {
+			logger.Info("推送任务,pushspace_temp表的数据加载到内存中:", _index)
+		}
+	}
+	logger.Info("推送任务,pushspace_temp表的数据加载完成")
+	index := 0
+	saveArray := []map[string]interface{}{}
+	updateArray_d := []map[string]interface{}{}
+	updateArray_q := []map[string]interface{}{}
+	updateArray_s := []map[string]interface{}{}
+	for userId, temp := range pushspace_temp {
+		var data map[string]interface{}
+		sess.DB(DbName).C("pushspace").Find(map[string]interface{}{"userid": userId}).Select(map[string]interface{}{"list": 1, "templist": 1}).One(&data)
+		if data == nil { //批量新增
+			saveArray = append(saveArray, temp)
+			if len(saveArray) == BulkSize {
+				p.SaveBulk(sess, &saveArray, nowUnix)
+			}
+		} else { //批量更新
+			setMap := map[string]interface{}{}
+			for _, field := range MoveFields {
+				if temp[field] == nil {
+					continue
+				}
+				setMap[field] = temp[field]
+			}
+			//
+			newList, _ := temp["list"].(SortList)
+			if newList == nil || len(newList) == 0 {
+				continue
+			}
+			pLength := len(newList)
+			pushAll := make(map[string]interface{})
+			for _, v := range []string{"", "temp"} {
+				oldList := p.ToSortList(data[v+"list"], userId)
+				if v == "temp" && oldList == nil {
+					continue
+				}
+				rLength := len(oldList)
+				if rLength+pLength > SysConfig.MaxPushSize {
+					newList = append(newList, oldList...)
+					if len(newList) > SysConfig.MaxPushSize {
+						newList = newList[:SysConfig.MaxPushSize]
+					}
+					setMap[v+"list"] = newList
+					setMap[v+"size"] = SysConfig.MaxPushSize
+				} else { //追加
+					setMap[v+"size"] = rLength + pLength
+					pushAll[v+"list"] = newList
+				}
+			}
+			upSet := map[string]interface{}{
+				"$set": setMap,
+			}
+			if len(pushAll) > 0 {
+				upSet["$pushAll"] = pushAll
+			}
+			updateArray_d = append(updateArray_d, map[string]interface{}{"userid": userId})
+			updateArray_q = append(updateArray_q, map[string]interface{}{"_id": data["_id"]})
+			updateArray_s = append(updateArray_s, upSet)
+			if len(updateArray_q) == BulkSize {
+				p.UpdateBulk(sess, &updateArray_d, &updateArray_q, &updateArray_s, nowUnix)
+			}
+		}
+		index++
+		if index%500 == 0 {
+			logger.Info("推送任务,迁移数据:", index)
+		}
+	}
+	if len(saveArray) > 0 {
+		p.SaveBulk(sess, &saveArray, nowUnix)
+	}
+	if len(updateArray_q) > 0 {
+		p.UpdateBulk(sess, &updateArray_d, &updateArray_q, &updateArray_s, nowUnix)
+	}
+	logger.Info("推送任务,迁移数据结束。。。", index)
+}
+func (p *PushJob) SaveBulk(sess *mgo.Session, array *[]map[string]interface{}, nowUnix int64) {
+	coll := sess.DB(DbName).C("pushspace")
+	bulk := coll.Bulk()
+	for _, v := range *array {
+		bulk.Insert(v)
+	}
+	_, err := bulk.Run()
+	if nil != err {
+		logger.Info("推送任务,BulkError", err)
+	} else {
+		p.DelBulk(sess, array, nowUnix)
+	}
+	*array = []map[string]interface{}{}
+}
+func (p *PushJob) UpdateBulk(sess *mgo.Session, array_d, array_q, array_s *[]map[string]interface{}, nowUnix int64) {
+	coll := sess.DB(DbName).C("pushspace")
+	bulk := coll.Bulk()
+	for k, v := range *array_q {
+		bulk.Update(v, (*array_s)[k])
+	}
+	_, err := bulk.Run()
+	if nil != err {
+		logger.Info("推送任务,UpdateBulkError", err)
+	} else {
+		p.DelBulk(sess, array_d, nowUnix)
+	}
+	*array_d = []map[string]interface{}{}
+	*array_q = []map[string]interface{}{}
+	*array_s = []map[string]interface{}{}
+}
+func (p *PushJob) DelBulk(sess *mgo.Session, array *[]map[string]interface{}, nowUnix int64) {
+	coll := sess.DB(DbName).C("pushspace_temp")
+	bulk := coll.Bulk()
+	for _, v := range *array {
+		bulk.RemoveAll(map[string]interface{}{
+			"userid": v["userid"],
+			"timestamp": map[string]interface{}{
+				"$lt": nowUnix,
+			},
+		})
+	}
+	_, err := bulk.Run()
+	if nil != err {
+		logger.Info("推送任务,DelBulkError", err)
+	}
+}
+func (p *PushJob) Push(taskType int) {
+	logger.Info("推送任务,开始推送。。。")
+	p.lastId = ""
+	batch_index := 0
+	for {
+		batch_index++
+		batch_size := p.OncePushBatch(batch_index, taskType)
+		for _, temp := range *p.pushDatas {
+			p.pushPool <- true
+			p.pushWait.Add(1)
+			go func(v map[string]interface{}) {
+				defer func() {
+					<-p.pushPool
+					p.pushWait.Done()
+				}()
+				words, _ := v["words"].([]interface{})
+				u := &UserInfo{
+					Id:           util.ObjToString(v["userid"]),
+					OriginalKeys: util.ObjArrToStringArr(words),
+					WxPush:       util.IntAll(v["wxpush"]),
+					AppPush:      util.IntAll(v["apppush"]),
+					MailPush:     util.IntAll(v["mailpush"]),
+					Email:        util.ObjToString(v["email"]),
+					S_m_openid:   util.ObjToString(v["s_m_openid"]),
+					A_m_openid:   util.ObjToString(v["a_m_openid"]),
+					Phone:        util.ObjToString(v["phone"]),
+					Jpushid:      util.ObjToString(v["jpushid"]),
+					Opushid:      util.ObjToString(v["opushid"]),
+					UserType:     util.IntAll(v["usertype"]),
+					RateMode:     util.IntAllDef(v["ratemode"], 1),
+					SmartSet:     util.IntAllDef(v["smartset"], 1),
+					DataExport:   util.IntAll(v["dataexport"]),
+					AppPhoneType: util.ObjToString(v["appphonetype"]),
+					ApplyStatus:  util.IntAll(v["applystatus"]),
+					Subscribe:    util.IntAllDef(v["subscribe"], 1),
+					ModifyDate:   util.ObjToString(v["modifydate"]),
+					MergeOrder:   v["mergeorder"],
+				}
+				logger.Info("推送任务,开始推送用户,userid", u.Id, "s_m_openid", u.S_m_openid, "a_m_openid", u.A_m_openid, "phone", u.Phone, "jpushid", u.Jpushid, "opushid", u.Opushid, "applystatus", u.ApplyStatus)
+				wxPush, appPush, mailPush := 0, 0, 0
+				list := p.ToSortList(v["list"], u.Id)
+				templist := p.ToSortList(v["templist"], u.Id)
+				if taskType == 1 {
+					if u.WxPush == 1 {
+						if u.ApplyStatus == 1 {
+							wxPush = -1
+						} else {
+							wxPush = 1
+						}
+					}
+					if u.AppPush == 1 {
+						appPush = 1
+					}
+					if u.MailPush == 1 {
+						mailPush = -1
+					}
+				} else if taskType == 2 || taskType == 4 {
+					if u.WxPush == 1 {
+						wxPush = 1
+					}
+					if u.AppPush == 1 {
+						appPush = 1
+					}
+					if u.MailPush == 1 {
+						mailPush = 1
+					}
+				} else if taskType == 3 {
+					if u.WxPush == 1 && u.ApplyStatus == 1 {
+						wxPush = 1
+					}
+					if u.MailPush == 1 {
+						mailPush = 1
+					}
+				} else if taskType == 4 {
+					if u.WxPush == 1 {
+						wxPush = 1
+					}
+					if u.AppPush == 1 {
+						appPush = 1
+					}
+					if u.MailPush == 1 {
+						mailPush = 1
+					}
+				}
+				//再对取消关注以及app没有登录的用户进行过滤
+				if u.Subscribe == 0 {
+					wxPush = 0
+				}
+				if u.Jpushid == "" && u.Opushid == "" {
+					appPush = 0
+				}
+				if mailPush != 0 {
+					if (u.UserType == 0 || u.UserType == 3) && u.Subscribe == 0 {
+						mailPush = 0
+					} else if (u.UserType == 1 || u.UserType == 2 || u.UserType == 4) && u.Jpushid == "" && u.Opushid == "" {
+						mailPush = 0
+					} else if u.UserType == 5 && u.Subscribe == 0 && u.Jpushid == "" && u.Opushid == "" {
+						mailPush = 0
+					}
+				}
+				t_wxpush, t_mailpush := util.IntAll(v["tempwxpush"]), util.IntAll(v["tempmailpush"])
+				if templist != nil {
+					if wxPush == 1 && t_wxpush == 0 {
+						wxPush = 0
+					}
+					if mailPush == 1 && t_mailpush == 0 {
+						mailPush = 0
+					}
+				}
+				logger.Info("推送任务,本次推送任务类型", taskType, "用户的接收方式", u.Id, wxPush, appPush, mailPush, t_wxpush, t_mailpush)
+				if wxPush == 1 || appPush == 1 || mailPush == 1 {
+					isSaveSuccess := false
+					//wxPushStatus, appPushStatus, mailPushStatus := 0, 0, 0
+					//开通订阅推送或邮箱推送的用户,由实时推送修改成九点推送,app推送用list字段,微信和邮箱推送用templist字段
+					if list != nil && templist != nil {
+						isSaveSuccess, _, _, _ = p.DealSend(taskType, true, 0, appPush, 0, u, &list)
+						if !isSaveSuccess {
+							return
+						}
+						p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
+					} else if list != nil {
+						isSaveSuccess, _, _, _ = p.DealSend(taskType, true, wxPush, appPush, mailPush, u, &list)
+						if !isSaveSuccess {
+							return
+						}
+					} else if templist != nil {
+						p.DealSend(taskType, false, wxPush, 0, mailPush, u, &templist)
+					}
+					/*if wxPush == 1 {
+						if wxPushStatus == -1 {
+							wxPush = -1
+						} else {
+							wxPush = 0
+						}
+					}
+					if mailPush == 1 {
+						if mailPushStatus == -1 {
+							mailPush = -1
+						} else {
+							mailPush = 0
+						}
+					}*/
+					//判断是否要删除数据
+					_sess := mongodb.GetMgoConn()
+					defer mongodb.DestoryMongoConn(_sess)
+					if wxPush == -1 || mailPush == -1 {
+						//如果该用户还有微信或者邮箱推送,把list字段的值挪到templist
+						update := map[string]interface{}{}
+						set := map[string]interface{}{
+							"tempwxpush":   wxPush,
+							"tempmailpush": mailPush,
+						}
+						if templist == nil {
+							update["$rename"] = map[string]interface{}{"list": "templist"}
+						} else {
+							update["$unset"] = map[string]interface{}{"list": ""}
+						}
+						update["$set"] = set
+						_sess.DB(DbName).C("pushspace").UpdateId(v["_id"], update)
+					} else {
+						_sess.DB(DbName).C("pushspace").RemoveId(v["_id"])
+					}
+				}
+			}(temp)
+		}
+		if batch_size < SysConfig.PushBatch {
+			break
+		}
+	}
+	p.pushWait.Wait()
+	logger.Info("推送任务结束。。。", taskType)
+}
+func (p *PushJob) DealSend(taskType int, isSave bool, wxPush, appPush, mailPush int, k *UserInfo, sl *SortList) (isSaveSuccess bool, wxPushStatus, appPushStatus, mailPushStatus int) {
+	defer util.Catch()
+	str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下信息:</div>", strings.Join(k.OriginalKeys, ";"))
+	mailContent := ""
+	//发送内容组合
+	i := 0
+	jpushtitle := ""
+	lastInfoDate := int64(0)
+	TitleArray := []string{}
+	o_pushinfo := map[string]map[string]interface{}{}
+	matchKey_infoIndex := map[string]string{}
+	publishTitle := map[string]bool{}
+	//邮件附件
+	var fmdatas = []map[string]interface{}{}
+	for _, ks := range *sl {
+		k2 := *ks.Info
+		title := strings.Replace(k2["title"].(string), "\n", "", -1)
+		area := util.ObjToString(k2["area"])
+		if area == "A" {
+			area = "全国"
+		}
+		newTitle := fmt.Sprintf("[%s]%s", area, title)
+		if publishTitle[newTitle] {
+			continue
+		}
+		publishTitle[title] = true
+		i++
+		TitleArray = append(TitleArray, Re.ReplaceAllString(newTitle, "$1"))
+		if i == 1 {
+			jpushtitle = title
+			lastInfoDate = util.Int64All(k2["publishtime"])
+		}
+		//_sid := util.EncodeArticleId(util.BsonIdToSId(k2["_id"]))
+		//url := fmt.Sprintf("%s/pcdetail/%s.html", Domain, _sid)
+		_sid := util.EncodeArticleId2ByCheck(util.ObjToString(k2["_id"]))
+		//增加行业的处理
+		industry := ""
+		industryclass := "industry"
+		if k2["s_subscopeclass"] != nil {
+			k2sub := strings.Split(util.ObjToString(k2["s_subscopeclass"]), ",")
+			if len(k2sub) > 0 {
+				industry = k2sub[0]
+				if industry != "" {
+					ss := strings.Split(industry, "_")
+					if len(ss) > 1 {
+						industry = ss[0]
+					}
+				}
+			}
+		}
+		if mailPush == 1 { //关于邮件的处理
+			mailSid := util.CommonEncodeArticle("mailprivate", util.ObjToString(k2["_id"]))
+			url := fmt.Sprintf("%s/article/mailprivate/%s.html", SysConfig.JianyuDomain, mailSid)
+			classArea := "area"
+			classType := "type"
+			types := util.ObjToString(k2["subtype"])
+			if types == "" {
+				types = util.ObjToString(k2["toptype"])
+				if types == "" {
+					types = "其他"
+				}
+			}
+			dates := util.LongToDate(k2["publishtime"], false)
+			//标题替换
+			otitle := title
+			for _, kw := range k.OriginalKeys {
+				kws := strings.Split(kw, "+")
+				n := 0
+				otitle2 := otitle
+				for _, kwn := range kws {
+					ot := strings.Replace(otitle2, kwn, "<span class='keys'>"+kwn+"</span>", 1)
+					if ot != otitle {
+						n++
+						otitle2 = ot
+					} else {
+						break
+					}
+				}
+				if n == len(kws) {
+					otitle = otitle2
+					break
+				}
+			}
+			if industry == "" {
+				industryclass = ""
+			}
+			mailContent += fmt.Sprintf(SysConfig.Mail_content, i, url, otitle, classArea, area, classType, types, industryclass, industry, dates)
+		}
+		str += "<div class='tslist'><span class='xh'>" + fmt.Sprintf("%d", i) + ".</span><a class='bt' target='_blank' eid='" + _sid + "' href='" + util.ObjToString(k2["href"]) + "'>[<span class='area'>" + area + "</span>]" + title + "</a></div>"
+		o_pushinfo[strconv.Itoa(i)] = map[string]interface{}{
+			"publishtime":   k2["publishtime"],
+			"stype":         util.ObjToString(k2["type"]),
+			"topstype":      util.ObjToString(k2["toptype"]),
+			"substype":      util.ObjToString(k2["subtype"]),
+			"subscopeclass": industry,
+			"buyer":         k2["buyer"],
+			"projectname":   k2["projectname"],
+			"budget":        k2["budget"],
+			"bidopentime":   k2["bidopentime"],
+			"winner":        k2["winner"],
+			"bidamount":     k2["bidamount"],
+		}
+		//附件数据
+		fmdata := map[string]interface{}{
+			"publishtime": k2["publishtime"],
+			"subtype":     k2["subtype"],
+			"buyer":       k2["buyer"],
+			"projectname": k2["projectname"],
+			"budget":      k2["budget"],
+			"bidopentime": k2["bidopentime"],
+			"winner":      k2["winner"],
+			"bidamount":   k2["bidamount"],
+		}
+		fmdatas = append(fmdatas, fmdata)
+		//匹配到的关键词
+		for _, key := range (*ks).Keys {
+			if matchKey_infoIndex[key] != "" {
+				matchKey_infoIndex[key] = matchKey_infoIndex[key] + ","
+			}
+			matchKey_infoIndex[key] = matchKey_infoIndex[key] + strconv.Itoa(i)
+		}
+		if i >= SysConfig.MaxPushSize {
+			//限制最大信息条数
+			break
+		}
+	}
+	if i == 0 {
+		logger.Info("推送任务,没有要推送的数据!", k.S_m_openid, k.A_m_openid, k.Phone)
+		return
+	}
+	if isSave {
+		//推送记录id
+		pushId := putil.SaveSendInfo(taskType, k, str, o_pushinfo, matchKey_infoIndex)
+		if pushId == "" {
+			logger.Info("推送任务,保存到cassandra出错", k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
+			return
+		} else {
+			logger.Info("推送任务,成功保存到cassandra", pushId, k.Id, k.S_m_openid, k.A_m_openid, k.Phone)
+		}
+		isSaveSuccess = true
+	}
+	//九点推送的时候,限制一分钟最大的推送数量
+	if taskType == 4 && (wxPush == 1 || appPush == 1) {
+		hour := time.Now().Hour()
+		fastigiumStart, fastigiumEnd := 0, 0
+		fastigiumTimes := strings.Split(SysConfig.FastigiumTime, "-")
+		if len(fastigiumTimes) == 2 {
+			fastigiumStart = util.IntAll(fastigiumTimes[0])
+			fastigiumEnd = util.IntAll(fastigiumTimes[1])
+		}
+		if hour >= fastigiumStart && hour <= fastigiumEnd {
+			<-p.fastigiumMinutePushPool //高峰期
+		} else {
+			<-p.minutePushPool //正常期
+		}
+	}
+	if wxPush == 1 {
+		isPushOk := true
+		if k.ApplyStatus == 1 {
+			TmpTip := ""
+			minute := time.Now().Unix() - lastInfoDate
+			if minute > -1 && minute < 61 {
+				TmpTip = fmt.Sprintf("%d秒前发布的", minute)
+			} else {
+				minute = minute / 60
+				if minute < 121 {
+					if minute < 1 {
+						minute = 1
+					}
+					TmpTip = fmt.Sprintf("%d分钟前发布的", minute)
+				}
+			}
+			Tip1 := util.If(TmpTip == "", "", TmpTip+":\n").(string)
+			LastTip := ""
+			if i > 1 {
+				LastTip = fmt.Sprintf("...(共%d条)", i)
+			}
+			LastTipLen := len([]rune(LastTip))
+			wxTitleKeys := strings.Join(k.OriginalKeys, ";")
+			if len([]rune(wxTitleKeys)) > 8 {
+				wxTitleKeys = string([]rune(wxTitleKeys)[:8]) + "..."
+			}
+			wxtitle := fmt.Sprintf(SysConfig.WxTitle, wxTitleKeys)
+			TitleLen := len([]rune(wxtitle))
+			GroupLen := len([]rune(k.ModifyDate))
+			reLen := 200 - TitleLen - GroupLen - WxContentLen - len([]rune(Tip1))
+			//if infoType == 2 {
+			//	reLen = reLen - 4
+			//}
+			WXTitle := ""
+			bshow := false
+			for n := 1; n < len(TitleArray)+1; n++ {
+				curTitle := TitleArray[n-1]
+				tmptitle := WXTitle + fmt.Sprintf("%d %s\n", n, curTitle)
+				ch := reLen - len([]rune(tmptitle))
+				if ch < LastTipLen { //加上后大于后辍,则没有完全显示
+					if ch == 0 && n == len(TitleArray) {
+						WXTitle = tmptitle
+						bshow = true
+					} else {
+						ch_1 := reLen - len([]rune(WXTitle)) - LastTipLen
+						if ch_1 > 8 {
+							curLen := len([]rune(curTitle))
+							if ch_1 > curLen {
+								ch_1 = curLen
+							}
+							WXTitle += fmt.Sprintf("%d %s\n", n, string([]rune(curTitle)[:ch_1-3]))
+						}
+					}
+				} else if ch == LastTipLen {
+					WXTitle = tmptitle
+					if n == len(TitleArray) {
+						bshow = true
+					}
+				} else {
+					WXTitle = tmptitle
+					if n == len(TitleArray) {
+						bshow = true
+					}
+				}
+			}
+			if bshow {
+				LastTip = ""
+			}
+			//推送微信
+			isPushOk = putil.SendWeixin(k, Tip1+WXTitle+LastTip, o_pushinfo, matchKey_infoIndex, wxtitle)
+			if isPushOk {
+				wxPushStatus = 1
+			} else {
+				wxPushStatus = -1
+			}
+		}
+		logger.Info("推送任务,微信推送", isPushOk, k.Id, k.S_m_openid, k.RateMode, k.ApplyStatus)
+	}
+	if appPush == 1 {
+		if len([]rune(jpushtitle)) > 80 {
+			jpushtitle = string([]rune(jpushtitle)[:80]) + "..."
+		}
+		if i > 1 {
+			jpushtitle = fmt.Sprintf("1. %s\n...(共%d条)", jpushtitle, i)
+		}
+		go mongodb.Update("user", map[string]interface{}{
+			"_id": bson.ObjectIdHex(k.Id),
+		}, map[string]interface{}{
+			"$inc": map[string]interface{}{
+				"i_apppushunread": 1,
+			},
+		}, false, false)
+		sess_openid := k.A_m_openid
+		if sess_openid == "" {
+			sess_openid = k.Phone
+		}
+		if sess_openid == "" {
+			sess_openid = k.S_m_openid
+		}
+		isPushOk := putil.SendApp(map[string]interface{}{
+			"phoneType": k.AppPhoneType,
+			"descript":  jpushtitle,
+			"type":      "bid",
+			"userId":    k.Id,
+			"openId":    sess_openid,
+			"url":       "/jyapp/free/sess/" + Se.EncodeString(sess_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",historypush"),
+			//"url":         "/jyapp/free/sess/" + push.Se.EncodeString(k.Openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",wxpushlist") + "__" + pushid,
+			"otherPushId": k.Opushid,
+			"jgPushId":    k.Jpushid, //极光-推送id
+		})
+		if isPushOk {
+			appPushStatus = 1
+		} else {
+			appPushStatus = -1
+		}
+		logger.Info("推送任务,app推送", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.AppPhoneType, k.Jpushid, k.Opushid, k.RateMode)
+	}
+	//发送邮件
+	if mailPush == 1 {
+		html := fmt.Sprintf(SysConfig.Mail_html, strings.Replace(strings.Join(k.OriginalKeys, ";"), "+", " ", -1), mailContent)
+		subject := fmt.Sprintf(SysConfig.Mail_title, "招标")
+		isPushOk := p.SendMail(k.Email, subject, html, fmdatas)
+		if isPushOk {
+			mailPushStatus = 1
+		} else {
+			mailPushStatus = -1
+		}
+		logger.Info("推送任务,发送邮件", isPushOk, k.Id, k.S_m_openid, k.A_m_openid, k.Phone, k.Email, k.DataExport)
+	}
+	if wxPush == 1 || appPush == 1 || (mailPush == 1 && wxPush == 0 && appPush == 0) {
+		//pc端助手推送
+		openid := k.S_m_openid
+		if openid == "" {
+			openid = k.Phone
+		}
+		if openid != "" {
+			putil.SendPcHelper(map[string]interface{}{"clientCode": openid})
+		}
+	}
+	return
+}
+
+//推送邮件(含附件)
+func (p *PushJob) SendMail(email, subject, html string, fmdatas []map[string]interface{}) bool {
+	if !SysConfig.IsPushMail {
+		return true
+	}
+	defer util.Catch()
+	//生成附件
+	var fnamepath, rename string
+	if len(fmdatas) > 0 { //开启导出
+		fnamepath, rename = putil.GetBidInfoXlsx(fmdatas)
+		if messyCodeEmailReg.MatchString(email) {
+			rename = time.Now().Format("2006-01-02") + ".xlsx"
+		}
+	}
+	//gmail := <-Gmails
+	//defer func() {
+	//Gmails <- gmail
+	//}()
+	status := mail.GSendMail("剑鱼招标订阅", email, "", "", subject, html, fnamepath, rename, Gmail)
+	if fnamepath != "" {
+		os.Remove(fnamepath)
+	}
+	if SysConfig.MailSleep > 0 {
+		time.Sleep(time.Duration(SysConfig.MailSleep) * time.Millisecond)
+	}
+	return status
+}
+
+/************************************************/
+func (p *PushJob) OncePushBatch(batch_index, taskType int) int {
+	p.pushDatas = &[]map[string]interface{}{}
+	i := 0
+	sess := mongodb.GetMgoConn()
+	defer mongodb.DestoryMongoConn(sess)
+	var query map[string]interface{}
+	//实时推送,只查找ratemode==1的用户
+	if taskType == 1 || taskType == 2 {
+		query = map[string]interface{}{
+			"ratemode": 1,
+		}
+	} else if taskType == 3 {
+		query = map[string]interface{}{
+			"ratemode":    1,
+			"applystatus": 1,
+		}
+	} else if taskType == 4 {
+		query = map[string]interface{}{
+			"ratemode": 2,
+		}
+	} else {
+		logger.Error("taskType error", taskType)
+		return i
+	}
+	if len(SysConfig.TestIds) > 0 {
+		query["userid"] = map[string]interface{}{
+			"$in": SysConfig.TestIds,
+		}
+	}
+	if p.lastId != "" {
+		query["_id"] = map[string]interface{}{
+			"$gt": bson.ObjectIdHex(p.lastId),
+		}
+	}
+	logger.Info("推送任务,开始加载第", batch_index, "批用户", query)
+	it := sess.DB(DbName).C("pushspace").Find(query).Sort("_id").Iter()
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		i++
+		p.lastId = util.BsonIdToSId(temp["_id"])
+		*p.pushDatas = append(*p.pushDatas, temp)
+		temp = make(map[string]interface{})
+		if i == SysConfig.PushBatch {
+			break
+		}
+	}
+	logger.Info("推送任务,第", batch_index, "批用户加载结束", p.lastId)
+	return i
+}
+func (p *PushJob) ToSortList(list interface{}, userId string) SortList {
+	if list == nil {
+		return nil
+	}
+	b, err := json.Marshal(list)
+	if err != nil {
+		return nil
+	}
+	sl := make(SortList, 0)
+	err = json.Unmarshal(b, &sl)
+	if err != nil {
+		return nil
+	}
+	sort.Sort(sl)
+	return sl
+}

+ 139 - 0
src/jfw/modules/pushsubscribe/src/push/job/timetask.go

@@ -0,0 +1,139 @@
+package job
+
+import (
+	"log"
+	. "push/config"
+	"qfw/util"
+	"strings"
+	"time"
+)
+
+type timeTask struct {
+	RealPush  *RealPushTimeTask  //实时推送
+	NinePush  *NinePushTimeTask  //九点推送
+	OtherPush *OtherPushTimeTask //一天三次
+}
+
+var Task = &timeTask{
+	RealPush:  &RealPushTimeTask{},  //实时推送
+	NinePush:  &NinePushTimeTask{},  //九点推送
+	OtherPush: &OtherPushTimeTask{}, //一天三次
+}
+
+type RealPushTimeTask struct {
+}
+
+func (r *RealPushTimeTask) Execute() {
+	s_h_m := strings.Split(SysConfig.StartPushTime, ":")
+	if len(s_h_m) != 2 {
+		log.Fatalln("error:startpushtime", SysConfig.StartPushTime)
+	}
+	now := time.Now()
+	start := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(s_h_m[0]), util.IntAll(s_h_m[1]), 0, 0, time.Local)
+	//程序启动在开始时间之前
+	if now.Before(start) {
+		sub := start.Sub(now)
+		log.Println("start", SysConfig.StartPushTime, "pushjob after", sub)
+		time.AfterFunc(sub, func() {
+			go r.run(2)
+			ticker := time.NewTicker(time.Hour * 24)
+			for {
+				select {
+				case <-ticker.C:
+					go r.run(2)
+				}
+			}
+		})
+	} else {
+		go r.run(1)
+		start = start.AddDate(0, 0, 1)
+		sub := start.Sub(now)
+		log.Println("start", SysConfig.StartPushTime, "pushjob after", sub)
+		timer := time.NewTimer(sub)
+		for {
+			select {
+			case <-timer.C:
+				timer.Reset(time.Hour * 24)
+				go r.run(2)
+			}
+		}
+	}
+}
+func (r *RealPushTimeTask) run(taskType int) {
+	e_h_m := strings.Split(SysConfig.EndPushTime, ":")
+	if len(e_h_m) != 2 {
+		log.Fatalln("endpushtime", SysConfig.EndPushTime)
+	}
+	now := time.Now()
+	end := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(e_h_m[0]), util.IntAll(e_h_m[1]), 0, 0, time.Local)
+	if now.Before(end) {
+		Jobs.Push.Execute(taskType, true)
+	}
+	//隔天的话,不继续
+	//判断下一轮是否还需要继续
+	if now.Day() != time.Now().Day() || time.Now().After(end) {
+		return
+	}
+	log.Println("start pushjob after", SysConfig.PushDuration, "m")
+	time.AfterFunc(time.Duration(SysConfig.PushDuration)*time.Minute, func() {
+		r.run(1)
+	})
+}
+
+type OtherPushTimeTask struct {
+}
+
+func (o *OtherPushTimeTask) Execute() {
+	for _, otherpushtime := range SysConfig.OtherPushTimes {
+		h_m := strings.Split(otherpushtime, ":")
+		if len(h_m) != 2 {
+			log.Fatalln("error:otherpushtimes", otherpushtime)
+			return
+		}
+		now := time.Now()
+		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
+		if newDate.Before(now) {
+			newDate = newDate.AddDate(0, 0, 1)
+		}
+		sub := newDate.Sub(now)
+		log.Println("start", otherpushtime, "pushjob after", sub)
+		time.AfterFunc(sub, func() {
+			go Jobs.Push.Execute(3, true)
+			ticker := time.NewTicker(time.Hour * 24)
+			for {
+				select {
+				case <-ticker.C:
+					go Jobs.Push.Execute(3, true)
+				}
+			}
+		})
+	}
+}
+
+type NinePushTimeTask struct {
+}
+
+func (n *NinePushTimeTask) Execute() {
+	h_m := strings.Split(SysConfig.OncePushTime, ":")
+	if len(h_m) == 2 {
+		now := time.Now()
+		newDate := time.Date(now.Year(), now.Month(), now.Day(), util.IntAll(h_m[0]), util.IntAll(h_m[1]), 0, 0, time.Local)
+		if newDate.Before(now) {
+			newDate = newDate.AddDate(0, 0, 1)
+		}
+		sub := newDate.Sub(now)
+		log.Println("start", SysConfig.OncePushTime, "pushjob after", sub)
+		time.AfterFunc(sub, func() {
+			go Jobs.Push.Execute(4, true)
+			ticker := time.NewTicker(time.Hour * 24)
+			for {
+				select {
+				case <-ticker.C:
+					go Jobs.Push.Execute(4, true)
+				}
+			}
+		})
+	} else {
+		log.Fatalln("error:oncepushtime", SysConfig.OtherPushTimes)
+	}
+}

+ 56 - 0
src/jfw/modules/pushsubscribe/src/push/main.go

@@ -0,0 +1,56 @@
+//订阅推送-推送服务
+package main
+
+import (
+	"flag"
+	"log"
+	. "push/config"
+	"push/job"
+	"qfw/util/mongodb"
+	"qfw/util/redis"
+	"time"
+	ca "ucbsutil/cassandra"
+
+	"github.com/donnie4w/go-logger/logger"
+)
+
+func main() {
+	sleep := flag.Int("s", 0, "程序启动完以后,实时推送休眠s分钟再开始")
+	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务")
+	taskType := flag.Int("t", 1, "1 实时推送;2 实时推送+一天三次的八点推送;3 一天三次推送;4 九点推送")
+	moveDatas := flag.String("v", "y", "是否迁移数据")
+	flag.Parse()
+	logger.SetConsole(false)
+	logger.SetRollingDaily("./logs", "push.log")
+	mongodb.InitMongodbPool(SysConfig.MgoSize, SysConfig.MgoAddr, "qfw")
+	redis.InitRedis(SysConfig.RedisServers)
+	//初始化cassandra
+	ca.ViewCacheLen = true
+	ca.InitCassandra("jianyu",
+		SysConfig.Cassandra.Size,
+		SysConfig.Cassandra.Host,
+		map[string]int{
+			"cachesize": SysConfig.Cassandra.Cachesize,
+			"timeout":   SysConfig.Cassandra.Timeout,
+		},
+	)
+	log.Println("订阅推送-推送程序启动。。。")
+	isMoveDatas := *moveDatas == "y"
+	if *modle == 1 {
+		job.Jobs.Push.Execute(*taskType, isMoveDatas)
+	} else {
+		if *modle == 2 {
+			job.Jobs.Push.Execute(*taskType, isMoveDatas)
+		}
+		go job.Task.OtherPush.Execute()
+		go job.Task.NinePush.Execute()
+		if *sleep > 0 {
+			log.Println("实时推送先休眠", *sleep, "m")
+			time.AfterFunc(time.Duration(*sleep)*time.Minute, job.Task.RealPush.Execute)
+		} else {
+			go job.Task.RealPush.Execute()
+		}
+		flag := make(chan bool)
+		<-flag
+	}
+}

二进制
src/jfw/modules/pushsubscribe/src/push/src


+ 93 - 0
src/jfw/modules/pushsubscribe/src/push/util/excel.go

@@ -0,0 +1,93 @@
+package util
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"os"
+	qu "qfw/util"
+	"time"
+
+	"github.com/tealeg/xlsx"
+)
+
+var Sheets = map[int]xlsx.Sheet{}
+
+func init() {
+	fx, err := xlsx.OpenFile("./xlsx/temp.xlsx")
+	if err != nil {
+		log.Println(err.Error())
+		os.Exit(0)
+	}
+	for k, st := range fx.Sheets {
+		Sheets[k] = *st
+	}
+}
+func GetBidInfoXlsx(data []map[string]interface{}) (fnamepath, rename string) {
+	fx := xlsx.NewFile()
+	sheet := Sheets[0]
+	style := xlsx.NewStyle()
+	style.Alignment.WrapText = true
+	//信息
+	for _, v := range data {
+		row := sheet.AddRow()
+
+		projectname := qu.ObjToString(v["projectname"])
+		if projectname == "" {
+			projectname = qu.ObjToString(v["title"])
+		}
+		cell := row.AddCell()
+		cell.SetValue(projectname)
+		cell.SetStyle(style)
+
+		cell = row.AddCell()
+		cell.SetValue(v["subtype"])
+
+		budget := qu.Float64All(v["budget"]) / float64(10000)
+		cell = row.AddCell()
+		if budget != 0 {
+			cell.SetValue(budget)
+		} else {
+			cell.SetValue("")
+		}
+
+		cell = row.AddCell()
+		cell.SetValue(v["buyer"])
+		cell.SetStyle(style)
+
+		bpt := v["bidopentime"]
+		bidopentime := qu.FormatDateWithObj(&bpt, "2006-01-02 15:04")
+		cell = row.AddCell()
+		cell.SetValue(bidopentime)
+
+		cell = row.AddCell()
+		cell.SetValue(v["winner"])
+		cell.SetStyle(style)
+
+		bidamount := qu.Float64All(v["bidamount"]) / float64(10000)
+		cell = row.AddCell()
+		if bidamount != 0 {
+			cell.SetValue(bidamount)
+		} else {
+			cell.SetValue("")
+		}
+
+		pt := v["publishtime"]
+		publishtime := qu.FormatDateWithObj(&pt, qu.Date_Short_Layout)
+		cell = row.AddCell()
+		cell.SetValue(publishtime)
+	}
+	fx.AppendSheet(sheet, "剑鱼")
+
+	t := time.Now()
+	rename = "剑鱼招标订阅_" + t.Format("2006-01-02") + "_推送信息表.xlsx"
+	fnamepath = "./xlsx/" + t.Format("20060102150405") + fmt.Sprint(rand.Intn(10000)) + ".xlsx"
+
+	err := fx.Save(fnamepath)
+	//log.Println("err", err)
+	if err != nil {
+		return "", ""
+	} else {
+		return fnamepath, rename
+	}
+}

+ 116 - 0
src/jfw/modules/pushsubscribe/src/push/util/rpccall.go

@@ -0,0 +1,116 @@
+package util
+
+import (
+	"encoding/json"
+	"net/rpc"
+	. "public"
+	. "push/config"
+	"qfw/util"
+	"qfw/util/mongodb"
+	qrpc "qfw/util/rpc"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
+)
+
+var (
+	wxPushPool  = make(chan bool, SysConfig.WxPollSize)
+	appPushPool = make(chan bool, SysConfig.AppPollSize)
+)
+
+//微信远程调用,实现模板发送消息
+func SendWeixin(k *UserInfo, Remarks string, o_pushinfo map[string]map[string]interface{}, matchKey_infoIndex map[string]string, wxtitle string) bool {
+	wxPushPool <- true
+	defer func() {
+		util.Catch()
+		<-wxPushPool
+	}()
+	if SysConfig.WxSleep > 0 {
+		time.Sleep(time.Duration(SysConfig.WxSleep) * time.Millisecond)
+	}
+	var repl qrpc.RpcResult
+	now := time.Now()
+	p := &qrpc.NotifyMsg{
+		Openid:      k.S_m_openid,
+		Title:       wxtitle,
+		Remark:      Remarks,
+		Detail:      SysConfig.WxGroup,
+		Date:        "",
+		Service:     util.FormatDate(&now, util.Date_Short_Layout),
+		Color:       SysConfig.WxColor,
+		DetailColor: SysConfig.WxDetailColor,
+		Url:         SysConfig.JianyuDomain + "/front/sess/" + Se.EncodeString(k.S_m_openid+",uid,"+strconv.Itoa(int(time.Now().Unix()))+",rssset"),
+	}
+	client, err := rpc.DialHTTP("tcp", SysConfig.WeixinRpcServer)
+	if err != nil {
+		logger.Error(err.Error())
+		return false
+	}
+	defer client.Close()
+	err = client.Call("WeiXinRpc.SubscribePush", p, &repl)
+	if err != nil {
+		logger.Error(err.Error())
+	}
+	res := string(repl)
+	if strings.Contains(res, "[46004]") || strings.Contains(res, "[65302]") || strings.Contains(res, "[43004]") || strings.Contains(res, "[40003]") {
+		mongodb.Update("user", map[string]interface{}{"_id": bson.ObjectIdHex(k.Id)}, map[string]interface{}{
+			"$set": map[string]interface{}{
+				"i_ispush": 0,
+			},
+		}, false, false)
+		return true
+	}
+	if repl == "Y" {
+		return true
+	}
+	return false
+}
+func SendApp(m map[string]interface{}) bool {
+	appPushPool <- true
+	defer func() {
+		util.Catch()
+		<-appPushPool
+	}()
+	if SysConfig.AppSleep > 0 {
+		time.Sleep(time.Duration(SysConfig.AppSleep) * time.Millisecond)
+	}
+	var repl string
+	client, err := rpc.DialHTTP("tcp", SysConfig.AppPushServiceRpc)
+	if err != nil {
+		logger.Error(err.Error())
+		return false
+	}
+	defer client.Close()
+	b, _ := json.Marshal(m)
+	err = client.Call("Rpc.Push", b, &repl)
+	if err != nil {
+		logger.Error(err.Error())
+		return false
+	}
+	return repl == "y"
+}
+
+//
+func SendPcHelper(m map[string]interface{}) bool {
+	defer util.Catch()
+	if SysConfig.PcHelperSleep > 0 {
+		time.Sleep(time.Duration(SysConfig.PcHelperSleep) * time.Millisecond)
+	}
+	var repl string
+	client, err := rpc.DialHTTP("tcp", SysConfig.PcHelper)
+	if err != nil {
+		logger.Error(err.Error())
+		return false
+	}
+	defer client.Close()
+	b, _ := json.Marshal(m)
+	err = client.Call("Service.PushMsg", b, &repl)
+	if err != nil {
+		logger.Error(err.Error())
+		return false
+	}
+	return repl == "y"
+}

+ 160 - 0
src/jfw/modules/pushsubscribe/src/push/util/util.go

@@ -0,0 +1,160 @@
+package util
+
+import (
+	"encoding/json"
+	"fmt"
+	. "public"
+	. "push/config"
+	"qfw/util"
+	"qfw/util/redis"
+	"time"
+	ca "ucbsutil/cassandra"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var cassandraPoll = make(chan bool, SysConfig.CassandraPollSize)
+
+//重新设置用户类型
+func GetUserType(s_m_openid, a_m_openid, phone string, userType int) int {
+	if userType == 0 {
+		if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			userType = 0 //公众号
+		} else if s_m_openid == "" && phone != "" {
+			userType = 1 //app手机号
+		} else if s_m_openid == "" && a_m_openid != "" {
+			userType = 2 //app微信
+		} else if s_m_openid != "" && a_m_openid == "" && phone == "" {
+			userType = 3 //用户合并以后只有微信用户
+		} else if s_m_openid == "" && (a_m_openid != "" || phone != "") {
+			userType = 4 //用户合并以后只有app用户
+		} else if s_m_openid != "" && (a_m_openid != "" || phone != "") {
+			userType = 5 //用户合并以后公众号和app用户都有
+		} else {
+			userType = -1
+		}
+	}
+	return userType
+}
+
+//推送方式转换
+func ModeTransform(userType int, o_msgset map[string]interface{}) (int, int, int) {
+	mode := util.IntAll(o_msgset["i_mode"])
+	wxpush := util.IntAll(o_msgset["i_wxpush"])
+	apppush := util.IntAll(o_msgset["i_apppush"])
+	mailpush := util.IntAll(o_msgset["i_mailpush"])
+	if wxpush == 1 || apppush == 1 || mailpush == 1 {
+		return wxpush, apppush, mailpush
+	}
+	//老的app用户
+	if userType == 1 || userType == 2 {
+		switch mode {
+		case 0, 1:
+			apppush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			apppush = 1
+			mailpush = 1
+			break
+		}
+		if apppush == 0 && mailpush == 0 {
+			apppush = 1
+		}
+	} else if userType == 0 {
+		switch mode {
+		case 0, 1:
+			wxpush = 1
+			break
+		case 2:
+			mailpush = 1
+			break
+		case 3:
+			wxpush = 1
+			mailpush = 1
+			break
+		}
+		if wxpush == 0 && mailpush == 0 {
+			wxpush = 1
+		}
+	} else {
+		switch mode {
+		case 0, 1, 3:
+			if userType == 3 {
+				wxpush = 1
+			} else if userType == 4 {
+				apppush = 1
+			} else if userType == 5 {
+				wxpush = 1
+				apppush = 1
+			}
+			if mode == 3 {
+				mailpush = 1
+			}
+			break
+		case 2:
+			mailpush = 1
+			break
+		}
+	}
+	return wxpush, apppush, mailpush
+}
+
+//保存发送信息
+func SaveSendInfo(taskType int, k *UserInfo, str string, o_pushinfo map[string]map[string]interface{}, matchKey_infoIndex map[string]string) string {
+	cassandraPoll <- true
+	defer func() {
+		<-cassandraPoll
+	}()
+	if SysConfig.CassandraSleep > 0 {
+		time.Sleep(time.Duration(SysConfig.CassandraSleep) * time.Millisecond)
+	}
+	pushid := time.Now().Unix()
+	md, _ := json.Marshal(o_pushinfo)
+	openid := util.GetOldOpenid(k.S_m_openid, k.A_m_openid, k.Phone, k.MergeOrder)
+	wxpush := map[string]interface{}{
+		"id":       time.Now().Format(util.Date_Short_Layout),
+		"openid":   openid,
+		"date":     pushid,
+		"words":    k.OriginalKeys,
+		"uid":      k.Id,
+		"province": k.Province,
+		"interest": k.OriginalKeys,
+		"content":  str,
+		"pushinfo": string(md),
+		"size":     len(o_pushinfo),
+		"appid":    2,
+		"ratemode": k.RateMode,
+		"sendmode": 9000 + k.WxPush*100 + k.AppPush*10 + k.MailPush,
+		"smartset": k.SmartSet,
+		"matchki":  matchKey_infoIndex,
+	}
+	if ca.SaveCacheByTimeOut("jy_push", wxpush, 10) {
+		redisKey := "pushsubscribe_" + k.Id
+		if taskType == 4 {
+			redisDatas := redis.Get("pushcache", redisKey)
+			if redisDatas != nil {
+				list := []interface{}{wxpush}
+				redisList, _ := redisDatas.([]interface{})
+				list = append(list, redisList...)
+				if len(list) > 500 {
+					list = list[:500]
+				}
+				redis.Put("pushcache", redisKey, list, SysConfig.NinePushRedisTimeout)
+			}
+		} else {
+			redis.Del("pushcache", redisKey)
+		}
+		return fmt.Sprint(pushid)
+	}
+	return ""
+}
+func ToObjectIds(ids []string) []bson.ObjectId {
+	_ids := []bson.ObjectId{}
+	for _, v := range ids {
+		_ids = append(_ids, bson.ObjectIdHex(v))
+	}
+	return _ids
+}

+ 1 - 0
src/jfw/modules/pushsubscribe/src/push/xlsx/readme.txt

@@ -0,0 +1 @@
+excel数据导出临时目录

二进制
src/jfw/modules/pushsubscribe/src/push/xlsx/temp.xlsx