|
@@ -0,0 +1,668 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "github.com/olivere/elastic/v7"
|
|
|
+ "github.com/robfig/cron/v3"
|
|
|
+ "github.com/spf13/viper"
|
|
|
+ "io"
|
|
|
+ util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
+ es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
+ "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
+ "log"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
|
|
|
+ saveEsSp = make(chan bool, 5)
|
|
|
+ EsBulkSize = 50
|
|
|
+ Es *es.Elastic
|
|
|
+ Mgo *mongodb.MongodbSim //bidding 地址
|
|
|
+ MgoP *mongodb.MongodbSim // 项目地址
|
|
|
+ portraitIndex = "" // 画像索引
|
|
|
+ portraitMgo = "" // MongoDB 的表名
|
|
|
+ GF GlobalConf
|
|
|
+ // 情报分类一级标签
|
|
|
+ topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁"}
|
|
|
+)
|
|
|
+
|
|
|
+type PortraitData struct {
|
|
|
+ Buyer string `json:"buyer"`
|
|
|
+ Area string `json:"area"`
|
|
|
+ City string `json:"city"`
|
|
|
+ Class string `json:"class"`
|
|
|
+ BusinessType string `json:"business_type"`
|
|
|
+ Lasttime int64 `json:"lasttime"`
|
|
|
+ ProjectCount int64 `json:"project_count"`
|
|
|
+ ProjectMoney float64 `json:"project_money"`
|
|
|
+ UpdateTime int64 `json:"update_time"`
|
|
|
+}
|
|
|
+
|
|
|
+func InitConfig() (err error) {
|
|
|
+ viper.SetConfigFile("config.toml") // 指定配置文件路径
|
|
|
+ viper.SetConfigName("config") // 配置文件名称(无扩展名)
|
|
|
+ viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
|
|
|
+
|
|
|
+ viper.AddConfigPath("./")
|
|
|
+ viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
|
|
|
+ viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
|
|
|
+ err = viper.ReadInConfig() // 查找并读取配置文件
|
|
|
+ if err != nil { // 处理读取配置文件的错误
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ err = viper.Unmarshal(&GF)
|
|
|
+
|
|
|
+ return err
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func Init() {
|
|
|
+ InitConfig()
|
|
|
+ // 正式环境
|
|
|
+ Es = &es.Elastic{
|
|
|
+ //S_esurl: "http://127.0.0.1:19908",
|
|
|
+ S_esurl: GF.Es.URL,
|
|
|
+ I_size: 5,
|
|
|
+ Username: GF.Es.Username,
|
|
|
+ Password: GF.Es.Password,
|
|
|
+ }
|
|
|
+ Es.InitElasticSize()
|
|
|
+ //测试环境
|
|
|
+ //Es = &es.Elastic{
|
|
|
+ // S_esurl: "http://192.168.3.149:9201",
|
|
|
+ // I_size: 5,
|
|
|
+ // Username: "",
|
|
|
+ // Password: "",
|
|
|
+ //}
|
|
|
+ //Es.InitElasticSize()
|
|
|
+
|
|
|
+ // bidding 地址
|
|
|
+ Mgo = &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: GF.Mongo.Host,
|
|
|
+ //MongodbAddr: "127.0.0.1:27083",
|
|
|
+ Size: 10,
|
|
|
+ DbName: GF.Mongo.DB,
|
|
|
+ UserName: GF.Mongo.Username,
|
|
|
+ Password: GF.Mongo.Password,
|
|
|
+ Direct: GF.Mongo.Direct,
|
|
|
+ }
|
|
|
+ Mgo.InitPool()
|
|
|
+
|
|
|
+ // 抽取库项目表
|
|
|
+ MgoP = &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: GF.Mongop.Host,
|
|
|
+ Size: 10,
|
|
|
+ DbName: GF.Mongop.DB,
|
|
|
+ UserName: GF.Mongop.Username,
|
|
|
+ Password: GF.Mongop.Password,
|
|
|
+ Direct: GF.Mongop.Direct,
|
|
|
+ }
|
|
|
+
|
|
|
+ MgoP.InitPool()
|
|
|
+
|
|
|
+ portraitIndex = GF.Env.PortraitIndex
|
|
|
+ portraitMgo = GF.Env.PortraitMgo
|
|
|
+ if portraitIndex == "" || portraitMgo == "" {
|
|
|
+ log.Fatalln("画像索引或者MongoDB数据表为空")
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+func main() {
|
|
|
+ Init()
|
|
|
+ go SaveEsMethod() // 生索引
|
|
|
+
|
|
|
+ //定时任务
|
|
|
+ local, _ := time.LoadLocation("Asia/Shanghai")
|
|
|
+ c := cron.New(cron.WithLocation(local), cron.WithSeconds())
|
|
|
+ _, err := c.AddFunc(GF.Env.Spec, dealIncData)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("AddFunc err", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ c.Start()
|
|
|
+ defer c.Stop()
|
|
|
+
|
|
|
+ select {}
|
|
|
+}
|
|
|
+
|
|
|
+// dealIncData 处理增量数据
|
|
|
+func dealIncData() {
|
|
|
+ now := time.Now()
|
|
|
+ yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
|
|
|
+ today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "pici": map[string]interface{}{
|
|
|
+ "$gt": yesterday.Unix(),
|
|
|
+ "$lte": today.Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ url := GF.Es.URL
|
|
|
+ username := GF.Es.Username
|
|
|
+ password := GF.Es.Password
|
|
|
+
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoP.GetMgoConn()
|
|
|
+ defer MgoP.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ ch := make(chan map[string]interface{}, 10000)
|
|
|
+
|
|
|
+ // 并行处理结果
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ for hit := range ch {
|
|
|
+ processProjectData(hit, client, Mgo)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ count := 0
|
|
|
+ it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%5000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["buyer"])
|
|
|
+ }
|
|
|
+ // 没有采购单位
|
|
|
+ if util.ObjToString(tmp["buyer"]) == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ ch <- tmp
|
|
|
+ }
|
|
|
+
|
|
|
+ close(ch) // 关闭通道
|
|
|
+ wg.Wait()
|
|
|
+ //处理完毕生索引
|
|
|
+ incDataEs()
|
|
|
+ log.Println("增量数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// dealAllData 处理存量数据,
|
|
|
+func dealAllData() {
|
|
|
+ /**
|
|
|
+ 循环采购单位存量数据,
|
|
|
+ */
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ //url := "http://127.0.0.1:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+ index := "buyer" //索引名称
|
|
|
+ //index := "projectset" //索引名称
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ MgoB := &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: "172.17.189.140:27080",
|
|
|
+ //MongodbAddr: "127.0.0.1:27083",
|
|
|
+ Size: 10,
|
|
|
+ DbName: "qfw",
|
|
|
+ UserName: "SJZY_RWbid_ES",
|
|
|
+ Password: "SJZY@B4i4D5e6S",
|
|
|
+ //Direct: true,
|
|
|
+ }
|
|
|
+ MgoB.InitPool()
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ //开始滚动搜索
|
|
|
+ scrollID := ""
|
|
|
+ scroll := "10m"
|
|
|
+ searchSource := elastic.NewSearchSource().
|
|
|
+ //Query(query).
|
|
|
+ Size(10000).
|
|
|
+ Sort("_doc", true) //升序排序
|
|
|
+ //Sort("_doc", false) //降序排序
|
|
|
+
|
|
|
+ searchService := client.Scroll(index).
|
|
|
+ Size(10000).
|
|
|
+ Scroll(scroll).
|
|
|
+ SearchSource(searchSource)
|
|
|
+
|
|
|
+ res, err := searchService.Do(ctx)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ fmt.Println("没有数据")
|
|
|
+ } else {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
|
|
|
+ fmt.Println("总数是:", res.TotalHits())
|
|
|
+ total := 0
|
|
|
+ for len(res.Hits.Hits) > 0 {
|
|
|
+ for k, hit := range res.Hits.Hits {
|
|
|
+ if k%1000 == 0 {
|
|
|
+ log.Println("当前:", k)
|
|
|
+ }
|
|
|
+ var doc map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ //处理查询结果
|
|
|
+ portrait := PortraitData{
|
|
|
+ Buyer: util.ObjToString(doc["name"]),
|
|
|
+ BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
|
|
|
+ Class: "情报_物业",
|
|
|
+ }
|
|
|
+
|
|
|
+ // 构建查询
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ Must(
|
|
|
+ //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
|
|
|
+ elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
|
|
|
+ elastic.NewTermQuery("tag_topinformation", "情报_物业"),
|
|
|
+ )
|
|
|
+ // 创建搜索服务
|
|
|
+ searchService2 := client.Search().
|
|
|
+ Index("projectset"). // 替换为你的索引名称
|
|
|
+ Query(query).
|
|
|
+ Sort("lasttime", false). // false表示降序
|
|
|
+ Size(1).
|
|
|
+ Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
|
|
|
+
|
|
|
+ // 执行查询
|
|
|
+ searchResult, err := searchService2.Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Error getting response: %s", err)
|
|
|
+ }
|
|
|
+ // 处理结果
|
|
|
+ if searchResult.Hits.TotalHits.Value > 0 {
|
|
|
+ portrait.ProjectCount = searchResult.TotalHits()
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ var doc2 map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc2)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ portrait.Lasttime = util.Int64All(doc2["lasttime"])
|
|
|
+ portrait.Area = util.ObjToString(doc2["area"])
|
|
|
+ portrait.City = util.ObjToString(doc2["city"])
|
|
|
+ }
|
|
|
+ // 处理聚合结果
|
|
|
+ if agg, found := searchResult.Aggregations.Sum("total_price"); found {
|
|
|
+ portrait.ProjectMoney = *agg.Value
|
|
|
+ } else {
|
|
|
+ log.Println("Aggregation not found")
|
|
|
+ }
|
|
|
+ //写入MongoDB
|
|
|
+ MgoB.Save("wcc_project_portrait", structToMap(portrait))
|
|
|
+ } else {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ total = total + len(res.Hits.Hits)
|
|
|
+ scrollID = res.ScrollId
|
|
|
+ res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
|
|
|
+ log.Println("current count:", total)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ // 滚动到最后一批数据,退出循环
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("滚动搜索失败:", err, res)
|
|
|
+ break // 处理错误时退出循环
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 在循环外调用 ClearScroll
|
|
|
+ _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("清理滚动搜索失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Println("结束~~~~~~~~~~~~~~~")
|
|
|
+}
|
|
|
+
|
|
|
+// dealAllDataB 处理存量数据,协程处理全部类型
|
|
|
+func dealAllDataB() {
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+ index := "buyer"
|
|
|
+
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ MgoB := &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: "172.17.189.140:27080",
|
|
|
+ Size: 10,
|
|
|
+ DbName: "qfw",
|
|
|
+ UserName: "SJZY_RWbid_ES",
|
|
|
+ Password: "SJZY@B4i4D5e6S",
|
|
|
+ }
|
|
|
+ MgoB.InitPool()
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ scroll := "10m"
|
|
|
+ searchSource := elastic.NewSearchSource().
|
|
|
+ Size(10000).
|
|
|
+ Sort("_doc", true)
|
|
|
+
|
|
|
+ searchService := client.Scroll(index).
|
|
|
+ Size(10000).
|
|
|
+ Scroll(scroll).
|
|
|
+ SearchSource(searchSource)
|
|
|
+
|
|
|
+ res, err := searchService.Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ fmt.Println("没有数据")
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Println("总数是:", res.TotalHits())
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ ch := make(chan *elastic.SearchHit, 10000)
|
|
|
+
|
|
|
+ // 并行处理结果
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ for hit := range ch {
|
|
|
+ processHit(hit, client, MgoB)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ num := 0
|
|
|
+ for len(res.Hits.Hits) > 0 {
|
|
|
+ for _, hit := range res.Hits.Hits {
|
|
|
+ num++
|
|
|
+ ch <- hit
|
|
|
+ if num%1000 == 0 {
|
|
|
+ log.Println("current num:", num)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ scrollID := res.ScrollId
|
|
|
+ res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ close(ch)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("滚动搜索失败:", err, res)
|
|
|
+ close(ch)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ // 在循环外调用 ClearScroll
|
|
|
+ _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("清理滚动搜索失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Println("结束~~~~~~~~~~~~~~~")
|
|
|
+}
|
|
|
+
|
|
|
+func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
|
|
|
+ var doc map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range topInfos {
|
|
|
+ portrait := PortraitData{
|
|
|
+ Buyer: util.ObjToString(doc["name"]),
|
|
|
+ BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
|
|
|
+ Class: v,
|
|
|
+ }
|
|
|
+
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ Must(
|
|
|
+ elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
|
|
|
+ elastic.NewTermQuery("tag_topinformation", v),
|
|
|
+ )
|
|
|
+ searchService2 := client.Search().
|
|
|
+ Index("projectset").
|
|
|
+ Query(query).
|
|
|
+ Sort("lasttime", false).
|
|
|
+ Size(1).
|
|
|
+ Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
|
|
|
+
|
|
|
+ searchResult, err := searchService2.Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Error getting response: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if searchResult.Hits.TotalHits.Value > 0 {
|
|
|
+ portrait.ProjectCount = searchResult.TotalHits()
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ var doc2 map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc2)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ portrait.Lasttime = util.Int64All(doc2["lasttime"])
|
|
|
+ portrait.Area = util.ObjToString(doc2["area"])
|
|
|
+ portrait.City = util.ObjToString(doc2["city"])
|
|
|
+ }
|
|
|
+ if agg, found := searchResult.Aggregations.Sum("total_price"); found {
|
|
|
+ portrait.ProjectMoney = *agg.Value
|
|
|
+ } else {
|
|
|
+ log.Println("Aggregation not found")
|
|
|
+ }
|
|
|
+ MgoB.Save(portraitMgo, structToMap(portrait))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// processProjectData 处理项目表增量数据
|
|
|
+func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
|
|
|
+ for _, v := range topInfos {
|
|
|
+ existsWhere := map[string]interface{}{
|
|
|
+ "buyer": tmp["buyer"],
|
|
|
+ "class": v,
|
|
|
+ }
|
|
|
+ portrait := PortraitData{
|
|
|
+ Buyer: util.ObjToString(tmp["buyer"]),
|
|
|
+ BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
|
|
|
+ Class: v,
|
|
|
+ }
|
|
|
+
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ Must(
|
|
|
+ elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
|
|
|
+ elastic.NewTermQuery("tag_topinformation", v),
|
|
|
+ )
|
|
|
+ searchService2 := client.Search().
|
|
|
+ Index("projectset").
|
|
|
+ Query(query).
|
|
|
+ Sort("lasttime", false).
|
|
|
+ Size(1).
|
|
|
+ Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
|
|
|
+
|
|
|
+ searchResult, err := searchService2.Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Error getting response: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if searchResult.Hits.TotalHits.Value > 0 {
|
|
|
+ portrait.ProjectCount = searchResult.TotalHits()
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ var doc2 map[string]interface{}
|
|
|
+ err = json.Unmarshal(hit.Source, &doc2)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ portrait.Lasttime = util.Int64All(doc2["lasttime"])
|
|
|
+ portrait.Area = util.ObjToString(doc2["area"])
|
|
|
+ portrait.City = util.ObjToString(doc2["city"])
|
|
|
+ }
|
|
|
+ if agg, found := searchResult.Aggregations.Sum("total_price"); found {
|
|
|
+ portrait.ProjectMoney = *agg.Value
|
|
|
+ } else {
|
|
|
+ log.Println("Aggregation not found")
|
|
|
+ }
|
|
|
+
|
|
|
+ portrait.UpdateTime = time.Now().Unix()
|
|
|
+ //
|
|
|
+ exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
|
|
|
+ if exist != nil && len(*exist) > 0 {
|
|
|
+ // 存在只更新部分内容
|
|
|
+ update := map[string]interface{}{
|
|
|
+ "lasttime": portrait.Lasttime,
|
|
|
+ "project_money": portrait.ProjectMoney,
|
|
|
+ "project_count": portrait.ProjectCount,
|
|
|
+ "update_time": portrait.UpdateTime,
|
|
|
+ }
|
|
|
+ if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
|
|
|
+ id := mongodb.BsonIdToSId((*exist)["_id"])
|
|
|
+ MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //不存在直接保存
|
|
|
+ MgoB.Save(portraitMgo, structToMap(portrait))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// incDataEs 增量数据处理生索引
|
|
|
+func incDataEs() {
|
|
|
+ now := time.Now()
|
|
|
+ today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "update_time": map[string]interface{}{
|
|
|
+ "$gte": today.Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+ it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("current:", count)
|
|
|
+ }
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ tmp["id"] = id
|
|
|
+ tmp["_id"] = id
|
|
|
+ delete(tmp, "update_time")
|
|
|
+ saveEsPool <- tmp
|
|
|
+ }
|
|
|
+ log.Println("数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+// allDataEs 处理存量数据到es
|
|
|
+func allDataEs() {
|
|
|
+ MgoB := &mongodb.MongodbSim{
|
|
|
+ MongodbAddr: "172.17.189.140:27080",
|
|
|
+ //MongodbAddr: "127.0.0.1:27083",
|
|
|
+ Size: 10,
|
|
|
+ DbName: "qfw",
|
|
|
+ UserName: "SJZY_RWbid_ES",
|
|
|
+ Password: "SJZY@B4i4D5e6S",
|
|
|
+ //Direct: true,
|
|
|
+ }
|
|
|
+ MgoB.InitPool()
|
|
|
+ // 测试环境
|
|
|
+ //MgoB := &mongodb.MongodbSim{
|
|
|
+ // MongodbAddr: "192.168.3.206:27002",
|
|
|
+ // Size: 10,
|
|
|
+ // DbName: "qfw_data",
|
|
|
+ // UserName: "root",
|
|
|
+ // Password: "root",
|
|
|
+ // //Direct: true,
|
|
|
+ //}
|
|
|
+ //MgoB.InitPool()
|
|
|
+
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+ count := 0
|
|
|
+ it := sess.DB(MgoB.DbName).C(portraitMgo).Find(nil).Select(nil).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%5000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["_id"])
|
|
|
+ }
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ tmp["id"] = id
|
|
|
+ tmp["_id"] = id
|
|
|
+
|
|
|
+ saveEsPool <- tmp
|
|
|
+ }
|
|
|
+ log.Println("数据处理完毕")
|
|
|
+}
|
|
|
+
|
|
|
+func SaveEsMethod() {
|
|
|
+ arru := make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-saveEsPool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == EsBulkSize {
|
|
|
+ saveEsSp <- true
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-saveEsSp
|
|
|
+ }()
|
|
|
+ Es.BulkSave(portraitIndex, arru)
|
|
|
+ }(arru)
|
|
|
+ arru = make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ saveEsSp <- true
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-saveEsSp
|
|
|
+ }()
|
|
|
+ Es.BulkSave(portraitIndex, arru)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([]map[string]interface{}, EsBulkSize)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|