Browse Source

修改剑鱼

renzheng 9 years ago
parent
commit
25bd1edd81

+ 2 - 0
push/src/main.go

@@ -10,6 +10,7 @@ import (
 	"net/rpc"
 	"qfw/push"
 	"qfw/push/bid"
+	"qfw/push/listdb"
 	"qfw/push/rpcpush"
 	"qfw/push/tender"
 	"qfw/util"
@@ -29,6 +30,7 @@ func init() {
 	rpcpush.PushInfoScopeDays = util.IntAll(push.PushConfig["pushInfoScopeDays"])
 	tender.MaxPushSize = bid.MaxPushSize
 	mongodb.InitMongodbPool(util.IntAll(push.PushConfig["mgoSize"]), push.PushConfig["mgoAddr"].(string), "qfw")
+	listdb.Inits()
 }
 
 var jobs []push.PushJobFace = []push.PushJobFace{

+ 66 - 44
push/src/qfw/push/dopush/dopush.go

@@ -1,22 +1,22 @@
 package dopush
 
 import (
-	"sort"
 	"container/list"
 	"fmt"
 	"log"
 	"math/rand"
 	"qfw/push"
 	"qfw/push/dfa"
+	"qfw/push/listdb"
 	"qfw/util"
 	"qfw/util/mongodb"
 	qrpc "qfw/util/rpc"
 	"regexp"
 	"runtime"
+	"sort"
 	"strconv"
 	"strings"
 	"time"
-	"qfw/push/listdb"
 )
 
 var se util.SimpleEncrypt
@@ -45,14 +45,42 @@ func (b *Pjob) CreateUserInterestWord() {
 	b.Dfa.AddWord(words...)
 }
 
-type sortList []map[string]interface{}
+type sortList []*map[string]interface{}
+
+func (s sortList) Len() int {
+	return len(s)
+}
 
 func (s sortList) Less(i, j int) bool {
-    return util.Int64All(s[i]["publishtime"]) < util.Int64All(s[j]["publishtime"])
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 0; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	return util.Int64All((*s[i])["publishtime"]) > util.Int64All((*s[j])["publishtime"])
 }
 
 func (s sortList) Swap(i, j int) {
-    s[i], s[j] = s[j], s[i]
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 0; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	s[i], s[j] = s[j], s[i]
 }
 
 //内存数据推送
@@ -81,64 +109,59 @@ func (p *Pjob) EachInfoForView(mopenid, words string) map[string]interface{} {
 			words: &ss,
 		}
 	} else {
-		*p.Cache = push.InitCache(p.Stype, mopenid,true)
+		*p.Cache = push.InitCache(p.Stype, mopenid, true)
 	}
 	p.CreateUserInterestWord()
 	userMap := &map[*push.MemberInterest]sortList{}
-	LDB:=listdb.GetDb(p.Stype)
-	
+	LDB := listdb.GetDb(p.Stype)
 	LDB.Lock.Lock()
-	L1:
-	for e := LDB.DB.Back(); e != nil; e = e.Prev(){
-		tmp:=e.Value.(map[string]interface{})
-			title := util.ObjToString(tmp["title"])
-			if title != "" {
-				//返回匹配到的词组
-				res := p.Dfa.Analy(title)
-				if len(res) > 0 {
-					province := tmp["area"].(string)
-					provinceVal := push.GetChoiceCode(province)
-					if "A" != province {
-						tmp["title"] = `[<span class='area'>` + province + `</span>]` + title
-					}
-					for _, v := range res {
-						//根据关键词返回用户指针
-						tw := (*p.Cache)[v]
-						if tw != nil {
-							//遍历用户加入到此条信息上
-							for _, v2 := range *tw {
-								if v2.Province == "A" || v2.ProvinceVal&provinceVal > 0 {
-									s := (*userMap)[v2]
-									if s == nil {
-										s = sortList{}
-										(*userMap)[v2] = s
-									}
-									s=append(s,tmp)
-									if len(s) > 50 {
-										break L1
-									}
+L1:
+	for e := LDB.DB.Back(); e != nil; e = e.Prev() {
+		tmp := e.Value.(*map[string]interface{})
+		title := util.ObjToString((*tmp)["title"])
+		if title != "" {
+			//返回匹配到的词组
+			res := p.Dfa.Analy(title)
+			if len(res) > 0 {
+				province := (*tmp)["area"].(string)
+				provinceVal := push.GetChoiceCode(province)
+				for _, v := range res {
+					//根据关键词返回用户指针
+					tw := (*p.Cache)[v]
+					if tw != nil {
+						//遍历用户加入到此条信息上
+						for _, v2 := range *tw {
+							if v2.Province == "A" || v2.ProvinceVal&provinceVal > 0 {
+								s := (*userMap)[v2]
+								if s == nil {
+									s = sortList{}
+								}
+								s = append(s, tmp)
+								(*userMap)[v2] = s
+								if len(s) > 50 {
+									break L1
 								}
 							}
 						}
 					}
 				}
 			}
+		}
 	}
 	LDB.Lock.Unlock()
 	mcontent := map[string]interface{}{}
 	mcontent["s_type"] = p.Stype
 	bmatch := false
-	for kk, vv := range *(userMap) {
+	for kk, v := range *(userMap) {
 		k := *kk
-		v := vv
 		str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下"+p.StypeName+"信息:</div>", strings.Join(k.Interest, ";"))
 		//发送内容组合
 		i := 0
 		publishTimes := map[string]interface{}{}
 		publishTitle := map[string]bool{}
 		sort.Sort(v)
-		for ks := v.Front(); ks != nil; ks = ks.Next() {
-			k2 := *(ks.Value.(*map[string]interface{}))
+		for _, kn := range v {
+			k2 := *kn
 			title := strings.Replace(k2["title"].(string), "\n", "", -1)
 			if !publishTitle[title] {
 				publishTitle[title] = true
@@ -164,13 +187,13 @@ func (p *Pjob) EachInfoForView(mopenid, words string) map[string]interface{} {
 			break
 		}
 	}
-	return mcontent	
+	return mcontent
 }
 
 func (p *Pjob) DoPush(mopenid, stime string, opr int, ltime int64) bool {
 	log.Println("开始执行任务:", p.StypeName, stime)
 	p.Cache = new(map[string]*[]*push.MemberInterest)
-	*p.Cache = push.InitCache(p.Stype, mopenid,false)
+	*p.Cache = push.InitCache(p.Stype, mopenid, false)
 	p.CreateUserInterestWord()
 	return EachAllBidInfo(p.Stype, "["+p.StypeName+"信息]", p.StypeName, ltime, p.MaxPushSize, p.Dfa, p.Cache, opr)
 }
@@ -367,7 +390,6 @@ func SendWeixin(k *push.MemberInterest, TITLE, ShortTitle, str, stype string, no
 
 }
 
-
 /**
 //遍历数据只做标题预览
 func (p *Pjob) EachInfoForViewOld(mopenid, words string) map[string]interface{} {

+ 61 - 48
push/src/qfw/push/listdb/listdb.go

@@ -1,81 +1,94 @@
 package listdb
 
 import (
-	"time"
 	"container/list"
-	"sync"
+	"log"
 	"qfw/util"
 	"qfw/util/mongodb"
+	"sync"
+	"time"
 )
 
-type ListDB struct{
-	Lock *sync.Mutex
-	DB *list.List
-	Stype string
+type ListDB struct {
+	Lock      *sync.Mutex
+	DB        *list.List
+	Stype     string
+	timestamp int64
 }
-var fields map[string]int=map[string]int{
-	"type" : 1,
-	"comeintime": 1, 
-    "publishtime": 1, 
-    "title": 1,
-    "area" : 1,
-    "href": 1,
-    "areaval" : 1,
+
+var fields map[string]interface{} = map[string]interface{}{
+	"type":        1,
+	"comeintime":  1,
+	"publishtime": 1,
+	"title":       1,
+	"area":        1,
+	"href":        1,
+	"areaval":     1,
 }
 
-var Bid,Tender *ListDB
+var Bid, Tender *ListDB
 
-func init(){
-	Bid=&ListDB{
-		Lock:new(sync.Mutex),
-		DB:list.New(),
-		Stype:"bid",
+func Inits() {
+	Bid = &ListDB{
+		Lock:      new(sync.Mutex),
+		DB:        list.New(),
+		Stype:     "bid",
+		timestamp: 0,
 	}
 	go Bid.update()
-	
-	Tender=&ListDB{
-		Lock:new(sync.Mutex),
-		DB:list.New(),
-		Stype:"tender",
+
+	Tender = &ListDB{
+		Lock:      new(sync.Mutex),
+		DB:        list.New(),
+		Stype:     "tender",
+		timestamp: 0,
 	}
 	go Tender.update()
 }
 
-func GetDb(stype string) *ListDB{
-	switch stype{
-		case "tender":
-			return Tender
-		case "bid":
-			return Bid
-		default:
-			return nil
+func GetDb(stype string) *ListDB {
+	switch stype {
+	case "tender":
+		return Tender
+	case "bid":
+		return Bid
+	default:
+		return nil
 	}
 }
 
-func (l *ListDB) update(){
+func (l *ListDB) update() {
 	l.Lock.Lock()
-	util.Try(func(){	
-		q:=map[string]interface{}{
-			"type":l.Stype,
-		}
-		if l.DB.Len()>0 {
-			data:=l.DB.Back().Value.(map[string]interface{})
-			q["comeintime"]=map[string]interface{}{
-				"$gt":util.Int64All(data["comeintime"]),
-			}
+	util.Try(func() {
+		q := map[string]interface{}{
+			"type": l.Stype,
+			"comeintime": map[string]interface{}{
+				"$gt": l.timestamp,
+			},
 		}
 		session := mongodb.GetMgoConn()
 		defer mongodb.DestoryMongoConn(session)
 		//此处不排序,谁查谁排序(不排序会造成发布时间乱序)
 		query := session.DB("qfw").C("bidding").Find(q).Select(fields).Sort("publishtime").Iter()
 		for tmp := new(map[string]interface{}); query.Next(tmp); {
+			province := (*tmp)["area"].(string)
+			if "A" != province {
+				title := util.ObjToString((*tmp)["title"])
+				(*tmp)["title"] = `[<span class='area'>` + province + `</span>]` + title
+			}
 			l.DB.PushBack(tmp)
-			if l.DB.Len()>150000{
-				l.DB.Remove(l.DB.Front())	
+			last := util.Int64All((*tmp)["comeintime"])
+			if last > l.timestamp {
+				l.timestamp = last
+			}
+			if l.DB.Len() > 120000 {
+				l.DB.Remove(l.DB.Front())
 			}
 			tmp = new(map[string]interface{})
 		}
-	},func(e interface{}){})
+	}, func(e interface{}) {
+		log.Println(e)
+	})
 	l.Lock.Unlock()
-	time.AfterFunc(15*time.Minute,l.update)
-}
+	time.AfterFunc(10*time.Minute, l.update)
+}

+ 5 - 2
push/src/qfw/push/rpcpush/rpcpush.go

@@ -15,10 +15,14 @@ type PushInfo struct {
 }
 
 var PushInfoScopeDays int
+var rpcchan = make(chan bool, 50)
 
 //RPC调用结果预览
 func (p *PushInfo) ResultView(data *qrpc.PushData, Reply *[]byte) error {
-	log.Println("rpc_data", data)
+	rpcchan <- true
+	defer func() {
+		<-rpcchan
+	}()
 	util.Try(func() {
 		for k, v := range data.PushType {
 			pj := dopush.Pjob{
@@ -36,7 +40,6 @@ func (p *PushInfo) ResultView(data *qrpc.PushData, Reply *[]byte) error {
 
 //RPC调用推送信息
 func (p *PushInfo) PushMsg(data *qrpc.PushData, Reply *int) error {
-	log.Println("rpc_data", data)
 	util.Try(func() {
 		for k, v := range data.PushType {
 			//昨天到今天的数据