package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "github.com/robfig/cron/v3" "github.com/spf13/viper" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sync" "time" ) var ( saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es saveEsSp = make(chan bool, 5) EsBulkSize = 50 Es *es.Elastic Mgo *mongodb.MongodbSim //bidding 地址 MgoP *mongodb.MongodbSim // 项目地址 portraitIndex = "" // 画像索引 portraitMgo = "" // MongoDB 的表名 GF GlobalConf // 情报分类一级标签 topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁", "情报_安防"} ) type PortraitData struct { Buyer string `json:"buyer"` Area string `json:"area"` City string `json:"city"` Class string `json:"class"` BusinessType string `json:"business_type"` Lasttime int64 `json:"lasttime"` ProjectCount int64 `json:"project_count"` ProjectMoney float64 `json:"project_money"` UpdateTime int64 `json:"update_time"` } func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func Init() { InitConfig() // 正式环境 Es = &es.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: GF.Es.URL, I_size: 5, Username: GF.Es.Username, Password: GF.Es.Password, } Es.InitElasticSize() //测试环境 //Es = &es.Elastic{ // S_esurl: "http://192.168.3.149:9201", // I_size: 5, // Username: "", // Password: "", //} //Es.InitElasticSize() // bidding 地址 Mgo = &mongodb.MongodbSim{ MongodbAddr: GF.Mongo.Host, //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: GF.Mongo.DB, UserName: GF.Mongo.Username, Password: GF.Mongo.Password, Direct: GF.Mongo.Direct, } Mgo.InitPool() // 抽取库项目表 MgoP = &mongodb.MongodbSim{ MongodbAddr: GF.Mongop.Host, Size: 10, DbName: GF.Mongop.DB, UserName: GF.Mongop.Username, Password: GF.Mongop.Password, Direct: GF.Mongop.Direct, } MgoP.InitPool() portraitIndex = GF.Env.PortraitIndex portraitMgo = GF.Env.PortraitMgo if portraitIndex == "" || portraitMgo == "" { log.Fatalln("画像索引或者MongoDB数据表为空") } if GF.Env.Curs < 1 { log.Fatalln("参数 curs 并发数不能低于1") } else { log.Println("参数 curs 并发数", GF.Env.Curs) } } func main() { Init() go SaveEsMethod() // 生索引 //定时任务 local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) _, err := c.AddFunc(GF.Env.Spec, dealIncData) if err != nil { log.Println("AddFunc err", err) } c.Start() defer c.Stop() select {} } // dealIncData 处理增量数据 func dealIncData() { now := time.Now() yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location()) today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) where := map[string]interface{}{ "pici": map[string]interface{}{ "$gt": yesterday.Unix(), "$lte": today.Unix(), }, } url := GF.Es.URL username := GF.Es.Username password := GF.Es.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } defer util.Catch() sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) var wg sync.WaitGroup ch := make(chan map[string]interface{}, 10000) // 并行处理结果 for i := 0; i < GF.Env.Curs; i++ { wg.Add(1) go func() { defer wg.Done() for hit := range ch { processProjectData(hit, client, Mgo) } }() } count := 0 it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%5000 == 0 { log.Println("current:", count, tmp["buyer"]) } // 没有采购单位 if util.ObjToString(tmp["buyer"]) == "" { continue } ch <- tmp } close(ch) // 关闭通道 wg.Wait() //处理完毕生索引 incDataEs() log.Println("增量数据处理完毕") } // dealAllData 处理存量数据, func dealAllData() { /** 循环采购单位存量数据, */ url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "buyer" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). //Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for k, hit := range res.Hits.Hits { if k%1000 == 0 { log.Println("当前:", k) } var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //处理查询结果 portrait := PortraitData{ Buyer: util.ObjToString(doc["name"]), BusinessType: getStr(util.ObjToString(doc["buyerclass"])), Class: "情报_物业", } // 构建查询 query := elastic.NewBoolQuery(). Must( //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"), elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])), elastic.NewTermQuery("tag_topinformation", "情报_物业"), ) // 创建搜索服务 searchService2 := client.Search(). Index("projectset"). // 替换为你的索引名称 Query(query). Sort("lasttime", false). // false表示降序 Size(1). Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice")) // 执行查询 searchResult, err := searchService2.Do(context.Background()) if err != nil { log.Fatalf("Error getting response: %s", err) } // 处理结果 if searchResult.Hits.TotalHits.Value > 0 { portrait.ProjectCount = searchResult.TotalHits() for _, hit := range searchResult.Hits.Hits { var doc2 map[string]interface{} err := json.Unmarshal(hit.Source, &doc2) if err != nil { log.Printf("解析文档失败:%s", err) continue } portrait.Lasttime = util.Int64All(doc2["lasttime"]) portrait.Area = util.ObjToString(doc2["area"]) portrait.City = util.ObjToString(doc2["city"]) } // 处理聚合结果 if agg, found := searchResult.Aggregations.Sum("total_price"); found { portrait.ProjectMoney = *agg.Value } else { log.Println("Aggregation not found") } //写入MongoDB MgoB.Save("wcc_project_portrait", structToMap(portrait)) } else { continue } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // dealAllDataB 处理存量数据,协程处理全部类型 func dealAllDataB() { url := "http://172.17.4.184:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "buyer" // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", } MgoB.InitPool() ctx := context.Background() scroll := "10m" searchSource := elastic.NewSearchSource(). Size(10000). Sort("_doc", true) searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") return } else { panic(err) } } fmt.Println("总数是:", res.TotalHits()) var wg sync.WaitGroup ch := make(chan *elastic.SearchHit, 10000) // 并行处理结果 for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for hit := range ch { processHit(hit, client, MgoB) } }() } num := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { num++ ch <- hit if num%1000 == 0 { log.Println("current num:", num) } } scrollID := res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) if err != nil { if err == io.EOF { close(ch) break } log.Println("滚动搜索失败:", err, res) close(ch) break } } wg.Wait() // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) return } for _, v := range topInfos { portrait := PortraitData{ Buyer: util.ObjToString(doc["name"]), BusinessType: getStr(util.ObjToString(doc["buyerclass"])), Class: v, } query := elastic.NewBoolQuery(). Must( elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])), elastic.NewTermQuery("tag_topinformation", v), ) searchService2 := client.Search(). Index("projectset"). Query(query). Sort("lasttime", false). Size(1). Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice")) searchResult, err := searchService2.Do(context.Background()) if err != nil { log.Fatalf("Error getting response: %s", err) } if searchResult.Hits.TotalHits.Value > 0 { portrait.ProjectCount = searchResult.TotalHits() for _, hit := range searchResult.Hits.Hits { var doc2 map[string]interface{} err := json.Unmarshal(hit.Source, &doc2) if err != nil { log.Printf("解析文档失败:%s", err) continue } portrait.Lasttime = util.Int64All(doc2["lasttime"]) portrait.Area = util.ObjToString(doc2["area"]) portrait.City = util.ObjToString(doc2["city"]) } if agg, found := searchResult.Aggregations.Sum("total_price"); found { portrait.ProjectMoney = *agg.Value } else { log.Println("Aggregation not found") } MgoB.Save(portraitMgo, structToMap(portrait)) } } } // processProjectData 处理项目表增量数据 func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) { for _, v := range topInfos { existsWhere := map[string]interface{}{ "buyer": tmp["buyer"], "class": v, } portrait := PortraitData{ Buyer: util.ObjToString(tmp["buyer"]), BusinessType: getStr(util.ObjToString(tmp["buyerclass"])), Class: v, } query := elastic.NewBoolQuery(). Must( elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])), elastic.NewTermQuery("tag_topinformation", v), ) searchService2 := client.Search(). Index("projectset"). Query(query). Sort("lasttime", false). Size(1). Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice")) searchResult, err := searchService2.Do(context.Background()) if err != nil { log.Fatalf("Error getting response: %s", err) } if searchResult.Hits.TotalHits.Value > 0 { portrait.ProjectCount = searchResult.TotalHits() for _, hit := range searchResult.Hits.Hits { var doc2 map[string]interface{} err = json.Unmarshal(hit.Source, &doc2) if err != nil { log.Printf("解析文档失败:%s", err) continue } portrait.Lasttime = util.Int64All(doc2["lasttime"]) portrait.Area = util.ObjToString(doc2["area"]) portrait.City = util.ObjToString(doc2["city"]) } if agg, found := searchResult.Aggregations.Sum("total_price"); found { portrait.ProjectMoney = *agg.Value } else { log.Println("Aggregation not found") } portrait.UpdateTime = time.Now().Unix() // exist, _ := MgoB.FindOne(portraitMgo, existsWhere) if exist != nil && len(*exist) > 0 { // 存在只更新部分内容 update := map[string]interface{}{ "lasttime": portrait.Lasttime, "project_money": portrait.ProjectMoney, "project_count": portrait.ProjectCount, "update_time": portrait.UpdateTime, } if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney { id := mongodb.BsonIdToSId((*exist)["_id"]) MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update}) } } else { //不存在直接保存 MgoB.Save(portraitMgo, structToMap(portrait)) } } } } // incDataEs 增量数据处理生索引 func incDataEs() { now := time.Now() today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) where := map[string]interface{}{ "update_time": map[string]interface{}{ "$gte": today.Unix(), }, } defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) count := 0 it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } id := mongodb.BsonIdToSId(tmp["_id"]) tmp["id"] = id tmp["_id"] = id delete(tmp, "update_time") saveEsPool <- tmp } log.Println("数据处理完毕") } // allDataEs 处理存量数据到es func allDataEs() { MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() // 测试环境 //MgoB := &mongodb.MongodbSim{ // MongodbAddr: "192.168.3.206:27002", // Size: 10, // DbName: "qfw_data", // UserName: "root", // Password: "root", // //Direct: true, //} //MgoB.InitPool() defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) count := 0 it := sess.DB(MgoB.DbName).C(portraitMgo).Find(nil).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%5000 == 0 { log.Println("current:", count, tmp["_id"]) } id := mongodb.BsonIdToSId(tmp["_id"]) tmp["id"] = id tmp["_id"] = id saveEsPool <- tmp } log.Println("数据处理完毕") } func SaveEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(portraitIndex, arru) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Es.BulkSave(portraitIndex, arru) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } }