main.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "github.com/robfig/cron/v3"
  8. "github.com/spf13/viper"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "log"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  19. saveEsSp = make(chan bool, 5)
  20. EsBulkSize = 50
  21. Es *es.Elastic
  22. Mgo *mongodb.MongodbSim //bidding 地址
  23. MgoP *mongodb.MongodbSim // 项目地址
  24. portraitIndex = "" // 画像索引
  25. portraitMgo = "" // MongoDB 的表名
  26. GF GlobalConf
  27. // 情报分类一级标签
  28. topInfos = []string{"情报_物业", "情报_环境采购", "情报_印务商机", "情报_家具招投标", "情报_车辆租赁"}
  29. )
  30. type PortraitData struct {
  31. Buyer string `json:"buyer"`
  32. Area string `json:"area"`
  33. City string `json:"city"`
  34. Class string `json:"class"`
  35. BusinessType string `json:"business_type"`
  36. Lasttime int64 `json:"lasttime"`
  37. ProjectCount int64 `json:"project_count"`
  38. ProjectMoney float64 `json:"project_money"`
  39. UpdateTime int64 `json:"update_time"`
  40. }
  41. func InitConfig() (err error) {
  42. viper.SetConfigFile("config.toml") // 指定配置文件路径
  43. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  44. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  45. viper.AddConfigPath("./")
  46. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  47. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  48. err = viper.ReadInConfig() // 查找并读取配置文件
  49. if err != nil { // 处理读取配置文件的错误
  50. return
  51. }
  52. err = viper.Unmarshal(&GF)
  53. return err
  54. }
  55. func Init() {
  56. InitConfig()
  57. // 正式环境
  58. Es = &es.Elastic{
  59. //S_esurl: "http://127.0.0.1:19908",
  60. S_esurl: GF.Es.URL,
  61. I_size: 5,
  62. Username: GF.Es.Username,
  63. Password: GF.Es.Password,
  64. }
  65. Es.InitElasticSize()
  66. //测试环境
  67. //Es = &es.Elastic{
  68. // S_esurl: "http://192.168.3.149:9201",
  69. // I_size: 5,
  70. // Username: "",
  71. // Password: "",
  72. //}
  73. //Es.InitElasticSize()
  74. // bidding 地址
  75. Mgo = &mongodb.MongodbSim{
  76. MongodbAddr: GF.Mongo.Host,
  77. //MongodbAddr: "127.0.0.1:27083",
  78. Size: 10,
  79. DbName: GF.Mongo.DB,
  80. UserName: GF.Mongo.Username,
  81. Password: GF.Mongo.Password,
  82. Direct: GF.Mongo.Direct,
  83. }
  84. Mgo.InitPool()
  85. // 抽取库项目表
  86. MgoP = &mongodb.MongodbSim{
  87. MongodbAddr: GF.Mongop.Host,
  88. Size: 10,
  89. DbName: GF.Mongop.DB,
  90. UserName: GF.Mongop.Username,
  91. Password: GF.Mongop.Password,
  92. Direct: GF.Mongop.Direct,
  93. }
  94. MgoP.InitPool()
  95. portraitIndex = GF.Env.PortraitIndex
  96. portraitMgo = GF.Env.PortraitMgo
  97. if portraitIndex == "" || portraitMgo == "" {
  98. log.Fatalln("画像索引或者MongoDB数据表为空")
  99. }
  100. if GF.Env.Curs < 1 {
  101. log.Fatalln("参数 curs 并发数不能低于1")
  102. } else {
  103. log.Println("参数 curs 并发数", GF.Env.Curs)
  104. }
  105. }
  106. func main() {
  107. Init()
  108. go SaveEsMethod() // 生索引
  109. //定时任务
  110. local, _ := time.LoadLocation("Asia/Shanghai")
  111. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  112. _, err := c.AddFunc(GF.Env.Spec, dealIncData)
  113. if err != nil {
  114. log.Println("AddFunc err", err)
  115. }
  116. c.Start()
  117. defer c.Stop()
  118. select {}
  119. }
  120. // dealIncData 处理增量数据
  121. func dealIncData() {
  122. now := time.Now()
  123. yesterday := time.Date(now.Year(), now.Month(), now.Day()+GF.Env.Days, 0, 0, 0, 0, now.Location())
  124. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  125. where := map[string]interface{}{
  126. "pici": map[string]interface{}{
  127. "$gt": yesterday.Unix(),
  128. "$lte": today.Unix(),
  129. },
  130. }
  131. url := GF.Es.URL
  132. username := GF.Es.Username
  133. password := GF.Es.Password
  134. // 创建 Elasticsearch 客户端
  135. client, err := elastic.NewClient(
  136. elastic.SetURL(url),
  137. elastic.SetBasicAuth(username, password),
  138. elastic.SetSniff(false),
  139. )
  140. if err != nil {
  141. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  142. }
  143. defer util.Catch()
  144. sess := MgoP.GetMgoConn()
  145. defer MgoP.DestoryMongoConn(sess)
  146. var wg sync.WaitGroup
  147. ch := make(chan map[string]interface{}, 10000)
  148. // 并行处理结果
  149. for i := 0; i < GF.Env.Curs; i++ {
  150. wg.Add(1)
  151. go func() {
  152. defer wg.Done()
  153. for hit := range ch {
  154. processProjectData(hit, client, Mgo)
  155. }
  156. }()
  157. }
  158. count := 0
  159. it := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(where).Select(nil).Iter()
  160. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  161. if count%5000 == 0 {
  162. log.Println("current:", count, tmp["buyer"])
  163. }
  164. // 没有采购单位
  165. if util.ObjToString(tmp["buyer"]) == "" {
  166. continue
  167. }
  168. ch <- tmp
  169. }
  170. close(ch) // 关闭通道
  171. wg.Wait()
  172. //处理完毕生索引
  173. incDataEs()
  174. log.Println("增量数据处理完毕")
  175. }
  176. // dealAllData 处理存量数据,
  177. func dealAllData() {
  178. /**
  179. 循环采购单位存量数据,
  180. */
  181. url := "http://172.17.4.184:19908"
  182. //url := "http://127.0.0.1:19908"
  183. username := "jybid"
  184. password := "Top2023_JEB01i@31"
  185. index := "buyer" //索引名称
  186. //index := "projectset" //索引名称
  187. // 创建 Elasticsearch 客户端
  188. client, err := elastic.NewClient(
  189. elastic.SetURL(url),
  190. elastic.SetBasicAuth(username, password),
  191. elastic.SetSniff(false),
  192. )
  193. if err != nil {
  194. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  195. }
  196. MgoB := &mongodb.MongodbSim{
  197. MongodbAddr: "172.17.189.140:27080",
  198. //MongodbAddr: "127.0.0.1:27083",
  199. Size: 10,
  200. DbName: "qfw",
  201. UserName: "SJZY_RWbid_ES",
  202. Password: "SJZY@B4i4D5e6S",
  203. //Direct: true,
  204. }
  205. MgoB.InitPool()
  206. ctx := context.Background()
  207. //开始滚动搜索
  208. scrollID := ""
  209. scroll := "10m"
  210. searchSource := elastic.NewSearchSource().
  211. //Query(query).
  212. Size(10000).
  213. Sort("_doc", true) //升序排序
  214. //Sort("_doc", false) //降序排序
  215. searchService := client.Scroll(index).
  216. Size(10000).
  217. Scroll(scroll).
  218. SearchSource(searchSource)
  219. res, err := searchService.Do(ctx)
  220. if err != nil {
  221. if err == io.EOF {
  222. fmt.Println("没有数据")
  223. } else {
  224. panic(err)
  225. }
  226. }
  227. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  228. fmt.Println("总数是:", res.TotalHits())
  229. total := 0
  230. for len(res.Hits.Hits) > 0 {
  231. for k, hit := range res.Hits.Hits {
  232. if k%1000 == 0 {
  233. log.Println("当前:", k)
  234. }
  235. var doc map[string]interface{}
  236. err := json.Unmarshal(hit.Source, &doc)
  237. if err != nil {
  238. log.Printf("解析文档失败:%s", err)
  239. continue
  240. }
  241. //处理查询结果
  242. portrait := PortraitData{
  243. Buyer: util.ObjToString(doc["name"]),
  244. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  245. Class: "情报_物业",
  246. }
  247. // 构建查询
  248. query := elastic.NewBoolQuery().
  249. Must(
  250. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  251. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  252. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  253. )
  254. // 创建搜索服务
  255. searchService2 := client.Search().
  256. Index("projectset"). // 替换为你的索引名称
  257. Query(query).
  258. Sort("lasttime", false). // false表示降序
  259. Size(1).
  260. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  261. // 执行查询
  262. searchResult, err := searchService2.Do(context.Background())
  263. if err != nil {
  264. log.Fatalf("Error getting response: %s", err)
  265. }
  266. // 处理结果
  267. if searchResult.Hits.TotalHits.Value > 0 {
  268. portrait.ProjectCount = searchResult.TotalHits()
  269. for _, hit := range searchResult.Hits.Hits {
  270. var doc2 map[string]interface{}
  271. err := json.Unmarshal(hit.Source, &doc2)
  272. if err != nil {
  273. log.Printf("解析文档失败:%s", err)
  274. continue
  275. }
  276. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  277. portrait.Area = util.ObjToString(doc2["area"])
  278. portrait.City = util.ObjToString(doc2["city"])
  279. }
  280. // 处理聚合结果
  281. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  282. portrait.ProjectMoney = *agg.Value
  283. } else {
  284. log.Println("Aggregation not found")
  285. }
  286. //写入MongoDB
  287. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  288. } else {
  289. continue
  290. }
  291. }
  292. total = total + len(res.Hits.Hits)
  293. scrollID = res.ScrollId
  294. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  295. log.Println("current count:", total)
  296. if err != nil {
  297. if err == io.EOF {
  298. // 滚动到最后一批数据,退出循环
  299. break
  300. }
  301. log.Println("滚动搜索失败:", err, res)
  302. break // 处理错误时退出循环
  303. }
  304. }
  305. // 在循环外调用 ClearScroll
  306. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  307. if err != nil {
  308. log.Printf("清理滚动搜索失败:%s", err)
  309. }
  310. fmt.Println("结束~~~~~~~~~~~~~~~")
  311. }
  312. // dealAllDataB 处理存量数据,协程处理全部类型
  313. func dealAllDataB() {
  314. url := "http://172.17.4.184:19908"
  315. username := "jybid"
  316. password := "Top2023_JEB01i@31"
  317. index := "buyer"
  318. // 创建 Elasticsearch 客户端
  319. client, err := elastic.NewClient(
  320. elastic.SetURL(url),
  321. elastic.SetBasicAuth(username, password),
  322. elastic.SetSniff(false),
  323. )
  324. if err != nil {
  325. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  326. }
  327. MgoB := &mongodb.MongodbSim{
  328. MongodbAddr: "172.17.189.140:27080",
  329. Size: 10,
  330. DbName: "qfw",
  331. UserName: "SJZY_RWbid_ES",
  332. Password: "SJZY@B4i4D5e6S",
  333. }
  334. MgoB.InitPool()
  335. ctx := context.Background()
  336. scroll := "10m"
  337. searchSource := elastic.NewSearchSource().
  338. Size(10000).
  339. Sort("_doc", true)
  340. searchService := client.Scroll(index).
  341. Size(10000).
  342. Scroll(scroll).
  343. SearchSource(searchSource)
  344. res, err := searchService.Do(ctx)
  345. if err != nil {
  346. if err == io.EOF {
  347. fmt.Println("没有数据")
  348. return
  349. } else {
  350. panic(err)
  351. }
  352. }
  353. fmt.Println("总数是:", res.TotalHits())
  354. var wg sync.WaitGroup
  355. ch := make(chan *elastic.SearchHit, 10000)
  356. // 并行处理结果
  357. for i := 0; i < 10; i++ {
  358. wg.Add(1)
  359. go func() {
  360. defer wg.Done()
  361. for hit := range ch {
  362. processHit(hit, client, MgoB)
  363. }
  364. }()
  365. }
  366. num := 0
  367. for len(res.Hits.Hits) > 0 {
  368. for _, hit := range res.Hits.Hits {
  369. num++
  370. ch <- hit
  371. if num%1000 == 0 {
  372. log.Println("current num:", num)
  373. }
  374. }
  375. scrollID := res.ScrollId
  376. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  377. if err != nil {
  378. if err == io.EOF {
  379. close(ch)
  380. break
  381. }
  382. log.Println("滚动搜索失败:", err, res)
  383. close(ch)
  384. break
  385. }
  386. }
  387. wg.Wait()
  388. // 在循环外调用 ClearScroll
  389. _, err = client.ClearScroll().ScrollId(res.ScrollId).Do(ctx)
  390. if err != nil {
  391. log.Printf("清理滚动搜索失败:%s", err)
  392. }
  393. fmt.Println("结束~~~~~~~~~~~~~~~")
  394. }
  395. func processHit(hit *elastic.SearchHit, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  396. var doc map[string]interface{}
  397. err := json.Unmarshal(hit.Source, &doc)
  398. if err != nil {
  399. log.Printf("解析文档失败:%s", err)
  400. return
  401. }
  402. for _, v := range topInfos {
  403. portrait := PortraitData{
  404. Buyer: util.ObjToString(doc["name"]),
  405. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  406. Class: v,
  407. }
  408. query := elastic.NewBoolQuery().
  409. Must(
  410. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  411. elastic.NewTermQuery("tag_topinformation", v),
  412. )
  413. searchService2 := client.Search().
  414. Index("projectset").
  415. Query(query).
  416. Sort("lasttime", false).
  417. Size(1).
  418. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  419. searchResult, err := searchService2.Do(context.Background())
  420. if err != nil {
  421. log.Fatalf("Error getting response: %s", err)
  422. }
  423. if searchResult.Hits.TotalHits.Value > 0 {
  424. portrait.ProjectCount = searchResult.TotalHits()
  425. for _, hit := range searchResult.Hits.Hits {
  426. var doc2 map[string]interface{}
  427. err := json.Unmarshal(hit.Source, &doc2)
  428. if err != nil {
  429. log.Printf("解析文档失败:%s", err)
  430. continue
  431. }
  432. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  433. portrait.Area = util.ObjToString(doc2["area"])
  434. portrait.City = util.ObjToString(doc2["city"])
  435. }
  436. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  437. portrait.ProjectMoney = *agg.Value
  438. } else {
  439. log.Println("Aggregation not found")
  440. }
  441. MgoB.Save(portraitMgo, structToMap(portrait))
  442. }
  443. }
  444. }
  445. // processProjectData 处理项目表增量数据
  446. func processProjectData(tmp map[string]interface{}, client *elastic.Client, MgoB *mongodb.MongodbSim) {
  447. for _, v := range topInfos {
  448. existsWhere := map[string]interface{}{
  449. "buyer": tmp["buyer"],
  450. "class": v,
  451. }
  452. portrait := PortraitData{
  453. Buyer: util.ObjToString(tmp["buyer"]),
  454. BusinessType: getStr(util.ObjToString(tmp["buyerclass"])),
  455. Class: v,
  456. }
  457. query := elastic.NewBoolQuery().
  458. Must(
  459. elastic.NewTermQuery("buyer", util.ObjToString(tmp["buyer"])),
  460. elastic.NewTermQuery("tag_topinformation", v),
  461. )
  462. searchService2 := client.Search().
  463. Index("projectset").
  464. Query(query).
  465. Sort("lasttime", false).
  466. Size(1).
  467. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  468. searchResult, err := searchService2.Do(context.Background())
  469. if err != nil {
  470. log.Fatalf("Error getting response: %s", err)
  471. }
  472. if searchResult.Hits.TotalHits.Value > 0 {
  473. portrait.ProjectCount = searchResult.TotalHits()
  474. for _, hit := range searchResult.Hits.Hits {
  475. var doc2 map[string]interface{}
  476. err = json.Unmarshal(hit.Source, &doc2)
  477. if err != nil {
  478. log.Printf("解析文档失败:%s", err)
  479. continue
  480. }
  481. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  482. portrait.Area = util.ObjToString(doc2["area"])
  483. portrait.City = util.ObjToString(doc2["city"])
  484. }
  485. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  486. portrait.ProjectMoney = *agg.Value
  487. } else {
  488. log.Println("Aggregation not found")
  489. }
  490. portrait.UpdateTime = time.Now().Unix()
  491. //
  492. exist, _ := MgoB.FindOne(portraitMgo, existsWhere)
  493. if exist != nil && len(*exist) > 0 {
  494. // 存在只更新部分内容
  495. update := map[string]interface{}{
  496. "lasttime": portrait.Lasttime,
  497. "project_money": portrait.ProjectMoney,
  498. "project_count": portrait.ProjectCount,
  499. "update_time": portrait.UpdateTime,
  500. }
  501. if util.Int64All((*exist)["project_count"]) != portrait.ProjectCount || util.Int64All((*exist)["lasttime"]) != portrait.Lasttime || util.Float64All((*exist)["project_money"]) != portrait.ProjectMoney {
  502. id := mongodb.BsonIdToSId((*exist)["_id"])
  503. MgoB.UpdateById(portraitMgo, id, map[string]interface{}{"$set": update})
  504. }
  505. } else {
  506. //不存在直接保存
  507. MgoB.Save(portraitMgo, structToMap(portrait))
  508. }
  509. }
  510. }
  511. }
  512. // incDataEs 增量数据处理生索引
  513. func incDataEs() {
  514. now := time.Now()
  515. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  516. where := map[string]interface{}{
  517. "update_time": map[string]interface{}{
  518. "$gte": today.Unix(),
  519. },
  520. }
  521. defer util.Catch()
  522. sess := Mgo.GetMgoConn()
  523. defer Mgo.DestoryMongoConn(sess)
  524. count := 0
  525. it := sess.DB("qfw").C(portraitMgo).Find(where).Select(nil).Iter()
  526. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  527. if count%1000 == 0 {
  528. log.Println("current:", count)
  529. }
  530. id := mongodb.BsonIdToSId(tmp["_id"])
  531. tmp["id"] = id
  532. tmp["_id"] = id
  533. delete(tmp, "update_time")
  534. saveEsPool <- tmp
  535. }
  536. log.Println("数据处理完毕")
  537. }
  538. // allDataEs 处理存量数据到es
  539. func allDataEs() {
  540. MgoB := &mongodb.MongodbSim{
  541. MongodbAddr: "172.17.189.140:27080",
  542. //MongodbAddr: "127.0.0.1:27083",
  543. Size: 10,
  544. DbName: "qfw",
  545. UserName: "SJZY_RWbid_ES",
  546. Password: "SJZY@B4i4D5e6S",
  547. //Direct: true,
  548. }
  549. MgoB.InitPool()
  550. // 测试环境
  551. //MgoB := &mongodb.MongodbSim{
  552. // MongodbAddr: "192.168.3.206:27002",
  553. // Size: 10,
  554. // DbName: "qfw_data",
  555. // UserName: "root",
  556. // Password: "root",
  557. // //Direct: true,
  558. //}
  559. //MgoB.InitPool()
  560. defer util.Catch()
  561. sess := MgoB.GetMgoConn()
  562. defer MgoB.DestoryMongoConn(sess)
  563. count := 0
  564. it := sess.DB(MgoB.DbName).C(portraitMgo).Find(nil).Select(nil).Iter()
  565. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  566. if count%5000 == 0 {
  567. log.Println("current:", count, tmp["_id"])
  568. }
  569. id := mongodb.BsonIdToSId(tmp["_id"])
  570. tmp["id"] = id
  571. tmp["_id"] = id
  572. saveEsPool <- tmp
  573. }
  574. log.Println("数据处理完毕")
  575. }
  576. func SaveEsMethod() {
  577. arru := make([]map[string]interface{}, EsBulkSize)
  578. indexu := 0
  579. for {
  580. select {
  581. case v := <-saveEsPool:
  582. arru[indexu] = v
  583. indexu++
  584. if indexu == EsBulkSize {
  585. saveEsSp <- true
  586. go func(arru []map[string]interface{}) {
  587. defer func() {
  588. <-saveEsSp
  589. }()
  590. Es.BulkSave(portraitIndex, arru)
  591. }(arru)
  592. arru = make([]map[string]interface{}, EsBulkSize)
  593. indexu = 0
  594. }
  595. case <-time.After(1000 * time.Millisecond):
  596. if indexu > 0 {
  597. saveEsSp <- true
  598. go func(arru []map[string]interface{}) {
  599. defer func() {
  600. <-saveEsSp
  601. }()
  602. Es.BulkSave(portraitIndex, arru)
  603. }(arru[:indexu])
  604. arru = make([]map[string]interface{}, EsBulkSize)
  605. indexu = 0
  606. }
  607. }
  608. }
  609. }