123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package main
- import (
- util "app.yhyue.com/moapp/jybase/common"
- elastic "app.yhyue.com/moapp/jybase/esv7"
- "fmt"
- "github.com/spf13/cobra"
- "log"
- "sync"
- "time"
- )
- var (
- saveEs []map[string]interface{}
- SaveEsLock = &sync.Mutex{}
- esIndex = "clue_info"
- )
- // @Author jianghan
- // @Description 全量数据
- // @Date 2024/8/2
- func allData() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "all",
- Short: "Start processing full data",
- Run: func(cmd *cobra.Command, args []string) {
- esIndex = "clue_info"
- taskAll("jianyu_subjectdb.dwd_f_crm_clue_info")
- },
- }
- return cmdClient
- }
- // @Author jianghan
- // @Description 全量数据(测试库)
- // @Date 2024/8/2
- func allTestData() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "all-test",
- Short: "Start processing test full data",
- Run: func(cmd *cobra.Command, args []string) {
- esIndex = "clue_info_test"
- taskAll("jianyu_subjectdb_test.dwd_f_crm_clue_info")
- },
- }
- return cmdClient
- }
- func taskAll(coll string) {
- finalId := 0
- lastInfo := Tidb.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC", coll))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Println("查询最后id---finalId---", finalId)
- lastid, count := 0, 0
- for {
- log.Println("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid)
- rows, err := Tidb.DB.Query(q)
- if err != nil {
- log.Println("mysql query err ", err)
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- log.Println("----finish----- count: ", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Println("mysql scan err ", err)
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%2000 == 0 {
- log.Println(fmt.Sprintf("current----, count: %d, lastid: %d", count, lastid))
- }
- taskinfo(ret)
- }
- _ = rows.Close()
- }
- if len(saveEs) > 0 {
- elastic.BulkSave(esIndex, "", &saveEs, false)
- saveEs = []map[string]interface{}{}
- }
- }
- func taskinfo(tmp map[string]interface{}) {
- save := make(map[string]interface{})
- for k, v := range tmp {
- if v == nil {
- continue
- }
- if k == "id" {
- save["id"] = util.ObjToString(tmp["id"])
- save["_id"] = util.ObjToString(tmp["id"])
- } else if k == "comeintime" || k == "createtime" || k == "updatetime" {
- t, _ := time.Parse(time.DateTime, util.ObjToString(tmp[k]))
- save[k] = t.Unix()
- } else {
- save[k] = tmp[k]
- }
- }
- if len(save) > 0 {
- saveEs = append(saveEs, save)
- }
- if len(saveEs) >= 200 {
- tmps := saveEs
- elastic.BulkSave(esIndex, "", &tmps, false)
- saveEs = []map[string]interface{}{}
- }
- }
|