123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- package front
- import (
- qutil "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- "app.yhyue.com/moapp/jybase/encrypt"
- "app.yhyue.com/moapp/jybase/go-xweb/httpsession"
- "app.yhyue.com/moapp/jybase/go-xweb/xweb"
- "app.yhyue.com/moapp/jybase/redis"
- "app.yhyue.com/moapp/jypkg/golang.org/x/net/websocket"
- "app.yhyue.com/moapp/jypkg/public"
- "encoding/json"
- "fmt"
- . "jy/src/jfw/config"
- "jy/src/jfw/jyutil"
- "log"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- // socket对象放在内存中,待rpc回调使用
- type Wss struct {
- Conn *websocket.Conn
- session *httpsession.Session
- Time int64
- }
- type MapSocket struct {
- Map map[string]*Wss
- Lock sync.Mutex
- }
- func (m *MapSocket) GC() {
- defer qutil.Catch()
- m.Lock.Lock()
- now := time.Now().Unix()
- for k, v := range m.Map {
- if now-v.Time > 3600 || v.Conn == nil {
- delete(m.Map, k)
- }
- }
- m.Lock.Unlock()
- time.AfterFunc(5*time.Minute, m.GC)
- }
- var MSPOOL = 20
- var MapSocketArr = make([]*MapSocket, MSPOOL)
- // 初始化
- func init() {
- for i := 0; i < MSPOOL; i++ {
- ms := &MapSocket{Map: map[string]*Wss{}}
- go ms.GC()
- MapSocketArr[i] = ms
- }
- redis.InitRedisLogin(public.DbConf.Redis.Login.Address)
- }
- func GetRedisPoolVal() {
- redis.GetLoginVal("loginCode", GetWsByCode)
- // var res string
- // for {
- // res = redis.GetLoginVal("loginCode")
- // if res != "" {
- // param := strings.Split(res, ",")
- // ok := GetWsByCode(param)
- // log.Println("ok:", ok)
- // }
- // }
- }
- // 根据代码和ws做映射
- func PutWsByCode(src string, ws *websocket.Conn, session *httpsession.Session) {
- defer qutil.Catch()
- n := HashVal(src) % MSPOOL
- ms := MapSocketArr[n]
- ms.Lock.Lock()
- if ms.Map[src] == nil {
- ms.Map[src] = &Wss{ws, session, time.Now().Unix()}
- } else {
- wss := ms.Map[src]
- if ws != nil {
- wss.Conn = ws
- }
- if session != nil {
- wss.session = session
- }
- wss.Time = time.Now().Unix()
- }
- ms.Lock.Unlock()
- }
- // 计算代码的hash值
- func HashVal(src string) int {
- check := 0
- for i := len(src) / 2; i < len(src); i++ {
- check += int(src[i])
- }
- return check
- }
- // rpc回调,写到前台
- func GetWsByCode(param []string) bool {
- if len(param) < 2 {
- return false
- }
- src := param[0]
- openid := param[1]
- if src == "" {
- return false
- }
- defer qutil.Catch()
- n := HashVal(src) % MSPOOL
- ms := MapSocketArr[n]
- defer func() {
- ms.Lock.Lock()
- delete(ms.Map, src)
- ms.Lock.Unlock()
- }()
- ms.Lock.Lock()
- wss := ms.Map[src]
- ms.Lock.Unlock()
- if wss != nil {
- session := wss.session
- if session == nil && wss.Conn != nil {
- session = wss.Conn.Sess
- }
- if session == nil {
- log.Println("error:rpc back has no session!")
- return false
- }
- infoData := LoginInfo(src, openid, session)
- //用户信息正常 且 当前二维码 code被绑定
- if infoData != nil {
- baseUserId := qutil.Int64All(session.Get("base_user_id"))
- if erStr := redis.GetStr("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)); erStr != "" && baseUserId > 0 {
- infoData["cValue"] = encrypt.SE.EncodeString(strconv.FormatInt(baseUserId, 10))
- infoData["cName"] = jyutil.KeepLoginCookieName
- expiresHour := qutil.IntAll(Sysconfig["setSessionTimeout"])
- expires := time.Now().Add(time.Duration(expiresHour) * time.Hour)
- infoData["expires"] = expires.Format("2006-01-02 15:04:05")
- //往cookie加入标识 保证用户登录状态延期
- if ok := redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)) && redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, erStr)); !ok {
- log.Println(fmt.Sprintf("%d 清除用户登录延期标识 异常", baseUserId))
- }
- //登录source 更新
- //收到webSocket消息后,您无法设置cookie.一旦建立了webSocket连接,它就是一个开放的TCP套接字,协议不再是http,因此没有内置的方式来交换cookie.
- //jyutil.SetCookieValueForAutoLogin(wss.Conn.W, baseUserId) 不可用
- //您可以将自己的webSocket消息发送回客户端,告知它设置cookie,然后在客户端中侦听该消息,当它收到该消息时,它可以在浏览器中设置cookie.
- }
- }
- if wss.Conn == nil {
- return true
- }
- sendmessage, _ := json.Marshal(infoData)
- if err := websocket.Message.Send(wss.Conn, string(sendmessage)); err != nil {
- log.Println("socket send fail..", err)
- return false
- } else {
- wss.Conn.Close()
- }
- return true
- } else {
- return false
- }
- }
- // 用户登录信息
- func LoginInfo(shareid, openid string, Sess interface{}) map[string]interface{} {
- if Sess == nil {
- return nil
- }
- sess, _ := Sess.(*httpsession.Session)
- if openid != "" {
- ok, user, infoData := FindUserAndCreateSess(openid, sess, "pc", true, true)
- if ok {
- (*user)["shareid"] = shareid
- sess.Set("user", *user)
- infoData["shareid"] = shareid
- sess.Set("rpcBackUserInfo", infoData)
- }
- return infoData
- }
- return nil
- }
- // ServeWss 登录关注
- func ServeWss(conn *websocket.Conn) {
- defer qutil.Catch()
- conn.Sess = xweb.RootApp().SessionManager.Session(conn.R, conn.W)
- var shareIds string
- for {
- var shareData string
- err := websocket.Message.Receive(conn, &shareData)
- if err != nil {
- //log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
- break
- } else {
- //心跳监测
- if shareData == "HeartBeat" {
- websocket.Message.Send(conn, "HeartBeat")
- continue
- }
- shareIds = shareData
- shareidlist := strings.Split(shareIds, "___")
- if shareIds != "" && len(shareidlist) > 1 {
- shareidnum := shareidlist[0]
- shareidkop := shareidlist[1]
- PutWsByCode(se.DecodeString(shareidnum), conn, nil)
- PutWsByCode(se.DecodeString(shareidkop), conn, nil)
- }
- }
- }
- }
- // 实验室
- func QrToLabWss(conn *websocket.Conn) {
- defer qutil.Catch()
- var receive, userId string
- var qrToLab_ok, qrToLab_open_ok bool
- //接收消息
- go func() {
- defer qutil.Catch()
- for {
- err := websocket.Message.Receive(conn, &receive)
- if err != nil {
- receive = "close"
- //log.Println("websocket接收失败!", err)
- return
- }
- if receive == "close" { //关闭
- return
- } else if receive == "HeartBeat" { //心跳监测
- websocket.Message.Send(conn, "HeartBeat")
- } else { //接收到userid
- userId = se.DecodeString(receive)
- }
- }
- }()
- //发送消息
- for {
- time.Sleep(2 * time.Second)
- if receive == "close" { //接收到关闭信息
- conn.Close()
- return
- } else if userId == "" {
- continue
- }
- var reply string
- //是否进入实验室
- if !qrToLab_ok {
- qrToLab_ok, _ = redis.Exists("other", "qrToLab_"+userId)
- if qrToLab_ok {
- reply = "qrToLab_ok"
- }
- }
- //是否打开开关
- if !qrToLab_open_ok {
- qrToLab_open_ok, _ = redis.Exists("other", "qrToLab_open_"+userId)
- if qrToLab_open_ok {
- reply = "qrToLab_open_ok"
- }
- }
- if reply == "" {
- continue
- }
- if err := websocket.Message.Send(conn, reply); err != nil {
- redis.Del("other", "qrToLab_"+userId)
- redis.Del("other", "qrToLab_open_"+userId)
- //log.Println("websocket发送失败!", err)
- conn.Close()
- return
- }
- if reply == "qrToLab_ok" {
- redis.Del("other", "qrToLab_"+userId)
- } else if reply == "qrToLab_open_ok" {
- redis.Del("other", "qrToLab_open_"+userId)
- }
- if qrToLab_ok && qrToLab_open_ok {
- conn.Close()
- return
- }
- }
- }
- // 电脑端强制分享,扫码分享到朋友圈
- func QrToShareTimeline(conn *websocket.Conn) {
- defer qutil.Catch()
- var receive string
- //接收消息
- sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
- userid, _ := sess.Get("userId").(string)
- key := ""
- if userid != "" {
- key = fmt.Sprintf("pcbiddetail_shareTimeline_%s", userid)
- }
- go func() {
- defer qutil.Catch()
- for {
- err := websocket.Message.Receive(conn, &receive)
- if err != nil {
- receive = "close"
- //log.Println("websocket接收失败!", err)
- return
- }
- if receive == "close" { //关闭
- return
- } else if receive == "HeartBeat" { //心跳监测
- websocket.Message.Send(conn, "HeartBeat")
- }
- }
- }()
- //发送消息
- for {
- time.Sleep(2 * time.Second)
- if receive == "close" { //接收到关闭信息
- conn.Close()
- return
- }
- reply := "n"
- if key != "" {
- ok, _ := redis.Exists("other", key)
- if ok {
- reply = "y"
- }
- }
- if err := websocket.Message.Send(conn, reply); err != nil {
- redis.Del("other", key)
- //log.Println("websocket发送失败!", err)
- conn.Close()
- return
- }
- if reply == "y" {
- redis.Del("other", key)
- conn.Close()
- return
- }
- }
- }
- // pc弹框 最新一条消息
- func GetBuoyMsg(conn *websocket.Conn) {
- defer qutil.Catch()
- sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
- userid, _ := sess.Get("userId").(string)
- if userid == "" {
- return
- }
- messageCenter, _ := Sysconfig["messageCenter"].(map[string]interface{})
- appid, _ := messageCenter["appid"].(string)
- dbName, _ := messageCenter["dbName"].(string)
- createtime := qutil.ObjToString(messageCenter["createtime"])
- interval := qutil.IntAllDef(messageCenter["interval"], 300)
- limitCount := qutil.IntAllDef(messageCenter["limitCount"], 3)
- go func() {
- defer qutil.Catch()
- for {
- var receive string
- err := websocket.Message.Receive(conn, &receive)
- if err != nil {
- //log.Println("websocket接收失败!", err)
- return
- }
- if receive == "close" { //关闭
- return
- } else if receive == "HeartBeat" { //心跳监测
- websocket.Message.Send(conn, "HeartBeat")
- }
- }
- }()
- //发送消息
- counter := 0
- isFirst := true
- for {
- counter++
- if counter%5 == 0 {
- if err := websocket.Message.Send(conn, "HeartBeat"); err != nil {
- conn.Close()
- return
- }
- }
- if isFirst || counter >= interval {
- counter = 0
- timeA := time.Now().AddDate(0, 0, -qutil.IntAllDef(messageCenter["limitDay"], 7))
- timeB, err := time.ParseInLocation(Date_Full_Layout, createtime, time.Local)
- if err == nil && timeA.After(timeB) {
- createtime = FormatDate(&timeA, Date_Full_Layout)
- }
- list := public.BaseMysql.SelectBySql(`select id,title,link,content,show_content,pc_tip,isdel,isRead from `+dbName+`.message where receive_userid=? and appid=? and createtime>? order by createtime desc limit ?`, userid, appid, createtime, limitCount)
- if list != nil {
- for _, v := range *list {
- if qutil.IntAll(v["pc_tip"]) == 1 || qutil.IntAll(v["isdel"]) != 1 || qutil.IntAll(v["isRead"]) == 1 {
- continue
- }
- id := qutil.Int64All(v["id"])
- reply := fmt.Sprintf(`{"id":%d,"title":"%s","content":"%s","show_content":"%s","link":"%s"}`, id, qutil.ObjToString(v["title"]), qutil.ObjToString(v["content"]), qutil.ObjToString(v["show_content"]), strings.Split(qutil.ObjToString(v["link"]), ",")[0])
- if err := websocket.Message.Send(conn, reply); err != nil {
- //log.Println("websocket发送失败!", err)
- conn.Close()
- return
- }
- if id > 0 {
- public.BaseMysql.UpdateOrDeleteBySql(`update `+dbName+`.message set pc_tip=1 where id=?`, id)
- }
- time.Sleep(time.Second)
- }
- }
- }
- isFirst = false
- time.Sleep(time.Second)
- }
- }
|