Ver código fonte

更新 采购单位省市

wcc 8 horas atrás
pai
commit
315b5bb32c
2 arquivos alterados com 196 adições e 4 exclusões
  1. 196 4
      createEsIndex/buyertask.go
  2. BIN
      createEsIndex/createindex_es7_end

+ 196 - 4
createEsIndex/buyertask.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"context"
+	"database/sql"
 	"esindex/config"
 	"fmt"
 	"go.uber.org/zap"
@@ -11,11 +12,162 @@ import (
 	"time"
 )
 
+func buyerTest() {
+
+	names := []string{"海南东方新丝路职业学院", "海口旅游职业学院", "海南卫生健康职业学院", "海南科技职业大学", "海南省旅游学校"}
+
+	areaMap, err := loadCodeRegionMap(Mysql.DB)
+	if err != nil {
+		fmt.Println(err, areaMap)
+	}
+	for _, na := range names {
+
+		query := fmt.Sprintf(`
+ SELECT
+               b.name, 
+               b.area_code, 
+               b.city_code, 
+			   b.seo_id,
+               t.id,
+               t.name_id, 
+			   b.company_id,
+               t.createtime,
+               t.updatetime,
+               class.name AS buyerclass 
+              
+           FROM 
+               dws_f_ent_tags AS t 
+               LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
+               LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
+
+			WHERE  b.name = '%v' 
+        ;
+      `, na)
+
+		ctx := context.Background()
+		rows, err := Mysql.DB.QueryContext(ctx, query)
+		if err != nil {
+			log.Info("dealData", zap.Any("QueryContext err", err))
+		}
+
+		columns, err := rows.Columns()
+		if err != nil {
+			log.Info("buyerOnce", zap.Any("rows.Columns", err))
+		}
+
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			//MySQL 更新
+			update := map[string]interface{}{}
+
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Info("buyerOnce", zap.Any("rows.Scan", err))
+				break
+			}
+
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+
+			name := util.ObjToString(ret["name"])
+			//company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
+			if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
+				update["status"] = 1
+			} else if ruleBuyer(name) { //不符合条件,排除
+				update["status"] = -1
+			} else { //默认2,认为可信
+				update["status"] = 2
+			}
+
+			//1.更新MySQL
+			where := map[string]interface{}{
+				"name_id": ret["name_id"],
+			}
+			if len(update) > 0 {
+				Mysql.Update("dws_f_ent_tags", where, update)
+			}
+
+			//2.生索引,status = 1或者2 才生索引
+			if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
+				data := make(map[string]interface{}, 0)
+				data["name"] = name
+				data["name_id"] = ret["name_id"]
+				if ret["seo_id"] != nil {
+					data["seo_id"] = ret["seo_id"]
+				}
+				data["id"] = ret["name_id"]
+				data["buyer_name"] = name
+				areaCode := util.ObjToString(ret["area_code"])
+				data["province"] = areaMap[areaCode].Province
+				cityCode := util.ObjToString(ret["city_code"])
+				data["city"] = areaMap[cityCode].City
+				data["buyerclass"] = ret["buyerclass"]
+				if ret["createtime"] != nil {
+					if createtime, ok := ret["createtime"].(time.Time); ok {
+						data["createtime"] = createtime.Unix()
+						if ret["updatetime"] != nil {
+							if updatetime, ok := ret["updatetime"].(time.Time); ok {
+								data["updatetime"] = updatetime.Unix()
+							}
+						} else {
+							data["updatetime"] = createtime.Unix()
+						}
+					}
+				}
+
+				sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
+				counts := Mysql.CountBySql(sql)
+				if counts > 0 {
+					data["is_contact"] = true
+				} else {
+					data["is_contact"] = false
+				}
+				arrEs := make([]map[string]interface{}, 0) //最终生索引数据
+				arrEs = append(arrEs, data)
+				err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
+				if err != nil {
+					log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
+				}
+				// 华为云新集群,存储标讯、项目、凭安数据
+				if config.Conf.DB.Es.Addr3 != "" {
+					err = Es3.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
+					if err != nil {
+						log.Info("buyerOnce", zap.Any("Addr3 InsertOrUpdate err", err))
+					}
+				}
+			}
+		}
+
+		rows.Close()
+
+		if err := rows.Err(); err != nil {
+			log.Info("buyerOnce", zap.Any("err", err))
+		}
+
+	}
+
+}
+
 // buyerOnce  处理增量数据
 func buyerOnce() {
 	if len(specialNames) < 1 {
 		initSpecialNames()
 	}
+	areaMap, err := loadCodeRegionMap(Mysql.DB)
+	if err != nil {
+		log.Info("buyerOnce", zap.Error(err), zap.Any("areaMap", areaMap))
+	}
 
 	rowsPerPage := 1000
 	now := time.Now()
@@ -50,6 +202,8 @@ func buyerOnce() {
 		query := fmt.Sprintf(`
  SELECT
                b.name, 
+               b.area_code, 
+               b.city_code, 
 			   b.seo_id,
                t.id,
                t.name_id, 
@@ -63,8 +217,7 @@ func buyerOnce() {
            FROM 
                dws_f_ent_tags AS t 
                LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
-               LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
-               LEFT JOIN code_area AS c ON b.city_code = c.code 
+               LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id 
 
 			WHERE  (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d ) 
            ORDER BY t.id ASC
@@ -148,8 +301,13 @@ func buyerOnce() {
 				}
 				data["id"] = ret["name_id"]
 				data["buyer_name"] = name
-				data["province"] = ret["area"]
-				data["city"] = ret["city"]
+				//data["province"] = ret["area"]
+				//data["city"] = ret["city"]
+				areaCode := util.ObjToString(ret["area_code"])
+				data["province"] = areaMap[areaCode].Province
+				cityCode := util.ObjToString(ret["city_code"])
+				data["city"] = areaMap[cityCode].City
+
 				data["buyerclass"] = ret["buyerclass"]
 				if ret["createtime"] != nil {
 					if createtime, ok := ret["createtime"].(time.Time); ok {
@@ -533,3 +691,37 @@ func buyerAll() {
 	log.Info("buyerAll", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
 
 }
+
+type RegionInfo struct {
+	Province string
+	City     string
+	District string
+}
+
+func loadCodeRegionMap(db *sql.DB) (map[string]RegionInfo, error) {
+	query := `SELECT code, area, city, district FROM code_area`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	codeMap := make(map[string]RegionInfo)
+	for rows.Next() {
+		var code, area, city, district sql.NullString
+		if err := rows.Scan(&code, &area, &city, &district); err != nil {
+			return nil, err
+		}
+		codeStr := code.String
+
+		codeMap[codeStr] = RegionInfo{
+			Province: area.String,
+			City:     city.String,
+			District: district.String,
+		}
+	}
+	if err := rows.Err(); err != nil {
+		return nil, err
+	}
+	return codeMap, nil
+}

BIN
createEsIndex/createindex → createEsIndex/createindex_es7_end