Browse Source

修改push中的时间

renzheng 9 years ago
parent
commit
29ba71a9d8

+ 75 - 0
common/src/qfw/util/mongodb/mongodbutil_test.go

@@ -1,12 +1,14 @@
 package mongodb
 package mongodb
 
 
 import (
 import (
+	"sync"
 	"fmt"
 	"fmt"
 	. "gopkg.in/mgo.v2/bson"
 	. "gopkg.in/mgo.v2/bson"
 	"log"
 	"log"
 	"regexp"
 	"regexp"
 	"testing"
 	"testing"
 	"time"
 	"time"
+	"qfw/util"
 )
 )
 
 
 func Test_orAnd(t *testing.T) {
 func Test_orAnd(t *testing.T) {
@@ -97,3 +99,76 @@ func Test_aggregate(t *testing.T) {
 	log.Println(res, sun)
 	log.Println(res, sun)
 
 
 }
 }
+
+//检测query正确性
+func Test_nextQuery(t *testing.T) {
+	InitMongodbPool(2, "192.168.3.18:27080", "qfw")
+		sess := GetMgoConn()
+		defer DestoryMongoConn(sess)
+	for i := 0; i < 50; i++ {
+		log.Println("-----", i)
+
+		query := sess.DB("qfw").C("enterprise").Find(nil).Select(map[string]interface{}{
+			"_id":   1,
+		}).Iter()
+		res := map[ObjectId]int{}
+		for tmp := make(map[string]interface{}); query.Next(tmp); {
+			util.Try(func(){
+				res[tmp["_id"].(ObjectId)]++
+			},func(e interface{}){})
+			tmp = make(map[string]interface{})
+		}
+
+		for k, v := range res {
+			if v > 1 {
+				log.Println(k, v)
+			}
+		}
+
+	}
+}
+
+
+	type Person struct{
+		Name string
+		Age int
+	}
+func Test_range(t *testing.T){
+
+	for t:=0;t<100;t++{
+		log.Println("-------",t)
+
+	m1:=&map[*Person]string{}
+	
+	for i:=0;i<1000000;i++{
+		p:=Person{
+			Name:fmt.Sprintf("N-%d",i),
+			Age:i,
+		}
+		(*m1)[&p]=fmt.Sprintf("String-%d",i)
+	}
+	m2:=map[int]int{}
+	ws:=new(sync.WaitGroup)
+	sm:=new(sync.Mutex)
+	
+	for k,_:=range *m1{
+		kk:=*k
+		ws.Add(1)
+		go func(p Person){
+			defer ws.Done()
+			sm.Lock()
+			m2[p.Age]++
+			sm.Unlock()
+		}(kk)
+	}
+	ws.Wait()
+	
+	for k, v := range m2 {
+			if v > 1 {
+				log.Println(k, v)
+			}
+		}
+	
+		}
+}
+

+ 18 - 11
push/src/qfw/push/cache.go

@@ -25,7 +25,7 @@ type MemberInterest struct {
 }
 }
 
 
 //初始化缓存,在每次执行任务时调用,
 //初始化缓存,在每次执行任务时调用,
