123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package main
- import (
- elastic "app.yhyue.com/moapp/jybase/esv7"
- "context"
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "github.com/segmentio/kafka-go"
- "github.com/spf13/cobra"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
- "log"
- "strconv"
- "time"
- )
- var (
- Tidb *mysqldb.Mysql
- Pici int64
- )
- func init() {
- elastic.InitElasticSizeByAuth("http://172.17.4.184:19908", 10, "jybid", "Top2023_JEB01i@31")
- Tidb = &mysqldb.Mysql{
- Address: "172.17.162.27:14000",
- DBName: "jianyu_subjectdb",
- UserName: "root",
- PassWord: "Tibi#20211222",
- }
- Tidb.Init()
- }
- func main() {
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(allData())
- rootCmd.AddCommand(allTestData())
- rootCmd.AddCommand(addData())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- select {}
- }
- func addData() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "add",
- Short: "Start processing add data",
- Run: func(cmd *cobra.Command, args []string) {
- //taskAdd()
- //taskAdd1()
- crn := cron.New(cron.WithSeconds())
- cronstr := "0 */5 * * * *"
- crn.AddFunc(cronstr, func() {
- taskAdd1()
- })
- crn.Start()
- },
- }
- cmdClient.Flags().Int64VarP(&Pici, "pici", "c", 0, "pici time")
- return cmdClient
- }
- func taskAdd() {
- // 创建一个reader,指定GroupID,从 topic-A 消费消息
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"172.17.32.18:9094"},
- GroupID: "g3", // 指定消费者组id
- Topic: "Jianyu_subjectdb_dwd_f_crm_clue_info",
- MaxBytes: 10e6, // 10MB
- })
- count := 0
- // 接收消息
- for {
- m, err := r.ReadMessage(context.Background())
- if err != nil {
- break
- }
- //log.Println("data---", string(m.Value))
- formatMsg(m.Value)
- count++
- if count%20000 == 0 {
- log.Println("current --- " + strconv.Itoa(count))
- }
- }
- // 程序退出前关闭Reader
- if err := r.Close(); err != nil {
- log.Fatal("failed to close reader:", err)
- }
- }
- func taskAdd1() {
- sql := `SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM dwd_f_crm_clue_info WHERE updatetime >= ? ORDER BY id ASC`
- sql1 := `SELECT count(1) FROM dwd_f_crm_clue_info WHERE updatetime >= ?`
- log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&Pici, util.Date_Full_Layout)))
- info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&Pici, util.Date_Full_Layout))
- if info != nil && len(*info) > 0 {
- for _, data := range *info {
- save := make(map[string]interface{})
- for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
- if v == "id" {
- save[v] = fmt.Sprint(data[v])
- } else if v == "updatetime" {
- t1, _ := data[v].(time.Time)
- save[v] = t1.Unix()
- if t1.Unix() > Pici {
- Pici = t1.Unix()
- }
- } else if v == "comeintime" || v == "createtime" {
- t1, _ := data[v].(time.Time)
- save[v] = t1.Unix()
- } else {
- save[v] = data[v]
- }
- }
- elastic.UpdateNew(esIndex, save)
- }
- }
- log.Println(fmt.Sprintf("轮次结束,last time: %d", Pici))
- }
- func formatMsg(msg []byte) {
- msgInfo := make(map[string]interface{})
- err := json.Unmarshal(msg, &msgInfo)
- if err != nil {
- log.Fatal("unmarshal msg err:", err.Error())
- }
- db := util.ObjToString(msgInfo["database"])
- if db == "Jianyu_subjectdb" {
- esIndex = "clue_info"
- } else {
- esIndex = "clue_info_test"
- }
- if datas, ok := msgInfo["data"].([]interface{}); ok && len(datas) > 0 {
- data := datas[0].(map[string]interface{})
- stype := util.ObjToString(msgInfo["type"])
- log.Println(fmt.Sprintf("db: %s, type: %s, id: %s", db, stype, util.ObjToString(data["cluename"])))
- if stype == "UPDATE" {
- updateFun(data)
- } else if stype == "INSERT" {
- insertFun(data)
- } else if stype == "DELETE" {
- delFun(data)
- }
- } else {
- log.Println("msgInfo data error --- ")
- }
- }
- func insertFun(data map[string]interface{}) {
- save := make(map[string]interface{})
- for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
- if v == "id" {
- save[v] = util.ObjToString(data[v])
- } else if v == "comeintime" || v == "createtime" || v == "updatetime" {
- t, _ := time.Parse(time.DateTime, util.ObjToString(data[v]))
- save[v] = t.Unix()
- } else {
- save[v] = data[v]
- }
- }
- elastic.SaveNew(esIndex, save)
- }
- func updateFun(data map[string]interface{}) {
- update := make(map[string]interface{})
- for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
- if v == "id" {
- update[v] = util.ObjToString(data[v])
- } else if v == "comeintime" || v == "createtime" || v == "updatetime" {
- t, _ := time.Parse(time.DateTime, util.ObjToString(data[v]))
- update[v] = t.Unix()
- } else {
- update[v] = data[v]
- }
- }
- elastic.UpdateNew(esIndex, update)
- }
- func delFun(data map[string]interface{}) {
- elastic.DelById(esIndex, "", util.ObjToString(data["id"]))
- }
|