main.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "github.com/spf13/viper"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  18. saveEsSp = make(chan bool, 5)
  19. EsBulkSize = 50
  20. Es *es.Elastic
  21. Mgo *mongodb.MongodbSim //bidding 地址
  22. MgoP *mongodb.MongodbSim // 项目地址
  23. portraitIndex = "project_portrait_v1" // 画像索引
  24. portraitMgo = "project_portrait" // MongoDB 的表名
  25. GF GlobalConf
  26. // 情报分类一级标签
  27. //topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁", "情报_安防"}
  28. topInfos = []string{"情报_安防"}
  29. )
  30. type PortraitData struct {
  31. Buyer string `json:"buyer"`
  32. Area string `json:"area"`
  33. City string `json:"city"`
  34. Class string `json:"class"`
  35. BusinessType string `json:"business_type"`
  36. Lasttime int64 `json:"lasttime"`
  37. ProjectCount int64 `json:"project_count"`
  38. ProjectMoney float64 `json:"project_money"`
  39. UpdateTime int64 `json:"update_time"`
  40. }
  41. func InitConfig() (err error) {
  42. viper.SetConfigFile("config.toml") // 指定配置文件路径
  43. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  44. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  45. viper.AddConfigPath("./")
  46. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  47. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  48. err = viper.ReadInConfig() // 查找并读取配置文件
  49. if err != nil { // 处理读取配置文件的错误
  50. return
  51. }
  52. err = viper.Unmarshal(&GF)
  53. return err
  54. }
  55. func Init() {
  56. InitConfig()
  57. // 正式环境
  58. Es = &es.Elastic{
  59. //S_esurl: "http://127.0.0.1:19908",
  60. S_esurl: GF.Es.URL,
  61. I_size: 5,
  62. Username: GF.Es.Username,
  63. Password: GF.Es.Password,
  64. }
  65. Es.InitElasticSize()
  66. //测试环境
  67. //Es = &es.Elastic{
  68. // S_esurl: "http://192.168.3.149:9201",
  69. // I_size: 5,
  70. // Username: "",
  71. // Password: "",
  72. //}
  73. //Es.InitElasticSize()
  74. // bidding 地址
  75. Mgo = &mongodb.MongodbSim{
  76. MongodbAddr: GF.Mongo.Host,
  77. //MongodbAddr: "127.0.0.1:27083",
  78. Size: 10,
  79. DbName: GF.Mongo.DB,
  80. UserName: GF.Mongo.Username,
  81. Password: GF.Mongo.Password,
  82. Direct: GF.Mongo.Direct,
  83. }
  84. Mgo.InitPool()
  85. // 抽取库项目表
  86. MgoP = &mongodb.MongodbSim{
  87. MongodbAddr: GF.Mongop.Host,
  88. Size: 10,
  89. DbName: GF.Mongop.DB,
  90. UserName: GF.Mongop.Username,
  91. Password: GF.Mongop.Password,
  92. Direct: GF.Mongop.Direct,
  93. }
  94. MgoP.InitPool()
  95. portraitIndex = GF.Env.PortraitIndex
  96. portraitMgo = GF.Env.PortraitMgo
  97. if portraitIndex == "" || portraitMgo == "" {
  98. log.Fatalln("画像索引或者MongoDB数据表为空")
  99. }
  100. }
  101. func main() {
  102. Init()
  103. go SaveEsMethod() // 生索引
  104. //dealAllDataB()
  105. allDataEs()
  106. //定时任务
  107. //local, _ := time.LoadLocation("Asia/Shanghai")
  108. //c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  109. //_, err := c.AddFunc(GF.Env.Spec, dealIncData)
  110. //if err != nil {
  111. // log.Println("AddFunc err", err)
  112. //}
  113. //
  114. //c.Start()
  115. //defer c.Stop()
  116. select {}
  117. }
  118. // dealIncData 处理增量数据
  119. func dealIncData() {
  120. now := time.Now()
  121. yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
  122. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  123. where := map[string]interface{}{
  124. "pici": map[string]interface{}{
  125. "$gt": yesterday.Unix(),
  126. "$lte": today.Unix(),
  127. },
  128. }
  129. url := GF.Es.URL
  130. username := GF.Es.Username
  131. password := GF.Es.Password
  132. // 创建 Elasticsearch 客户端
  133. client, err := elastic.NewClient(
  134. elastic.SetURL(url),
  135. elastic.SetBasicAuth(username, password),
  136. elastic.SetSniff(false),
  137. )
  138. if err != nil {
  139. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  140. }
  141. defer util.Catch()
  142. sess := MgoP.GetMgoConn()
  143. defer MgoP.DestoryMongoConn(sess)
  144. var wg sync.WaitGroup
  145. ch := make(chan map[string]interface{}, 10000)
  146. // 并行处理结果
  147. for i := 0; i < 2; i++ {
  148. wg.Add(1)
  149. go func() {
  150. defer wg.Done()
  151. for hit := range ch {
  152. processProjectData(hit, client, Mgo)
  153. }
  154. }()
  155. }
  156. count := 0
  157. it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
  158. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  159. if count%5000 == 0 {
  160. log.Println("current:", count, tmp["buyer"])
  161. }
  162. // 没有采购单位
  163. if util.ObjToString(tmp["buyer"]) == "" {
  164. continue
  165. }
  166. ch <- tmp
  167. }
  168. close(ch) // 关闭通道
  169. wg.Wait()
  170. //处理完毕生索引
  171. incDataEs()
  172. log.Println("增量数据处理完毕")
  173. }
  174. // dealAllData 处理存量数据,
  175. func dealAllData() {
  176. /**
  177. 循环采购单位存量数据,
  178. */
  179. url := "http://172.17.4.184:19908"
  180. //url := "http://127.0.0.1:19908"
  181. username := "jybid"
  182. password := "Top2023_JEB01i@31"
  183. index := "buyer" //索引名称
  184. //index := "projectset" //索引名称
  185. // 创建 Elasticsearch 客户端
  186. client, err := elastic.NewClient(
  187. elastic.SetURL(url),
  188. elastic.SetBasicAuth(username, password),
  189. elastic.SetSniff(false),
  190. )
  191. if err != nil {
  192. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  193. }
  194. MgoB := &mongodb.MongodbSim{
  195. MongodbAddr: "172.17.189.140:27080",
  196. //MongodbAddr: "127.0.0.1:27083",
  197. Size: 10,
  198. DbName: "qfw",
  199. UserName: "SJZY_RWbid_ES",
  200. Password: "SJZY@B4i4D5e6S",
  201. //Direct: true,
  202. }
  203. MgoB.InitPool()
  204. ctx := context.Background()
  205. //开始滚动搜索
  206. scrollID := ""
  207. scroll := "10m"
  208. searchSource := elastic.NewSearchSource().
  209. //Query(query).
  210. Size(10000).
  211. Sort("_doc", true) //升序排序
  212. //Sort("_doc", false) //降序排序
  213. searchService := client.Scroll(index).
  214. Size(10000).
  215. Scroll(scroll).
  216. SearchSource(searchSource)
  217. res, err := searchService.Do(ctx)
  218. if err != nil {
  219. if err == io.EOF {
  220. fmt.Println("没有数据")
  221. } else {
  222. panic(err)
  223. }
  224. }
  225. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  226. fmt.Println("总数是:", res.TotalHits())
  227. total := 0
  228. for len(res.Hits.Hits) > 0 {
  229. for k, hit := range res.Hits.Hits {
  230. if k%1000 == 0 {
  231. log.Println("当前:", k)
  232. }
  233. var doc map[string]interface{}
  234. err := json.Unmarshal(hit.Source, &doc)
  235. if err != nil {
  236. log.Printf("解析文档失败:%s", err)
  237. continue
  238. }
  239. //处理查询结果
  240. portrait := PortraitData{
  241. Buyer: util.ObjToString(doc["name"]),
  242. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  243. Class: "情报_物业",
  244. }
  245. // 构建查询
  246. query := elastic.NewBoolQuery().
  247. Must(
  248. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  249. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  250. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  251. )
  252. // 创建搜索服务
  253. searchService2 := client.Search().
  254. Index("projectset"). // 替换为你的索引名称
  255. Query(query).
  256. Sort("lasttime", false). // false表示降序
  257. Size(1).
  258. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  259. // 执行查询
  260. searchResult, err := searchService2.Do(context.Background())
  261. if err != nil {
  262. log.Fatalf("Error getting response: %s", err)
  263. }
  264. // 处理结果
  265. if searchResult.Hits.TotalHits.Value > 0 {
  266. portrait.ProjectCount = searchResult.TotalHits()
  267. for _, hit := range searchResult.Hits.Hits {
  268. var doc2 map[string]interface{}
  269. err := json.Unmarshal(hit.Source, &doc2)
  270. if err != nil {
  271. log.Printf("解析文档失败:%s", err)
  272. continue
  273. }
  274. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  275. portrait.Area = util.ObjToString(doc2["area"])
  276. portrait.City = util.ObjToString(doc2["city"])
  277. }
  278. // 处理聚合结果
  279. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  280. portrait.ProjectMoney = *agg.Value
  281. } else {
  282. log.Println("Aggregation not found")
  283. }
  284. //写入MongoDB
  285. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  286. } else {
  287. continue
  288. }
  289. }
  290. total = total + len(res.Hits.Hits)
  291. scrollID = res.ScrollId
  292. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  293. log.Println("current count:", total)
  294. if err != nil {
  295. if err == io.EOF {
  296. // 滚动到最后一批数据,退出循环
  297. break
  298. }
  299. log.Println("滚动搜索失败:", err, res)
  300. break // 处理错误时退出循环
  301. }
  302. }
  303. // 在循环外调用 ClearScroll
  304. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  305. if err != nil {
  306. log.Printf("清理滚动搜索失败:%s", err)
  307. }
  308. fmt.Println("结束~~~~~~~~~~~~~~~")
  309. }
  310. // dealAllDataB 处理存量数据,协程处理全部类型
  311. func dealAllDataB() {
  312. url := "http://172.17.4.184:19908"
  313. username := "jybid"
  314. password := "Top2023_JEB01i@31"
  315. index := "buyer"
  316. //url := "http://192.168.3.149:9201"
  317. //username := ""
  318. //password := ""
  319. //index := "buyer"
  320. // 创建 Elasticsearch 客户端
  321. client, err := elastic.NewClient(
  322. elastic.SetURL(url),
  323. elastic.SetBasicAuth(username, password),
  324. elastic.SetSniff(false),
  325. )
  326. if err != nil {
  327. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  328. }
  329. MgoB := &mongodb.MongodbSim{
  330. MongodbAddr: "172.17.189.140:27080",
  331. Size: 10,
  332. DbName: "qfw",
  333. UserName: "SJZY_RWbid_ES",
  334. Password: "SJZY@B4i4D5e6S",
  335. }
  336. MgoB.InitPool()
  337. //MgoB := &mongodb.MongodbSim{
  338. // MongodbAddr: "192.168.3.206:27002",
  339. // Size: 10,
  340. // DbName: "qfw_data",
  341. // UserName: "root",
  342. // Password: "root",
  343. //}
  344. //MgoB.InitPool()
  345. ctx := context.Background()
  346. scroll := "10m"
  347. searchSource := elastic.NewSearchSource().
  348. Size(10000).
  349. Sort("_doc", true)
  350. searchService := client.Scroll(index).
  351. Size(10000).
  352. Scroll(scroll).
  353. SearchSource(searchSource)
  354. res, err := searchService.Do(ctx)
  355. if err != nil {
  356. if err == io.EOF {
  357. fmt.Println("没有数据")
  358. return
  359. } else {
  360. panic(err)
  361. }
  362. }
  363. fmt.Println("总数是:", res.TotalHits())
  364. var wg sync.WaitGroup
  365. ch := make(chan *elastic.SearchHit, 10000)
  366. // 并行处理结果
  367. for i := 0; i < 10; i++ {
  368. wg.Add(1)
  369. go func() {
  370. defer wg.Done()
  371. for hit := range ch {
  372. processHit(hit, client, MgoB)
  373. }
  374. }()
  375. }
  376. num := 0
  377. for len(res.Hits.Hits) > 0 {
  378. for _, hit := range res.Hits.Hits {
  379. num++
  380. ch <- hit
  381. if num%1000 == 0 {
  382. log.Println("current num:", num)
  383. }
  384. }
  385. scrollID := res.ScrollId
  386. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  387. if err != nil {
  388. if err == io.EOF {
  389. close(ch)
  390. break
  391. }
  392. log.Println("滚动搜索失败:", err, res)
  393. close(ch)
  394. break
  395. }
  396. }
  397. wg.Wait()
  398. // 在循环外调用 ClearScroll
  399. _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
  400. if err != nil {
  401. log.Printf("清理滚动搜索失败:%s", err)
  402. }
  403. fmt.Println("结束~~~~~~~~~~~~~~~")
  404. }
  405. func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  406. var doc map[string]interface{}
  407. err := json.Unmarshal(hit.Source, &doc)
  408. if err != nil {
  409. log.Printf("解析文档失败:%s", err)
  410. return
  411. }
  412. for _, v := range topInfos {
  413. portrait := PortraitData{
  414. Buyer: util.ObjToString(doc["name"]),
  415. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  416. Class: v,
  417. }
  418. query := elastic.NewBoolQuery().
  419. Must(
  420. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  421. elastic.NewTermQuery("tag_topinformation", v),
  422. )
  423. searchService2 := client.Search().
  424. Index("projectset").
  425. Query(query).
  426. Sort("lasttime", false).
  427. Size(1).
  428. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  429. searchResult, err := searchService2.Do(context.Background())
  430. if err != nil {
  431. log.Fatalf("Error getting response: %s", err)
  432. }
  433. if searchResult.Hits.TotalHits.Value > 0 {
  434. portrait.ProjectCount = searchResult.TotalHits()
  435. for _, hit := range searchResult.Hits.Hits {
  436. var doc2 map[string]interface{}
  437. err := json.Unmarshal(hit.Source, &doc2)
  438. if err != nil {
  439. log.Printf("解析文档失败:%s", err)
  440. continue
  441. }
  442. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  443. portrait.Area = util.ObjToString(doc2["area"])
  444. portrait.City = util.ObjToString(doc2["city"])
  445. }
  446. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  447. portrait.ProjectMoney = *agg.Value
  448. } else {
  449. log.Println("Aggregation not found")
  450. }
  451. //insert := structToMap(portrait)
  452. //insert[""]
  453. MgoB.Save("project_portrait_0930", structToMap(portrait))
  454. }
  455. }
  456. }
  457. // processProjectData 处理项目表增量数据
  458. func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  459. for _, v := range topInfos {
  460. existsWhere := map[string]interface{}{
  461. "buyer": tmp["buyer"],
  462. "class": v,
  463. }
  464. portrait := PortraitData{
  465. Buyer: util.ObjToString(tmp["buyer"]),
  466. BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
  467. Class: v,
  468. }
  469. query := elastic.NewBoolQuery().
  470. Must(
  471. elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
  472. elastic.NewTermQuery("tag_topinformation", v),
  473. )
  474. searchService2 := client.Search().
  475. Index("projectset").
  476. Query(query).
  477. Sort("lasttime", false).
  478. Size(1).
  479. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  480. searchResult, err := searchService2.Do(context.Background())
  481. if err != nil {
  482. log.Fatalf("Error getting response: %s", err)
  483. }
  484. if searchResult.Hits.TotalHits.Value > 0 {
  485. portrait.ProjectCount = searchResult.TotalHits()
  486. for _, hit := range searchResult.Hits.Hits {
  487. var doc2 map[string]interface{}
  488. err = json.Unmarshal(hit.Source, &doc2)
  489. if err != nil {
  490. log.Printf("解析文档失败:%s", err)
  491. continue
  492. }
  493. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  494. portrait.Area = util.ObjToString(doc2["area"])
  495. portrait.City = util.ObjToString(doc2["city"])
  496. }
  497. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  498. portrait.ProjectMoney = *agg.Value
  499. } else {
  500. log.Println("Aggregation not found")
  501. }
  502. portrait.UpdateTime = time.Now().Unix()
  503. //
  504. exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
  505. if exist != nil && len(*exist) > 0 {
  506. // 存在只更新部分内容
  507. update := map[string]interface{}{
  508. "lasttime": portrait.Lasttime,
  509. "project_money": portrait.ProjectMoney,
  510. "project_count": portrait.ProjectCount,
  511. "update_time": portrait.UpdateTime,
  512. }
  513. if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
  514. id := mongodb.BsonIdToSId((*exist)["_id"])
  515. MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
  516. }
  517. } else {
  518. //不存在直接保存
  519. MgoB.Save(portraitMgo, structToMap(portrait))
  520. }
  521. }
  522. }
  523. }
  524. // incDataEs 增量数据处理生索引
  525. func incDataEs() {
  526. now := time.Now()
  527. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  528. where := map[string]interface{}{
  529. "update_time": map[string]interface{}{
  530. "$gte": today.Unix(),
  531. },
  532. }
  533. defer util.Catch()
  534. sess := Mgo.GetMgoConn()
  535. defer Mgo.DestoryMongoConn(sess)
  536. count := 0
  537. it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
  538. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  539. if count%1000 == 0 {
  540. log.Println("current:", count)
  541. }
  542. id := mongodb.BsonIdToSId(tmp["_id"])
  543. tmp["id"] = id
  544. tmp["_id"] = id
  545. delete(tmp, "update_time")
  546. saveEsPool <- tmp
  547. }
  548. log.Println("数据处理完毕")
  549. }
  550. // allDataEs 处理存量数据到es
  551. func allDataEs() {
  552. MgoB := &mongodb.MongodbSim{
  553. MongodbAddr: "172.17.189.140:27080",
  554. //MongodbAddr: "127.0.0.1:27083",
  555. Size: 10,
  556. DbName: "qfw",
  557. UserName: "SJZY_RWbid_ES",
  558. Password: "SJZY@B4i4D5e6S",
  559. //Direct: true,
  560. }
  561. MgoB.InitPool()
  562. // 测试环境
  563. //MgoB := &mongodb.MongodbSim{
  564. // MongodbAddr: "192.168.3.206:27002",
  565. // Size: 10,
  566. // DbName: "qfw_data",
  567. // UserName: "root",
  568. // Password: "root",
  569. // //Direct: true,
  570. //}
  571. //MgoB.InitPool()
  572. defer util.Catch()
  573. sess := MgoB.GetMgoConn()
  574. defer MgoB.DestoryMongoConn(sess)
  575. count := 0
  576. it := sess.DB(MgoB.DbName).C("project_portrait_0930").Find(nil).Select(nil).Iter()
  577. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  578. if count%5000 == 0 {
  579. log.Println("current:", count, tmp["_id"])
  580. }
  581. id := mongodb.BsonIdToSId(tmp["_id"])
  582. tmp["id"] = id
  583. tmp["_id"] = id
  584. saveEsPool <- tmp
  585. }
  586. log.Println("数据处理完毕")
  587. }
  588. func SaveEsMethod() {
  589. arru := make([]map[string]interface{}, EsBulkSize)
  590. indexu := 0
  591. for {
  592. select {
  593. case v := <-saveEsPool:
  594. arru[indexu] = v
  595. indexu++
  596. if indexu == EsBulkSize {
  597. saveEsSp <- true
  598. go func(arru []map[string]interface{}) {
  599. defer func() {
  600. <-saveEsSp
  601. }()
  602. Es.BulkSave(portraitIndex, arru)
  603. }(arru)
  604. arru = make([]map[string]interface{}, EsBulkSize)
  605. indexu = 0
  606. }
  607. case <-time.After(1000 * time.Millisecond):
  608. if indexu > 0 {
  609. saveEsSp <- true
  610. go func(arru []map[string]interface{}) {
  611. defer func() {
  612. <-saveEsSp
  613. }()
  614. Es.BulkSave(portraitIndex, arru)
  615. }(arru[:indexu])
  616. arru = make([]map[string]interface{}, EsBulkSize)
  617. indexu = 0
  618. }
  619. }
  620. }
  621. }