-func InitCache(flag, m_openid string,bview bool) map[string]*[]*MemberInterest {
+func InitCache(flag, m_openid string, bview bool) map[string]*[]*MemberInterest {
 	defer func() {
 	defer func() {
 		if r := recover(); r != nil {
 		if r := recover(); r != nil {
 			log.Println("[E]", r)
 			log.Println("[E]", r)
@@ -42,18 +42,18 @@ func InitCache(flag, m_openid string,bview bool) map[string]*[]*MemberInterest {
 	cache := make(map[string]*[]*MemberInterest)
 	cache := make(map[string]*[]*MemberInterest)
 	q := map[string]interface{}{}
 	q := map[string]interface{}{}
 	if m_openid != "" {
 	if m_openid != "" {
-		if bview{
+		if bview {
 			q = map[string]interface{}{
 			q = map[string]interface{}{
-			"s_m_openid":                           m_openid,
-			}	
-		}else{
+				"s_m_openid": m_openid,
+			}
+		} else {
 			q = map[string]interface{}{
 			q = map[string]interface{}{
-			"o_msgset." + flag + ".i_switchstatus": 1,
-			"o_msgset." + flag + ".i_status":       1,
-			"s_m_openid":                           m_openid,
-			}	
+				"o_msgset." + flag + ".i_switchstatus": 1,
+				"o_msgset." + flag + ".i_status":       1,
+				"s_m_openid":                           m_openid,
+			}
 		}
 		}
-		
+
 	} else {
 	} else {
 		fixPush := util.ObjToString(PushConfig["fixPush"])
 		fixPush := util.ObjToString(PushConfig["fixPush"])
 		if len(fixPush) > 5 {
 		if len(fixPush) > 5 {
@@ -76,8 +76,10 @@ func InitCache(flag, m_openid string,bview bool) map[string]*[]*MemberInterest {
 		"o_msgset.l_modifydate": 1,
 		"o_msgset.l_modifydate": 1,
 		"s_m_openid":            1,
 		"s_m_openid":            1,
 	}).Iter()
 	}).Iter()
+	n := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); {
 	for tmp := make(map[string]interface{}); query.Next(tmp); {
 		util.Try(func() {
 		util.Try(func() {
+			n++
 			_id := fmt.Sprintf("%x", string(tmp["_id"].(bson.ObjectId)))
 			_id := fmt.Sprintf("%x", string(tmp["_id"].(bson.ObjectId)))
 			o_msgset := tmp["o_msgset"].(map[string]interface{})
 			o_msgset := tmp["o_msgset"].(map[string]interface{})
 			flagModule := o_msgset[flag].(map[string]interface{})
 			flagModule := o_msgset[flag].(map[string]interface{})
@@ -91,7 +93,11 @@ func InitCache(flag, m_openid string,bview bool) map[string]*[]*MemberInterest {
 					Interest:    a_key,
 					Interest:    a_key,
 					Openid:      util.ObjToString(tmp["s_m_openid"]),
 					Openid:      util.ObjToString(tmp["s_m_openid"]),
 				}
 				}
-				date := o_msgset["l_modifydate"]
+				log.Println("find-openid:", user.Openid)
+				date := flagModule["l_modifydate"]
+				if date == nil {
+					date = o_msgset["l_modifydate"]
+				}
 				if date != nil {
 				if date != nil {
 					util.Try(func() {
 					util.Try(func() {
 						user.InterestDate = date.(int64)
 						user.InterestDate = date.(int64)
@@ -114,6 +120,7 @@ func InitCache(flag, m_openid string,bview bool) map[string]*[]*MemberInterest {
 		})
 		})
 		tmp = make(map[string]interface{})
 		tmp = make(map[string]interface{})
 	}
 	}
+	log.Println(flag, "本次查询用户总数:", n)
 	return cache
 	return cache
 }
 }
 
 

+ 7 - 1
push/src/qfw/push/dopush/dopush.go

@@ -18,7 +18,7 @@ import (
 	"strings"
 	"strings"
 	"time"
 	"time"
 )
 )
-
+var wxsendpool = make(chan bool,50)
 var se util.SimpleEncrypt
 var se util.SimpleEncrypt
 var re *regexp.Regexp
 var re *regexp.Regexp
 
 
@@ -269,6 +269,8 @@ L1:
 		vv := *v
 		vv := *v
 		time.Sleep(50 * time.Millisecond)
 		time.Sleep(50 * time.Millisecond)
 		res = true
 		res = true
+		log.Println("send-openid:",kk.Openid)
+		wxsendpool<-true
 		go Send(&kk, &vv, now, TITLEA, ShortTitle, stype, MaxPushSize)
 		go Send(&kk, &vv, now, TITLEA, ShortTitle, stype, MaxPushSize)
 	}
 	}
 	return res
 	return res
@@ -288,6 +290,10 @@ func Send(k *push.MemberInterest, v *list.List, now time.Time, TITLEA, ShortTitl
 			}
 			}
 		}
 		}
 	}()
 	}()
+	
+	defer func(){
+		<-wxsendpool
+	}()
 	str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下信息:</div>", strings.Join(k.Interest, ";"))
 	str := fmt.Sprintf("<div>根据您设置的关键词(%s),给您推送以下信息:</div>", strings.Join(k.Interest, ";"))
 	//发送内容组合
 	//发送内容组合
 	i := 0
 	i := 0