newSendMsgService.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
  5. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/redis"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "github.com/zeromicro/go-zero/core/logx"
  12. "log"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var MsgGroupIdMap map[int]int
  19. func SetMsgSummary(newMsg, groupId, msgType int64) error {
  20. //更新所有消息
  21. if groupId == 11 {
  22. groupId = msgType
  23. //根据msgType更新待办二级分类汇总
  24. /*err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, msgType))
  25. if err1 != nil {
  26. //插入失败
  27. return err1
  28. }*/
  29. }
  30. err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([toUInt64(%d)])) where group_id = %d`, newMsg, groupId))
  31. if err != nil {
  32. return err
  33. }
  34. return nil
  35. }
  36. func NewUserSendMsg(in *message.NewUserInsertMsgReq) string {
  37. userIdArr := strings.Split(in.UserIds, ",")
  38. positionIdArr := strings.Split(in.PositionIds, ",")
  39. if len(userIdArr) == 0 {
  40. return "无效的用户id"
  41. }
  42. wg := &sync.WaitGroup{}
  43. group_id := MsgGroupIdMap[int(in.MsgType)]
  44. for i := 0; i < len(userIdArr); i++ {
  45. if userIdArr[i] == "" {
  46. continue
  47. }
  48. //查询
  49. wg.Add(1)
  50. entity.SaveConcurrencyChan <- 1
  51. var positionId int64
  52. if len(positionIdArr) == len(userIdArr) {
  53. positionId = common.Int64All(positionIdArr[i])
  54. }
  55. go func(v string, positionId int64) {
  56. defer func() {
  57. <-entity.SaveConcurrencyChan
  58. wg.Done()
  59. }()
  60. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT COUNT(*) from message_user_summary where userId = '%s'", v))
  61. var count uint64
  62. row.Scan(&count)
  63. if count > 0 { //存在则更新
  64. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, in.MsgLogId, v))
  65. if err1 != nil {
  66. log.Println("新用户update message_user_summary表出错,error", err1)
  67. return
  68. }
  69. } else {
  70. //新用户需要insert
  71. sql := "INSERT INTO message_user_summary values "
  72. sql += fmt.Sprintf(" ('%s',bitmapBuild([toUInt64(%d)]),bitmapBuild([toUInt64(0)])) ", v, int(in.MsgLogId))
  73. fmt.Println("sql", sql)
  74. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  75. if err1 != nil {
  76. //插入失败
  77. log.Println("新用户insert message_user_summary表出错,error", err1)
  78. return
  79. }
  80. }
  81. //微信推送模板消息、app push
  82. pushData := WxTmplAndPush{
  83. MsgType: in.MsgType,
  84. Title: in.Title,
  85. Content: in.Content,
  86. WxPushUrl: in.WxPushUrl,
  87. AppPushUrl: in.AppPushUrl,
  88. ProductName: in.ProductName,
  89. OrderId: in.OrderId,
  90. OrderMoney: in.OrderMoney,
  91. Row4: in.Row4,
  92. }
  93. SentWxTmplAndAppPush(pushData, v, group_id, "", "")
  94. key := fmt.Sprintf(MsgCountKey, v, group_id)
  95. redis.Del(redisModule, key)
  96. if in.MsgType == 11 || in.MsgType == 12 {
  97. key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
  98. redis.Del(redisModule, key1)
  99. }
  100. }(userIdArr[i], positionId)
  101. }
  102. wg.Wait()
  103. return ""
  104. }
  105. func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
  106. userIdArr := strings.Split(in.UserIds, ",")
  107. //positionIdArr := strings.Split(in.PositionIds, ",")
  108. if len(userIdArr) == 0 {
  109. return errors.New("无效的用户id")
  110. }
  111. wg := &sync.WaitGroup{}
  112. group_id := MsgGroupIdMap[int(in.MsgType)]
  113. str := ""
  114. i := 0
  115. for _, v := range userIdArr {
  116. i++
  117. if i != 1 {
  118. str += ","
  119. }
  120. str += "'" + v + "'"
  121. if i == 1000 {
  122. go Update(str, in.MsgLogId)
  123. str = ""
  124. i = 0
  125. }
  126. }
  127. if i > 0 {
  128. go Update(str, in.MsgLogId)
  129. }
  130. //p459 特殊处理 传过来的消息内容格式为 消息内容#jy#微信模板项目名称#jy#服务地址
  131. equityName, equityAddr := "", ""
  132. if in.MsgType == config.ConfigJson.EquityInfoMsgType {
  133. equityRs := strings.Split(in.Content, "#jy#")
  134. if len(equityRs) != 3 {
  135. log.Println("消息内容格式有误:", in.Content)
  136. return errors.New("无效的消息内容格式")
  137. }
  138. in.Content = equityRs[0]
  139. equityName = equityRs[1]
  140. equityAddr = equityRs[2]
  141. }
  142. for i := 0; i < len(userIdArr); i++ {
  143. if userIdArr[i] == "" {
  144. continue
  145. }
  146. //查询
  147. wg.Add(1)
  148. entity.SaveConcurrencyChan <- 1
  149. /*var positionId int64
  150. if len(positionIdArr) == len(userIdArr) {
  151. positionId = common.Int64All(positionIdArr[i])
  152. }*/
  153. go func(v string) {
  154. defer func() {
  155. <-entity.SaveConcurrencyChan
  156. wg.Done()
  157. }()
  158. //微信推送模板消息、app push
  159. pushData := WxTmplAndPush{
  160. MsgType: in.MsgType,
  161. Title: in.Title,
  162. Content: in.Content,
  163. WxPushUrl: in.WxPushUrl,
  164. AppPushUrl: in.AppPushUrl,
  165. ProductName: in.ProductName,
  166. OrderId: in.OrderId,
  167. OrderMoney: in.OrderMoney,
  168. Row4: in.Row4,
  169. }
  170. SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr)
  171. key := fmt.Sprintf(MsgCountKey, v, group_id)
  172. redis.Del(redisModule, key)
  173. if in.MsgType == 11 || in.MsgType == 12 {
  174. key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
  175. redis.Del(redisModule, key1)
  176. }
  177. }(userIdArr[i])
  178. }
  179. wg.Wait()
  180. return nil
  181. }
  182. func Update(str string, msgLogId int64) {
  183. fmt.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
  184. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
  185. if err1 != nil {
  186. log.Printf("批量更新message_user_summary出错:%s", err1)
  187. return
  188. }
  189. }
  190. //附件下载、剑鱼币活动到期提醒存消息发送记录
  191. func InsertMsgSendLog(in *message.MultipleSaveMsgReq) int64 {
  192. groupId := MsgGroupIdMap[int(in.MsgType)]
  193. id := entity.Mysql.Insert("message_send_log", map[string]interface{}{
  194. "send_usergroup_id": "",
  195. "send_usergroup_name": "",
  196. "msg_type": in.MsgType,
  197. "title": in.Title,
  198. "content": in.Content,
  199. "send_mode": 2,
  200. "send_time": time.Now().Format("2006-01-02 15:04:05"),
  201. "send_status": 4,
  202. "update_time": time.Now().Format("2006-01-02 15:04:05"),
  203. "link": in.Link,
  204. "isdel": 1,
  205. "send_userid": "",
  206. "update_user": "",
  207. "Sign": 5,
  208. "menu_name": "message",
  209. "group_id": groupId,
  210. })
  211. if id > 0 {
  212. //更新消息汇总表
  213. err := SetMsgSummary(id, int64(groupId), in.MsgType)
  214. if err != nil {
  215. log.Println("更新消息汇总表出错:", err)
  216. return 0
  217. }
  218. return id
  219. }
  220. return 0
  221. }
  222. func ConvertToBitmap(num int) (res []uint32) {
  223. binary := strconv.FormatInt(int64(num), 2)
  224. total := len(binary)
  225. for i := total - 1; i >= 0; i-- {
  226. if binary[i] == '1' {
  227. res = append(res, uint32(total-i))
  228. }
  229. }
  230. return
  231. }
  232. type WxTmplAndPush struct {
  233. MsgType int64
  234. Title string
  235. Content string
  236. WxPushUrl string
  237. AppPushUrl string
  238. ProductName string
  239. OrderId string
  240. OrderMoney string
  241. Row4 string
  242. SendUserId string
  243. }
  244. func SentWxTmplAndAppPush(this WxTmplAndPush, v string, group_id int, equityName, equityAddr string) {
  245. nTime := time.Now().Format("2006-01-02 15:04:05")
  246. //发送消息成功,推送微信、app
  247. //fmt.Println("this.MsgType", this.MsgType)
  248. pushConfig, err := GetWxTmplConfig(this.MsgType)
  249. if err != nil {
  250. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  251. }
  252. p := &WxTmplPush{
  253. Config: pushConfig,
  254. }
  255. p.MgoId = v
  256. if this.MsgType == 10 {
  257. this.Title = this.ProductName
  258. this.Content = this.OrderId
  259. nTime = this.OrderMoney
  260. }
  261. // 消息模版 工单类型 {{thing19.DATA}} 工单标题 {{thing6.DATA}} 项目名称 {{thing13.DATA}} 服务时间 {{time25.DATA}} 服务地址 {{thing26.DATA}}
  262. if this.MsgType != 1 && this.MsgType != 10 {
  263. if this.MsgType == config.ConfigJson.EquityInfoMsgType {
  264. // p459 服务地址特殊处理
  265. err = p.SendMsg(this.WxPushUrl, this.Title, equityName, nTime, this.Row4, equityAddr)
  266. } else {
  267. err = p.SendMsg(this.WxPushUrl, this.Title, this.Content, nTime, this.Row4, "")
  268. }
  269. if err != nil {
  270. logx.Error(fmt.Sprintf("SendWxTmplMsg uId %s Error %s", v, err.Error()))
  271. } else {
  272. logx.Infof("SendWxTmplMsg uId success %s ", v)
  273. }
  274. }
  275. if this.MsgType == 1 {
  276. mst := new(WxTmplConfig)
  277. mst.Switch = AppPushMsgType[group_id]
  278. p.Config = mst
  279. }
  280. uData := p.GetUserPushInfo()
  281. //app推送
  282. if this.MsgType != 10 {
  283. category := ""
  284. if this.SendUserId == "cbgl" {
  285. category = "服务通知_工作事项"
  286. }
  287. if err = AppPushMsg(uData, AppPushMsgType[group_id], this.AppPushUrl, this.Title, this.Content, this.MsgType, category); err != nil {
  288. logx.Error(fmt.Sprintf("SendAppMsg uId %s Error %s", v, err.Error()))
  289. }
  290. }
  291. }