|
@@ -7,11 +7,21 @@ import (
|
|
|
"github.com/olivere/elastic/v7"
|
|
|
"io"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
+ es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"log"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+var (
|
|
|
+ saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
|
|
|
+ saveEsSp = make(chan bool, 5)
|
|
|
+ EsBulkSize = 50
|
|
|
+ Es *es.Elastic
|
|
|
+ portraitIndex = "project_portrait" // 画像索引
|
|
|
+ portraitMgo = "wcc_project_portrait" // MongoDB 的表名
|
|
|
+)
|
|
|
+
|
|
|
type PortraitData struct {
|
|
|
Buyer string `json:"buyer"`
|
|
|
Area string `json:"area"`
|
|
@@ -23,9 +33,32 @@ type PortraitData struct {
|
|
|
ProjectMoney float64 `json:"project_money"`
|
|
|
}
|
|
|
|
|
|
+func Init() {
|
|
|
+ //Es = &es.Elastic{
|
|
|
+ // //S_esurl: "http://127.0.0.1:19908",
|
|
|
+ // S_esurl: "http://172.17.4.184:19908",
|
|
|
+ // I_size: 5,
|
|
|
+ // Username: "jybid",
|
|
|
+ // Password: "Top2023_JEB01i@31",
|
|
|
+ //}
|
|
|
+ //Es.InitElasticSize()
|
|
|
+ //测试环境
|
|
|
+ Es = &es.Elastic{
|
|
|
+ S_esurl: "http://192.168.3.149:9201",
|
|
|
+ I_size: 5,
|
|
|
+ Username: "",
|
|
|
+ Password: "",
|
|
|
+ }
|
|
|
+ Es.InitElasticSize()
|
|
|
+}
|
|
|
func main() {
|
|
|
+ Init()
|
|
|
+ go SaveEsMethod()
|
|
|
+ //dealAllData()
|
|
|
+ allDataEs() //处理存量数据到es
|
|
|
|
|
|
- dealAllData()
|
|
|
+ ch := make(chan bool, 1)
|
|
|
+ <-ch
|
|
|
}
|
|
|
|
|
|
// dealIncData 处理增量数据
|
|
@@ -195,3 +228,80 @@ func dealAllData() {
|
|
|
|
|
|
fmt.Println("结束~~~~~~~~~~~~~~~")
|
|
|
}
|
|
|
+
|
|
|
+// allDataEs 处理存量数据到es
|
|
|
+func allDataEs() {
|
|
|
+ //MgoB := &mongodb.MongodbSim{
|
|
|
+ // MongodbAddr: "172.17.189.140:27080",
|
|
|
+ // //MongodbAddr: "127.0.0.1:27083",
|
|
|
+ // Size: 10,
|
|
|
+ // DbName: "qfw",
|
|
|
+ // UserName: "SJZY_RWbid_ES",
|
|
|
+ // Password: "SJZY@B4i4D5e6S",
|
|
|
+ // //Direct: true,
|
|
|
+ //}
|
|
|
+ //MgoB.InitPool()
|
|
|
+ //
|
|
|
+ MgoB := &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: "192.168.3.206:27002",
|
|
|
+ Size: 10,
|
|
|
+ DbName: "qfw_data",
|
|
|
+ UserName: "root",
|
|
|
+ Password: "root",
|
|
|
+ //Direct: true,
|
|
|
+ }
|
|
|
+ MgoB.InitPool()
|
|
|
+
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+ it := sess.DB("qfw_data").C(portraitMgo).Find(nil).Select(nil).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%5000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["_id"])
|
|
|
+ }
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ tmp["id"] = id
|
|
|
+ tmp["_id"] = id
|
|
|
+
|
|
|
+ saveEsPool <- tmp
|
|
|
+ //Es.Save("project_portrait", tmp)
|
|
|
+ }
|
|
|
+ log.Println("数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+func SaveEsMethod() {
|
|
|
+ arru := make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-saveEsPool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == EsBulkSize {
|
|
|
+ saveEsSp <- true
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-saveEsSp
|
|
|
+ }()
|
|
|
+ Es.BulkSave(portraitIndex, arru)
|
|
|
+ }(arru)
|
|
|
+ arru = make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ saveEsSp <- true
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-saveEsSp
|
|
|
+ }()
|
|
|
+ Es.BulkSave(portraitIndex, arru)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|