websocket.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package front
  2. import (
  3. qutil "app.yhyue.com/moapp/jybase/common"
  4. . "app.yhyue.com/moapp/jybase/date"
  5. "app.yhyue.com/moapp/jybase/encrypt"
  6. "app.yhyue.com/moapp/jybase/go-xweb/httpsession"
  7. "app.yhyue.com/moapp/jybase/go-xweb/xweb"
  8. "app.yhyue.com/moapp/jybase/redis"
  9. "app.yhyue.com/moapp/jypkg/golang.org/x/net/websocket"
  10. "app.yhyue.com/moapp/jypkg/public"
  11. "encoding/json"
  12. "fmt"
  13. . "jy/src/jfw/config"
  14. "jy/src/jfw/jyutil"
  15. "log"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. // socket对象放在内存中,待rpc回调使用
  22. type Wss struct {
  23. Conn *websocket.Conn
  24. session *httpsession.Session
  25. Time int64
  26. }
  27. type MapSocket struct {
  28. Map map[string]*Wss
  29. Lock sync.Mutex
  30. }
  31. func (m *MapSocket) GC() {
  32. defer qutil.Catch()
  33. m.Lock.Lock()
  34. now := time.Now().Unix()
  35. for k, v := range m.Map {
  36. if now-v.Time > 3600 || v.Conn == nil {
  37. delete(m.Map, k)
  38. }
  39. }
  40. m.Lock.Unlock()
  41. time.AfterFunc(5*time.Minute, m.GC)
  42. }
  43. var MSPOOL = 20
  44. var MapSocketArr = make([]*MapSocket, MSPOOL)
  45. // 初始化
  46. func init() {
  47. for i := 0; i < MSPOOL; i++ {
  48. ms := &MapSocket{Map: map[string]*Wss{}}
  49. go ms.GC()
  50. MapSocketArr[i] = ms
  51. }
  52. redis.InitRedisLogin(public.DbConf.Redis.Login.Address)
  53. }
  54. func GetRedisPoolVal() {
  55. redis.GetLoginVal("loginCode", GetWsByCode)
  56. // var res string
  57. // for {
  58. // res = redis.GetLoginVal("loginCode")
  59. // if res != "" {
  60. // param := strings.Split(res, ",")
  61. // ok := GetWsByCode(param)
  62. // log.Println("ok:", ok)
  63. // }
  64. // }
  65. }
  66. // 根据代码和ws做映射
  67. func PutWsByCode(src string, ws *websocket.Conn, session *httpsession.Session) {
  68. defer qutil.Catch()
  69. n := HashVal(src) % MSPOOL
  70. ms := MapSocketArr[n]
  71. ms.Lock.Lock()
  72. if ms.Map[src] == nil {
  73. ms.Map[src] = &Wss{ws, session, time.Now().Unix()}
  74. } else {
  75. wss := ms.Map[src]
  76. if ws != nil {
  77. wss.Conn = ws
  78. }
  79. if session != nil {
  80. wss.session = session
  81. }
  82. wss.Time = time.Now().Unix()
  83. }
  84. ms.Lock.Unlock()
  85. }
  86. // 计算代码的hash值
  87. func HashVal(src string) int {
  88. check := 0
  89. for i := len(src) / 2; i < len(src); i++ {
  90. check += int(src[i])
  91. }
  92. return check
  93. }
  94. // rpc回调,写到前台
  95. func GetWsByCode(param []string) bool {
  96. if len(param) < 2 {
  97. return false
  98. }
  99. src := param[0]
  100. openid := param[1]
  101. if src == "" {
  102. return false
  103. }
  104. defer qutil.Catch()
  105. n := HashVal(src) % MSPOOL
  106. ms := MapSocketArr[n]
  107. defer func() {
  108. ms.Lock.Lock()
  109. delete(ms.Map, src)
  110. ms.Lock.Unlock()
  111. }()
  112. ms.Lock.Lock()
  113. wss := ms.Map[src]
  114. ms.Lock.Unlock()
  115. if wss != nil {
  116. session := wss.session
  117. if session == nil && wss.Conn != nil {
  118. session = wss.Conn.Sess
  119. }
  120. if session == nil {
  121. log.Println("error:rpc back has no session!")
  122. return false
  123. }
  124. infoData := LoginInfo(src, openid, session)
  125. //用户信息正常 且 当前二维码 code被绑定
  126. if infoData != nil {
  127. baseUserId := qutil.Int64All(session.Get("base_user_id"))
  128. if erStr := redis.GetStr("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)); erStr != "" && baseUserId > 0 {
  129. infoData["cValue"] = encrypt.SE.EncodeString(strconv.FormatInt(baseUserId, 10))
  130. infoData["cName"] = jyutil.KeepLoginCookieName
  131. expiresHour := qutil.IntAll(Sysconfig["setSessionTimeout"])
  132. expires := time.Now().Add(time.Duration(expiresHour) * time.Hour)
  133. infoData["expires"] = expires.Format("2006-01-02 15:04:05")
  134. //往cookie加入标识 保证用户登录状态延期
  135. if ok := redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, src)) && redis.Del("newother", fmt.Sprintf(jyutil.KeepLoginTimeKey, erStr)); !ok {
  136. log.Println(fmt.Sprintf("%d 清除用户登录延期标识 异常", baseUserId))
  137. }
  138. //登录source 更新
  139. //收到webSocket消息后,您无法设置cookie.一旦建立了webSocket连接,它就是一个开放的TCP套接字,协议不再是http,因此没有内置的方式来交换cookie.
  140. //jyutil.SetCookieValueForAutoLogin(wss.Conn.W, baseUserId) 不可用
  141. //您可以将自己的webSocket消息发送回客户端,告知它设置cookie,然后在客户端中侦听该消息,当它收到该消息时,它可以在浏览器中设置cookie.
  142. }
  143. }
  144. if wss.Conn == nil {
  145. return true
  146. }
  147. sendmessage, _ := json.Marshal(infoData)
  148. if err := websocket.Message.Send(wss.Conn, string(sendmessage)); err != nil {
  149. log.Println("socket send fail..", err)
  150. return false
  151. } else {
  152. wss.Conn.Close()
  153. }
  154. return true
  155. } else {
  156. return false
  157. }
  158. }
  159. // 用户登录信息
  160. func LoginInfo(shareid, openid string, Sess interface{}) map[string]interface{} {
  161. if Sess == nil {
  162. return nil
  163. }
  164. sess, _ := Sess.(*httpsession.Session)
  165. if openid != "" {
  166. ok, user, infoData := FindUserAndCreateSess(openid, sess, "pc", true, true)
  167. if ok {
  168. (*user)["shareid"] = shareid
  169. sess.Set("user", *user)
  170. infoData["shareid"] = shareid
  171. sess.Set("rpcBackUserInfo", infoData)
  172. }
  173. return infoData
  174. }
  175. return nil
  176. }
  177. // ServeWss 登录关注
  178. func ServeWss(conn *websocket.Conn) {
  179. defer qutil.Catch()
  180. conn.Sess = xweb.RootApp().SessionManager.Session(conn.R, conn.W)
  181. var shareIds string
  182. for {
  183. var shareData string
  184. err := websocket.Message.Receive(conn, &shareData)
  185. if err != nil {
  186. //log.Println("前台socket关闭,后台socket断开并退出循环。。。。")
  187. break
  188. } else {
  189. //心跳监测
  190. if shareData == "HeartBeat" {
  191. websocket.Message.Send(conn, "HeartBeat")
  192. continue
  193. }
  194. shareIds = shareData
  195. shareidlist := strings.Split(shareIds, "___")
  196. if shareIds != "" && len(shareidlist) > 1 {
  197. shareidnum := shareidlist[0]
  198. shareidkop := shareidlist[1]
  199. PutWsByCode(se.DecodeString(shareidnum), conn, nil)
  200. PutWsByCode(se.DecodeString(shareidkop), conn, nil)
  201. }
  202. }
  203. }
  204. }
  205. // 实验室
  206. func QrToLabWss(conn *websocket.Conn) {
  207. defer qutil.Catch()
  208. var receive, userId string
  209. var qrToLab_ok, qrToLab_open_ok bool
  210. //接收消息
  211. go func() {
  212. defer qutil.Catch()
  213. for {
  214. err := websocket.Message.Receive(conn, &receive)
  215. if err != nil {
  216. receive = "close"
  217. //log.Println("websocket接收失败!", err)
  218. return
  219. }
  220. if receive == "close" { //关闭
  221. return
  222. } else if receive == "HeartBeat" { //心跳监测
  223. websocket.Message.Send(conn, "HeartBeat")
  224. } else { //接收到userid
  225. userId = se.DecodeString(receive)
  226. }
  227. }
  228. }()
  229. //发送消息
  230. for {
  231. time.Sleep(2 * time.Second)
  232. if receive == "close" { //接收到关闭信息
  233. conn.Close()
  234. return
  235. } else if userId == "" {
  236. continue
  237. }
  238. var reply string
  239. //是否进入实验室
  240. if !qrToLab_ok {
  241. qrToLab_ok, _ = redis.Exists("other", "qrToLab_"+userId)
  242. if qrToLab_ok {
  243. reply = "qrToLab_ok"
  244. }
  245. }
  246. //是否打开开关
  247. if !qrToLab_open_ok {
  248. qrToLab_open_ok, _ = redis.Exists("other", "qrToLab_open_"+userId)
  249. if qrToLab_open_ok {
  250. reply = "qrToLab_open_ok"
  251. }
  252. }
  253. if reply == "" {
  254. continue
  255. }
  256. if err := websocket.Message.Send(conn, reply); err != nil {
  257. redis.Del("other", "qrToLab_"+userId)
  258. redis.Del("other", "qrToLab_open_"+userId)
  259. //log.Println("websocket发送失败!", err)
  260. conn.Close()
  261. return
  262. }
  263. if reply == "qrToLab_ok" {
  264. redis.Del("other", "qrToLab_"+userId)
  265. } else if reply == "qrToLab_open_ok" {
  266. redis.Del("other", "qrToLab_open_"+userId)
  267. }
  268. if qrToLab_ok && qrToLab_open_ok {
  269. conn.Close()
  270. return
  271. }
  272. }
  273. }
  274. // 电脑端强制分享,扫码分享到朋友圈
  275. func QrToShareTimeline(conn *websocket.Conn) {
  276. defer qutil.Catch()
  277. var receive string
  278. //接收消息
  279. sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
  280. userid, _ := sess.Get("userId").(string)
  281. key := ""
  282. if userid != "" {
  283. key = fmt.Sprintf("pcbiddetail_shareTimeline_%s", userid)
  284. }
  285. go func() {
  286. defer qutil.Catch()
  287. for {
  288. err := websocket.Message.Receive(conn, &receive)
  289. if err != nil {
  290. receive = "close"
  291. //log.Println("websocket接收失败!", err)
  292. return
  293. }
  294. if receive == "close" { //关闭
  295. return
  296. } else if receive == "HeartBeat" { //心跳监测
  297. websocket.Message.Send(conn, "HeartBeat")
  298. }
  299. }
  300. }()
  301. //发送消息
  302. for {
  303. time.Sleep(2 * time.Second)
  304. if receive == "close" { //接收到关闭信息
  305. conn.Close()
  306. return
  307. }
  308. reply := "n"
  309. if key != "" {
  310. ok, _ := redis.Exists("other", key)
  311. if ok {
  312. reply = "y"
  313. }
  314. }
  315. if err := websocket.Message.Send(conn, reply); err != nil {
  316. redis.Del("other", key)
  317. //log.Println("websocket发送失败!", err)
  318. conn.Close()
  319. return
  320. }
  321. if reply == "y" {
  322. redis.Del("other", key)
  323. conn.Close()
  324. return
  325. }
  326. }
  327. }
  328. // pc弹框 最新一条消息
  329. func GetBuoyMsg(conn *websocket.Conn) {
  330. defer qutil.Catch()
  331. sess := xweb.RootApp().SessionManager.Session(conn.R, conn.W)
  332. userid, _ := sess.Get("userId").(string)
  333. if userid == "" {
  334. return
  335. }
  336. messageCenter, _ := Sysconfig["messageCenter"].(map[string]interface{})
  337. appid, _ := messageCenter["appid"].(string)
  338. dbName, _ := messageCenter["dbName"].(string)
  339. createtime := qutil.ObjToString(messageCenter["createtime"])
  340. interval := qutil.IntAllDef(messageCenter["interval"], 300)
  341. limitCount := qutil.IntAllDef(messageCenter["limitCount"], 3)
  342. go func() {
  343. defer qutil.Catch()
  344. for {
  345. var receive string
  346. err := websocket.Message.Receive(conn, &receive)
  347. if err != nil {
  348. //log.Println("websocket接收失败!", err)
  349. return
  350. }
  351. if receive == "close" { //关闭
  352. return
  353. } else if receive == "HeartBeat" { //心跳监测
  354. websocket.Message.Send(conn, "HeartBeat")
  355. }
  356. }
  357. }()
  358. //发送消息
  359. counter := 0
  360. isFirst := true
  361. for {
  362. counter++
  363. if counter%5 == 0 {
  364. if err := websocket.Message.Send(conn, "HeartBeat"); err != nil {
  365. conn.Close()
  366. return
  367. }
  368. }
  369. if isFirst || counter >= interval {
  370. counter = 0
  371. timeA := time.Now().AddDate(0, 0, -qutil.IntAllDef(messageCenter["limitDay"], 7))
  372. timeB, err := time.ParseInLocation(Date_Full_Layout, createtime, time.Local)
  373. if err == nil && timeA.After(timeB) {
  374. createtime = FormatDate(&timeA, Date_Full_Layout)
  375. }
  376. list := public.BaseMysql.SelectBySql(`select id,title,link,content,show_content,pc_tip,isdel,isRead from `+dbName+`.message where receive_userid=? and appid=? and createtime>? order by createtime desc limit ?`, userid, appid, createtime, limitCount)
  377. if list != nil {
  378. for _, v := range *list {
  379. if qutil.IntAll(v["pc_tip"]) == 1 || qutil.IntAll(v["isdel"]) != 1 || qutil.IntAll(v["isRead"]) == 1 {
  380. continue
  381. }
  382. id := qutil.Int64All(v["id"])
  383. reply := fmt.Sprintf(`{"id":%d,"title":"%s","content":"%s","show_content":"%s","link":"%s"}`, id, qutil.ObjToString(v["title"]), qutil.ObjToString(v["content"]), qutil.ObjToString(v["show_content"]), strings.Split(qutil.ObjToString(v["link"]), ",")[0])
  384. if err := websocket.Message.Send(conn, reply); err != nil {
  385. //log.Println("websocket发送失败!", err)
  386. conn.Close()
  387. return
  388. }
  389. if id > 0 {
  390. public.BaseMysql.UpdateOrDeleteBySql(`update `+dbName+`.message set pc_tip=1 where id=?`, id)
  391. }
  392. time.Sleep(time.Second)
  393. }
  394. }
  395. }
  396. isFirst = false
  397. time.Sleep(time.Second)
  398. }
  399. }