package main import ( elastic "app.yhyue.com/moapp/jybase/esv7" "context" "encoding/json" "fmt" "github.com/robfig/cron/v3" "github.com/segmentio/kafka-go" "github.com/spf13/cobra" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb" "log" "strconv" "time" ) var ( Tidb *mysqldb.Mysql Pici int64 ) func init() { elastic.InitElasticSizeByAuth("http://172.17.4.184:19908", 10, "jybid", "Top2023_JEB01i@31") Tidb = &mysqldb.Mysql{ Address: "172.17.162.27:14000", DBName: "jianyu_subjectdb", UserName: "root", PassWord: "Tibi#20211222", } Tidb.Init() } func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(allData()) rootCmd.AddCommand(allTestData()) rootCmd.AddCommand(addData()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } select {} } func addData() *cobra.Command { cmdClient := &cobra.Command{ Use: "add", Short: "Start processing add data", Run: func(cmd *cobra.Command, args []string) { //taskAdd() //taskAdd1() crn := cron.New(cron.WithSeconds()) cronstr := "0 */5 * * * *" crn.AddFunc(cronstr, func() { taskAdd1() }) crn.Start() }, } cmdClient.Flags().Int64VarP(&Pici, "pici", "c", 0, "pici time") return cmdClient } func taskAdd() { // 创建一个reader,指定GroupID,从 topic-A 消费消息 r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"172.17.32.18:9094"}, GroupID: "g3", // 指定消费者组id Topic: "Jianyu_subjectdb_dwd_f_crm_clue_info", MaxBytes: 10e6, // 10MB }) count := 0 // 接收消息 for { m, err := r.ReadMessage(context.Background()) if err != nil { break } //log.Println("data---", string(m.Value)) formatMsg(m.Value) count++ if count%20000 == 0 { log.Println("current --- " + strconv.Itoa(count)) } } // 程序退出前关闭Reader if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) } } func taskAdd1() { sql := `SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM dwd_f_crm_clue_info WHERE updatetime >= ? ORDER BY id ASC` sql1 := `SELECT count(1) FROM dwd_f_crm_clue_info WHERE updatetime >= ?` log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&Pici, util.Date_Full_Layout))) info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&Pici, util.Date_Full_Layout)) if info != nil && len(*info) > 0 { for _, data := range *info { save := make(map[string]interface{}) for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} { if v == "id" { save[v] = fmt.Sprint(data[v]) } else if v == "updatetime" { t1, _ := data[v].(time.Time) save[v] = t1.Unix() if t1.Unix() > Pici { Pici = t1.Unix() } } else if v == "comeintime" || v == "createtime" { t1, _ := data[v].(time.Time) save[v] = t1.Unix() } else { save[v] = data[v] } } elastic.UpdateNew(esIndex, save) } } log.Println(fmt.Sprintf("轮次结束,last time: %d", Pici)) } func formatMsg(msg []byte) { msgInfo := make(map[string]interface{}) err := json.Unmarshal(msg, &msgInfo) if err != nil { log.Fatal("unmarshal msg err:", err.Error()) } db := util.ObjToString(msgInfo["database"]) if db == "Jianyu_subjectdb" { esIndex = "clue_info" } else { esIndex = "clue_info_test" } if datas, ok := msgInfo["data"].([]interface{}); ok && len(datas) > 0 { data := datas[0].(map[string]interface{}) stype := util.ObjToString(msgInfo["type"]) log.Println(fmt.Sprintf("db: %s, type: %s, id: %s", db, stype, util.ObjToString(data["cluename"]))) if stype == "UPDATE" { updateFun(data) } else if stype == "INSERT" { insertFun(data) } else if stype == "DELETE" { delFun(data) } } else { log.Println("msgInfo data error --- ") } } func insertFun(data map[string]interface{}) { save := make(map[string]interface{}) for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} { if v == "id" { save[v] = util.ObjToString(data[v]) } else if v == "comeintime" || v == "createtime" || v == "updatetime" { t, _ := time.Parse(time.DateTime, util.ObjToString(data[v])) save[v] = t.Unix() } else { save[v] = data[v] } } elastic.SaveNew(esIndex, save) } func updateFun(data map[string]interface{}) { update := make(map[string]interface{}) for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} { if v == "id" { update[v] = util.ObjToString(data[v]) } else if v == "comeintime" || v == "createtime" || v == "updatetime" { t, _ := time.Parse(time.DateTime, util.ObjToString(data[v])) update[v] = t.Unix() } else { update[v] = data[v] } } elastic.UpdateNew(esIndex, update) } func delFun(data map[string]interface{}) { elastic.DelById(esIndex, "", util.ObjToString(data["id"])) }