package main import ( "app.yhyue.com/data_processing/common_utils/log" "esindex/config" "fmt" "go.uber.org/zap" "time" ) //buyerOnce 处理增量数据 func buyerOnce() { rowsPerPage := 1000 currentPage := 1 total := 0 for { log.Info("buyerOnce", zap.Int("currentPage", currentPage)) arrEs := make([]map[string]interface{}, 0) offset := (currentPage - 1) * rowsPerPage //year, month, day := 2022, time.October, 01 //now := time.Date(year, month, day, 0, 0, 0, 0, time.Local) now := time.Now() curTime := now.Format("2006-01-02") query := fmt.Sprintf(` SELECT b.name, t.id, t.name_id, t.createtime, t.updatetime, c.area, c.city, class.name AS buyerclass FROM dws_f_ent_tags AS t LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id LEFT JOIN code_area AS c ON b.city_code = c.code WHERE t.createtime > '%v' OR t.updatetime > '%v' ORDER BY t.createtime ASC LIMIT %d, %d; `, curTime, curTime, offset, rowsPerPage) result := MysqlB.SelectBySql(query) if result == nil { break } if len(*result) > 0 { for _, re := range *result { tmp := make(map[string]interface{}, 0) tmp["name"] = re["name"] tmp["buyer_name"] = re["name"] tmp["id"] = re["name_id"] tmp["province"] = re["area"] tmp["city"] = re["city"] tmp["buyerclass"] = re["buyerclass"] if re["createtime"] != nil { if createtime, ok := re["createtime"].(time.Time); ok { tmp["createtime"] = createtime.Unix() if re["updatetime"] != nil { if updatetime, ok := re["updatetime"].(time.Time); ok { tmp["updatetime"] = updatetime.Unix() } } else { tmp["updatetime"] = createtime.Unix() } } } sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"]) counts := MysqlB.SelectBySql(sql) if len(*counts) > 0 { tmp["is_contact"] = true } else { tmp["is_contact"] = false } arrEs = append(arrEs, tmp) } total = total + len(arrEs) err := Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs) if err != nil { log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err)) } } if len(arrEs) < rowsPerPage { break } // 继续查询下一页 currentPage++ } log.Info("buyerOnce", zap.Int("结束,总数是:", total)) } //buyerall 全量数据 func buyerall() { rowsPerPage := 5000 currentPage := 1 total := 0 for { log.Info("buyerall", zap.Int("currentPage", currentPage)) arrEs := make([]map[string]interface{}, 0) offset := (currentPage - 1) * rowsPerPage query := fmt.Sprintf(` SELECT b.name, t.id, t.name_id, t.createtime, t.updatetime, c.area, c.city, class.name AS buyerclass FROM dws_f_ent_tags AS t LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id LEFT JOIN code_area AS c ON b.city_code = c.code ORDER BY t.createtime ASC LIMIT %d, %d; `, offset, rowsPerPage) result := MysqlB.SelectBySql(query) if result == nil { break } if len(*result) > 0 { for _, re := range *result { tmp := make(map[string]interface{}, 0) tmp["name"] = re["name"] tmp["buyer_name"] = re["name"] tmp["id"] = re["name_id"] tmp["_id"] = re["name_id"] tmp["province"] = re["area"] tmp["city"] = re["city"] tmp["buyerclass"] = re["buyerclass"] if re["createtime"] != nil { if createtime, ok := re["createtime"].(time.Time); ok { tmp["createtime"] = createtime.Unix() if re["updatetime"] != nil { if updatetime, ok := re["updatetime"].(time.Time); ok { tmp["updatetime"] = updatetime.Unix() } } else { tmp["updatetime"] = createtime.Unix() } } } sql := fmt.Sprintf(`select id from dws_f_ent_contact where name_id = '%v'`, re["name_id"]) counts := MysqlB.SelectBySql(sql) if len(*counts) > 0 { tmp["is_contact"] = true } else { tmp["is_contact"] = false } arrEs = append(arrEs, tmp) } total = total + len(arrEs) //保存es Es.BulkSave(config.Conf.DB.Es.IndexBuyer, arrEs) } // 如果本次查询返回的数据不足每页请求的数量,说明已经查询到最后一页 if len(*result) < rowsPerPage { break } // 继续查询下一页 currentPage++ } log.Info("buyerall", zap.Int("结束,总数是:", total)) }