websocket.go 7.5 KB


  1. package front
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. //"net/http"
  7. "jfw/config"
  8. qutil "qfw/util"
  9. "qfw/util/redis"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/go-xweb/httpsession"
  14. "github.com/go-xweb/xweb"
  15. "golang.org/x/net/websocket"
  16. )
  17. //socket对象放在内存中,待rpc回调使用
  18. type Wss struct {
  19. Conn *websocket.Conn
  20. session *httpsession.Session
  21. Time int64
  22. }
  23. type MapSocket struct {
  24. Map map[string]*Wss
  25. Lock sync.Mutex
  26. }
  27. func (m *MapSocket) GC() {
  28. defer qutil.Catch()
  29. m.Lock.Lock()
  30. now := time.Now().Unix()
  31. for k, v := range m.Map {
  32. if now-v.Time > 3600 || v.Conn == nil {
  33. delete(m.Map, k)
  34. }
  35. }
  36. m.Lock.Unlock()
  37. time.AfterFunc(5*time.Minute, m.GC)
  38. }
  39. var MSPOOL = 20
  40. var MapSocketArr = make([]*MapSocket, MSPOOL)
  41. //初始化
  42. func init() {
  43. for i := 0; i < MSPOOL; i++ {
  44. ms := &MapSocket{Map: map[string]*Wss{}}
  45. go ms.GC()
  46. MapSocketArr[i] = ms
  47. }
  48. redis.InitRedisLogin(config.Sysconfig["redisaddrs"].(string))
  49. }
  50. //
  51. func GetRedisPoolVal() {
  52. redis.GetLoginVal("loginCode", GetWsByCode)
  53. // var res string
  54. // for {
  55. // res = redis.GetLoginVal("loginCode")
  56. // if res != "" {
  57. // param := strings.Split(res, ",")
  58. // ok := GetWsByCode(param)
  59. // log.Println("ok:", ok)
  60. // }
  61. // }
  62. }
  63. //根据代码和ws做映射
  64. func PutWsByCode(src string, ws *websocket.Conn, session *httpsession.Session) {
  65. defer qutil.Catch()
  66. n := HashVal(src) % MSPOOL
  67. ms := MapSocketArr[n]
  68. ms.Lock.Lock()
  69. if ms.Map[src] == nil {
  70. ms.Map[src] = &Wss{ws, session, time.Now().Unix()}
  71. } else {
  72. wss := ms.Map[src]
  73. if ws != nil {
  74. wss.Conn = ws
  75. }
  76. if session != nil {
  77. wss.session = session
  78. }
  79. wss.Time = time.Now().Unix()
  80. }
  81. ms.Lock.Unlock()
  82. }
  83. //计算代码的hash值
  84. func HashVal(src string) int {
  85. check := 0
  86. for i := len(src) / 2; i < len(src); i++ {
  87. check += int(src[i])
  88. }
  89. return check
  90. }
  91. //rpc回调,写到前台
  92. func GetWsByCode(param []string) bool {
  93. if len(param) < 2 {
  94. return false
  95. }
  96. src := param[0]
  97. openid := param[1]
  98. if src == "" {
  99. return false
  100. }
  101. defer qutil.Catch()
  102. n := HashVal(src) % MSPOOL
  103. ms := MapSocketArr[n]
  104. defer func() {
  105. ms.Lock.Lock()
  106. delete(ms.Map, src)
  107. ms.Lock.Unlock()
  108. }()
  109. ms.Lock.Lock()
  110. wss := ms.Map[src]
  111. ms.Lock.Unlock()
  112. if wss != nil {
  113. session := wss.session
  114. if session == nil && wss.Conn != nil {
  115. session = wss.Conn.Sess
  116. }
  117. if session == nil {
  118. log.Println("error:rpc back has no session!")
  119. return false
  120. }
  121. infoData := LoginInfo(src, openid, session)
  122. if wss.Conn == nil {
  123. return true
  124. }
  125. sendmessage, _ := json.Marshal(infoData)
  126. if err := websocket.Message.Send(wss.Conn, string(sendmessage)); err != nil {
  127. log.Println("socket send fail..", err)
  128. return false
  129. } else {
  130. wss.Conn.Close()
  131. }
  132. return true
  133. } else {
  134. return false
  135. }
  136. }
  137. //用户登录信息
  138. func LoginInfo(shareid, openid string, Sess interface{}) (infoData map[string]interface{}) {
  139. if Sess == nil {
  140. return nil
  141. }
  142. sess, _ := Sess.(*httpsession.Session)
  143. infoData = make(map[string]interface{})
  144. if openid != "" {
  145. sess.Set("openid", openid)
  146. redisheadimg := redis.Get("other", "newUser-"+openid)
  147. if redisheadimg == nil {
  148. redisheadimg = ""
  149. }
  150. user, _ := mongodb.FindOneByField("user", `{"s_m_openid":"`+openid+`"}`, `{"s_nickname":1,"s_headimage":1,"s_m_openid":1,"_id":1}`)
  151. if user != nil && len(*user) > 0 {
  152. infoData["result"] = "ok"
  153. infoData["s_nickname"] = fmt.Sprint((*user)["s_nickname"])
  154. infoData["s_headimage"] = fmt.Sprint((*user)["s_headimage"])
  155. infoData["redisheadimg"] = fmt.Sprint(redisheadimg)
  156. infoData["encryptId"] = se.EncodeString(qutil.BsonIdToSId((*user)["_id"]))
  157. infoData["shareid"] = shareid
  158. infoData["openid"] = se.EncodeString(fmt.Sprint((*user)["s_m_openid"])) //add 20181116
  159. (*user)["shareid"] = shareid
  160. nick := fmt.Sprint((*user)["s_nickname"])
  161. sess.Set("nickname", nick)
  162. sess.Set("s_nickname", nick)
  163. sess.Set("s_m_openid", fmt.Sprint((*user)["s_m_openid"]))
  164. sess.Set("user", *user)
  165. sess.Set("userId", qutil.BsonIdToSId((*user)["_id"]))
  166. sess.Set("rpcBackUserInfo", infoData)
  167. }
  168. }
  169. return infoData
  170. }
  171. //登录关注
  172. func ServeWss(conn *websocket.Conn) {
  173. defer qutil.Catch()
  174. conn.Sess = xweb.RootApp().SessionManager.Session(conn.R, conn.W)
  175. var shareIds string
  176. for {
  177. var shareData string
  178. err := websocket.Message.Receive(conn, &shareData)
  179. if err != nil {
  180. //log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
  181. break
  182. } else {
  183. //心跳监测
  184. if shareData == "HeartBeat" {
  185. websocket.Message.Send(conn, "HeartBeat")
  186. continue
  187. }
  188. shareIds = shareData
  189. shareidlist := strings.Split(shareIds, "___")
  190. if shareIds != "" && len(shareidlist) > 1 {
  191. shareidnum := shareidlist[0]
  192. shareidkop := shareidlist[1]
  193. PutWsByCode(se.DecodeString(shareidnum), conn, nil)
  194. PutWsByCode(se.DecodeString(shareidkop), conn, nil)
  195. }
  196. }
  197. }
  198. }
  199. //实验室
  200. func QrToLabWss(conn *websocket.Conn) {
  201. defer qutil.Catch()
  202. var receive, userId string
  203. var qrToLab_ok, qrToLab_open_ok bool
  204. //接收消息
  205. go func() {
  206. defer qutil.Catch()
  207. for {
  208. err := websocket.Message.Receive(conn, &receive)
  209. if err != nil {
  210. receive = "close"
  211. //log.Println("websocket接收失败!", err)
  212. return
  213. }
  214. if receive == "close" { //关闭
  215. return
  216. } else if receive == "HeartBeat" { //心跳监测
  217. websocket.Message.Send(conn, "HeartBeat")
  218. } else { //接收到userid
  219. userId = se.DecodeString(receive)
  220. }
  221. }
  222. }()
  223. //发送消息
  224. for {
  225. time.Sleep(2 * time.Second)
  226. if receive == "close" { //接收到关闭信息
  227. conn.Close()
  228. return
  229. } else if userId == "" {
  230. continue
  231. }
  232. var reply string
  233. //是否进入实验室
  234. if !qrToLab_ok {
  235. qrToLab_ok, _ = redis.Exists("other", "qrToLab_"+userId)
  236. if qrToLab_ok {
  237. reply = "qrToLab_ok"
  238. }
  239. }
  240. //是否打开开关
  241. if !qrToLab_open_ok {
  242. qrToLab_open_ok, _ = redis.Exists("other", "qrToLab_open_"+userId)
  243. if qrToLab_open_ok {
  244. reply = "qrToLab_open_ok"
  245. }
  246. }
  247. if reply == "" {
  248. continue
  249. }
  250. if err := websocket.Message.Send(conn, reply); err != nil {
  251. redis.Del("other", "qrToLab_"+userId)
  252. redis.Del("other", "qrToLab_open_"+userId)
  253. //log.Println("websocket发送失败!", err)
  254. conn.Close()
  255. return
  256. }
  257. if reply == "qrToLab_ok" {
  258. redis.Del("other", "qrToLab_"+userId)
  259. } else if reply == "qrToLab_open_ok" {
  260. redis.Del("other", "qrToLab_open_"+userId)
  261. }
  262. if qrToLab_ok && qrToLab_open_ok {
  263. conn.Close()
  264. return
  265. }
  266. }
  267. }
  268. //电脑端强制分享,扫码分享到朋友圈
  269. func QrToShareTimeline(conn *websocket.Conn) {
  270. defer qutil.Catch()
  271. var receive string
  272. //接收消息
  273. sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
  274. userid, _ := sess.Get("userId").(string)
  275. key := ""
  276. if userid != "" {
  277. key = fmt.Sprintf("pcbiddetail_shareTimeline_%s", userid)
  278. }
  279. go func() {
  280. defer qutil.Catch()
  281. for {
  282. err := websocket.Message.Receive(conn, &receive)
  283. if err != nil {
  284. receive = "close"
  285. //log.Println("websocket接收失败!", err)
  286. return
  287. }
  288. if receive == "close" { //关闭
  289. return
  290. } else if receive == "HeartBeat" { //心跳监测
  291. websocket.Message.Send(conn, "HeartBeat")
  292. }
  293. }
  294. }()
  295. //发送消息
  296. for {
  297. time.Sleep(2 * time.Second)
  298. if receive == "close" { //接收到关闭信息
  299. conn.Close()
  300. return
  301. }
  302. reply := "n"
  303. if key != "" {
  304. ok, _ := redis.Exists("other", key)
  305. if ok {
  306. reply = "y"
  307. }
  308. }
  309. if err := websocket.Message.Send(conn, reply); err != nil {
  310. redis.Del("other", key)
  311. //log.Println("websocket发送失败!", err)
  312. conn.Close()
  313. return
  314. }
  315. if reply == "y" {
  316. redis.Del("other", key)
  317. conn.Close()
  318. return
  319. }
  320. }
  321. }