main.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "github.com/spf13/viper"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  18. saveEsSp = make(chan bool, 5)
  19. EsBulkSize = 50
  20. Es *es.Elastic
  21. Mgo *mongodb.MongodbSim //bidding 地址
  22. MgoP *mongodb.MongodbSim // 项目地址
  23. portraitIndex = "project_portrait_v1" // 画像索引
  24. portraitMgo = "project_portrait" // MongoDB 的表名
  25. GF GlobalConf
  26. // 情报分类一级标签
  27. topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁"}
  28. )
  29. type PortraitData struct {
  30. Buyer string `json:"buyer"`
  31. Area string `json:"area"`
  32. City string `json:"city"`
  33. Class string `json:"class"`
  34. BusinessType string `json:"business_type"`
  35. Lasttime int64 `json:"lasttime"`
  36. ProjectCount int64 `json:"project_count"`
  37. ProjectMoney float64 `json:"project_money"`
  38. UpdateTime int64 `json:"update_time"`
  39. }
  40. func InitConfig() (err error) {
  41. viper.SetConfigFile("config.toml") // 指定配置文件路径
  42. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  43. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  44. viper.AddConfigPath("./")
  45. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  46. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  47. err = viper.ReadInConfig() // 查找并读取配置文件
  48. if err != nil { // 处理读取配置文件的错误
  49. return
  50. }
  51. err = viper.Unmarshal(&GF)
  52. return err
  53. }
  54. func Init() {
  55. InitConfig()
  56. // 正式环境
  57. Es = &es.Elastic{
  58. //S_esurl: "http://127.0.0.1:19908",
  59. S_esurl: GF.Es.URL,
  60. I_size: 5,
  61. Username: GF.Es.Username,
  62. Password: GF.Es.Password,
  63. }
  64. Es.InitElasticSize()
  65. //测试环境
  66. //Es = &es.Elastic{
  67. // S_esurl: "http://192.168.3.149:9201",
  68. // I_size: 5,
  69. // Username: "",
  70. // Password: "",
  71. //}
  72. //Es.InitElasticSize()
  73. // bidding 地址
  74. Mgo = &mongodb.MongodbSim{
  75. MongodbAddr: GF.Mongo.Host,
  76. //MongodbAddr: "127.0.0.1:27083",
  77. Size: 10,
  78. DbName: GF.Mongo.DB,
  79. UserName: GF.Mongo.Username,
  80. Password: GF.Mongo.Password,
  81. Direct: GF.Mongo.Direct,
  82. }
  83. Mgo.InitPool()
  84. // 抽取库项目表
  85. MgoP = &mongodb.MongodbSim{
  86. MongodbAddr: GF.Mongop.Host,
  87. Size: 10,
  88. DbName: GF.Mongop.DB,
  89. UserName: GF.Mongop.Username,
  90. Password: GF.Mongop.Password,
  91. Direct: GF.Mongop.Direct,
  92. }
  93. MgoP.InitPool()
  94. portraitIndex = GF.Env.PortraitIndex
  95. portraitMgo = GF.Env.PortraitMgo
  96. if portraitIndex == "" || portraitMgo == "" {
  97. log.Fatalln("画像索引或者MongoDB数据表为空")
  98. }
  99. }
  100. func main() {
  101. //Init()
  102. Es = &es.Elastic{
  103. S_esurl: "http://192.168.3.149:9201",
  104. I_size: 5,
  105. Username: "",
  106. Password: "",
  107. }
  108. Es.InitElasticSize()
  109. go SaveEsMethod() // 生索引
  110. //dealAllDataB()
  111. allDataEs()
  112. ////定时任务
  113. //local, _ := time.LoadLocation("Asia/Shanghai")
  114. //c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  115. //_, err := c.AddFunc(GF.Env.Spec, dealIncData)
  116. //if err != nil {
  117. // log.Println("AddFunc err", err)
  118. //}
  119. //
  120. //c.Start()
  121. //defer c.Stop()
  122. select {}
  123. }
  124. // dealIncData 处理增量数据
  125. func dealIncData() {
  126. now := time.Now()
  127. yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
  128. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  129. where := map[string]interface{}{
  130. "pici": map[string]interface{}{
  131. "$gt": yesterday.Unix(),
  132. "$lte": today.Unix(),
  133. },
  134. }
  135. url := GF.Es.URL
  136. username := GF.Es.Username
  137. password := GF.Es.Password
  138. // 创建 Elasticsearch 客户端
  139. client, err := elastic.NewClient(
  140. elastic.SetURL(url),
  141. elastic.SetBasicAuth(username, password),
  142. elastic.SetSniff(false),
  143. )
  144. if err != nil {
  145. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  146. }
  147. defer util.Catch()
  148. sess := MgoP.GetMgoConn()
  149. defer MgoP.DestoryMongoConn(sess)
  150. var wg sync.WaitGroup
  151. ch := make(chan map[string]interface{}, 10000)
  152. // 并行处理结果
  153. for i := 0; i < 2; i++ {
  154. wg.Add(1)
  155. go func() {
  156. defer wg.Done()
  157. for hit := range ch {
  158. processProjectData(hit, client, Mgo)
  159. }
  160. }()
  161. }
  162. count := 0
  163. it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
  164. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  165. if count%5000 == 0 {
  166. log.Println("current:", count, tmp["buyer"])
  167. }
  168. // 没有采购单位
  169. if util.ObjToString(tmp["buyer"]) == "" {
  170. continue
  171. }
  172. ch <- tmp
  173. }
  174. close(ch) // 关闭通道
  175. wg.Wait()
  176. //处理完毕生索引
  177. incDataEs()
  178. log.Println("增量数据处理完毕")
  179. }
  180. // dealAllData 处理存量数据,
  181. func dealAllData() {
  182. /**
  183. 循环采购单位存量数据,
  184. */
  185. url := "http://172.17.4.184:19908"
  186. //url := "http://127.0.0.1:19908"
  187. username := "jybid"
  188. password := "Top2023_JEB01i@31"
  189. index := "buyer" //索引名称
  190. //index := "projectset" //索引名称
  191. // 创建 Elasticsearch 客户端
  192. client, err := elastic.NewClient(
  193. elastic.SetURL(url),
  194. elastic.SetBasicAuth(username, password),
  195. elastic.SetSniff(false),
  196. )
  197. if err != nil {
  198. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  199. }
  200. MgoB := &mongodb.MongodbSim{
  201. MongodbAddr: "172.17.189.140:27080",
  202. //MongodbAddr: "127.0.0.1:27083",
  203. Size: 10,
  204. DbName: "qfw",
  205. UserName: "SJZY_RWbid_ES",
  206. Password: "SJZY@B4i4D5e6S",
  207. //Direct: true,
  208. }
  209. MgoB.InitPool()
  210. ctx := context.Background()
  211. //开始滚动搜索
  212. scrollID := ""
  213. scroll := "10m"
  214. searchSource := elastic.NewSearchSource().
  215. //Query(query).
  216. Size(10000).
  217. Sort("_doc", true) //升序排序
  218. //Sort("_doc", false) //降序排序
  219. searchService := client.Scroll(index).
  220. Size(10000).
  221. Scroll(scroll).
  222. SearchSource(searchSource)
  223. res, err := searchService.Do(ctx)
  224. if err != nil {
  225. if err == io.EOF {
  226. fmt.Println("没有数据")
  227. } else {
  228. panic(err)
  229. }
  230. }
  231. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  232. fmt.Println("总数是:", res.TotalHits())
  233. total := 0
  234. for len(res.Hits.Hits) > 0 {
  235. for k, hit := range res.Hits.Hits {
  236. if k%1000 == 0 {
  237. log.Println("当前:", k)
  238. }
  239. var doc map[string]interface{}
  240. err := json.Unmarshal(hit.Source, &doc)
  241. if err != nil {
  242. log.Printf("解析文档失败:%s", err)
  243. continue
  244. }
  245. //处理查询结果
  246. portrait := PortraitData{
  247. Buyer: util.ObjToString(doc["name"]),
  248. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  249. Class: "情报_物业",
  250. }
  251. // 构建查询
  252. query := elastic.NewBoolQuery().
  253. Must(
  254. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  255. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  256. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  257. )
  258. // 创建搜索服务
  259. searchService2 := client.Search().
  260. Index("projectset"). // 替换为你的索引名称
  261. Query(query).
  262. Sort("lasttime", false). // false表示降序
  263. Size(1).
  264. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  265. // 执行查询
  266. searchResult, err := searchService2.Do(context.Background())
  267. if err != nil {
  268. log.Fatalf("Error getting response: %s", err)
  269. }
  270. // 处理结果
  271. if searchResult.Hits.TotalHits.Value > 0 {
  272. portrait.ProjectCount = searchResult.TotalHits()
  273. for _, hit := range searchResult.Hits.Hits {
  274. var doc2 map[string]interface{}
  275. err := json.Unmarshal(hit.Source, &doc2)
  276. if err != nil {
  277. log.Printf("解析文档失败:%s", err)
  278. continue
  279. }
  280. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  281. portrait.Area = util.ObjToString(doc2["area"])
  282. portrait.City = util.ObjToString(doc2["city"])
  283. }
  284. // 处理聚合结果
  285. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  286. portrait.ProjectMoney = *agg.Value
  287. } else {
  288. log.Println("Aggregation not found")
  289. }
  290. //写入MongoDB
  291. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  292. } else {
  293. continue
  294. }
  295. }
  296. total = total + len(res.Hits.Hits)
  297. scrollID = res.ScrollId
  298. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  299. log.Println("current count:", total)
  300. if err != nil {
  301. if err == io.EOF {
  302. // 滚动到最后一批数据,退出循环
  303. break
  304. }
  305. log.Println("滚动搜索失败:", err, res)
  306. break // 处理错误时退出循环
  307. }
  308. }
  309. // 在循环外调用 ClearScroll
  310. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  311. if err != nil {
  312. log.Printf("清理滚动搜索失败:%s", err)
  313. }
  314. fmt.Println("结束~~~~~~~~~~~~~~~")
  315. }
  316. // dealAllDataB 处理存量数据,协程处理全部类型
  317. func dealAllDataB() {
  318. //url := "http://172.17.4.184:19908"
  319. //username := "jybid"
  320. //password := "Top2023_JEB01i@31"
  321. //index := "buyer"
  322. url := "http://192.168.3.149:9201"
  323. username := ""
  324. password := ""
  325. index := "buyer"
  326. // 创建 Elasticsearch 客户端
  327. client, err := elastic.NewClient(
  328. elastic.SetURL(url),
  329. elastic.SetBasicAuth(username, password),
  330. elastic.SetSniff(false),
  331. )
  332. if err != nil {
  333. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  334. }
  335. //MgoB := &mongodb.MongodbSim{
  336. // MongodbAddr: "172.17.189.140:27080",
  337. // Size: 10,
  338. // DbName: "qfw",
  339. // UserName: "SJZY_RWbid_ES",
  340. // Password: "SJZY@B4i4D5e6S",
  341. //}
  342. //MgoB.InitPool()
  343. MgoB := &mongodb.MongodbSim{
  344. MongodbAddr: "192.168.3.206:27002",
  345. Size: 10,
  346. DbName: "qfw_data",
  347. UserName: "root",
  348. Password: "root",
  349. }
  350. MgoB.InitPool()
  351. ctx := context.Background()
  352. scroll := "10m"
  353. searchSource := elastic.NewSearchSource().
  354. Size(10000).
  355. Sort("_doc", true)
  356. searchService := client.Scroll(index).
  357. Size(10000).
  358. Scroll(scroll).
  359. SearchSource(searchSource)
  360. res, err := searchService.Do(ctx)
  361. if err != nil {
  362. if err == io.EOF {
  363. fmt.Println("没有数据")
  364. return
  365. } else {
  366. panic(err)
  367. }
  368. }
  369. fmt.Println("总数是:", res.TotalHits())
  370. var wg sync.WaitGroup
  371. ch := make(chan *elastic.SearchHit, 10000)
  372. // 并行处理结果
  373. for i := 0; i < 10; i++ {
  374. wg.Add(1)
  375. go func() {
  376. defer wg.Done()
  377. for hit := range ch {
  378. processHit(hit, client, MgoB)
  379. }
  380. }()
  381. }
  382. num := 0
  383. for len(res.Hits.Hits) > 0 {
  384. for _, hit := range res.Hits.Hits {
  385. num++
  386. ch <- hit
  387. if num%1000 == 0 {
  388. log.Println("current num:", num)
  389. }
  390. }
  391. scrollID := res.ScrollId
  392. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  393. if err != nil {
  394. if err == io.EOF {
  395. close(ch)
  396. break
  397. }
  398. log.Println("滚动搜索失败:", err, res)
  399. close(ch)
  400. break
  401. }
  402. }
  403. wg.Wait()
  404. // 在循环外调用 ClearScroll
  405. _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
  406. if err != nil {
  407. log.Printf("清理滚动搜索失败:%s", err)
  408. }
  409. fmt.Println("结束~~~~~~~~~~~~~~~")
  410. }
  411. func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  412. var doc map[string]interface{}
  413. err := json.Unmarshal(hit.Source, &doc)
  414. if err != nil {
  415. log.Printf("解析文档失败:%s", err)
  416. return
  417. }
  418. for _, v := range topInfos {
  419. portrait := PortraitData{
  420. Buyer: util.ObjToString(doc["name"]),
  421. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  422. Class: v,
  423. }
  424. query := elastic.NewBoolQuery().
  425. Must(
  426. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  427. elastic.NewTermQuery("tag_topinformation", v),
  428. )
  429. searchService2 := client.Search().
  430. Index("projectset").
  431. Query(query).
  432. Sort("lasttime", false).
  433. Size(1).
  434. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  435. searchResult, err := searchService2.Do(context.Background())
  436. if err != nil {
  437. log.Fatalf("Error getting response: %s", err)
  438. }
  439. if searchResult.Hits.TotalHits.Value > 0 {
  440. portrait.ProjectCount = searchResult.TotalHits()
  441. for _, hit := range searchResult.Hits.Hits {
  442. var doc2 map[string]interface{}
  443. err := json.Unmarshal(hit.Source, &doc2)
  444. if err != nil {
  445. log.Printf("解析文档失败:%s", err)
  446. continue
  447. }
  448. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  449. portrait.Area = util.ObjToString(doc2["area"])
  450. portrait.City = util.ObjToString(doc2["city"])
  451. }
  452. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  453. portrait.ProjectMoney = *agg.Value
  454. } else {
  455. log.Println("Aggregation not found")
  456. }
  457. MgoB.Save("project_portrait", structToMap(portrait))
  458. }
  459. }
  460. }
  461. // processProjectData 处理项目表增量数据
  462. func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  463. for _, v := range topInfos {
  464. existsWhere := map[string]interface{}{
  465. "buyer": tmp["buyer"],
  466. "class": v,
  467. }
  468. portrait := PortraitData{
  469. Buyer: util.ObjToString(tmp["buyer"]),
  470. BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
  471. Class: v,
  472. }
  473. query := elastic.NewBoolQuery().
  474. Must(
  475. elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
  476. elastic.NewTermQuery("tag_topinformation", v),
  477. )
  478. searchService2 := client.Search().
  479. Index("projectset").
  480. Query(query).
  481. Sort("lasttime", false).
  482. Size(1).
  483. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  484. searchResult, err := searchService2.Do(context.Background())
  485. if err != nil {
  486. log.Fatalf("Error getting response: %s", err)
  487. }
  488. if searchResult.Hits.TotalHits.Value > 0 {
  489. portrait.ProjectCount = searchResult.TotalHits()
  490. for _, hit := range searchResult.Hits.Hits {
  491. var doc2 map[string]interface{}
  492. err = json.Unmarshal(hit.Source, &doc2)
  493. if err != nil {
  494. log.Printf("解析文档失败:%s", err)
  495. continue
  496. }
  497. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  498. portrait.Area = util.ObjToString(doc2["area"])
  499. portrait.City = util.ObjToString(doc2["city"])
  500. }
  501. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  502. portrait.ProjectMoney = *agg.Value
  503. } else {
  504. log.Println("Aggregation not found")
  505. }
  506. portrait.UpdateTime = time.Now().Unix()
  507. //
  508. exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
  509. if exist != nil && len(*exist) > 0 {
  510. // 存在只更新部分内容
  511. update := map[string]interface{}{
  512. "lasttime": portrait.Lasttime,
  513. "project_money": portrait.ProjectMoney,
  514. "project_count": portrait.ProjectCount,
  515. "update_time": portrait.UpdateTime,
  516. }
  517. if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
  518. id := mongodb.BsonIdToSId((*exist)["_id"])
  519. MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
  520. }
  521. } else {
  522. //不存在直接保存
  523. MgoB.Save(portraitMgo, structToMap(portrait))
  524. }
  525. }
  526. }
  527. }
  528. // incDataEs 增量数据处理生索引
  529. func incDataEs() {
  530. now := time.Now()
  531. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  532. where := map[string]interface{}{
  533. "update_time": map[string]interface{}{
  534. "$gte": today.Unix(),
  535. },
  536. }
  537. defer util.Catch()
  538. sess := Mgo.GetMgoConn()
  539. defer Mgo.DestoryMongoConn(sess)
  540. count := 0
  541. it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
  542. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  543. if count%1000 == 0 {
  544. log.Println("current:", count)
  545. }
  546. id := mongodb.BsonIdToSId(tmp["_id"])
  547. tmp["id"] = id
  548. tmp["_id"] = id
  549. delete(tmp, "update_time")
  550. saveEsPool <- tmp
  551. }
  552. log.Println("数据处理完毕")
  553. }
  554. // allDataEs 处理存量数据到es
  555. func allDataEs() {
  556. //MgoB := &mongodb.MongodbSim{
  557. // MongodbAddr: "172.17.189.140:27080",
  558. // //MongodbAddr: "127.0.0.1:27083",
  559. // Size: 10,
  560. // DbName: "qfw",
  561. // UserName: "SJZY_RWbid_ES",
  562. // Password: "SJZY@B4i4D5e6S",
  563. // //Direct: true,
  564. //}
  565. //MgoB.InitPool()
  566. // 测试环境
  567. MgoB := &mongodb.MongodbSim{
  568. MongodbAddr: "192.168.3.206:27002",
  569. Size: 10,
  570. DbName: "qfw_data",
  571. UserName: "root",
  572. Password: "root",
  573. //Direct: true,
  574. }
  575. MgoB.InitPool()
  576. defer util.Catch()
  577. sess := MgoB.GetMgoConn()
  578. defer MgoB.DestoryMongoConn(sess)
  579. count := 0
  580. it := sess.DB(MgoB.DbName).C(portraitMgo).Find(nil).Select(nil).Iter()
  581. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  582. if count%5000 == 0 {
  583. log.Println("current:", count, tmp["_id"])
  584. }
  585. id := mongodb.BsonIdToSId(tmp["_id"])
  586. tmp["id"] = id
  587. tmp["_id"] = id
  588. saveEsPool <- tmp
  589. }
  590. log.Println("数据处理完毕")
  591. }
  592. func SaveEsMethod() {
  593. arru := make([]map[string]interface{}, EsBulkSize)
  594. indexu := 0
  595. for {
  596. select {
  597. case v := <-saveEsPool:
  598. arru[indexu] = v
  599. indexu++
  600. if indexu == EsBulkSize {
  601. saveEsSp <- true
  602. go func(arru []map[string]interface{}) {
  603. defer func() {
  604. <-saveEsSp
  605. }()
  606. Es.BulkSave(portraitIndex, arru)
  607. }(arru)
  608. arru = make([]map[string]interface{}, EsBulkSize)
  609. indexu = 0
  610. }
  611. case <-time.After(1000 * time.Millisecond):
  612. if indexu > 0 {
  613. saveEsSp <- true
  614. go func(arru []map[string]interface{}) {
  615. defer func() {
  616. <-saveEsSp
  617. }()
  618. Es.BulkSave(portraitIndex, arru)
  619. }(arru[:indexu])
  620. arru = make([]map[string]interface{}, EsBulkSize)
  621. indexu = 0
  622. }
  623. }
  624. }
  625. }