123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- package front
- import (
- "encoding/json"
- "fmt"
- "log"
- //"net/http"
- "jfw/config"
- qutil "qfw/util"
- "qfw/util/redis"
- "strings"
- "sync"
- "time"
- "github.com/go-xweb/httpsession"
- "github.com/go-xweb/xweb"
- "golang.org/x/net/websocket"
- )
- //socket对象放在内存中,待rpc回调使用
- type Wss struct {
- Conn *websocket.Conn
- session *httpsession.Session
- Time int64
- }
- type MapSocket struct {
- Map map[string]*Wss
- Lock sync.Mutex
- }
- func (m *MapSocket) GC() {
- defer qutil.Catch()
- m.Lock.Lock()
- now := time.Now().Unix()
- for k, v := range m.Map {
- if now-v.Time > 3600 || v.Conn == nil {
- delete(m.Map, k)
- }
- }
- m.Lock.Unlock()
- time.AfterFunc(5*time.Minute, m.GC)
- }
- var MSPOOL = 20
- var MapSocketArr = make([]*MapSocket, MSPOOL)
- //初始化
- func init() {
- for i := 0; i < MSPOOL; i++ {
- ms := &MapSocket{Map: map[string]*Wss{}}
- go ms.GC()
- MapSocketArr[i] = ms
- }
- redis.InitRedisLogin(config.Sysconfig["redisaddrs"].(string))
- }
- //
- func GetRedisPoolVal() {
- redis.GetLoginVal("loginCode", GetWsByCode)
- // var res string
- // for {
- // res = redis.GetLoginVal("loginCode")
- // if res != "" {
- // param := strings.Split(res, ",")
- // ok := GetWsByCode(param)
- // log.Println("ok:", ok)
- // }
- // }
- }
- //根据代码和ws做映射
- func PutWsByCode(src string, ws *websocket.Conn, session *httpsession.Session) {
- defer qutil.Catch()
- n := HashVal(src) % MSPOOL
- ms := MapSocketArr[n]
- ms.Lock.Lock()
- if ms.Map[src] == nil {
- ms.Map[src] = &Wss{ws, session, time.Now().Unix()}
- } else {
- wss := ms.Map[src]
- if ws != nil {
- wss.Conn = ws
- }
- if session != nil {
- wss.session = session
- }
- wss.Time = time.Now().Unix()
- }
- ms.Lock.Unlock()
- }
- //计算代码的hash值
- func HashVal(src string) int {
- check := 0
- for i := len(src) / 2; i < len(src); i++ {
- check += int(src[i])
- }
- return check
- }
- //rpc回调,写到前台
- func GetWsByCode(param []string) bool {
- if len(param) < 2 {
- return false
- }
- src := param[0]
- openid := param[1]
- if src == "" {
- return false
- }
- defer qutil.Catch()
- n := HashVal(src) % MSPOOL
- ms := MapSocketArr[n]
- defer func() {
- ms.Lock.Lock()
- delete(ms.Map, src)
- ms.Lock.Unlock()
- }()
- ms.Lock.Lock()
- wss := ms.Map[src]
- ms.Lock.Unlock()
- if wss != nil {
- session := wss.session
- if session == nil && wss.Conn != nil {
- session = wss.Conn.Sess
- }
- if session == nil {
- log.Println("error:rpc back has no session!")
- return false
- }
- infoData := LoginInfo(src, openid, session)
- if wss.Conn == nil {
- return true
- }
- sendmessage, _ := json.Marshal(infoData)
- if err := websocket.Message.Send(wss.Conn, string(sendmessage)); err != nil {
- log.Println("socket send fail..", err)
- return false
- } else {
- wss.Conn.Close()
- }
- return true
- } else {
- return false
- }
- }
- //用户登录信息
- func LoginInfo(shareid, openid string, Sess interface{}) (infoData map[string]interface{}) {
- if Sess == nil {
- return nil
- }
- sess, _ := Sess.(*httpsession.Session)
- infoData = make(map[string]interface{})
- if openid != "" {
- sess.Set("openid", openid)
- redisheadimg := redis.Get("other", "newUser-"+openid)
- if redisheadimg == nil {
- redisheadimg = ""
- }
- user, _ := mongodb.FindOneByField("user", `{"s_m_openid":"`+openid+`"}`, `{"s_nickname":1,"s_headimage":1,"s_m_openid":1,"_id":1}`)
- if user != nil && len(*user) > 0 {
- infoData["result"] = "ok"
- infoData["s_nickname"] = fmt.Sprint((*user)["s_nickname"])
- infoData["s_headimage"] = fmt.Sprint((*user)["s_headimage"])
- infoData["redisheadimg"] = fmt.Sprint(redisheadimg)
- infoData["encryptId"] = se.EncodeString(qutil.BsonIdToSId((*user)["_id"]))
- infoData["shareid"] = shareid
- infoData["openid"] = se.EncodeString(fmt.Sprint((*user)["s_m_openid"])) //add 20181116
- (*user)["shareid"] = shareid
- nick := fmt.Sprint((*user)["s_nickname"])
- sess.Set("nickname", nick)
- sess.Set("s_nickname", nick)
- sess.Set("s_m_openid", fmt.Sprint((*user)["s_m_openid"]))
- sess.Set("user", *user)
- sess.Set("userId", qutil.BsonIdToSId((*user)["_id"]))
- sess.Set("rpcBackUserInfo", infoData)
- }
- }
- return infoData
- }
- //登录关注
- func ServeWss(conn *websocket.Conn) {
- defer qutil.Catch()
- conn.Sess = xweb.RootApp().SessionManager.Session(conn.R, conn.W)
- var shareIds string
- for {
- var shareData string
- err := websocket.Message.Receive(conn, &shareData)
- if err != nil {
- //log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
- break
- } else {
- //心跳监测
- if shareData == "HeartBeat" {
- websocket.Message.Send(conn, "HeartBeat")
- continue
- }
- shareIds = shareData
- shareidlist := strings.Split(shareIds, "___")
- if shareIds != "" && len(shareidlist) > 1 {
- shareidnum := shareidlist[0]
- shareidkop := shareidlist[1]
- PutWsByCode(se.DecodeString(shareidnum), conn, nil)
- PutWsByCode(se.DecodeString(shareidkop), conn, nil)
- }
- }
- }
- }
- //实验室
- func QrToLabWss(conn *websocket.Conn) {
- defer qutil.Catch()
- var receive, userId string
- var qrToLab_ok, qrToLab_open_ok bool
- //接收消息
- go func() {
- defer qutil.Catch()
- for {
- err := websocket.Message.Receive(conn, &receive)
- if err != nil {
- receive = "close"
- //log.Println("websocket接收失败!", err)
- return
- }
- if receive == "close" { //关闭
- return
- } else if receive == "HeartBeat" { //心跳监测
- websocket.Message.Send(conn, "HeartBeat")
- } else { //接收到userid
- userId = se.DecodeString(receive)
- }
- }
- }()
- //发送消息
- for {
- time.Sleep(2 * time.Second)
- if receive == "close" { //接收到关闭信息
- conn.Close()
- return
- } else if userId == "" {
- continue
- }
- var reply string
- //是否进入实验室
- if !qrToLab_ok {
- qrToLab_ok, _ = redis.Exists("other", "qrToLab_"+userId)
- if qrToLab_ok {
- reply = "qrToLab_ok"
- }
- }
- //是否打开开关
- if !qrToLab_open_ok {
- qrToLab_open_ok, _ = redis.Exists("other", "qrToLab_open_"+userId)
- if qrToLab_open_ok {
- reply = "qrToLab_open_ok"
- }
- }
- if reply == "" {
- continue
- }
- if err := websocket.Message.Send(conn, reply); err != nil {
- redis.Del("other", "qrToLab_"+userId)
- redis.Del("other", "qrToLab_open_"+userId)
- //log.Println("websocket发送失败!", err)
- conn.Close()
- return
- }
- if reply == "qrToLab_ok" {
- redis.Del("other", "qrToLab_"+userId)
- } else if reply == "qrToLab_open_ok" {
- redis.Del("other", "qrToLab_open_"+userId)
- }
- if qrToLab_ok && qrToLab_open_ok {
- conn.Close()
- return
- }
- }
- }
- //电脑端强制分享,扫码分享到朋友圈
- func QrToShareTimeline(conn *websocket.Conn) {
- defer qutil.Catch()
- var receive string
- //接收消息
- sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
- userid, _ := sess.Get("userId").(string)
- key := ""
- if userid != "" {
- key = fmt.Sprintf("pcbiddetail_shareTimeline_%s", userid)
- }
- go func() {
- defer qutil.Catch()
- for {
- err := websocket.Message.Receive(conn, &receive)
- if err != nil {
- receive = "close"
- //log.Println("websocket接收失败!", err)
- return
- }
- if receive == "close" { //关闭
- return
- } else if receive == "HeartBeat" { //心跳监测
- websocket.Message.Send(conn, "HeartBeat")
- }
- }
- }()
- //发送消息
- for {
- time.Sleep(2 * time.Second)
- if receive == "close" { //接收到关闭信息
- conn.Close()
- return
- }
- reply := "n"
- if key != "" {
- ok, _ := redis.Exists("other", key)
- if ok {
- reply = "y"
- }
- }
- if err := websocket.Message.Send(conn, reply); err != nil {
- redis.Del("other", key)
- //log.Println("websocket发送失败!", err)
- conn.Close()
- return
- }
- if reply == "y" {
- redis.Del("other", key)
- conn.Close()
- return
- }
- }
- }
|