123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "github.com/spf13/viper"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "sync"
- "time"
- )
- var (
- saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
- saveEsSp = make(chan bool, 5)
- EsBulkSize = 50
- Es *es.Elastic
- Mgo *mongodb.MongodbSim //bidding 地址
- MgoP *mongodb.MongodbSim // 项目地址
- portraitIndex = "project_portrait_v1" // 画像索引
- portraitMgo = "project_portrait" // MongoDB 的表名
- GF GlobalConf
- // 情报分类一级标签
- //topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁", "情报_安防"}
- topInfos = []string{"情报_安防"}
- )
- type PortraitData struct {
- Buyer string `json:"buyer"`
- Area string `json:"area"`
- City string `json:"city"`
- Class string `json:"class"`
- BusinessType string `json:"business_type"`
- Lasttime int64 `json:"lasttime"`
- ProjectCount int64 `json:"project_count"`
- ProjectMoney float64 `json:"project_money"`
- UpdateTime int64 `json:"update_time"`
- }
- func InitConfig() (err error) {
- viper.SetConfigFile("config.toml") // 指定配置文件路径
- viper.SetConfigName("config") // 配置文件名称(无扩展名)
- viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
- viper.AddConfigPath("./")
- viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
- viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
- err = viper.ReadInConfig() // 查找并读取配置文件
- if err != nil { // 处理读取配置文件的错误
- return
- }
- err = viper.Unmarshal(&GF)
- return err
- }
- func Init() {
- InitConfig()
- // 正式环境
- Es = &es.Elastic{
- //S_esurl: "http://127.0.0.1:19908",
- S_esurl: GF.Es.URL,
- I_size: 5,
- Username: GF.Es.Username,
- Password: GF.Es.Password,
- }
- Es.InitElasticSize()
- //测试环境
- //Es = &es.Elastic{
- // S_esurl: "http://192.168.3.149:9201",
- // I_size: 5,
- // Username: "",
- // Password: "",
- //}
- //Es.InitElasticSize()
- // bidding 地址
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: GF.Mongo.Host,
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: GF.Mongo.DB,
- UserName: GF.Mongo.Username,
- Password: GF.Mongo.Password,
- Direct: GF.Mongo.Direct,
- }
- Mgo.InitPool()
- // 抽取库项目表
- MgoP = &mongodb.MongodbSim{
- MongodbAddr: GF.Mongop.Host,
- Size: 10,
- DbName: GF.Mongop.DB,
- UserName: GF.Mongop.Username,
- Password: GF.Mongop.Password,
- Direct: GF.Mongop.Direct,
- }
- MgoP.InitPool()
- portraitIndex = GF.Env.PortraitIndex
- portraitMgo = GF.Env.PortraitMgo
- if portraitIndex == "" || portraitMgo == "" {
- log.Fatalln("画像索引或者MongoDB数据表为空")
- }
- }
- func main() {
- Init()
- go SaveEsMethod() // 生索引
- //dealAllDataB()
- allDataEs()
- //定时任务
- //local, _ := time.LoadLocation("Asia/Shanghai")
- //c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- //_, err := c.AddFunc(GF.Env.Spec, dealIncData)
- //if err != nil {
- // log.Println("AddFunc err", err)
- //}
- //
- //c.Start()
- //defer c.Stop()
- select {}
- }
- // dealIncData 处理增量数据
- func dealIncData() {
- now := time.Now()
- yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
- today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- where := map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": yesterday.Unix(),
- "$lte": today.Unix(),
- },
- }
- url := GF.Es.URL
- username := GF.Es.Username
- password := GF.Es.Password
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- defer util.Catch()
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- var wg sync.WaitGroup
- ch := make(chan map[string]interface{}, 10000)
- // 并行处理结果
- for i := 0; i < 2; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for hit := range ch {
- processProjectData(hit, client, Mgo)
- }
- }()
- }
- count := 0
- it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%5000 == 0 {
- log.Println("current:", count, tmp["buyer"])
- }
- // 没有采购单位
- if util.ObjToString(tmp["buyer"]) == "" {
- continue
- }
- ch <- tmp
- }
- close(ch) // 关闭通道
- wg.Wait()
- //处理完毕生索引
- incDataEs()
- log.Println("增量数据处理完毕")
- }
- // dealAllData 处理存量数据,
- func dealAllData() {
- /**
- 循环采购单位存量数据,
- */
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "buyer" //索引名称
- //index := "projectset" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- //Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for k, hit := range res.Hits.Hits {
- if k%1000 == 0 {
- log.Println("当前:", k)
- }
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- //处理查询结果
- portrait := PortraitData{
- Buyer: util.ObjToString(doc["name"]),
- BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
- Class: "情报_物业",
- }
- // 构建查询
- query := elastic.NewBoolQuery().
- Must(
- //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
- elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
- elastic.NewTermQuery("tag_topinformation", "情报_物业"),
- )
- // 创建搜索服务
- searchService2 := client.Search().
- Index("projectset"). // 替换为你的索引名称
- Query(query).
- Sort("lasttime", false). // false表示降序
- Size(1).
- Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
- // 执行查询
- searchResult, err := searchService2.Do(context.Background())
- if err != nil {
- log.Fatalf("Error getting response: %s", err)
- }
- // 处理结果
- if searchResult.Hits.TotalHits.Value > 0 {
- portrait.ProjectCount = searchResult.TotalHits()
- for _, hit := range searchResult.Hits.Hits {
- var doc2 map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc2)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- portrait.Lasttime = util.Int64All(doc2["lasttime"])
- portrait.Area = util.ObjToString(doc2["area"])
- portrait.City = util.ObjToString(doc2["city"])
- }
- // 处理聚合结果
- if agg, found := searchResult.Aggregations.Sum("total_price"); found {
- portrait.ProjectMoney = *agg.Value
- } else {
- log.Println("Aggregation not found")
- }
- //写入MongoDB
- MgoB.Save("wcc_project_portrait", structToMap(portrait))
- } else {
- continue
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- // dealAllDataB 处理存量数据,协程处理全部类型
- func dealAllDataB() {
- url := "http://172.17.4.184:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "buyer"
- //url := "http://192.168.3.149:9201"
- //username := ""
- //password := ""
- //index := "buyer"
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- MgoB.InitPool()
- //MgoB := &mongodb.MongodbSim{
- // MongodbAddr: "192.168.3.206:27002",
- // Size: 10,
- // DbName: "qfw_data",
- // UserName: "root",
- // Password: "root",
- //}
- //MgoB.InitPool()
- ctx := context.Background()
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Size(10000).
- Sort("_doc", true)
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- return
- } else {
- panic(err)
- }
- }
- fmt.Println("总数是:", res.TotalHits())
- var wg sync.WaitGroup
- ch := make(chan *elastic.SearchHit, 10000)
- // 并行处理结果
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for hit := range ch {
- processHit(hit, client, MgoB)
- }
- }()
- }
- num := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- num++
- ch <- hit
- if num%1000 == 0 {
- log.Println("current num:", num)
- }
- }
- scrollID := res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- if err != nil {
- if err == io.EOF {
- close(ch)
- break
- }
- log.Println("滚动搜索失败:", err, res)
- close(ch)
- break
- }
- }
- wg.Wait()
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- return
- }
- for _, v := range topInfos {
- portrait := PortraitData{
- Buyer: util.ObjToString(doc["name"]),
- BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
- Class: v,
- }
- query := elastic.NewBoolQuery().
- Must(
- elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
- elastic.NewTermQuery("tag_topinformation", v),
- )
- searchService2 := client.Search().
- Index("projectset").
- Query(query).
- Sort("lasttime", false).
- Size(1).
- Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
- searchResult, err := searchService2.Do(context.Background())
- if err != nil {
- log.Fatalf("Error getting response: %s", err)
- }
- if searchResult.Hits.TotalHits.Value > 0 {
- portrait.ProjectCount = searchResult.TotalHits()
- for _, hit := range searchResult.Hits.Hits {
- var doc2 map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc2)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- portrait.Lasttime = util.Int64All(doc2["lasttime"])
- portrait.Area = util.ObjToString(doc2["area"])
- portrait.City = util.ObjToString(doc2["city"])
- }
- if agg, found := searchResult.Aggregations.Sum("total_price"); found {
- portrait.ProjectMoney = *agg.Value
- } else {
- log.Println("Aggregation not found")
- }
- //insert := structToMap(portrait)
- //insert[""]
- MgoB.Save("project_portrait_0930", structToMap(portrait))
- }
- }
- }
- // processProjectData 处理项目表增量数据
- func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
- for _, v := range topInfos {
- existsWhere := map[string]interface{}{
- "buyer": tmp["buyer"],
- "class": v,
- }
- portrait := PortraitData{
- Buyer: util.ObjToString(tmp["buyer"]),
- BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
- Class: v,
- }
- query := elastic.NewBoolQuery().
- Must(
- elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
- elastic.NewTermQuery("tag_topinformation", v),
- )
- searchService2 := client.Search().
- Index("projectset").
- Query(query).
- Sort("lasttime", false).
- Size(1).
- Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
- searchResult, err := searchService2.Do(context.Background())
- if err != nil {
- log.Fatalf("Error getting response: %s", err)
- }
- if searchResult.Hits.TotalHits.Value > 0 {
- portrait.ProjectCount = searchResult.TotalHits()
- for _, hit := range searchResult.Hits.Hits {
- var doc2 map[string]interface{}
- err = json.Unmarshal(hit.Source, &doc2)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- portrait.Lasttime = util.Int64All(doc2["lasttime"])
- portrait.Area = util.ObjToString(doc2["area"])
- portrait.City = util.ObjToString(doc2["city"])
- }
- if agg, found := searchResult.Aggregations.Sum("total_price"); found {
- portrait.ProjectMoney = *agg.Value
- } else {
- log.Println("Aggregation not found")
- }
- portrait.UpdateTime = time.Now().Unix()
- //
- exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
- if exist != nil && len(*exist) > 0 {
- // 存在只更新部分内容
- update := map[string]interface{}{
- "lasttime": portrait.Lasttime,
- "project_money": portrait.ProjectMoney,
- "project_count": portrait.ProjectCount,
- "update_time": portrait.UpdateTime,
- }
- if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
- id := mongodb.BsonIdToSId((*exist)["_id"])
- MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
- }
- } else {
- //不存在直接保存
- MgoB.Save(portraitMgo, structToMap(portrait))
- }
- }
- }
- }
- // incDataEs 增量数据处理生索引
- func incDataEs() {
- now := time.Now()
- today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- where := map[string]interface{}{
- "update_time": map[string]interface{}{
- "$gte": today.Unix(),
- },
- }
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- count := 0
- it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Println("current:", count)
- }
- id := mongodb.BsonIdToSId(tmp["_id"])
- tmp["id"] = id
- tmp["_id"] = id
- delete(tmp, "update_time")
- saveEsPool <- tmp
- }
- log.Println("数据处理完毕")
- }
- // allDataEs 处理存量数据到es
- func allDataEs() {
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- // 测试环境
- //MgoB := &mongodb.MongodbSim{
- // MongodbAddr: "192.168.3.206:27002",
- // Size: 10,
- // DbName: "qfw_data",
- // UserName: "root",
- // Password: "root",
- // //Direct: true,
- //}
- //MgoB.InitPool()
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- count := 0
- it := sess.DB(MgoB.DbName).C("project_portrait_0930").Find(nil).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%5000 == 0 {
- log.Println("current:", count, tmp["_id"])
- }
- id := mongodb.BsonIdToSId(tmp["_id"])
- tmp["id"] = id
- tmp["_id"] = id
- saveEsPool <- tmp
- }
- log.Println("数据处理完毕")
- }
- func SaveEsMethod() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveEsPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- Es.BulkSave(portraitIndex, arru)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- Es.BulkSave(portraitIndex, arru)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
|