messageLog.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/jybase/common"
  5. "context"
  6. "fmt"
  7. "log"
  8. "time"
  9. )
  10. var (
  11. SP = make(chan bool, 5)
  12. SaveCache = make(chan map[string]interface{}, 100000)
  13. )
  14. // SaveMessageClockLog 保存消息点击记录
  15. func SaveMessageClockLog(saveData []map[string]interface{}) {
  16. sql := "INSERT INTO message_open_log (`userid`,`msg_log_id`,`platform`,`createtime`) values "
  17. for i, data := range saveData {
  18. if i != 0 {
  19. sql += ","
  20. }
  21. sql += fmt.Sprintf(" ('%s',%d,%d,%d) ", common.InterfaceToStr(data["userid"]), common.IntAll(data["msg_log_id"]), common.IntAll(data["platform"]), time.Now().Unix())
  22. }
  23. fmt.Println(sql)
  24. if err := entity.ClickhouseConn.Exec(context.Background(), sql); err != nil {
  25. log.Println("save 异常", err)
  26. return
  27. }
  28. }
  29. func SaveTask() {
  30. log.Println("message log Save...")
  31. arru := make([]map[string]interface{}, 500)
  32. indexu := 0
  33. for {
  34. select {
  35. case v := <-SaveCache:
  36. arru[indexu] = v
  37. indexu++
  38. if indexu == 500 {
  39. SP <- true
  40. go func(arru []map[string]interface{}) {
  41. defer func() {
  42. <-SP
  43. }()
  44. SaveMessageClockLog(arru)
  45. }(arru)
  46. arru = make([]map[string]interface{}, 500)
  47. indexu = 0
  48. }
  49. case <-time.After(1000 * time.Millisecond):
  50. if indexu > 0 {
  51. SP <- true
  52. go func(arru []map[string]interface{}) {
  53. defer func() {
  54. <-SP
  55. }()
  56. SaveMessageClockLog(arru)
  57. }(arru[:indexu])
  58. arru = make([]map[string]interface{}, 500)
  59. indexu = 0
  60. }
  61. }
  62. }
  63. }