newSendMsgService.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
  5. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  6. "app.yhyue.com/moapp/jybase/redis"
  7. "context"
  8. "errors"
  9. "fmt"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "log"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var MsgGroupIdMap map[int]int
  18. func SetMsgSummary(newMsg, groupId, msgType int64) error {
  19. //更新所有消息
  20. gId := groupId
  21. if groupId == 11 {
  22. groupId = msgType
  23. //根据msgType更新待办二级分类汇总
  24. /*err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType))
  25. if err1 != nil {
  26. //插入失败
  27. return err1
  28. }*/
  29. }
  30. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_summary where group_id = %d", msgType))
  31. var count uint64
  32. row.Scan(&count)
  33. var err error
  34. if count > 0 { //存在则更新
  35. err = entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId))
  36. } else {
  37. log.Println("插入消息汇总表:", fmt.Sprintf("INSERT INTO message_summary values (%d,bitmapBuild([toUInt64(%d)]),%d)", msgType, newMsg, gId))
  38. err = entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf("INSERT INTO message_summary values (%d,bitmapBuild([toUInt64(%d)]),%d)", msgType, newMsg, gId))
  39. }
  40. if err != nil {
  41. return err
  42. }
  43. return nil
  44. }
  45. func NewUserSendMsg(in *message.NewUserInsertMsgReq) error {
  46. userIdArr := strings.Split(in.UserIds, ",")
  47. //positionIdArr := strings.Split(in.PositionIds, ",")
  48. if len(userIdArr) == 0 {
  49. return errors.New("无效的用户id")
  50. }
  51. //wg := &sync.WaitGroup{}
  52. group_id := MsgGroupIdMap[int(in.MsgType)]
  53. for i := 0; i < len(userIdArr); i++ {
  54. if userIdArr[i] == "" {
  55. continue
  56. }
  57. //查询
  58. /*var positionId int64
  59. if len(positionIdArr) == len(userIdArr) {
  60. positionId = common.Int64All(positionIdArr[i])
  61. }*/
  62. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", userIdArr[i]))
  63. var count uint64
  64. row.Scan(&count)
  65. if count > 0 { //存在则更新
  66. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, userIdArr[i]))
  67. if err1 != nil {
  68. log.Println("新用户update message_user_summary表出错,error", err1)
  69. return err1
  70. }
  71. } else {
  72. //新用户需要insert
  73. sql := "INSERT INTO message_user_summary values "
  74. sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", userIdArr[i], int(in.MsgLogId))
  75. log.Println("sql", sql)
  76. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  77. if err1 != nil {
  78. //插入失败
  79. log.Println("新用户insert message_user_summary表出错,error", err1)
  80. return err1
  81. }
  82. }
  83. //微信推送模板消息、app push
  84. pushData := WxTmplAndPush{
  85. MsgType: in.MsgType,
  86. Title: in.Title,
  87. Content: in.Content,
  88. WxPushUrl: in.WxPushUrl,
  89. AppPushUrl: in.AppPushUrl,
  90. ProductName: in.ProductName,
  91. OrderId: in.OrderId,
  92. OrderMoney: in.OrderMoney,
  93. Row4: in.Row4,
  94. }
  95. SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "")
  96. /*key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
  97. redis.Del(redisModule, key)
  98. if in.MsgType == 11 || in.MsgType == 12 {
  99. key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType)
  100. redis.Del(redisModule, key1)
  101. }*/
  102. DelRedis(userIdArr[i], in.MsgType, group_id)
  103. }
  104. //wg.Wait()
  105. return nil
  106. }
  107. func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
  108. userIdArr := strings.Split(in.UserIds, ",")
  109. //positionIdArr := strings.Split(in.PositionIds, ",")
  110. if len(userIdArr) == 0 {
  111. return errors.New("无效的用户id")
  112. }
  113. wg := &sync.WaitGroup{}
  114. group_id := MsgGroupIdMap[int(in.MsgType)]
  115. var ids []string
  116. for _, v := range userIdArr {
  117. ids = append(ids, v)
  118. if len(ids) == 1000 {
  119. go Update(ids, in.MsgLogId)
  120. ids = []string{}
  121. }
  122. }
  123. if len(ids) > 0 {
  124. go Update(ids, in.MsgLogId)
  125. }
  126. //p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址
  127. equityName, equityAddr := "", ""
  128. if in.MsgType == config.ConfigJson.EquityInfoMsgType {
  129. equityRs := strings.Split(in.Content, "#jy#")
  130. if len(equityRs) != 3 {
  131. log.Println("消息内容格式有误:", in.Content)
  132. return errors.New("无效的消息内容格式")
  133. }
  134. in.Content = equityRs[0]
  135. equityName = equityRs[1]
  136. equityAddr = equityRs[2]
  137. }
  138. for i := 0; i < len(userIdArr); i++ {
  139. if userIdArr[i] == "" {
  140. continue
  141. }
  142. //查询
  143. wg.Add(1)
  144. entity.SaveConcurrencyChan <- 1
  145. /*var positionId int64
  146. if len(positionIdArr) == len(userIdArr) {
  147. positionId = common.Int64All(positionIdArr[i])
  148. }*/
  149. go func(v string) {
  150. defer func() {
  151. <-entity.SaveConcurrencyChan
  152. wg.Done()
  153. }()
  154. //微信推送模板消息、app push
  155. pushData := WxTmplAndPush{
  156. MsgType: in.MsgType,
  157. Title: in.Title,
  158. Content: in.Content,
  159. WxPushUrl: in.WxPushUrl,
  160. AppPushUrl: in.AppPushUrl,
  161. ProductName: in.ProductName,
  162. OrderId: in.OrderId,
  163. OrderMoney: in.OrderMoney,
  164. Row4: in.Row4,
  165. }
  166. SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr)
  167. /*key := fmt.Sprintf(MsgCountKey, v, group_id)
  168. redis.Del(redisModule, key)
  169. if in.MsgType == 11 || in.MsgType == 12 {
  170. key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
  171. redis.Del(redisModule, key1)
  172. }*/
  173. DelRedis(v, in.MsgType, group_id)
  174. }(userIdArr[i])
  175. }
  176. wg.Wait()
  177. return nil
  178. }
  179. func DelRedis(userId string, msgType int64, groupId int) {
  180. key := fmt.Sprintf(MsgCountKey, userId, groupId)
  181. redis.Del(redisModule, key)
  182. if msgType == 11 || msgType == 12 {
  183. key1 := fmt.Sprintf(MsgClassCountKey, userId, msgType)
  184. redis.Del(redisModule, key1)
  185. }
  186. if groupId == 5 || groupId == 11 {
  187. redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, userId))
  188. }
  189. redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, userId))
  190. redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId))
  191. }
  192. func Update(ids []string, msgLogId int64) {
  193. // 查询是否存在 都存在updae 不存在的插入
  194. userMap := make(map[string]bool)
  195. row, err := entity.ClickhouseConn.Query(context.Background(), fmt.Sprintf("SELECT userId FROM message_user_summary where userId in ('%s')", strings.Join(ids, `','`)))
  196. if err != nil {
  197. log.Println("查询消息失败")
  198. return
  199. }
  200. for row.Next() {
  201. var userId string
  202. _ = row.Scan(&userId)
  203. if userId != "" {
  204. userMap[userId] = true
  205. }
  206. }
  207. if len(userMap) == len(ids) {
  208. log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
  209. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(ids, `','`)))
  210. if err1 != nil {
  211. log.Printf("批量更新message_user_summary出错:%s", err1)
  212. return
  213. }
  214. } else {
  215. log.Println("匹配到数据量:", len(userMap))
  216. var uData, sqlArr []string
  217. for _, id := range ids {
  218. if userMap[id] {
  219. uData = append(uData, id)
  220. } else {
  221. sqlArr = append(sqlArr, fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", id, msgLogId))
  222. }
  223. }
  224. log.Println("缺失数据量:", len(sqlArr))
  225. if len(uData) > 0 {
  226. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in ('%s')`, msgLogId, strings.Join(uData, `','`)))
  227. if err1 != nil {
  228. log.Printf("批量更新message_user_summary出错:%s", err1)
  229. }
  230. }
  231. if len(sqlArr) > 0 {
  232. //新用户需要insert
  233. sql := "INSERT INTO message_user_summary values %s"
  234. log.Println("sql", sql)
  235. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(sql, strings.Join(sqlArr, ",")))
  236. if err1 != nil {
  237. //插入失败
  238. log.Println("insert message_user_summary表出错,error", err1)
  239. }
  240. }
  241. }
  242. }
  243. // 附件下载、剑鱼币活动到期提醒存消息发送记录
  244. func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 {
  245. groupId := MsgGroupIdMap[int(in.MsgType)]
  246. id := entity.Mysql.Insert("message_send_log", map[string]interface{}{
  247. "send_usergroup_id": "",
  248. "send_usergroup_name": "",
  249. "msg_type": in.MsgType,
  250. "title": in.Title,
  251. "content": in.Content,
  252. "send_mode": 2,
  253. "send_time": time.Now().Format("2006-01-02 15:04:05"),
  254. "send_status": 4,
  255. "update_time": time.Now().Format("2006-01-02 15:04:05"),
  256. "link": in.Link,
  257. "isdel": 1,
  258. "send_userid": "",
  259. "update_user": "",
  260. "Sign": 5,
  261. "menu_name": "message",
  262. "group_id": groupId,
  263. })
  264. if id > 0 {
  265. //更新消息汇总表
  266. err := SetMsgSummary(id, int64(groupId), in.MsgType)
  267. if err != nil {
  268. log.Println("更新消息汇总表出错:", err)
  269. return 0
  270. }
  271. return id
  272. }
  273. return 0
  274. }
  275. func ConvertToBitmap(num int) (res []uint32) {
  276. binary := strconv.FormatInt(int64(num), 2)
  277. total := len(binary)
  278. for i := total - 1; i >= 0; i-- {
  279. if binary[i] == '1' {
  280. res = append(res, uint32(total-i))
  281. }
  282. }
  283. return
  284. }
  285. type WxTmplAndPush struct {
  286. MsgType int64
  287. Title string
  288. Content string
  289. WxPushUrl string
  290. AppPushUrl string
  291. ProductName string
  292. OrderId string
  293. OrderMoney string
  294. Row4 string
  295. SendUserId string
  296. }
  297. func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int, equityName, equityAddr string) {
  298. nTime := time.Now().Format("2006-01-02 15:04:05")
  299. //发送消息成功,推送微信、app
  300. //fmt.Println("this.MsgType", this.MsgType)
  301. pushConfig, err := GetWxTmplConfig(this.MsgType)
  302. if err != nil {
  303. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  304. }
  305. p := &WxTmplPush{
  306. Config: pushConfig,
  307. }
  308. p.MgoId = v
  309. if this.MsgType == 10 {
  310. this.Title = this.ProductName
  311. this.Content = this.OrderId
  312. nTime = this.OrderMoney
  313. }
  314. // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}}
  315. if this.MsgType != 1 && this.MsgType != 10 {
  316. if this.MsgType == config.ConfigJson.EquityInfoMsgType {
  317. // p459 服务地址特殊处理
  318. err = p.SendMsg(this.WxPushUrl, this.Title, equityName, nTime, this.Row4, equityAddr)
  319. } else {
  320. err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4, "")
  321. }
  322. if err != nil {
  323. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s this.MsgType:%d", v, err.Error(), this.MsgType))
  324. } else {
  325. logx.Infof("SendWxTmplMsg uId success %s ", v)
  326. }
  327. }
  328. if this.MsgType == 1 {
  329. mst := new(WxTmplConfig)
  330. mst.Switch = AppPushMsgType[group_id]
  331. p.Config = mst
  332. }
  333. uData := p.GetUserPushInfo()
  334. //app推送
  335. if this.MsgType != 10 {
  336. category := ""
  337. if this.SendUserId == "cbgl" {
  338. category = "服务通知_工作事项"
  339. }
  340. if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil {
  341. logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error()))
  342. }
  343. }
  344. }