123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package main
- import (
- "fmt"
- "github.com/spf13/cobra"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "sync"
- )
- func dealerEs() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "dealer_es",
- Short: "Start processing dealer_es data",
- Run: func(cmd *cobra.Command, args []string) {
- InitEs()
- go SaveEs("medical_dealer_v1", "medical_dealer")
- taskDealerEs()
- },
- }
- return cmdClient
- }
- func taskDealerEs() {
- pool := make(chan bool, 5) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_dealer_baseinfo"))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("taskDealerEs---", zap.Int("finally id", finalId))
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_dealer_baseinfo", lastid)
- rows, err := MysqlM.DB.Query(q)
- if err != nil {
- log.Error("taskDealerEs---", zap.Error(err))
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Error("taskDealerEs---", zap.Error(err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count%500 == 0 {
- util.Debug("current-------", count, lastid)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- saveM := make(map[string]interface{})
- for _, f := range []string{"id", "name_id", "company_id", "dealer_name", "area_code", "city_code", "district_code"} {
- if tmp[f] != nil {
- saveM[f] = tmp[f]
- }
- }
- EsSaveCache <- saveM
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
|