|
@@ -7,10 +7,11 @@ import (
|
|
|
"go.uber.org/zap"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-//buyerOnce 处理增量数据
|
|
|
+// buyerOnce 处理增量数据
|
|
|
func buyerOnce() {
|
|
|
if len(specialNames) < 1 {
|
|
|
initSpecialNames()
|
|
@@ -190,7 +191,7 @@ func buyerOnce() {
|
|
|
log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
|
|
|
}
|
|
|
|
|
|
-//dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
|
|
|
+// dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
|
|
|
func initSpecialNames() {
|
|
|
sess := MgoS.GetMgoConn()
|
|
|
defer MgoS.DestoryMongoConn(sess)
|
|
@@ -201,7 +202,11 @@ func initSpecialNames() {
|
|
|
|
|
|
for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
- specialNames[tmp["company_name"].(string)] = true
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ specialNames[tmp["company_name"].(string)] = true
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -212,7 +217,11 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query2.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
- specialNames[tmp["company_name"].(string)] = true
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ specialNames[tmp["company_name"].(string)] = true
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -223,7 +232,11 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query3.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
- specialNames[tmp["company_name"].(string)] = true
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ specialNames[tmp["company_name"].(string)] = true
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -234,7 +247,11 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query4.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
- specialNames[tmp["company_name"].(string)] = true
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ specialNames[tmp["company_name"].(string)] = true
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -245,6 +262,9 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query5.Next(tmp); count++ {
|
|
|
if tmp["use_name"] != nil && tmp["use_name"] != "" {
|
|
|
+ if specialNames[tmp["use_name"].(string)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
specialNames[tmp["use_name"].(string)] = true
|
|
|
}
|
|
|
}
|
|
@@ -255,6 +275,9 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query6.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
specialNames[tmp["company_name"].(string)] = true
|
|
|
}
|
|
|
}
|
|
@@ -266,6 +289,9 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query7.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
specialNames[tmp["company_name"].(string)] = true
|
|
|
}
|
|
|
}
|
|
@@ -276,6 +302,9 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query8.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
specialNames[tmp["company_name"].(string)] = true
|
|
|
}
|
|
|
}
|
|
@@ -284,8 +313,203 @@ func initSpecialNames() {
|
|
|
count = 0
|
|
|
for tmp := make(map[string]interface{}); query9.Next(tmp); count++ {
|
|
|
if tmp["company_name"] != nil && tmp["company_name"] != "" {
|
|
|
+ if specialNames[tmp["company_name"].(string)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
specialNames[tmp["company_name"].(string)] = true
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+// buyerAll 采购单位全量数据
|
|
|
+func buyerAll() {
|
|
|
+ if len(specialNames) < 1 {
|
|
|
+ initSpecialNames()
|
|
|
+ }
|
|
|
+
|
|
|
+ countSql := fmt.Sprintf(`SELECT count(id) FROM dws_f_ent_tags `)
|
|
|
+ dataCounts := Mysql.CountBySql(countSql)
|
|
|
+ if dataCounts > 0 {
|
|
|
+ log.Info("buyerAll", zap.Any("采购单位全量:", dataCounts))
|
|
|
+ } else {
|
|
|
+ log.Info("buyerAll", zap.String("采购单位全量是0", "没有更新数据"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ finalId := 0
|
|
|
+ lastSql := fmt.Sprintf(`SELECT id FROM dws_f_ent_tags ORDER BY id DESC LIMIT 1`)
|
|
|
+ lastInfo := Mysql.SelectBySql(lastSql)
|
|
|
+ if len(*lastInfo) > 0 {
|
|
|
+ finalId = util.IntAll((*lastInfo)[0]["id"])
|
|
|
+ } else {
|
|
|
+ log.Info("buyerAll", zap.String("获取最大ID失败", "没有数据"))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("buyerAll", zap.Int("finalId", finalId))
|
|
|
+ buyerPool := make(chan bool, 3) //控制线程数
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+
|
|
|
+ lastid, total := 0, 0
|
|
|
+ realCount := 0
|
|
|
+
|
|
|
+ for {
|
|
|
+ query := fmt.Sprintf(`
|
|
|
+ SELECT
|
|
|
+ b.name,
|
|
|
+ b.seo_id,
|
|
|
+ t.id,
|
|
|
+ t.name_id,
|
|
|
+ b.company_id,
|
|
|
+ t.createtime,
|
|
|
+ t.updatetime,
|
|
|
+ c.area,
|
|
|
+ c.city,
|
|
|
+ class.name AS buyerclass
|
|
|
+
|
|
|
+ FROM
|
|
|
+ dws_f_ent_tags AS t
|
|
|
+ LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
|
|
|
+ LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
|
|
|
+ LEFT JOIN code_area AS c ON b.city_code = c.code
|
|
|
+
|
|
|
+ WHERE t.id > %d
|
|
|
+ ORDER BY t.id ASC
|
|
|
+ LIMIT %d;
|
|
|
+ `, lastid, 200)
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ rows, err := Mysql.DB.QueryContext(ctx, query)
|
|
|
+ if err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("QueryContext err", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ if finalId == lastid {
|
|
|
+ log.Info("buyerAll over", zap.Any("total", total), zap.Any("lastid", lastid))
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ columns, err := rows.Columns()
|
|
|
+ if err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("rows.Columns", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ for rows.Next() {
|
|
|
+ scanArgs := make([]interface{}, len(columns))
|
|
|
+ values := make([]interface{}, len(columns))
|
|
|
+ ret := make(map[string]interface{})
|
|
|
+
|
|
|
+ for k := range values {
|
|
|
+ scanArgs[k] = &values[k]
|
|
|
+ }
|
|
|
+
|
|
|
+ err = rows.Scan(scanArgs...)
|
|
|
+ if err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("rows.Scan", err))
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, col := range values {
|
|
|
+ if v, ok := col.([]uint8); ok {
|
|
|
+ ret[columns[i]] = string(v)
|
|
|
+ } else {
|
|
|
+ ret[columns[i]] = col
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ lastid = util.IntAll(ret["id"])
|
|
|
+ total++
|
|
|
+ if total%1000 == 0 {
|
|
|
+ log.Info("buyerAll", zap.Int("current total", total), zap.Int("lastid", lastid))
|
|
|
+ }
|
|
|
+
|
|
|
+ buyerPool <- true
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-buyerPool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //MySQL 更新
|
|
|
+ update := map[string]interface{}{}
|
|
|
+ name := util.ObjToString(tmp["name"])
|
|
|
+ //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
|
|
|
+ if util.ObjToString(tmp["company_id"]) != "" || specialNames[name] {
|
|
|
+ realCount++
|
|
|
+ update["status"] = 1
|
|
|
+ } else if ruleBuyer(name) { //不符合条件,排除
|
|
|
+ update["status"] = -1
|
|
|
+ } else { //默认2,认为可信
|
|
|
+ realCount++
|
|
|
+ update["status"] = 2
|
|
|
+ }
|
|
|
+ //1.更新MySQL
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "name_id": tmp["name_id"],
|
|
|
+ }
|
|
|
+ if len(update) > 0 {
|
|
|
+ Mysql.Update("dws_f_ent_tags", where, update)
|
|
|
+ }
|
|
|
+
|
|
|
+ //2.生索引,status = 1或者2 才生索引
|
|
|
+ if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
|
|
|
+ data := make(map[string]interface{}, 0)
|
|
|
+ data["name"] = name
|
|
|
+ data["name_id"] = tmp["name_id"]
|
|
|
+ if ret["seo_id"] != nil {
|
|
|
+ data["seo_id"] = tmp["seo_id"]
|
|
|
+ }
|
|
|
+ data["id"] = tmp["name_id"]
|
|
|
+ data["buyer_name"] = name
|
|
|
+ data["province"] = tmp["area"]
|
|
|
+ data["city"] = tmp["city"]
|
|
|
+ data["buyerclass"] = tmp["buyerclass"]
|
|
|
+ if ret["createtime"] != nil {
|
|
|
+ if createtime, ok := tmp["createtime"].(time.Time); ok {
|
|
|
+ data["createtime"] = createtime.Unix()
|
|
|
+ if ret["updatetime"] != nil {
|
|
|
+ if updatetime, ok := tmp["updatetime"].(time.Time); ok {
|
|
|
+ data["updatetime"] = updatetime.Unix()
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ data["updatetime"] = createtime.Unix()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, tmp["name_id"])
|
|
|
+ counts := Mysql.CountBySql(sql)
|
|
|
+ if counts > 0 {
|
|
|
+ data["is_contact"] = true
|
|
|
+ } else {
|
|
|
+ data["is_contact"] = false
|
|
|
+ }
|
|
|
+ arrEs := make([]map[string]interface{}, 0) //最终生索引数据
|
|
|
+ arrEs = append(arrEs, data)
|
|
|
+ err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
|
|
|
+ if err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("InsertOrUpdate err", err))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }(ret)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := rows.Err(); err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("rows.Err()", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ err = rows.Close()
|
|
|
+ if err != nil {
|
|
|
+ log.Info("buyerAll", zap.Any("rows.Close() err", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("buyerAll", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
|
|
|
+
|
|
|
+}
|