Jianghan преди 11 месеца
родител
ревизия
e80546840e
променени са 1 файла, в които са добавени 46 реда и са изтрити 6 реда
  1. 46 6
      clueEs/main.go

+ 46 - 6
clueEs/main.go

@@ -35,6 +35,7 @@ func main() {
 	rootCmd.AddCommand(allData())
 	rootCmd.AddCommand(allTestData())
 	rootCmd.AddCommand(addData())
+	rootCmd.AddCommand(addTestData())
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
 	}
@@ -43,15 +44,29 @@ func main() {
 func addData() *cobra.Command {
 	cmdClient := &cobra.Command{
 		Use:   "add",
-		Short: "Start processing add data",
+		Short: "处理正式环境增量数据",
 		Run: func(cmd *cobra.Command, args []string) {
+			esIndex = "clue_info"
 			taskAdd()
 		},
 	}
 	return cmdClient
 }
 
+func addTestData() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "add-test",
+		Short: "处理测试环境增量数据",
+		Run: func(cmd *cobra.Command, args []string) {
+			esIndex = "clue_info_test"
+			taskTestAdd()
+		},
+	}
+	return cmdClient
+}
+
 func taskAdd() {
+	esIndex = "clue_info"
 	// 创建一个reader,指定GroupID,从 topic-A 消费消息
 	r := kafka.NewReader(kafka.ReaderConfig{
 		Brokers:  []string{"172.17.32.18:9094"},
@@ -81,6 +96,36 @@ func taskAdd() {
 	}
 }
 
+func taskTestAdd() {
+	// 创建一个reader,指定GroupID,从 topic-A 消费消息
+	r := kafka.NewReader(kafka.ReaderConfig{
+		Brokers:  []string{"172.17.32.18:9094"},
+		GroupID:  "g1", // 指定消费者组id
+		Topic:    "jianyu_subjectdb_test_dwd_f_crm_clue_info",
+		MaxBytes: 10e6, // 10MB
+	})
+
+	count := 0
+	// 接收消息
+	for {
+		m, err := r.ReadMessage(context.Background())
+		if err != nil {
+			break
+		}
+		//log.Println("data---", string(m.Value))
+		formatMsg(m.Value)
+		count++
+		if count%20000 == 0 {
+			log.Println("current --- " + strconv.Itoa(count))
+		}
+	}
+
+	// 程序退出前关闭Reader
+	if err := r.Close(); err != nil {
+		log.Fatal("failed to close reader:", err)
+	}
+}
+
 func formatMsg(msg []byte) {
 	msgInfo := make(map[string]interface{})
 	err := json.Unmarshal(msg, &msgInfo)
@@ -88,11 +133,6 @@ func formatMsg(msg []byte) {
 		log.Fatal("unmarshal msg err:", err.Error())
 	}
 	db := util.ObjToString(msgInfo["database"])
-	if db == "Jianyu_subjectdb" {
-		esIndex = "clue_info"
-	} else {
-		esIndex = "clue_info_test"
-	}
 	if datas, ok := msgInfo["data"].([]interface{}); ok && len(datas) > 0 {
 		data := datas[0].(map[string]interface{})
 		stype := util.ObjToString(msgInfo["type"])