|
@@ -1,97 +1,146 @@
|
|
package main
|
|
package main
|
|
|
|
|
|
import (
|
|
import (
|
|
- util "app.yhyue.com/data_processing/common_utils"
|
|
|
|
"app.yhyue.com/data_processing/common_utils/log"
|
|
"app.yhyue.com/data_processing/common_utils/log"
|
|
- "app.yhyue.com/data_processing/common_utils/mongodb"
|
|
|
|
"esindex/config"
|
|
"esindex/config"
|
|
- "go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
|
|
|
+ "fmt"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap"
|
|
- "sync"
|
|
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
-var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
|
|
|
|
-
|
|
|
|
-func buyerEsTaskOnce() {
|
|
|
|
- defer util.Catch()
|
|
|
|
- arrEs := []map[string]interface{}{}
|
|
|
|
- buyerEsLock := &sync.Mutex{}
|
|
|
|
- pool := make(chan bool, 3)
|
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
|
-
|
|
|
|
- now := time.Now()
|
|
|
|
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
|
|
|
|
- curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
|
|
|
|
- task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
|
|
|
|
- task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
|
|
|
|
- log.Info("buyer 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
|
|
|
|
- //区间id
|
|
|
|
- q := map[string]interface{}{
|
|
|
|
- "_id": map[string]interface{}{
|
|
|
|
- "$gte": mongodb.StringTOBsonId(task_sid),
|
|
|
|
- "$lt": mongodb.StringTOBsonId(task_eid),
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- //mongo
|
|
|
|
- sess := MgoQ.GetMgoConn()
|
|
|
|
- defer MgoQ.DestoryMongoConn(sess)
|
|
|
|
-
|
|
|
|
- it_1 := sess.DB(MgoQ.DbName).C("buyer_enterprise").Find(&q).Select(map[string]interface{}{
|
|
|
|
- "buyer_name": 1,
|
|
|
|
- "institute_type": 1,
|
|
|
|
- "buyerclass": 1,
|
|
|
|
- "fixedphone": 1,
|
|
|
|
- "mobilephone": 1,
|
|
|
|
- "latestfixedphone": 1,
|
|
|
|
- "latestmobilephone": 1,
|
|
|
|
- "province": 1,
|
|
|
|
- "city": 1,
|
|
|
|
- }).Sort("_id").Iter()
|
|
|
|
- num_1 := 0
|
|
|
|
- for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
|
|
|
|
- if num_1%2000 == 0 && num_1 > 0 {
|
|
|
|
- log.Info("current", zap.Int("数量", num_1))
|
|
|
|
- }
|
|
|
|
- pool <- true
|
|
|
|
- wg.Add(1)
|
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
|
- defer func() {
|
|
|
|
- <-pool
|
|
|
|
- wg.Done()
|
|
|
|
- }()
|
|
|
|
- savetmp := map[string]interface{}{}
|
|
|
|
- _id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
|
- if util.ObjToString(tmp["buyerclass"]) != "" {
|
|
|
|
- savetmp["buyerclass"] = tmp["buyerclass"]
|
|
|
|
- }
|
|
|
|
- savetmp["_id"] = _id
|
|
|
|
- savetmp["name"] = tmp["buyer_name"]
|
|
|
|
- savetmp["buyer_name"] = tmp["buyer_name"]
|
|
|
|
- for _, f := range fieldArr {
|
|
|
|
- if val := util.ObjToString(tmp[f]); val != "" {
|
|
|
|
- savetmp[f] = val
|
|
|
|
|
|
+//buyerOnce 处理增量数据
|
|
|
|
+func buyerOnce() {
|
|
|
|
+ rowsPerPage := 1000
|
|
|
|
+ currentPage := 1
|
|
|
|
+ total := 0
|
|
|
|
+ for {
|
|
|
|
+ log.Info("buyerOnce", zap.Int("currentPage", currentPage))
|
|
|
|
+ arrEs := make([]map[string]interface{}, 0)
|
|
|
|
+ offset := (currentPage - 1) * rowsPerPage
|
|
|
|
+ //year, month, day := 2022, time.October, 01
|
|
|
|
+ //now := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
|
|
|
|
+ now := time.Now()
|
|
|
|
+ curTime := now.Format("2006-01-02")
|
|
|
|
+ insertquery := fmt.Sprintf(`
|
|
|
|
+ SELECT
|
|
|
|
+ b.name,
|
|
|
|
+ t.id,
|
|
|
|
+ t.name_id,
|
|
|
|
+ c.area,
|
|
|
|
+ c.city,
|
|
|
|
+ class.name AS buyerclass
|
|
|
|
+
|
|
|
|
+ FROM
|
|
|
|
+ dws_f_ent_tags AS t
|
|
|
|
+ LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
|
|
|
|
+ LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
|
|
|
|
+ LEFT JOIN code_area AS c ON b.city_code = c.code
|
|
|
|
+
|
|
|
|
+ WHERE t.createtime > '%v' OR t.updatetime > '%v'
|
|
|
|
+ ORDER BY t.id ASC
|
|
|
|
+ LIMIT %d, %d;
|
|
|
|
+ `, curTime, curTime, offset, rowsPerPage)
|
|
|
|
+
|
|
|
|
+ result := MysqlB.SelectBySql(insertquery)
|
|
|
|
+
|
|
|
|
+ if len(*result) > 0 {
|
|
|
|
+ for _, re := range *result {
|
|
|
|
+ tmp := make(map[string]interface{}, 0)
|
|
|
|
+ tmp["name"] = re["name"]
|
|
|
|
+ tmp["id"] = re["name_id"]
|
|
|
|
+ tmp["area"] = re["area"]
|
|
|
|
+ tmp["city"] = re["city"]
|
|
|
|
+ tmp["buyerclass"] = re["buyerclass"]
|
|
|
|
+ sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
|
|
|
|
+
|
|
|
|
+ counts := MysqlB.SelectBySql(sql)
|
|
|
|
+ if len(*counts) > 0 {
|
|
|
|
+ tmp["is_contact"] = true
|
|
|
|
+ } else {
|
|
|
|
+ tmp["is_contact"] = false
|
|
}
|
|
}
|
|
|
|
+ arrEs = append(arrEs, tmp)
|
|
}
|
|
}
|
|
- buyerEsLock.Lock()
|
|
|
|
- arrEs = append(arrEs, savetmp)
|
|
|
|
- if len(arrEs) >= EsBulkSize {
|
|
|
|
- tmps := arrEs
|
|
|
|
- Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
|
|
|
|
- arrEs = []map[string]interface{}{}
|
|
|
|
- }
|
|
|
|
- buyerEsLock.Unlock()
|
|
|
|
- }(tmp)
|
|
|
|
- tmp = make(map[string]interface{})
|
|
|
|
|
|
+ total = total + len(arrEs)
|
|
|
|
+
|
|
|
|
+ Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(arrEs) < rowsPerPage {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ // 继续查询下一页
|
|
|
|
+ currentPage++
|
|
}
|
|
}
|
|
|
|
|
|
- wg.Wait()
|
|
|
|
- buyerEsLock.Lock()
|
|
|
|
- if len(arrEs) > 0 {
|
|
|
|
- tmps := arrEs
|
|
|
|
- Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
|
|
|
|
- arrEs = []map[string]interface{}{}
|
|
|
|
|
|
+ log.Info("buyerOnce", zap.Int("结束,总数是:", total))
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//buyerall 全量数据
|
|
|
|
+func buyerall() {
|
|
|
|
+ rowsPerPage := 10000
|
|
|
|
+ currentPage := 1
|
|
|
|
+ total := 0
|
|
|
|
+
|
|
|
|
+ for {
|
|
|
|
+ log.Info("buyerall", zap.Int("currentPage", currentPage))
|
|
|
|
+ arrEs := make([]map[string]interface{}, 0)
|
|
|
|
+ offset := (currentPage - 1) * rowsPerPage
|
|
|
|
+ query := fmt.Sprintf(`
|
|
|
|
+ SELECT
|
|
|
|
+ b.name,
|
|
|
|
+ t.id,
|
|
|
|
+ t.name_id,
|
|
|
|
+ c.area,
|
|
|
|
+ c.city,
|
|
|
|
+ class.name AS buyerclass
|
|
|
|
+
|
|
|
|
+ FROM
|
|
|
|
+ dws_f_ent_tags AS t
|
|
|
|
+ LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
|
|
|
|
+ LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
|
|
|
|
+ LEFT JOIN code_area AS c ON b.city_code = c.code
|
|
|
|
+
|
|
|
|
+ ORDER BY t.id ASC
|
|
|
|
+
|
|
|
|
+ LIMIT %d, %d;
|
|
|
|
+ `, offset, rowsPerPage)
|
|
|
|
+
|
|
|
|
+ result := MysqlB.SelectBySql(query)
|
|
|
|
+
|
|
|
|
+ if len(*result) > 0 {
|
|
|
|
+ for _, re := range *result {
|
|
|
|
+ tmp := make(map[string]interface{}, 0)
|
|
|
|
+ tmp["name"] = re["name"]
|
|
|
|
+ tmp["id"] = re["name_id"]
|
|
|
|
+ tmp["_id"] = re["name_id"]
|
|
|
|
+ tmp["area"] = re["area"]
|
|
|
|
+ tmp["city"] = re["city"]
|
|
|
|
+ tmp["buyerclass"] = re["buyerclass"]
|
|
|
|
+
|
|
|
|
+ sql := fmt.Sprintf(`select * from dws_f_ent_contact where name_id = '%v'`, re["name_id"])
|
|
|
|
+
|
|
|
|
+ counts := MysqlB.SelectBySql(sql)
|
|
|
|
+ if len(*counts) > 0 {
|
|
|
|
+ tmp["is_contact"] = true
|
|
|
|
+ } else {
|
|
|
|
+ tmp["is_contact"] = false
|
|
|
|
+ }
|
|
|
|
+ arrEs = append(arrEs, tmp)
|
|
|
|
+ }
|
|
|
|
+ total = total + len(arrEs)
|
|
|
|
+ //保存es
|
|
|
|
+ Es.BulkSave(config.Conf.DB.Es.IndexBuyer, arrEs)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 如果本次查询返回的数据不足每页请求的数量,说明已经查询到最后一页
|
|
|
|
+ if len(*result) < rowsPerPage {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 继续查询下一页
|
|
|
|
+ currentPage++
|
|
}
|
|
}
|
|
- buyerEsLock.Unlock()
|
|
|
|
- log.Info("buyer over!", zap.Int("总计", num_1))
|
|
|
|
|
|
+
|
|
|
|
+ log.Info("buyerall", zap.Int("结束,总数是:", total))
|
|
}
|
|
}
|