main.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "github.com/robfig/cron/v3"
  8. "github.com/spf13/viper"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "log"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  19. saveEsSp = make(chan bool, 5)
  20. EsBulkSize = 50
  21. Es *es.Elastic
  22. Mgo *mongodb.MongodbSim //bidding 地址
  23. MgoP *mongodb.MongodbSim // 项目地址
  24. portraitIndex = "" // 画像索引
  25. portraitMgo = "" // MongoDB 的表名
  26. GF GlobalConf
  27. // 情报分类一级标签
  28. topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁"}
  29. )
  30. type PortraitData struct {
  31. Buyer string `json:"buyer"`
  32. Area string `json:"area"`
  33. City string `json:"city"`
  34. Class string `json:"class"`
  35. BusinessType string `json:"business_type"`
  36. Lasttime int64 `json:"lasttime"`
  37. ProjectCount int64 `json:"project_count"`
  38. ProjectMoney float64 `json:"project_money"`
  39. UpdateTime int64 `json:"update_time"`
  40. }
  41. func InitConfig() (err error) {
  42. viper.SetConfigFile("config.toml") // 指定配置文件路径
  43. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  44. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  45. viper.AddConfigPath("./")
  46. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  47. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  48. err = viper.ReadInConfig() // 查找并读取配置文件
  49. if err != nil { // 处理读取配置文件的错误
  50. return
  51. }
  52. err = viper.Unmarshal(&GF)
  53. return err
  54. }
  55. func Init() {
  56. InitConfig()
  57. // 正式环境
  58. Es = &es.Elastic{
  59. //S_esurl: "http://127.0.0.1:19908",
  60. S_esurl: GF.Es.URL,
  61. I_size: 5,
  62. Username: GF.Es.Username,
  63. Password: GF.Es.Password,
  64. }
  65. Es.InitElasticSize()
  66. //测试环境
  67. //Es = &es.Elastic{
  68. // S_esurl: "http://192.168.3.149:9201",
  69. // I_size: 5,
  70. // Username: "",
  71. // Password: "",
  72. //}
  73. //Es.InitElasticSize()
  74. // bidding 地址
  75. Mgo = &mongodb.MongodbSim{
  76. MongodbAddr: GF.Mongo.Host,
  77. //MongodbAddr: "127.0.0.1:27083",
  78. Size: 10,
  79. DbName: GF.Mongo.DB,
  80. UserName: GF.Mongo.Username,
  81. Password: GF.Mongo.Password,
  82. Direct: GF.Mongo.Direct,
  83. }
  84. Mgo.InitPool()
  85. // 抽取库项目表
  86. MgoP = &mongodb.MongodbSim{
  87. MongodbAddr: GF.Mongop.Host,
  88. Size: 10,
  89. DbName: GF.Mongop.DB,
  90. UserName: GF.Mongop.Username,
  91. Password: GF.Mongop.Password,
  92. Direct: GF.Mongop.Direct,
  93. }
  94. MgoP.InitPool()
  95. portraitIndex = GF.Env.PortraitIndex
  96. portraitMgo = GF.Env.PortraitMgo
  97. if portraitIndex == "" || portraitMgo == "" {
  98. log.Fatalln("画像索引或者MongoDB数据表为空")
  99. }
  100. }
  101. func main() {
  102. Init()
  103. go SaveEsMethod() // 生索引
  104. //定时任务
  105. local, _ := time.LoadLocation("Asia/Shanghai")
  106. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  107. _, err := c.AddFunc(GF.Env.Spec, dealIncData)
  108. if err != nil {
  109. log.Println("AddFunc err", err)
  110. }
  111. c.Start()
  112. defer c.Stop()
  113. select {}
  114. }
  115. // dealIncData 处理增量数据
  116. func dealIncData() {
  117. now := time.Now()
  118. yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
  119. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  120. where := map[string]interface{}{
  121. "pici": map[string]interface{}{
  122. "$gt": yesterday.Unix(),
  123. "$lte": today.Unix(),
  124. },
  125. }
  126. url := GF.Es.URL
  127. username := GF.Es.Username
  128. password := GF.Es.Password
  129. // 创建 Elasticsearch 客户端
  130. client, err := elastic.NewClient(
  131. elastic.SetURL(url),
  132. elastic.SetBasicAuth(username, password),
  133. elastic.SetSniff(false),
  134. )
  135. if err != nil {
  136. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  137. }
  138. defer util.Catch()
  139. sess := MgoP.GetMgoConn()
  140. defer MgoP.DestoryMongoConn(sess)
  141. var wg sync.WaitGroup
  142. ch := make(chan map[string]interface{}, 10000)
  143. // 并行处理结果
  144. for i := 0; i < 2; i++ {
  145. wg.Add(1)
  146. go func() {
  147. defer wg.Done()
  148. for hit := range ch {
  149. processProjectData(hit, client, Mgo)
  150. }
  151. }()
  152. }
  153. count := 0
  154. it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
  155. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  156. if count%5000 == 0 {
  157. log.Println("current:", count, tmp["buyer"])
  158. }
  159. // 没有采购单位
  160. if util.ObjToString(tmp["buyer"]) == "" {
  161. continue
  162. }
  163. ch <- tmp
  164. }
  165. close(ch) // 关闭通道
  166. wg.Wait()
  167. //处理完毕生索引
  168. incDataEs()
  169. log.Println("增量数据处理完毕")
  170. }
  171. // dealAllData 处理存量数据,
  172. func dealAllData() {
  173. /**
  174. 循环采购单位存量数据,
  175. */
  176. url := "http://172.17.4.184:19908"
  177. //url := "http://127.0.0.1:19908"
  178. username := "jybid"
  179. password := "Top2023_JEB01i@31"
  180. index := "buyer" //索引名称
  181. //index := "projectset" //索引名称
  182. // 创建 Elasticsearch 客户端
  183. client, err := elastic.NewClient(
  184. elastic.SetURL(url),
  185. elastic.SetBasicAuth(username, password),
  186. elastic.SetSniff(false),
  187. )
  188. if err != nil {
  189. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  190. }
  191. MgoB := &mongodb.MongodbSim{
  192. MongodbAddr: "172.17.189.140:27080",
  193. //MongodbAddr: "127.0.0.1:27083",
  194. Size: 10,
  195. DbName: "qfw",
  196. UserName: "SJZY_RWbid_ES",
  197. Password: "SJZY@B4i4D5e6S",
  198. //Direct: true,
  199. }
  200. MgoB.InitPool()
  201. ctx := context.Background()
  202. //开始滚动搜索
  203. scrollID := ""
  204. scroll := "10m"
  205. searchSource := elastic.NewSearchSource().
  206. //Query(query).
  207. Size(10000).
  208. Sort("_doc", true) //升序排序
  209. //Sort("_doc", false) //降序排序
  210. searchService := client.Scroll(index).
  211. Size(10000).
  212. Scroll(scroll).
  213. SearchSource(searchSource)
  214. res, err := searchService.Do(ctx)
  215. if err != nil {
  216. if err == io.EOF {
  217. fmt.Println("没有数据")
  218. } else {
  219. panic(err)
  220. }
  221. }
  222. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  223. fmt.Println("总数是:", res.TotalHits())
  224. total := 0
  225. for len(res.Hits.Hits) > 0 {
  226. for k, hit := range res.Hits.Hits {
  227. if k%1000 == 0 {
  228. log.Println("当前:", k)
  229. }
  230. var doc map[string]interface{}
  231. err := json.Unmarshal(hit.Source, &doc)
  232. if err != nil {
  233. log.Printf("解析文档失败:%s", err)
  234. continue
  235. }
  236. //处理查询结果
  237. portrait := PortraitData{
  238. Buyer: util.ObjToString(doc["name"]),
  239. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  240. Class: "情报_物业",
  241. }
  242. // 构建查询
  243. query := elastic.NewBoolQuery().
  244. Must(
  245. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  246. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  247. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  248. )
  249. // 创建搜索服务
  250. searchService2 := client.Search().
  251. Index("projectset"). // 替换为你的索引名称
  252. Query(query).
  253. Sort("lasttime", false). // false表示降序
  254. Size(1).
  255. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  256. // 执行查询
  257. searchResult, err := searchService2.Do(context.Background())
  258. if err != nil {
  259. log.Fatalf("Error getting response: %s", err)
  260. }
  261. // 处理结果
  262. if searchResult.Hits.TotalHits.Value > 0 {
  263. portrait.ProjectCount = searchResult.TotalHits()
  264. for _, hit := range searchResult.Hits.Hits {
  265. var doc2 map[string]interface{}
  266. err := json.Unmarshal(hit.Source, &doc2)
  267. if err != nil {
  268. log.Printf("解析文档失败:%s", err)
  269. continue
  270. }
  271. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  272. portrait.Area = util.ObjToString(doc2["area"])
  273. portrait.City = util.ObjToString(doc2["city"])
  274. }
  275. // 处理聚合结果
  276. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  277. portrait.ProjectMoney = *agg.Value
  278. } else {
  279. log.Println("Aggregation not found")
  280. }
  281. //写入MongoDB
  282. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  283. } else {
  284. continue
  285. }
  286. }
  287. total = total + len(res.Hits.Hits)
  288. scrollID = res.ScrollId
  289. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  290. log.Println("current count:", total)
  291. if err != nil {
  292. if err == io.EOF {
  293. // 滚动到最后一批数据,退出循环
  294. break
  295. }
  296. log.Println("滚动搜索失败:", err, res)
  297. break // 处理错误时退出循环
  298. }
  299. }
  300. // 在循环外调用 ClearScroll
  301. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  302. if err != nil {
  303. log.Printf("清理滚动搜索失败:%s", err)
  304. }
  305. fmt.Println("结束~~~~~~~~~~~~~~~")
  306. }
  307. // dealAllDataB 处理存量数据,协程处理全部类型
  308. func dealAllDataB() {
  309. url := "http://172.17.4.184:19908"
  310. username := "jybid"
  311. password := "Top2023_JEB01i@31"
  312. index := "buyer"
  313. // 创建 Elasticsearch 客户端
  314. client, err := elastic.NewClient(
  315. elastic.SetURL(url),
  316. elastic.SetBasicAuth(username, password),
  317. elastic.SetSniff(false),
  318. )
  319. if err != nil {
  320. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  321. }
  322. MgoB := &mongodb.MongodbSim{
  323. MongodbAddr: "172.17.189.140:27080",
  324. Size: 10,
  325. DbName: "qfw",
  326. UserName: "SJZY_RWbid_ES",
  327. Password: "SJZY@B4i4D5e6S",
  328. }
  329. MgoB.InitPool()
  330. ctx := context.Background()
  331. scroll := "10m"
  332. searchSource := elastic.NewSearchSource().
  333. Size(10000).
  334. Sort("_doc", true)
  335. searchService := client.Scroll(index).
  336. Size(10000).
  337. Scroll(scroll).
  338. SearchSource(searchSource)
  339. res, err := searchService.Do(ctx)
  340. if err != nil {
  341. if err == io.EOF {
  342. fmt.Println("没有数据")
  343. return
  344. } else {
  345. panic(err)
  346. }
  347. }
  348. fmt.Println("总数是:", res.TotalHits())
  349. var wg sync.WaitGroup
  350. ch := make(chan *elastic.SearchHit, 10000)
  351. // 并行处理结果
  352. for i := 0; i < 10; i++ {
  353. wg.Add(1)
  354. go func() {
  355. defer wg.Done()
  356. for hit := range ch {
  357. processHit(hit, client, MgoB)
  358. }
  359. }()
  360. }
  361. num := 0
  362. for len(res.Hits.Hits) > 0 {
  363. for _, hit := range res.Hits.Hits {
  364. num++
  365. ch <- hit
  366. if num%1000 == 0 {
  367. log.Println("current num:", num)
  368. }
  369. }
  370. scrollID := res.ScrollId
  371. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  372. if err != nil {
  373. if err == io.EOF {
  374. close(ch)
  375. break
  376. }
  377. log.Println("滚动搜索失败:", err, res)
  378. close(ch)
  379. break
  380. }
  381. }
  382. wg.Wait()
  383. // 在循环外调用 ClearScroll
  384. _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
  385. if err != nil {
  386. log.Printf("清理滚动搜索失败:%s", err)
  387. }
  388. fmt.Println("结束~~~~~~~~~~~~~~~")
  389. }
  390. func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  391. var doc map[string]interface{}
  392. err := json.Unmarshal(hit.Source, &doc)
  393. if err != nil {
  394. log.Printf("解析文档失败:%s", err)
  395. return
  396. }
  397. for _, v := range topInfos {
  398. portrait := PortraitData{
  399. Buyer: util.ObjToString(doc["name"]),
  400. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  401. Class: v,
  402. }
  403. query := elastic.NewBoolQuery().
  404. Must(
  405. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  406. elastic.NewTermQuery("tag_topinformation", v),
  407. )
  408. searchService2 := client.Search().
  409. Index("projectset").
  410. Query(query).
  411. Sort("lasttime", false).
  412. Size(1).
  413. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  414. searchResult, err := searchService2.Do(context.Background())
  415. if err != nil {
  416. log.Fatalf("Error getting response: %s", err)
  417. }
  418. if searchResult.Hits.TotalHits.Value > 0 {
  419. portrait.ProjectCount = searchResult.TotalHits()
  420. for _, hit := range searchResult.Hits.Hits {
  421. var doc2 map[string]interface{}
  422. err := json.Unmarshal(hit.Source, &doc2)
  423. if err != nil {
  424. log.Printf("解析文档失败:%s", err)
  425. continue
  426. }
  427. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  428. portrait.Area = util.ObjToString(doc2["area"])
  429. portrait.City = util.ObjToString(doc2["city"])
  430. }
  431. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  432. portrait.ProjectMoney = *agg.Value
  433. } else {
  434. log.Println("Aggregation not found")
  435. }
  436. MgoB.Save(portraitMgo, structToMap(portrait))
  437. }
  438. }
  439. }
  440. // processProjectData 处理项目表增量数据
  441. func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  442. for _, v := range topInfos {
  443. existsWhere := map[string]interface{}{
  444. "buyer": tmp["buyer"],
  445. "class": v,
  446. }
  447. portrait := PortraitData{
  448. Buyer: util.ObjToString(tmp["buyer"]),
  449. BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
  450. Class: v,
  451. }
  452. query := elastic.NewBoolQuery().
  453. Must(
  454. elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
  455. elastic.NewTermQuery("tag_topinformation", v),
  456. )
  457. searchService2 := client.Search().
  458. Index("projectset").
  459. Query(query).
  460. Sort("lasttime", false).
  461. Size(1).
  462. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  463. searchResult, err := searchService2.Do(context.Background())
  464. if err != nil {
  465. log.Fatalf("Error getting response: %s", err)
  466. }
  467. if searchResult.Hits.TotalHits.Value > 0 {
  468. portrait.ProjectCount = searchResult.TotalHits()
  469. for _, hit := range searchResult.Hits.Hits {
  470. var doc2 map[string]interface{}
  471. err = json.Unmarshal(hit.Source, &doc2)
  472. if err != nil {
  473. log.Printf("解析文档失败:%s", err)
  474. continue
  475. }
  476. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  477. portrait.Area = util.ObjToString(doc2["area"])
  478. portrait.City = util.ObjToString(doc2["city"])
  479. }
  480. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  481. portrait.ProjectMoney = *agg.Value
  482. } else {
  483. log.Println("Aggregation not found")
  484. }
  485. portrait.UpdateTime = time.Now().Unix()
  486. //
  487. exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
  488. if exist != nil && len(*exist) > 0 {
  489. // 存在只更新部分内容
  490. update := map[string]interface{}{
  491. "lasttime": portrait.Lasttime,
  492. "project_money": portrait.ProjectMoney,
  493. "project_count": portrait.ProjectCount,
  494. "update_time": portrait.UpdateTime,
  495. }
  496. if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
  497. id := mongodb.BsonIdToSId((*exist)["_id"])
  498. MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
  499. }
  500. } else {
  501. //不存在直接保存
  502. MgoB.Save(portraitMgo, structToMap(portrait))
  503. }
  504. }
  505. }
  506. }
  507. // incDataEs 增量数据处理生索引
  508. func incDataEs() {
  509. now := time.Now()
  510. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  511. where := map[string]interface{}{
  512. "update_time": map[string]interface{}{
  513. "$gte": today.Unix(),
  514. },
  515. }
  516. defer util.Catch()
  517. sess := Mgo.GetMgoConn()
  518. defer Mgo.DestoryMongoConn(sess)
  519. count := 0
  520. it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
  521. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  522. if count%1000 == 0 {
  523. log.Println("current:", count)
  524. }
  525. id := mongodb.BsonIdToSId(tmp["_id"])
  526. tmp["id"] = id
  527. tmp["_id"] = id
  528. delete(tmp, "update_time")
  529. saveEsPool <- tmp
  530. }
  531. log.Println("数据处理完毕")
  532. }
  533. // allDataEs 处理存量数据到es
  534. func allDataEs() {
  535. MgoB := &mongodb.MongodbSim{
  536. MongodbAddr: "172.17.189.140:27080",
  537. //MongodbAddr: "127.0.0.1:27083",
  538. Size: 10,
  539. DbName: "qfw",
  540. UserName: "SJZY_RWbid_ES",
  541. Password: "SJZY@B4i4D5e6S",
  542. //Direct: true,
  543. }
  544. MgoB.InitPool()
  545. // 测试环境
  546. //MgoB := &mongodb.MongodbSim{
  547. // MongodbAddr: "192.168.3.206:27002",
  548. // Size: 10,
  549. // DbName: "qfw_data",
  550. // UserName: "root",
  551. // Password: "root",
  552. // //Direct: true,
  553. //}
  554. //MgoB.InitPool()
  555. defer util.Catch()
  556. sess := MgoB.GetMgoConn()
  557. defer MgoB.DestoryMongoConn(sess)
  558. count := 0
  559. it := sess.DB(MgoB.DbName).C(portraitMgo).Find(nil).Select(nil).Iter()
  560. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  561. if count%5000 == 0 {
  562. log.Println("current:", count, tmp["_id"])
  563. }
  564. id := mongodb.BsonIdToSId(tmp["_id"])
  565. tmp["id"] = id
  566. tmp["_id"] = id
  567. saveEsPool <- tmp
  568. }
  569. log.Println("数据处理完毕")
  570. }
  571. func SaveEsMethod() {
  572. arru := make([]map[string]interface{}, EsBulkSize)
  573. indexu := 0
  574. for {
  575. select {
  576. case v := <-saveEsPool:
  577. arru[indexu] = v
  578. indexu++
  579. if indexu == EsBulkSize {
  580. saveEsSp <- true
  581. go func(arru []map[string]interface{}) {
  582. defer func() {
  583. <-saveEsSp
  584. }()
  585. Es.BulkSave(portraitIndex, arru)
  586. }(arru)
  587. arru = make([]map[string]interface{}, EsBulkSize)
  588. indexu = 0
  589. }
  590. case <-time.After(1000 * time.Millisecond):
  591. if indexu > 0 {
  592. saveEsSp <- true
  593. go func(arru []map[string]interface{}) {
  594. defer func() {
  595. <-saveEsSp
  596. }()
  597. Es.BulkSave(portraitIndex, arru)
  598. }(arru[:indexu])
  599. arru = make([]map[string]interface{}, EsBulkSize)
  600. indexu = 0
  601. }
  602. }
  603. }
  604. }