wangchuanjin 5 vuotta sitten
vanhempi
commit
aeaf9c2f94

+ 6 - 1
pushentniche/src/match/config.json

@@ -1,7 +1,7 @@
 {
 	"elasticPoolSize":1,
 	"elasticSearch":"http://192.168.3.128:9800",
-	"redisServers": "pushcache_1=192.168.3.128:5000",
+	"redisServers": "pushcache_1=192.168.3.128:5000,pushcache_2_b=192.168.3.128:5000",
 	"mgoAddr":"192.168.3.128:27080",
 	"mgoSize":2,
 	"testQuery":{"i_entid":219,"i_deptid":478},
@@ -10,6 +10,11 @@
 	"matchPoolSize":60,
 	"savePoolSize":5,
 	"matchDuration":60,
+	"mergePoolSize":40,
+	"movePoolSize":40,
+	"moveBatch":1000,
+	"moveDuration":30,
+	"maxPushSize":50000,
 	"notMatchHour":[19,20,21,23,0,1,2,4,5],
 	"maxMatchSize":50,
 	"bulkSize":10,

+ 5 - 0
pushentniche/src/match/config/config.go

@@ -20,6 +20,11 @@ type config struct {
 	MatchPoolSize   int                    `json:"matchPoolSize"`
 	SavePoolSize    int                    `json:"savePoolSize"`
 	MatchDuration   int64                  `json:"matchDuration"`
+	MergePoolSize   int                    `json:"mergePoolSize"`
+	MovePoolSize    int                    `json:"movePoolSize"`
+	MoveBatch       int                    `json:"moveBatch"`
+	MoveDuration    int                    `json:"moveDuration"`
+	MaxPushSize     int                    `json:"maxPushSize"`
 	NotMatchHour    []int                  `json:"notMatchHour"`
 	MaxCustomer     int64                  `json:"maxCustomer"`
 	Mysql           struct {

+ 2 - 0
pushentniche/src/match/job/job.go

@@ -10,10 +10,12 @@ type Job interface {
 
 type jobs struct {
 	Match Job
+	Move  Job
 }
 
 var Jobs = &jobs{
 	Match: &MatchJob{
 		allProject: &sync.Map{},
 	},
+	Move: &MoveJob{},
 }

+ 12 - 12
pushentniche/src/push/job/movejob.go → pushentniche/src/match/job/movejob.go

@@ -2,9 +2,9 @@
 package job
 
 import (
+	. "match/config"
+	. "match/util"
 	. "public"
-	. "push/config"
-	putil "push/util"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"sort"
@@ -30,8 +30,8 @@ type MoveJob struct {
 
 func (m *MoveJob) Execute() {
 	defer util.Catch()
-	Jobs.Push.lock.Lock()
-	defer Jobs.Push.lock.Unlock()
+	MyLock.Lock(PushLock)
+	defer MyLock.UnLock(MatchLock, PushLock)
 	logger.Info("开始迁移数据。。。")
 	nowUnix := time.Now().Unix()
 	sess := mongodb.GetMgoConn()
@@ -46,7 +46,7 @@ func (m *MoveJob) Execute() {
 			query[k] = v
 		}
 	}
-	it := sess.DB(Config.Mongodb.DbName).C(Pushspace_entniche_temp).Find(query).Sort("entid", "deptid", "userid", "distributeid").Iter()
+	it := sess.DB(DbName).C(Pushspace_entniche_temp).Find(query).Sort("entid", "deptid", "userid", "distributeid").Iter()
 	moveUsers := map[string]*MoveUser{}
 	index, number, length := 0, 0, 0
 	//
@@ -80,7 +80,7 @@ func (m *MoveJob) Execute() {
 					idMap[util.ObjToString((*v.Info)["_id"])] = true
 					titleMap[util.ObjToString((*v.Info)["title"])] = true
 				}
-				newList := putil.ToSortList(temp["list"])
+				newList := ToSortList(temp["list"])
 				for _, v := range *newList {
 					if idMap[util.ObjToString((*v.Info)["_id"])] {
 						continue
@@ -97,7 +97,7 @@ func (m *MoveJob) Execute() {
 				moveUser.info = temp
 				moveUser.ids = append(moveUser.ids, temp["_id"])
 			} else {
-				temp["list"] = putil.ToSortList(temp["list"])
+				temp["list"] = ToSortList(temp["list"])
 				moveUser = &MoveUser{
 					info: temp,
 					ids:  []interface{}{temp["_id"]},
@@ -172,7 +172,7 @@ func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUs
 			} else {
 				return
 			}
-			sess.DB(Config.Mongodb.DbName).C(Pushspace_entniche).Find(query).Select(map[string]interface{}{"list": 1}).One(&data)
+			sess.DB(DbName).C(Pushspace_entniche).Find(query).Select(map[string]interface{}{"list": 1}).One(&data)
 			if data == nil { //批量新增
 				mergeLock.Lock()
 				saveArray = append(saveArray, moveUser.info)
@@ -190,7 +190,7 @@ func (m *MoveJob) merge(number *int, nowUnix int64, moveUsers map[string]*MoveUs
 					setMap[field] = moveUser.info[field]
 				}
 				//
-				oldList := putil.ToSortList(data["list"])
+				oldList := ToSortList(data["list"])
 				idMap := map[string]bool{}
 				for _, vv := range *oldList {
 					idMap[util.ObjToString((*vv.Info)["_id"])] = true
@@ -255,7 +255,7 @@ func (m *MoveJob) saveBulk(sess *mgo.Session, saves *[]map[string]interface{}, d
 		sess = mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(sess)
 	}
-	coll := sess.DB(Config.Mongodb.DbName).C(Pushspace_entniche)
+	coll := sess.DB(DbName).C(Pushspace_entniche)
 	bulk := coll.Bulk()
 	for _, v := range *saves {
 		bulk.Insert(v)
@@ -273,7 +273,7 @@ func (m *MoveJob) updateBulk(sess *mgo.Session, array_q, array_s *[]map[string]i
 		sess = mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(sess)
 	}
-	coll := sess.DB(Config.Mongodb.DbName).C(Pushspace_entniche)
+	coll := sess.DB(DbName).C(Pushspace_entniche)
 	bulk := coll.Bulk()
 	for k, v := range *array_q {
 		bulk.Update(v, (*array_s)[k])
@@ -292,7 +292,7 @@ func (m *MoveJob) delBulk(sess *mgo.Session, array *[]interface{}) {
 		sess = mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(sess)
 	}
-	coll := sess.DB(Config.Mongodb.DbName).C(Pushspace_entniche_temp)
+	coll := sess.DB(DbName).C(Pushspace_entniche_temp)
 	count := 0
 	bulk := coll.Bulk()
 	for _, v := range *array {

+ 18 - 12
pushentniche/src/match/job/timetask.go

@@ -8,18 +8,15 @@ import (
 	"github.com/donnie4w/go-logger/logger"
 )
 
-type timeTask struct {
-	Match *MatchTimeTask //匹配数据
+type TimeTask struct {
 }
 
-var Task = &timeTask{
-	Match: &MatchTimeTask{},
+func (t *TimeTask) Run() {
+	go t.match(false)
+	go t.move()
 }
 
-type MatchTimeTask struct {
-}
-
-func (m *MatchTimeTask) Execute(flag bool) {
+func (t *TimeTask) match(flag bool) {
 	hour := time.Now().Hour()
 	//23点到1点之间,不执行定时任务
 	//订阅付费有每天0点的定时任务删除pushspace_temp表的数据,会有冲突
@@ -39,9 +36,18 @@ func (m *MatchTimeTask) Execute(flag bool) {
 		Jobs.Match.Execute()
 		util.WriteSysConfig("./task.json", &TaskConfig)
 	}
-	t := time.Duration(Config.MatchDuration) * time.Minute
-	logger.Info("start match job after", t)
-	time.AfterFunc(t, func() {
-		m.Execute(true)
+	d := time.Duration(Config.MatchDuration) * time.Minute
+	logger.Info("start match job after", d)
+	time.AfterFunc(d, func() {
+		t.match(true)
+	})
+}
+
+func (t *TimeTask) move() {
+	d := time.Duration(Config.MoveDuration) * time.Minute
+	logger.Info("start move after", d)
+	time.AfterFunc(d, func() {
+		Jobs.Move.Execute()
+		t.move()
 	})
 }

+ 6 - 4
pushentniche/src/match/main.go

@@ -5,22 +5,24 @@ import (
 	"flag"
 	"log"
 	"match/job"
+	. "public"
 
 	"github.com/donnie4w/go-logger/logger"
 )
 
 func main() {
-	modle := flag.Int("m", 0, "默认:0;1 非定时任务匹配数据;")
+	modle := flag.Int("m", 0, "默认:0;1 非定时任务匹配数据;2 迁移数据;")
 	flag.Parse()
 	logger.SetConsole(false)
 	logger.SetRollingDaily("./logs", "match.log")
 	log.Println("订阅推送-匹配数据程序启动。。。")
-	//job.Jobs.Match.Execute()
-	//return
+	MyLock.UnLock(MatchLock)
 	if *modle == 1 {
 		job.Jobs.Match.Execute()
+	} else if *modle == 2 {
+		job.Jobs.Move.Execute()
 	} else {
-		go job.Task.Match.Execute(false)
+		go (&job.TimeTask{}).Run()
 		<-chan bool(nil)
 	}
 }

BIN
pushentniche/src/match/match


+ 43 - 0
pushentniche/src/public/entity.go

@@ -3,6 +3,8 @@ package public
 import (
 	"qfw/util"
 	"qfw/util/mysql"
+	"qfw/util/redis"
+	"time"
 
 	"github.com/donnie4w/go-logger/logger"
 )
@@ -200,3 +202,44 @@ type EntDistribute struct {
 	Buyerclass []interface{}
 	Items      []string
 }
+
+var MyLock = &myLock{}
+
+type myLock struct {
+}
+
+func (m *myLock) Lock(key string) {
+	for {
+		if m.incr("pushcache_2_b", key) == 1 {
+			if key == MatchLock {
+				redis.PutCKV("pushcache_2_b", PushLock, 1)
+			} else if key == PushLock {
+				redis.PutCKV("pushcache_2_b", MatchLock, 1)
+			}
+			break
+		}
+		time.Sleep(1 * time.Minute)
+	}
+}
+
+//自增计数器
+func (m *myLock) incr(code, key string) int64 {
+	defer util.Catch()
+	conn := redis.RedisPool[code].Get()
+	defer conn.Close()
+	ret, err := conn.Do("INCR", key)
+	if nil != err {
+		logger.Error("redis incr error", err)
+		return -1
+	} else {
+		if res, ok := ret.(int64); ok {
+			return res
+		} else {
+			return -1
+		}
+	}
+	return -1
+}
+func (m *myLock) UnLock(key ...interface{}) {
+	redis.Del("pushcache_2_b", key...)
+}

+ 20 - 0
pushentniche/src/public/util.go

@@ -1,9 +1,11 @@
 package public
 
 import (
+	"encoding/json"
 	"fmt"
 	"qfw/util"
 	"regexp"
+	"sort"
 )
 
 const (
@@ -23,6 +25,8 @@ const (
 	Entniche_distribute = "entniche_distribute"
 	Entniche_customer   = "entniche_customer"
 	Entniche_project    = "entniche_project"
+	MatchLock           = "pushentniche_match_lock"
+	PushLock            = "pushentniche_push_lock"
 )
 
 var (
@@ -59,3 +63,19 @@ func GetUnique(entId, deptId, userId int, disId string) string {
 	}
 	return ""
 }
+func ToSortList(list interface{}) *SortList {
+	sl := make(SortList, 0)
+	if list == nil {
+		return &sl
+	}
+	b, err := json.Marshal(list)
+	if err != nil {
+		return &sl
+	}
+	err = json.Unmarshal(b, &sl)
+	if err != nil {
+		return &sl
+	}
+	sort.Sort(sl)
+	return &sl
+}

+ 0 - 4
pushentniche/src/push/config.json

@@ -68,8 +68,6 @@
 	"pushPoolSize":5,
 	"projectPushPoolSize":5,
 	"loadProjectPoolSize":50,
-	"mergePoolSize":40,
-	"movePoolSize":40,
 	"minutePushSize":300,
 	"fastigiumMinutePushSize":100,
 	"fastigiumTime":"9-11",
@@ -79,8 +77,6 @@
 	"appSleep":5,
 	"isPushMail":true,
 	"pushBatch":10,
-	"moveBatch":1000,
-	"moveDuration":30,
 	"bulkSize":50,
 	"bigBulkSize":100,
 	"projectPublishTimeLimit":99999999,

+ 0 - 4
pushentniche/src/push/config/config.go

@@ -12,8 +12,6 @@ type config struct {
 	PushPoolSize            int                    `json:"pushPoolSize"`
 	ProjectPushPoolSize     int                    `json:"projectPushPoolSize"`
 	LoadProjectPoolSize     int                    `json:"loadProjectPoolSize"`
-	MergePoolSize           int                    `json:"mergePoolSize"`
-	MovePoolSize            int                    `json:"movePoolSize"`
 	Mail_content            string                 `json:"mail_content"`
 	Mail_html               string                 `json:"mail_html"`
 	Mail_title              string                 `json:"mail_title"`
@@ -41,8 +39,6 @@ type config struct {
 	FastigiumMinutePushSize int                    `json:"fastigiumMinutePushSize"`
 	FastigiumTime           string                 `json:"fastigiumTime"`
 	NinePushRedisTimeout    int                    `json:"ninePushRedisTimeout"`
-	MoveBatch               int                    `json:"moveBatch"`
-	MoveDuration            int                    `json:"moveDuration"`
 	BulkSize                int                    `json:"bulkSize"`
 	BigBulkSize             int                    `json:"bigBulkSize"`
 	ProjectPublishTimeLimit int64                  `json:"projectPublishTimeLimit"`

+ 0 - 2
pushentniche/src/push/job/jobs.go

@@ -7,11 +7,9 @@ import (
 )
 
 var Jobs = struct {
-	Move        *MoveJob
 	Push        *PushJob
 	ProjectPush *ProjectPushJob
 }{
-	Move: &MoveJob{},
 	Push: &PushJob{
 		lock: &sync.Mutex{},
 	},

+ 2 - 0
pushentniche/src/push/job/pushjob.go

@@ -37,6 +37,8 @@ type PushJob struct {
 
 //taskType 1--一天三次推送 2--九点推送
 func (p *PushJob) Execute(taskType int) {
+	MyLock.Lock(MatchLock)
+	defer MyLock.UnLock(PushLock, MatchLock)
 	p.lock.Lock()
 	defer func() {
 		ClearEnt()

+ 0 - 10
pushentniche/src/push/job/timetask.go

@@ -12,7 +12,6 @@ type TimeTask struct{}
 
 func (t *TimeTask) Run() {
 	go t.push() //推送
-	go t.move() //迁移数据
 	go t.crontab("01:00", func() {
 		Jobs.ProjectPush.Clear()
 	})
@@ -53,12 +52,3 @@ func (t *TimeTask) crontab(tm string, f func()) {
 		log.Fatalln("crontab", tm)
 	}
 }
-
-func (t *TimeTask) move() {
-	d := time.Duration(Config.MoveDuration) * time.Minute
-	log.Println("start move after", d)
-	time.AfterFunc(d, func() {
-		Jobs.Move.Execute()
-		t.move()
-	})
-}

+ 2 - 8
pushentniche/src/push/main.go

@@ -4,6 +4,7 @@ package main
 import (
 	"flag"
 	"log"
+	. "public"
 	. "push/job"
 
 	"github.com/donnie4w/go-logger/logger"
@@ -11,20 +12,13 @@ import (
 
 func main() {
 	modle := flag.Int("m", 0, "0 定时任务模式推送;1 非定时任务模式推送;2 定时任务模式推送之前先执行-t的任务;3 补推;4 关联项目推送")
-	move := flag.Int("v", 0, "1 优先迁移数据")
 	taskType := flag.Int("t", 1, "1 一天三次推送;2 九点推送;3 每周;4 每月;5 每周+每月")
 	flag.Parse()
 	logger.SetConsole(false)
 	logger.SetRollingDaily("./logs", "push.log")
 	//
 	log.Println("订阅推送-推送程序启动。。。")
-	//Jobs.Move.Execute()
-	//Jobs.Push.Execute(2)
-	//Jobs.ProjectPush.Execute()
-	//return
-	if *move == 1 {
-		Jobs.Move.Execute()
-	}
+	MyLock.UnLock(PushLock)
 	if *modle == 1 {
 		Jobs.Push.Execute(*taskType)
 		return

BIN
pushentniche/src/push/push


+ 0 - 18
pushentniche/src/push/util/util.go

@@ -1,13 +1,11 @@
 package util
 
 import (
-	"encoding/json"
 	. "public"
 	. "push/config"
 	"qfw/util"
 	"qfw/util/jy"
 	"qfw/util/mongodb"
-	"sort"
 	"strings"
 	"time"
 
@@ -19,22 +17,6 @@ var SavePool chan bool
 func init() {
 	SavePool = make(chan bool, Config.SavePoolSize)
 }
-func ToSortList(list interface{}) *SortList {
-	sl := make(SortList, 0)
-	if list == nil {
-		return &sl
-	}
-	b, err := json.Marshal(list)
-	if err != nil {
-		return &sl
-	}
-	err = json.Unmarshal(b, &sl)
-	if err != nil {
-		return &sl
-	}
-	sort.Sort(sl)
-	return &sl
-}
 
 //控制一分钟最大推送数,均匀调度
 func LimitMaxOneMinutePush(pushPoll *chan bool, maxOneMinute int) {