renzheng 7 жил өмнө
parent
commit
1b741a7116

+ 152 - 53
src/jfw/front/websocket.go

@@ -7,6 +7,7 @@ import (
 	qutil "qfw/util"
 	"qfw/util/redis"
 	"strings"
+	"sync"
 	"time"
 
 	//"github.com/go-xweb/xweb"
@@ -18,72 +19,170 @@ var upgrader = websocket.Upgrader{
 	WriteBufferSize: 1024,
 }
 
+//socket对象放在内存中,待rpc回调使用
+type Wss struct {
+	Conn *websocket.Conn
+	Time int64
+}
+type MapSocket struct {
+	Map  map[string]*Wss
+	Lock sync.Mutex
+}
+
+func (m *MapSocket) GC() {
+	defer qutil.Catch()
+	m.Lock.Lock()
+	now := time.Now().Unix()
+	for k, v := range m.Map {
+		if now-v.Time > 3600 || v.Conn == nil {
+			delete(m.Map, k)
+		}
+	}
+	m.Lock.Unlock()
+	time.AfterFunc(5*time.Minute, m.GC)
+}
+
+var MSPOOL = 20
+var MapSocketArr = make([]*MapSocket, MSPOOL)
+
+//初始化
+func init() {
+	for i := 0; i < MSPOOL; i++ {
+		ms := &MapSocket{Map: map[string]*Wss{}}
+		go ms.GC()
+		MapSocketArr[i] = ms
+	}
+}
+
+//根据代码和ws做映射
+func PutWsByCode(src string, ws *websocket.Conn) {
+	defer qutil.Catch()
+	n := HashVal(src) % MSPOOL
+	ms := MapSocketArr[n]
+	ms.Lock.Lock()
+	ms.Map[src] = &Wss{ws, time.Now().Unix()}
+	ms.Lock.Unlock()
+}
+
+//计算代码的hash值
+func HashVal(src string) int {
+	check := 0
+	for i := len(src) / 2; i < len(src); i++ {
+		check += int(src[i])
+	}
+	return check
+}
+
+//rpc回调,写到前台
+func GetWsByCode(src string) bool {
+	if src == "" {
+		return false
+	}
+	defer qutil.Catch()
+	n := HashVal(src) % MSPOOL
+	ms := MapSocketArr[n]
+	defer func() {
+		ms.Lock.Lock()
+		delete(ms.Map, src)
+		ms.Lock.Unlock()
+	}()
+	ms.Lock.Lock()
+	wss := ms.Map[src]
+	ms.Lock.Unlock()
+	sendmessage, _ := json.Marshal(src)
+	if err := wss.Conn.WriteMessage(websocket.TextMessage, sendmessage); err != nil {
+		log.Println("socket send fail..", err)
+		return false
+	} else {
+		//log.Println("登录后正常关闭!!!")
+		wss.Conn.Close()
+	}
+	return true
+}
+
 //登录关注
 func ServeWss(w http.ResponseWriter, r *http.Request) {
 	defer qutil.Catch()
 	conn, err := upgrader.Upgrade(w, r, nil)
 	var shareIds string
-	var shareid = ""
-	var openid = ""
-	go func() {
+	//var shareid = ""
+	//var openid = ""
+	if err == nil {
 		for {
 			_, shareData, err := conn.ReadMessage()
 			if err != nil {
 				log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
-				shareIds = "close"
-				return
-			}
-			shareIds = string(shareData)
-			if shareIds == "close" {
-				return
+				break
+			} else {
+				shareIds = string(shareData)
+				shareidlist := strings.Split(shareIds, "___")
+				if shareIds != "" && len(shareidlist) > 1 {
+					shareidnum := shareidlist[0]
+					shareidkop := shareidlist[1]
+					PutWsByCode(shareidnum, conn)
+					PutWsByCode(shareidkop, conn)
+				}
 			}
 			//log.Println("shareData:", shareIds, "---")
 		}
-	}()
-	for {
-		time.Sleep(2 * time.Second)
-		if shareIds == "close" {
-			//log.Println("!!!!socket关闭!!!退出循环")
-			conn.Close()
-			return
-		} else if shareIds == "" {
-			continue
-		}
-		shareidlist := strings.Split(shareIds, "___")
-		if shareIds != "" && len(shareidlist) > 1 {
-			shareidnum := strings.Split(shareIds, "___")[0]
-			shareidkop := strings.Split(shareIds, "___")[1]
-			//log.Println(se.DecodeString(shareidnum) + "--1--" + se.DecodeString(shareidkop))
-			shareid = shareidnum
-			openid = redis.GetStr("sso", "p_usershare_"+se.DecodeString(shareidnum))
-			if openid == "" {
-				openid = redis.GetStr("sso", "p_usershare_"+se.DecodeString(shareidkop))
-				shareid = shareidkop
-			}
-			if openid == "" {
-				shareid = ""
-			}
-		}
-		if shareid == "" {
-			continue
-		}
-		sendmessage, _ := json.Marshal(shareid)
-		if err := conn.WriteMessage(websocket.TextMessage, sendmessage); err != nil {
-			log.Println("socket send fail..", err)
-		}
-		if shareid != "" {
-			//log.Println("登录后正常关闭!!!")
-			conn.Close()
-			return
-		}
-	}
-	//sess := xweb.RootApp().SessionManager.Session(r, w)
-	//log.Println("sess-id:", sess.Id())
-	if err != nil {
-		log.Println(err)
-		return
 	}
-	//conn.Ch = make(chan bool, 1)
+	//	go func() {
+	//		for {
+	//			_, shareData, err := conn.ReadMessage()
+	//			if err != nil {
+	//				log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
+	//				shareIds = "close"
+	//				return
+	//			}
+	//			shareIds = string(shareData)
+	//			if shareIds == "close" {
+	//				return
+	//			}
+	//			//log.Println("shareData:", shareIds, "---")
+	//		}
+	//	}()
+	//	for {
+	//		time.Sleep(2 * time.Second)
+	//		if shareIds == "close" {
+	//			//log.Println("!!!!socket关闭!!!退出循环")
+	//			conn.Close()
+	//			return
+	//		} else if shareIds == "" {
+	//			continue
+	//		}
+	//		shareidlist := strings.Split(shareIds, "___")
+	//		if shareIds != "" && len(shareidlist) > 1 {
+	//			shareidnum := strings.Split(shareIds, "___")[0]
+	//			shareidkop := strings.Split(shareIds, "___")[1]
+	//			//log.Println(se.DecodeString(shareidnum) + "--1--" + se.DecodeString(shareidkop))
+	//			shareid = shareidnum
+	//			openid = redis.GetStr("sso", "p_usershare_"+se.DecodeString(shareidnum))
+	//			if openid == "" {
+	//				openid = redis.GetStr("sso", "p_usershare_"+se.DecodeString(shareidkop))
+	//				shareid = shareidkop
+	//			}
+	//			if openid == "" {
+	//				shareid = ""
+	//			}
+	//		}
+	//		if shareid == "" {
+	//			continue
+	//		}
+	//		sendmessage, _ := json.Marshal(shareid)
+	//		if err := conn.WriteMessage(websocket.TextMessage, sendmessage); err != nil {
+	//			log.Println("socket send fail..", err)
+	//		}
+	//		if shareid != "" {
+	//			//log.Println("登录后正常关闭!!!")
+	//			conn.Close()
+	//			return
+	//		}
+	//	}
+	//	//sess := xweb.RootApp().SessionManager.Session(r, w)
+	//	//log.Println("sess-id:", sess.Id())
+	//	if err != nil {
+	//		log.Println(err)
+	//	}
 }
 
 //实验室

+ 39 - 0
src/jfw/modules/weixin/src/wx/scanrpc.go

@@ -0,0 +1,39 @@
+package wx
+
+import (
+	"config"
+	"log"
+	"net/rpc"
+	"qfw/util"
+)
+
+var scanpool = make(chan bool, 20)
+var webrpcaddr string
+var method = "MyfollowRpc.WeixinScan"
+
+func init() {
+	webrpcaddr = config.Sysconfig["webrpcport"].(string)
+}
+
+func SendScanRpc(code string) {
+	scanpool <- true
+	defer func() {
+		<-scanpool
+	}()
+	util.Try(func() {
+		client, err := rpc.DialHTTP("tcp", webrpcaddr)
+		defer client.Close()
+		if err != nil {
+			log.Println(err.Error())
+			return
+		}
+		var repl bool
+		err = client.Call(method, &code, &repl)
+		if err != nil {
+			log.Println(err.Error())
+		}
+		if !repl {
+			log.Println("scan rpc error!", code)
+		}
+	}, func(e interface{}) {})
+}

+ 8 - 3
src/jfw/modules/weixin/src/wx/wx.go

@@ -588,7 +588,10 @@ func Subscribe(w ResponseWriter, r *Request) {
 		if source != "" && len(source) > 5 {
 			saveUserLog(source, openid)
 		}
-		redis.Put("sso", "p_usershare_"+source, openid, 600)
+		//redis.Put("sso", "p_usershare_"+source, openid, 600)
+		//修改为rpc回调
+		go SendScanRpc(source)
+
 		//redis.Put("sso", "p_userlogs_"+source, openid, 600)
 	}
 	go saveSubscribe(openid, r.Event, r.EventKey, 1)
@@ -823,8 +826,10 @@ func ScanHandler(w ResponseWriter, r *Request) {
 			}
 		}
 		if pccodepre != "32" {
-			log.Println("扫描登录:", r.EventKey)
-			redis.Put("sso", "p_usershare_"+r.EventKey, openid, 600)
+			//			log.Println("扫描登录:", r.EventKey)
+			//			redis.Put("sso", "p_usershare_"+r.EventKey, openid, 600)
+			//修改为rpc回调
+			go SendScanRpc(r.EventKey)
 		}
 	} else {
 		log.Println("scan-error,mongodb-conn-error")

+ 9 - 0
src/jfw/rpcfollow/rpc.go

@@ -2,6 +2,7 @@
 package rpcfollow
 
 import (
+	"jfw/front"
 	tools "jfw/tools"
 	"log"
 	"qfw/util"
@@ -14,6 +15,14 @@ import (
 
 type MyfollowRpc struct{}
 
+//微信扫码回调登录等操作
+func (c *MyfollowRpc) WeixinScan(param *string, ret *bool) error {
+	util.Try(func() {
+		*ret = front.GetWsByCode(*param)
+	}, func(e interface{}) {})
+	return nil
+}
+
 func (c *MyfollowRpc) MyFollowSet(param *frpc.FollowData, ret *string) error {
 	util.Try(func() {
 		*ret = setMyFollowKey(param.OpenId, param.Projectname)