123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- package main
- import (
- "app.yhyue.com/data_processing/common_utils/log"
- "esindex/config"
- "fmt"
- "go.uber.org/zap"
- "time"
- )
- //buyerOnce 处理增量数据
- func buyerOnce() {
- rowsPerPage := 1000
- currentPage := 1
- total := 0
- for {
- log.Info("buyerOnce", zap.Int("currentPage", currentPage))
- arrEs := make([]map[string]interface{}, 0)
- offset := (currentPage - 1) * rowsPerPage
- //year, month, day := 2022, time.October, 01
- //now := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
- now := time.Now()
- curTime := now.Format("2006-01-02")
- query := fmt.Sprintf(`
- SELECT
- b.name,
- t.id,
- t.name_id,
- t.createtime,
- t.updatetime,
- c.area,
- c.city,
- class.name AS buyerclass
-
- FROM
- dws_f_ent_tags AS t
- LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
- LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
- LEFT JOIN code_area AS c ON b.city_code = c.code
- WHERE t.createtime > '%v' OR t.updatetime > '%v'
- ORDER BY t.createtime ASC
- LIMIT %d, %d;
- `, curTime, curTime, offset, rowsPerPage)
- result := MysqlB.SelectBySql(query)
- if result == nil {
- break
- }
- if len(*result) > 0 {
- for _, re := range *result {
- tmp := make(map[string]interface{}, 0)
- tmp["name"] = re["name"]
- tmp["buyer_name"] = re["name"]
- tmp["id"] = re["name_id"]
- tmp["province"] = re["area"]
- tmp["city"] = re["city"]
- tmp["buyerclass"] = re["buyerclass"]
- if re["createtime"] != nil {
- if createtime, ok := re["createtime"].(time.Time); ok {
- tmp["createtime"] = createtime.Unix()
- if re["updatetime"] != nil {
- if updatetime, ok := re["updatetime"].(time.Time); ok {
- tmp["updatetime"] = updatetime.Unix()
- }
- } else {
- tmp["updatetime"] = createtime.Unix()
- }
- }
- }
- sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
- counts := MysqlB.SelectBySql(sql)
- if len(*counts) > 0 {
- tmp["is_contact"] = true
- } else {
- tmp["is_contact"] = false
- }
- arrEs = append(arrEs, tmp)
- }
- total = total + len(arrEs)
- err := Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
- if err != nil {
- log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
- }
- }
- if len(arrEs) < rowsPerPage {
- break
- }
- // 继续查询下一页
- currentPage++
- }
- log.Info("buyerOnce", zap.Int("结束,总数是:", total))
- }
- //buyerall 全量数据
- func buyerall() {
- rowsPerPage := 5000
- currentPage := 1
- total := 0
- for {
- log.Info("buyerall", zap.Int("currentPage", currentPage))
- arrEs := make([]map[string]interface{}, 0)
- offset := (currentPage - 1) * rowsPerPage
- query := fmt.Sprintf(`
- SELECT
- b.name,
- t.id,
- t.name_id,
- t.createtime,
- t.updatetime,
- c.area,
- c.city,
- class.name AS buyerclass
-
- FROM
- dws_f_ent_tags AS t
- LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
- LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
- LEFT JOIN code_area AS c ON b.city_code = c.code
- ORDER BY t.createtime ASC
-
- LIMIT %d, %d;
- `, offset, rowsPerPage)
- result := MysqlB.SelectBySql(query)
- if result == nil {
- break
- }
- if len(*result) > 0 {
- for _, re := range *result {
- tmp := make(map[string]interface{}, 0)
- tmp["name"] = re["name"]
- tmp["buyer_name"] = re["name"]
- tmp["id"] = re["name_id"]
- tmp["_id"] = re["name_id"]
- tmp["province"] = re["area"]
- tmp["city"] = re["city"]
- tmp["buyerclass"] = re["buyerclass"]
- if re["createtime"] != nil {
- if createtime, ok := re["createtime"].(time.Time); ok {
- tmp["createtime"] = createtime.Unix()
- if re["updatetime"] != nil {
- if updatetime, ok := re["updatetime"].(time.Time); ok {
- tmp["updatetime"] = updatetime.Unix()
- }
- } else {
- tmp["updatetime"] = createtime.Unix()
- }
- }
- }
- sql := fmt.Sprintf(`select id from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
- counts := MysqlB.SelectBySql(sql)
- if len(*counts) > 0 {
- tmp["is_contact"] = true
- } else {
- tmp["is_contact"] = false
- }
- arrEs = append(arrEs, tmp)
- }
- total = total + len(arrEs)
- //保存es
- Es.BulkSave(config.Conf.DB.Es.IndexBuyer, arrEs)
- }
- // 如果本次查询返回的数据不足每页请求的数量,说明已经查询到最后一页
- if len(*result) < rowsPerPage {
- break
- }
- // 继续查询下一页
- currentPage++
- }
- log.Info("buyerall", zap.Int("结束,总数是:", total))
- }
|