package front import ( qutil "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/encrypt" "app.yhyue.com/moapp/jybase/go-xweb/httpsession" "app.yhyue.com/moapp/jybase/go-xweb/xweb" "app.yhyue.com/moapp/jybase/redis" "app.yhyue.com/moapp/jypkg/golang.org/x/net/websocket" "app.yhyue.com/moapp/jypkg/public" "encoding/json" "fmt" . "jy/src/jfw/config" "jy/src/jfw/jyutil" "log" "strconv" "strings" "sync" "time" ) // socket对象放在内存中,待rpc回调使用 type Wss struct { Conn *websocket.Conn session *httpsession.Session Time int64 } type MapSocket struct { Map map[string]*Wss Lock sync.Mutex } func (m *MapSocket) GC() { defer qutil.Catch() m.Lock.Lock() now := time.Now().Unix() for k, v := range m.Map { if now-v.Time > 3600 || v.Conn == nil { delete(m.Map, k) } } m.Lock.Unlock() time.AfterFunc(5*time.Minute, m.GC) } var MSPOOL = 20 var MapSocketArr = make([]*MapSocket, MSPOOL) // 初始化 func init() { for i := 0; i < MSPOOL; i++ { ms := &MapSocket{Map: map[string]*Wss{}} go ms.GC() MapSocketArr[i] = ms } redis.InitRedisLogin(public.DbConf.Redis.Login.Address) } func GetRedisPoolVal() { redis.GetLoginVal("loginCode", GetWsByCode) // var res string // for { // res = redis.GetLoginVal("loginCode") // if res != "" { // param := strings.Split(res, ",") // ok := GetWsByCode(param) // log.Println("ok:", ok) // } // } } // 根据代码和ws做映射 func PutWsByCode(src string, ws *websocket.Conn, session *httpsession.Session) { defer qutil.Catch() n := HashVal(src) % MSPOOL ms := MapSocketArr[n] ms.Lock.Lock() if ms.Map[src] == nil { ms.Map[src] = &Wss{ws, session, time.Now().Unix()} } else { wss := ms.Map[src] if ws != nil { wss.Conn = ws } if session != nil { wss.session = session } wss.Time = time.Now().Unix() } ms.Lock.Unlock() } // 计算代码的hash值 func HashVal(src string) int { check := 0 for i := len(src) / 2; i < len(src); i++ { check += int(src[i]) } return check } // rpc回调,写到前台 func GetWsByCode(param []string) bool { if len(param) < 2 { return false } src := param[0] openid := param[1] if src == "" { return false } defer qutil.Catch() n := HashVal(src) % MSPOOL ms := MapSocketArr[n] defer func() { ms.Lock.Lock() delete(ms.Map, src) ms.Lock.Unlock() }() ms.Lock.Lock() wss := ms.Map[src] ms.Lock.Unlock() if wss != nil { session := wss.session if session == nil && wss.Conn != nil { session = wss.Conn.Sess } if session == nil { log.Println("error:rpc back has no session!") return false } infoData := LoginInfo(src, openid, session) //用户信息正常 且 当前二维码 code被绑定 if infoData != nil { baseUserId := qutil.Int64All(session.Get("base_user_id")) if erStr := redis.GetStr("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)); erStr != "" && baseUserId > 0 { infoData["cValue"] = encrypt.SE.EncodeString(strconv.FormatInt(baseUserId, 10)) infoData["cName"] = jyutil.KeepLoginCookieName expiresHour := qutil.IntAll(Sysconfig["setSessionTimeout"]) expires := time.Now().Add(time.Duration(expiresHour) * time.Hour) infoData["expires"] = expires.Format("2006-01-02 15:04:05") //往cookie加入标识 保证用户登录状态延期 if ok := redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)) && redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, erStr)); !ok { log.Println(fmt.Sprintf("%d 清除用户登录延期标识 异常", baseUserId)) } //登录source 更新 //收到webSocket消息后,您无法设置cookie.一旦建立了webSocket连接,它就是一个开放的TCP套接字,协议不再是http,因此没有内置的方式来交换cookie. //jyutil.SetCookieValueForAutoLogin(wss.Conn.W, baseUserId) 不可用 //您可以将自己的webSocket消息发送回客户端,告知它设置cookie,然后在客户端中侦听该消息,当它收到该消息时,它可以在浏览器中设置cookie. } } if wss.Conn == nil { return true } sendmessage, _ := json.Marshal(infoData) if err := websocket.Message.Send(wss.Conn, string(sendmessage)); err != nil { log.Println("socket send fail..", err) return false } else { wss.Conn.Close() } return true } else { return false } } // 用户登录信息 func LoginInfo(shareid, openid string, Sess interface{}) map[string]interface{} { if Sess == nil { return nil } sess, _ := Sess.(*httpsession.Session) if openid != "" { ok, user, infoData := FindUserAndCreateSess(openid, sess, "pc", true, true) if ok { (*user)["shareid"] = shareid sess.Set("user", *user) infoData["shareid"] = shareid sess.Set("rpcBackUserInfo", infoData) } return infoData } return nil } // ServeWss 登录关注 func ServeWss(conn *websocket.Conn) { defer qutil.Catch() conn.Sess = xweb.RootApp().SessionManager.Session(conn.R, conn.W) var shareIds string for { var shareData string err := websocket.Message.Receive(conn, &shareData) if err != nil { //log.Println("前台socket关闭,后台socket断开并退出循环。。。。") break } else { //心跳监测 if shareData == "HeartBeat" { websocket.Message.Send(conn, "HeartBeat") continue } shareIds = shareData shareidlist := strings.Split(shareIds, "___") if shareIds != "" && len(shareidlist) > 1 { shareidnum := shareidlist[0] shareidkop := shareidlist[1] PutWsByCode(se.DecodeString(shareidnum), conn, nil) PutWsByCode(se.DecodeString(shareidkop), conn, nil) } } } } // 实验室 func QrToLabWss(conn *websocket.Conn) { defer qutil.Catch() var receive, userId string var qrToLab_ok, qrToLab_open_ok bool //接收消息 go func() { defer qutil.Catch() for { err := websocket.Message.Receive(conn, &receive) if err != nil { receive = "close" //log.Println("websocket接收失败!", err) return } if receive == "close" { //关闭 return } else if receive == "HeartBeat" { //心跳监测 websocket.Message.Send(conn, "HeartBeat") } else { //接收到userid userId = se.DecodeString(receive) } } }() //发送消息 for { time.Sleep(2 * time.Second) if receive == "close" { //接收到关闭信息 conn.Close() return } else if userId == "" { continue } var reply string //是否进入实验室 if !qrToLab_ok { qrToLab_ok, _ = redis.Exists("other", "qrToLab_"+userId) if qrToLab_ok { reply = "qrToLab_ok" } } //是否打开开关 if !qrToLab_open_ok { qrToLab_open_ok, _ = redis.Exists("other", "qrToLab_open_"+userId) if qrToLab_open_ok { reply = "qrToLab_open_ok" } } if reply == "" { continue } if err := websocket.Message.Send(conn, reply); err != nil { redis.Del("other", "qrToLab_"+userId) redis.Del("other", "qrToLab_open_"+userId) //log.Println("websocket发送失败!", err) conn.Close() return } if reply == "qrToLab_ok" { redis.Del("other", "qrToLab_"+userId) } else if reply == "qrToLab_open_ok" { redis.Del("other", "qrToLab_open_"+userId) } if qrToLab_ok && qrToLab_open_ok { conn.Close() return } } } // 电脑端强制分享,扫码分享到朋友圈 func QrToShareTimeline(conn *websocket.Conn) { defer qutil.Catch() var receive string //接收消息 sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W) userid, _ := sess.Get("userId").(string) key := "" if userid != "" { key = fmt.Sprintf("pcbiddetail_shareTimeline_%s", userid) } go func() { defer qutil.Catch() for { err := websocket.Message.Receive(conn, &receive) if err != nil { receive = "close" //log.Println("websocket接收失败!", err) return } if receive == "close" { //关闭 return } else if receive == "HeartBeat" { //心跳监测 websocket.Message.Send(conn, "HeartBeat") } } }() //发送消息 for { time.Sleep(2 * time.Second) if receive == "close" { //接收到关闭信息 conn.Close() return } reply := "n" if key != "" { ok, _ := redis.Exists("other", key) if ok { reply = "y" } } if err := websocket.Message.Send(conn, reply); err != nil { redis.Del("other", key) //log.Println("websocket发送失败!", err) conn.Close() return } if reply == "y" { redis.Del("other", key) conn.Close() return } } } // pc弹框 最新一条消息 func GetBuoyMsg(conn *websocket.Conn) { defer qutil.Catch() sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W) userid, _ := sess.Get("userId").(string) if userid == "" { return } messageCenter, _ := Sysconfig["messageCenter"].(map[string]interface{}) appid, _ := messageCenter["appid"].(string) dbName, _ := messageCenter["dbName"].(string) createtime := qutil.ObjToString(messageCenter["createtime"]) interval := qutil.IntAllDef(messageCenter["interval"], 300) limitCount := qutil.IntAllDef(messageCenter["limitCount"], 3) go func() { defer qutil.Catch() for { var receive string err := websocket.Message.Receive(conn, &receive) if err != nil { //log.Println("websocket接收失败!", err) return } if receive == "close" { //关闭 return } else if receive == "HeartBeat" { //心跳监测 websocket.Message.Send(conn, "HeartBeat") } } }() //发送消息 counter := 0 isFirst := true for { counter++ if counter%5 == 0 { if err := websocket.Message.Send(conn, "HeartBeat"); err != nil { conn.Close() return } } if isFirst || counter >= interval { counter = 0 timeA := time.Now().AddDate(0, 0, -qutil.IntAllDef(messageCenter["limitDay"], 7)) timeB, err := time.ParseInLocation(Date_Full_Layout, createtime, time.Local) if err == nil && timeA.After(timeB) { createtime = FormatDate(&timeA, Date_Full_Layout) } list := public.BaseMysql.SelectBySql(`select id,title,link,content,show_content,pc_tip,isdel,isRead from `+dbName+`.message where receive_userid=? and appid=? and createtime>? order by createtime desc limit ?`, userid, appid, createtime, limitCount) if list != nil { for _, v := range *list { if qutil.IntAll(v["pc_tip"]) == 1 || qutil.IntAll(v["isdel"]) != 1 || qutil.IntAll(v["isRead"]) == 1 { continue } id := qutil.Int64All(v["id"]) reply := fmt.Sprintf(`{"id":%d,"title":"%s","content":"%s","show_content":"%s","link":"%s"}`, id, qutil.ObjToString(v["title"]), qutil.ObjToString(v["content"]), qutil.ObjToString(v["show_content"]), strings.Split(qutil.ObjToString(v["link"]), ",")[0]) if err := websocket.Message.Send(conn, reply); err != nil { //log.Println("websocket发送失败!", err) conn.Close() return } if id > 0 { public.BaseMysql.UpdateOrDeleteBySql(`update `+dbName+`.message set pc_tip=1 where id=?`, id) } time.Sleep(time.Second) } } } isFirst = false time.Sleep(time.Second) } }