Bläddra i källkod

线索es数据处理

Jianghan 7 månader sedan
förälder
incheckning
3a18fe738d
1 ändrade filer med 12 tillägg och 10 borttagningar
  1. 12 10
      clueEs/main.go

+ 12 - 10
clueEs/main.go

@@ -17,6 +17,8 @@ import (
 
 var (
 	Tidb *mysqldb.Mysql
+
+	Pici int64
 )
 
 func init() {
@@ -45,23 +47,23 @@ func main() {
 }
 
 func addData() *cobra.Command {
-	var pici int64
+
 	cmdClient := &cobra.Command{
 		Use:   "add",
 		Short: "Start processing add data",
 		Run: func(cmd *cobra.Command, args []string) {
 			//taskAdd()
 
-			taskAdd1(pici)
+			//taskAdd1()
 			crn := cron.New(cron.WithSeconds())
 			cronstr := "0 */5 * * * *"
 			crn.AddFunc(cronstr, func() {
-				taskAdd1(pici)
+				taskAdd1()
 			})
 			crn.Start()
 		},
 	}
-	cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time")
+	cmdClient.Flags().Int64VarP(&Pici, "pici", "c", 0, "pici time")
 	return cmdClient
 }
 
@@ -95,11 +97,11 @@ func taskAdd() {
 	}
 }
 
-func taskAdd1(pici int64) {
+func taskAdd1() {
 	sql := `SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM dwd_f_crm_clue_info WHERE updatetime >= ? ORDER BY id ASC`
 	sql1 := `SELECT count(1) FROM dwd_f_crm_clue_info WHERE updatetime >= ?`
-	log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&pici, util.Date_Full_Layout)))
-	info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&pici, util.Date_Full_Layout))
+	log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&Pici, util.Date_Full_Layout)))
+	info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&Pici, util.Date_Full_Layout))
 	if info != nil && len(*info) > 0 {
 		for _, data := range *info {
 			save := make(map[string]interface{})
@@ -109,8 +111,8 @@ func taskAdd1(pici int64) {
 				} else if v == "updatetime" {
 					t1, _ := data[v].(time.Time)
 					save[v] = t1.Unix()
-					if t1.Unix() > pici {
-						pici = t1.Unix()
+					if t1.Unix() > Pici {
+						Pici = t1.Unix()
 					}
 				} else if v == "comeintime" || v == "createtime" {
 					t1, _ := data[v].(time.Time)
@@ -122,7 +124,7 @@ func taskAdd1(pici int64) {
 			elastic.UpdateNew(esIndex, save)
 		}
 	}
-	log.Println(fmt.Sprintf("轮次结束,last time: %d", pici))
+	log.Println(fmt.Sprintf("轮次结束,last time: %d", Pici))
 }
 
 func formatMsg(msg []byte) {