newSendMsgService.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  5. "app.yhyue.com/moapp/jybase/common"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "log"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var MsgGroupIdMap map[int]int
  17. func SetMsgSummary(newMsg, groupId, msgType int64) error {
  18. //更新所有消息
  19. if groupId == 11 {
  20. //groupId = msgType
  21. //根据msgType更新待办二级分类汇总
  22. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType))
  23. if err1 != nil {
  24. //插入失败
  25. return err1
  26. }
  27. }
  28. err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId))
  29. if err != nil {
  30. return err
  31. }
  32. return nil
  33. }
  34. func NewUserSendMsg(in *message.NewUserInsertMsgReq) string {
  35. userIdArr := strings.Split(in.UserIds, ",")
  36. positionIdArr := strings.Split(in.PositionIds, ",")
  37. if len(userIdArr) == 0 {
  38. return "无效的用户id"
  39. }
  40. wg := &sync.WaitGroup{}
  41. group_id := MsgGroupIdMap[int(in.MsgType)]
  42. for i := 0; i < len(userIdArr); i++ {
  43. if userIdArr[i] == "" {
  44. continue
  45. }
  46. //查询
  47. wg.Add(1)
  48. entity.SaveConcurrencyChan <- 1
  49. var positionId int64
  50. if len(positionIdArr) == len(userIdArr) {
  51. positionId = common.Int64All(positionIdArr[i])
  52. }
  53. go func(v string, positionId int64) {
  54. defer func() {
  55. <-entity.SaveConcurrencyChan
  56. wg.Done()
  57. }()
  58. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", v))
  59. var count uint64
  60. row.Scan(&count)
  61. if count > 0 { //存在则更新
  62. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, v))
  63. if err1 != nil {
  64. log.Println("新用户update message_user_summary表出错,error", err1)
  65. return
  66. }
  67. } else {
  68. //新用户需要insert
  69. sql := "INSERT INTO message_user_summary values "
  70. sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", v, int(in.MsgLogId))
  71. fmt.Println("sql", sql)
  72. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  73. if err1 != nil {
  74. //插入失败
  75. log.Println("新用户insert message_user_summary表出错,error", err1)
  76. return
  77. }
  78. }
  79. //微信推送模板消息、app push
  80. pushData := WxTmplAndPush{
  81. MsgType: in.MsgType,
  82. Title: in.Title,
  83. Content: in.Content,
  84. WxPushUrl: in.WxPushUrl,
  85. AppPushUrl: in.AppPushUrl,
  86. ProductName: in.ProductName,
  87. OrderId: in.OrderId,
  88. OrderMoney: in.OrderMoney,
  89. Row4: in.Row4,
  90. }
  91. SentWxTmplAndAppPush(pushData, v, group_id)
  92. }(userIdArr[i], positionId)
  93. }
  94. wg.Wait()
  95. return ""
  96. }
  97. func UpdateUserMsgSummary(in *message.BitmapSaveMsgReq) error {
  98. userIdArr := strings.Split(in.UserIds, ",")
  99. //positionIdArr := strings.Split(in.PositionIds, ",")
  100. if len(userIdArr) == 0 {
  101. return errors.New("无效的用户id")
  102. }
  103. wg := &sync.WaitGroup{}
  104. group_id := MsgGroupIdMap[int(in.MsgType)]
  105. str := ""
  106. for k, v := range userIdArr {
  107. if k != 0 {
  108. str += ","
  109. }
  110. str += "'" + v + "'"
  111. }
  112. fmt.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, in.MsgLogId, str))
  113. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, in.MsgLogId, str))
  114. if err1 != nil {
  115. return err1
  116. }
  117. for i := 0; i < len(userIdArr); i++ {
  118. if userIdArr[i] == "" {
  119. continue
  120. }
  121. //查询
  122. wg.Add(1)
  123. entity.SaveConcurrencyChan <- 1
  124. /*var positionId int64
  125. if len(positionIdArr) == len(userIdArr) {
  126. positionId = common.Int64All(positionIdArr[i])
  127. }*/
  128. go func(v string) {
  129. defer func() {
  130. <-entity.SaveConcurrencyChan
  131. wg.Done()
  132. }()
  133. //微信推送模板消息、app push
  134. pushData := WxTmplAndPush{
  135. MsgType: in.MsgType,
  136. Title: in.Title,
  137. Content: in.Content,
  138. WxPushUrl: in.WxPushUrl,
  139. AppPushUrl: in.AppPushUrl,
  140. ProductName: in.ProductName,
  141. OrderId: in.OrderId,
  142. OrderMoney: in.OrderMoney,
  143. Row4: in.Row4,
  144. }
  145. SentWxTmplAndAppPush(pushData, v, group_id)
  146. }(userIdArr[i])
  147. }
  148. wg.Wait()
  149. return nil
  150. }
  151. func ConvertToBitmap(num int) (res []uint32) {
  152. binary := strconv.FormatInt(int64(num), 2)
  153. total := len(binary)
  154. for i := total - 1; i >= 0; i-- {
  155. if binary[i] == '1' {
  156. res = append(res, uint32(total-i))
  157. }
  158. }
  159. return
  160. }
  161. type WxTmplAndPush struct {
  162. MsgType int64
  163. Title string
  164. Content string
  165. WxPushUrl string
  166. AppPushUrl string
  167. ProductName string
  168. OrderId string
  169. OrderMoney string
  170. Row4 string
  171. SendUserId string
  172. }
  173. func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int) {
  174. nTime := time.Now().Format("2006-01-02 15:04:05")
  175. //发送消息成功,推送微信、app
  176. //fmt.Println("this.MsgType", this.MsgType)
  177. pushConfig, err := GetWxTmplConfig(this.MsgType)
  178. if err != nil {
  179. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  180. }
  181. p := &WxTmplPush{
  182. Config: pushConfig,
  183. }
  184. p.MgoId = v
  185. if this.MsgType == 10 {
  186. this.Title = this.ProductName
  187. this.Content = this.OrderId
  188. nTime = this.OrderMoney
  189. }
  190. // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}}
  191. if this.MsgType != 1 && this.MsgType != 10 {
  192. err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4)
  193. if err != nil {
  194. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  195. } else {
  196. logx.Infof("SendWxTmplMsg uId success %s ", v)
  197. }
  198. }
  199. if this.MsgType == 1 {
  200. mst := new(WxTmplConfig)
  201. mst.Switch = AppPushMsgType[group_id]
  202. p.Config = mst
  203. }
  204. uData := p.GetUserPushInfo()
  205. //app推送
  206. if this.MsgType != 10 {
  207. category := ""
  208. if this.SendUserId == "cbgl" {
  209. category = "服务通知_工作事项"
  210. }
  211. if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil {
  212. logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error()))
  213. }
  214. }
  215. }