newSendMsgService.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
  5. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  6. "app.yhyue.com/moapp/jybase/redis"
  7. "context"
  8. "errors"
  9. "fmt"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "log"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var MsgGroupIdMap map[int]int
  18. func SetMsgSummary(newMsg, groupId, msgType int64) error {
  19. //更新所有消息
  20. if groupId == 11 {
  21. groupId = msgType
  22. //根据msgType更新待办二级分类汇总
  23. /*err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType))
  24. if err1 != nil {
  25. //插入失败
  26. return err1
  27. }*/
  28. }
  29. err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId))
  30. if err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35. func NewUserSendMsg(in *message.NewUserInsertMsgReq) error {
  36. userIdArr := strings.Split(in.UserIds, ",")
  37. //positionIdArr := strings.Split(in.PositionIds, ",")
  38. if len(userIdArr) == 0 {
  39. return errors.New("无效的用户id")
  40. }
  41. //wg := &sync.WaitGroup{}
  42. group_id := MsgGroupIdMap[int(in.MsgType)]
  43. for i := 0; i < len(userIdArr); i++ {
  44. if userIdArr[i] == "" {
  45. continue
  46. }
  47. //查询
  48. /*var positionId int64
  49. if len(positionIdArr) == len(userIdArr) {
  50. positionId = common.Int64All(positionIdArr[i])
  51. }*/
  52. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", userIdArr[i]))
  53. var count uint64
  54. row.Scan(&count)
  55. if count > 0 { //存在则更新
  56. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, userIdArr[i]))
  57. if err1 != nil {
  58. log.Println("新用户update message_user_summary表出错,error", err1)
  59. return err1
  60. }
  61. } else {
  62. //新用户需要insert
  63. sql := "INSERT INTO message_user_summary values "
  64. sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", userIdArr[i], int(in.MsgLogId))
  65. log.Println("sql", sql)
  66. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  67. if err1 != nil {
  68. //插入失败
  69. log.Println("新用户insert message_user_summary表出错,error", err1)
  70. return err1
  71. }
  72. }
  73. //微信推送模板消息、app push
  74. pushData := WxTmplAndPush{
  75. MsgType: in.MsgType,
  76. Title: in.Title,
  77. Content: in.Content,
  78. WxPushUrl: in.WxPushUrl,
  79. AppPushUrl: in.AppPushUrl,
  80. ProductName: in.ProductName,
  81. OrderId: in.OrderId,
  82. OrderMoney: in.OrderMoney,
  83. Row4: in.Row4,
  84. }
  85. SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "")
  86. /*key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
  87. redis.Del(redisModule, key)
  88. if in.MsgType == 11 || in.MsgType == 12 {
  89. key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType)
  90. redis.Del(redisModule, key1)
  91. }*/
  92. DelRedis(userIdArr[i], in.MsgType, group_id)
  93. }
  94. //wg.Wait()
  95. return nil
  96. }
  97. func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
  98. userIdArr := strings.Split(in.UserIds, ",")
  99. //positionIdArr := strings.Split(in.PositionIds, ",")
  100. if len(userIdArr) == 0 {
  101. return errors.New("无效的用户id")
  102. }
  103. wg := &sync.WaitGroup{}
  104. group_id := MsgGroupIdMap[int(in.MsgType)]
  105. var ids []string
  106. for _, v := range userIdArr {
  107. ids = append(ids, fmt.Sprintf(`'%s'`, v))
  108. if len(ids) == 1000 {
  109. idStr := strings.Join(ids, ",")
  110. go Update(idStr, in.MsgLogId)
  111. ids = []string{}
  112. }
  113. }
  114. if len(ids) > 0 {
  115. go Update(strings.Join(ids, ","), in.MsgLogId)
  116. }
  117. //p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址
  118. equityName, equityAddr := "", ""
  119. if in.MsgType == config.ConfigJson.EquityInfoMsgType {
  120. equityRs := strings.Split(in.Content, "#jy#")
  121. if len(equityRs) != 3 {
  122. log.Println("消息内容格式有误:", in.Content)
  123. return errors.New("无效的消息内容格式")
  124. }
  125. in.Content = equityRs[0]
  126. equityName = equityRs[1]
  127. equityAddr = equityRs[2]
  128. }
  129. for i := 0; i < len(userIdArr); i++ {
  130. if userIdArr[i] == "" {
  131. continue
  132. }
  133. //查询
  134. wg.Add(1)
  135. entity.SaveConcurrencyChan <- 1
  136. /*var positionId int64
  137. if len(positionIdArr) == len(userIdArr) {
  138. positionId = common.Int64All(positionIdArr[i])
  139. }*/
  140. go func(v string) {
  141. defer func() {
  142. <-entity.SaveConcurrencyChan
  143. wg.Done()
  144. }()
  145. //微信推送模板消息、app push
  146. pushData := WxTmplAndPush{
  147. MsgType: in.MsgType,
  148. Title: in.Title,
  149. Content: in.Content,
  150. WxPushUrl: in.WxPushUrl,
  151. AppPushUrl: in.AppPushUrl,
  152. ProductName: in.ProductName,
  153. OrderId: in.OrderId,
  154. OrderMoney: in.OrderMoney,
  155. Row4: in.Row4,
  156. }
  157. SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr)
  158. /*key := fmt.Sprintf(MsgCountKey, v, group_id)
  159. redis.Del(redisModule, key)
  160. if in.MsgType == 11 || in.MsgType == 12 {
  161. key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
  162. redis.Del(redisModule, key1)
  163. }*/
  164. DelRedis(v, in.MsgType, group_id)
  165. }(userIdArr[i])
  166. }
  167. wg.Wait()
  168. return nil
  169. }
  170. func DelRedis(userId string, msgType int64, groupId int) {
  171. key := fmt.Sprintf(MsgCountKey, userId, groupId)
  172. redis.Del(redisModule, key)
  173. if msgType == 11 || msgType == 12 {
  174. key1 := fmt.Sprintf(MsgClassCountKey, userId, msgType)
  175. redis.Del(redisModule, key1)
  176. }
  177. if groupId == 5 || groupId == 11 {
  178. redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, userId))
  179. }
  180. redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, userId))
  181. redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId))
  182. }
  183. func Update(str string, msgLogId int64) {
  184. log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
  185. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
  186. if err1 != nil {
  187. log.Printf("批量更新message_user_summary出错:%s", err1)
  188. return
  189. }
  190. }
  191. //附件下载、剑鱼币活动到期提醒存消息发送记录
  192. func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 {
  193. groupId := MsgGroupIdMap[int(in.MsgType)]
  194. id := entity.Mysql.Insert("message_send_log", map[string]interface{}{
  195. "send_usergroup_id": "",
  196. "send_usergroup_name": "",
  197. "msg_type": in.MsgType,
  198. "title": in.Title,
  199. "content": in.Content,
  200. "send_mode": 2,
  201. "send_time": time.Now().Format("2006-01-02 15:04:05"),
  202. "send_status": 4,
  203. "update_time": time.Now().Format("2006-01-02 15:04:05"),
  204. "link": in.Link,
  205. "isdel": 1,
  206. "send_userid": "",
  207. "update_user": "",
  208. "Sign": 5,
  209. "menu_name": "message",
  210. "group_id": groupId,
  211. })
  212. if id > 0 {
  213. //更新消息汇总表
  214. err := SetMsgSummary(id, int64(groupId), in.MsgType)
  215. if err != nil {
  216. log.Println("更新消息汇总表出错:", err)
  217. return 0
  218. }
  219. return id
  220. }
  221. return 0
  222. }
  223. func ConvertToBitmap(num int) (res []uint32) {
  224. binary := strconv.FormatInt(int64(num), 2)
  225. total := len(binary)
  226. for i := total - 1; i >= 0; i-- {
  227. if binary[i] == '1' {
  228. res = append(res, uint32(total-i))
  229. }
  230. }
  231. return
  232. }
  233. type WxTmplAndPush struct {
  234. MsgType int64
  235. Title string
  236. Content string
  237. WxPushUrl string
  238. AppPushUrl string
  239. ProductName string
  240. OrderId string
  241. OrderMoney string
  242. Row4 string
  243. SendUserId string
  244. }
  245. func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int, equityName, equityAddr string) {
  246. nTime := time.Now().Format("2006-01-02 15:04:05")
  247. //发送消息成功,推送微信、app
  248. //fmt.Println("this.MsgType", this.MsgType)
  249. pushConfig, err := GetWxTmplConfig(this.MsgType)
  250. if err != nil {
  251. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  252. }
  253. p := &WxTmplPush{
  254. Config: pushConfig,
  255. }
  256. p.MgoId = v
  257. if this.MsgType == 10 {
  258. this.Title = this.ProductName
  259. this.Content = this.OrderId
  260. nTime = this.OrderMoney
  261. }
  262. // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}}
  263. if this.MsgType != 1 && this.MsgType != 10 {
  264. if this.MsgType == config.ConfigJson.EquityInfoMsgType {
  265. // p459 服务地址特殊处理
  266. err = p.SendMsg(this.WxPushUrl, this.Title, equityName, nTime, this.Row4, equityAddr)
  267. } else {
  268. err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4, "")
  269. }
  270. if err != nil {
  271. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  272. } else {
  273. logx.Infof("SendWxTmplMsg uId success %s ", v)
  274. }
  275. }
  276. if this.MsgType == 1 {
  277. mst := new(WxTmplConfig)
  278. mst.Switch = AppPushMsgType[group_id]
  279. p.Config = mst
  280. }
  281. uData := p.GetUserPushInfo()
  282. //app推送
  283. if this.MsgType != 10 {
  284. category := ""
  285. if this.SendUserId == "cbgl" {
  286. category = "服务通知_工作事项"
  287. }
  288. if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil {
  289. logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error()))
  290. }
  291. }
  292. }