package main import ( util "app.yhyue.com/moapp/jybase/common" elastic "app.yhyue.com/moapp/jybase/esv7" "fmt" "github.com/spf13/cobra" "log" "sync" "time" ) var ( saveEs []map[string]interface{} SaveEsLock = &sync.Mutex{} esIndex = "clue_info" ) // @Author jianghan // @Description 全量数据 // @Date 2024/8/2 func allData() *cobra.Command { cmdClient := &cobra.Command{ Use: "all", Short: "Start processing full data", Run: func(cmd *cobra.Command, args []string) { esIndex = "clue_info" taskAll("jianyu_subjectdb.dwd_f_crm_clue_info") }, } return cmdClient } // @Author jianghan // @Description 全量数据(测试库) // @Date 2024/8/2 func allTestData() *cobra.Command { cmdClient := &cobra.Command{ Use: "all-test", Short: "Start processing test full data", Run: func(cmd *cobra.Command, args []string) { esIndex = "clue_info_test" taskAll("jianyu_subjectdb_test.dwd_f_crm_clue_info") }, } return cmdClient } func taskAll(coll string) { finalId := 0 lastInfo := Tidb.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC", coll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Println("查询最后id---finalId---", finalId) lastid, count := 0, 0 for { log.Println("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid) rows, err := Tidb.DB.Query(q) if err != nil { log.Println("mysql query err ", err) } columns, err := rows.Columns() if finalId == lastid { log.Println("----finish----- count: ", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Println("mysql scan err ", err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Println(fmt.Sprintf("current----, count: %d, lastid: %d", count, lastid)) } taskinfo(ret) } _ = rows.Close() } if len(saveEs) > 0 { elastic.BulkSave(esIndex, "", &saveEs, false) saveEs = []map[string]interface{}{} } } func taskinfo(tmp map[string]interface{}) { save := make(map[string]interface{}) for k, v := range tmp { if v == nil { continue } if k == "id" { save["id"] = util.ObjToString(tmp["id"]) save["_id"] = util.ObjToString(tmp["id"]) } else if k == "comeintime" || k == "createtime" || k == "updatetime" { t, _ := time.Parse(time.DateTime, util.ObjToString(tmp[k])) save[k] = t.Unix() } else { save[k] = tmp[k] } } if len(save) > 0 { saveEs = append(saveEs, save) } if len(saveEs) >= 200 { tmps := saveEs elastic.BulkSave(esIndex, "", &tmps, false) saveEs = []map[string]interface{}{} } }