main.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "strings"
  9. // "app.yhyue.com/moapp/jybase/common"
  10. "time"
  11. elastic "app.yhyue.com/moapp/jybase/es"
  12. esV7 "github.com/olivere/elastic/v7"
  13. )
  14. var (
  15. Es elastic.Es
  16. Mgo *mongodb.MongodbSim
  17. )
  18. func init() {
  19. Es = elastic.NewEs("v7", "http://172.17.4.184:19905", 20, "jybid", "Top2023_JEB01i@31")
  20. //Mgo = &mongodb.MongodbSim{
  21. // //MongodbAddr: "127.0.0.1:27080",
  22. // MongodbAddr: "172.17.4.85:27080",
  23. // DbName: "top",
  24. // Size: 10,
  25. // //Direct: true,
  26. //}
  27. //Mgo.InitPool()
  28. Mgo = &mongodb.MongodbSim{
  29. MongodbAddr: "172.17.189.140:27080",
  30. //MongodbAddr: "127.0.0.1:27083",
  31. Size: 10,
  32. DbName: "qfw",
  33. UserName: "SJZY_RWbid_ES",
  34. Password: "SJZY@B4i4D5e6S",
  35. //Direct: true,
  36. }
  37. Mgo.InitPool()
  38. }
  39. type MySource struct {
  40. Querys string
  41. }
  42. func (m *MySource) Source() (interface{}, error) {
  43. mp := make(map[string]interface{})
  44. json.Unmarshal([]byte(m.Querys), &mp)
  45. return mp["query"], nil
  46. }
  47. func main() {
  48. //filePath := "./file2.xlsx"
  49. // SE := qu.SimpleEncrypt{Key: "topJYBX2019"}
  50. // log.Println(SE.DecodeString("QltHc2AmagsIUFwVV0dybyZpAwECX0YK"))
  51. // return
  52. //xlFile, _ := xlsx.OpenFile(filePath)
  53. //获取行数
  54. // length := len(xlFile.Sheets[0].Rows)
  55. //开辟除表头外的行数的数组内存
  56. // resourceArr := make([]map[string]interface{}, length-1)
  57. //遍历sheet
  58. //for _, sheet := range xlFile.Sheets {
  59. //遍历每一行
  60. //for rowIndex, row := range sheet.Rows {
  61. //跳过第一行表头信息
  62. //if rowIndex == 0 {
  63. // continue
  64. //}
  65. //s_winner := row.Cells[0].Value
  66. counts := 0
  67. esCon := elastic.VarEs.(*elastic.EsV7)
  68. client := esCon.GetEsConn()
  69. defer esCon.DestoryEsConn(client)
  70. cc := &MySource{
  71. Querys: `{
  72. "query": {
  73. "bool": {
  74. "must": [
  75. {
  76. "terms": {
  77. "subtype": ["中标", "单一", "成交", "合同"]
  78. }
  79. },
  80. {
  81. "terms": {
  82. "area": ["北京", "上海", "江苏", "浙江", "广东"]
  83. }
  84. },
  85. {
  86. "range": {
  87. "comeintime": {
  88. "gte": 1640966400,
  89. "lt": 1703952000
  90. }
  91. }
  92. }
  93. ]
  94. }
  95. }
  96. }`,
  97. }
  98. ctx, _ := context.WithTimeout(context.Background(), 5*time.Hour)
  99. //游标查询,index不支持别名,只能写索引库的名称
  100. res, err := client.Scroll("bidding").Query(cc).Size(500).Do(ctx) //查询一条获取游标
  101. if err == nil {
  102. numDocs := 0
  103. scrollId := res.ScrollId
  104. count := 1
  105. for {
  106. if scrollId == "" {
  107. log.Println("ScrollId Is Error")
  108. break
  109. }
  110. var searchResult *esV7.SearchResult
  111. var err error
  112. if count == 1 {
  113. searchResult = res
  114. } else {
  115. searchResult, err = client.Scroll("projectset").Size(500).ScrollId(scrollId).Do(ctx) //查询
  116. if err != nil {
  117. if err.Error() == "EOS" { //迭代完毕
  118. log.Println("Es Search Data Over:", err)
  119. } else {
  120. log.Println("Es Search Data Error:", err)
  121. }
  122. break
  123. }
  124. }
  125. log.Println("此次处理条数 ", len(searchResult.Hits.Hits))
  126. for _, hit := range searchResult.Hits.Hits {
  127. //开始处理数据
  128. doc := make(map[string]interface{})
  129. if json.Unmarshal(hit.Source, &doc) == nil {
  130. delete(doc, "filetext")
  131. delete(doc, "detail")
  132. counts++
  133. log.Println("当前数量 ", counts)
  134. projectName := util.ObjToString(doc["projectname"])
  135. if strings.Contains(projectName, "非政府") {
  136. continue
  137. }
  138. buyerclass := util.ObjToString(doc["buyerclass"])
  139. if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  140. continue
  141. }
  142. err := Mgo.InsertOrUpdate("qfw", "wcc_bank_poc2", doc)
  143. if err != nil {
  144. log.Println("error", doc["id"])
  145. }
  146. }
  147. }
  148. scrollId = searchResult.ScrollId
  149. count++
  150. }
  151. client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
  152. log.Println("Result Data Count:", numDocs)
  153. } else {
  154. log.Println("查询失败 ", err)
  155. }
  156. }