package main import ( "fmt" "github.com/spf13/cobra" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "sync" ) func dealerEs() *cobra.Command { cmdClient := &cobra.Command{ Use: "dealer_es", Short: "Start processing dealer_es data", Run: func(cmd *cobra.Command, args []string) { InitEs() go SaveEs("medical_dealer_v1", "medical_dealer") taskDealerEs() }, } return cmdClient } func taskDealerEs() { pool := make(chan bool, 5) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_dealer_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("taskDealerEs---", zap.Int("finally id", finalId)) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_dealer_baseinfo", lastid) rows, err := MysqlM.DB.Query(q) if err != nil { log.Error("taskDealerEs---", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("taskDealerEs---", zap.Error(err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count%500 == 0 { util.Debug("current-------", count, lastid) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() saveM := make(map[string]interface{}) for _, f := range []string{"id", "name_id", "company_id", "dealer_name", "area_code", "city_code", "district_code"} { if tmp[f] != nil { saveM[f] = tmp[f] } } EsSaveCache <- saveM }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } }