main.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package main
  2. import (
  3. elastic "app.yhyue.com/moapp/jybase/esv7"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/robfig/cron/v3"
  8. "github.com/segmentio/kafka-go"
  9. "github.com/spf13/cobra"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
  12. "log"
  13. "strconv"
  14. "time"
  15. )
  16. var (
  17. Tidb *mysqldb.Mysql
  18. Pici int64
  19. )
  20. func init() {
  21. elastic.InitElasticSizeByAuth("http://172.17.4.184:19908", 10, "jybid", "Top2023_JEB01i@31")
  22. Tidb = &mysqldb.Mysql{
  23. Address: "172.17.162.27:14000",
  24. DBName: "jianyu_subjectdb",
  25. UserName: "root",
  26. PassWord: "Tibi#20211222",
  27. }
  28. Tidb.Init()
  29. }
  30. func main() {
  31. rootCmd := &cobra.Command{Use: "my cmd"}
  32. rootCmd.AddCommand(allData())
  33. rootCmd.AddCommand(allTestData())
  34. rootCmd.AddCommand(addData())
  35. if err := rootCmd.Execute(); err != nil {
  36. fmt.Println("rootCmd.Execute failed", err.Error())
  37. }
  38. select {}
  39. }
  40. func addData() *cobra.Command {
  41. cmdClient := &cobra.Command{
  42. Use: "add",
  43. Short: "Start processing add data",
  44. Run: func(cmd *cobra.Command, args []string) {
  45. //taskAdd()
  46. //taskAdd1()
  47. crn := cron.New(cron.WithSeconds())
  48. cronstr := "0 */5 * * * *"
  49. crn.AddFunc(cronstr, func() {
  50. taskAdd1()
  51. })
  52. crn.Start()
  53. },
  54. }
  55. cmdClient.Flags().Int64VarP(&Pici, "pici", "c", 0, "pici time")
  56. return cmdClient
  57. }
  58. func taskAdd() {
  59. // 创建一个reader,指定GroupID,从 topic-A 消费消息
  60. r := kafka.NewReader(kafka.ReaderConfig{
  61. Brokers: []string{"172.17.32.18:9094"},
  62. GroupID: "g3", // 指定消费者组id
  63. Topic: "Jianyu_subjectdb_dwd_f_crm_clue_info",
  64. MaxBytes: 10e6, // 10MB
  65. })
  66. count := 0
  67. // 接收消息
  68. for {
  69. m, err := r.ReadMessage(context.Background())
  70. if err != nil {
  71. break
  72. }
  73. //log.Println("data---", string(m.Value))
  74. formatMsg(m.Value)
  75. count++
  76. if count%20000 == 0 {
  77. log.Println("current --- " + strconv.Itoa(count))
  78. }
  79. }
  80. // 程序退出前关闭Reader
  81. if err := r.Close(); err != nil {
  82. log.Fatal("failed to close reader:", err)
  83. }
  84. }
  85. func taskAdd1() {
  86. sql := `SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM dwd_f_crm_clue_info WHERE updatetime >= ? ORDER BY id ASC`
  87. sql1 := `SELECT count(1) FROM dwd_f_crm_clue_info WHERE updatetime >= ?`
  88. log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&Pici, util.Date_Full_Layout)))
  89. info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&Pici, util.Date_Full_Layout))
  90. if info != nil && len(*info) > 0 {
  91. for _, data := range *info {
  92. save := make(map[string]interface{})
  93. for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
  94. if v == "id" {
  95. save[v] = fmt.Sprint(data[v])
  96. } else if v == "updatetime" {
  97. t1, _ := data[v].(time.Time)
  98. save[v] = t1.Unix()
  99. if t1.Unix() > Pici {
  100. Pici = t1.Unix()
  101. }
  102. } else if v == "comeintime" || v == "createtime" {
  103. t1, _ := data[v].(time.Time)
  104. save[v] = t1.Unix()
  105. } else {
  106. save[v] = data[v]
  107. }
  108. }
  109. elastic.UpdateNew(esIndex, save)
  110. }
  111. }
  112. log.Println(fmt.Sprintf("轮次结束,last time: %d", Pici))
  113. }
  114. func formatMsg(msg []byte) {
  115. msgInfo := make(map[string]interface{})
  116. err := json.Unmarshal(msg, &msgInfo)
  117. if err != nil {
  118. log.Fatal("unmarshal msg err:", err.Error())
  119. }
  120. db := util.ObjToString(msgInfo["database"])
  121. if db == "Jianyu_subjectdb" {
  122. esIndex = "clue_info"
  123. } else {
  124. esIndex = "clue_info_test"
  125. }
  126. if datas, ok := msgInfo["data"].([]interface{}); ok && len(datas) > 0 {
  127. data := datas[0].(map[string]interface{})
  128. stype := util.ObjToString(msgInfo["type"])
  129. log.Println(fmt.Sprintf("db: %s, type: %s, id: %s", db, stype, util.ObjToString(data["cluename"])))
  130. if stype == "UPDATE" {
  131. updateFun(data)
  132. } else if stype == "INSERT" {
  133. insertFun(data)
  134. } else if stype == "DELETE" {
  135. delFun(data)
  136. }
  137. } else {
  138. log.Println("msgInfo data error --- ")
  139. }
  140. }
  141. func insertFun(data map[string]interface{}) {
  142. save := make(map[string]interface{})
  143. for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
  144. if v == "id" {
  145. save[v] = util.ObjToString(data[v])
  146. } else if v == "comeintime" || v == "createtime" || v == "updatetime" {
  147. t, _ := time.Parse(time.DateTime, util.ObjToString(data[v]))
  148. save[v] = t.Unix()
  149. } else {
  150. save[v] = data[v]
  151. }
  152. }
  153. elastic.SaveNew(esIndex, save)
  154. }
  155. func updateFun(data map[string]interface{}) {
  156. update := make(map[string]interface{})
  157. for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
  158. if v == "id" {
  159. update[v] = util.ObjToString(data[v])
  160. } else if v == "comeintime" || v == "createtime" || v == "updatetime" {
  161. t, _ := time.Parse(time.DateTime, util.ObjToString(data[v]))
  162. update[v] = t.Unix()
  163. } else {
  164. update[v] = data[v]
  165. }
  166. }
  167. elastic.UpdateNew(esIndex, update)
  168. }
  169. func delFun(data map[string]interface{}) {
  170. elastic.DelById(esIndex, "", util.ObjToString(data["id"]))
  171. }