12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package common
- import (
- "app.yhyue.com/moapp/MessageCenter/entity"
- "app.yhyue.com/moapp/jybase/common"
- "context"
- "fmt"
- "log"
- "time"
- )
- var (
- SP = make(chan bool, 5)
- SaveCache = make(chan map[string]interface{}, 100000)
- )
- // SaveMessageClockLog 保存消息点击记录
- func SaveMessageClockLog(saveData []map[string]interface{}) {
- sql := "INSERT INTO message_open_log (`userid`,`msg_log_id`,`platform`,`createtime`) values "
- for i, data := range saveData {
- if i != 0 {
- sql += ","
- }
- sql += fmt.Sprintf(" ('%s',%d,%d,%d) ", common.InterfaceToStr(data["userid"]), common.IntAll(data["msg_log_id"]), common.IntAll(data["platform"]), time.Now().Unix())
- }
- fmt.Println(sql)
- if err := entity.ClickhouseConn.Exec(context.Background(), sql); err != nil {
- log.Println("save 异常", err)
- return
- }
- }
- func SaveTask() {
- log.Println("message log Save...")
- arru := make([]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-SaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- SaveMessageClockLog(arru)
- }(arru)
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- SaveMessageClockLog(arru)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
|