go-elastic.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "github.com/elastic/go-elasticsearch/v7"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "log"
  9. "time"
  10. )
  11. func fixQyxy() {
  12. // 连接 Elasticsearch
  13. cfg := elasticsearch.Config{
  14. //Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908"
  15. Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908"
  16. Username: "jybid",
  17. Password: "Top2023_JEB01i@31",
  18. }
  19. es, err := elasticsearch.NewClient(cfg)
  20. if err != nil {
  21. log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
  22. }
  23. // 连接 MongoDB
  24. sess := MgoQy.GetMgoConn()
  25. defer MgoQy.DestoryMongoConn(sess)
  26. // 查询 MongoDB 需要处理的企业
  27. where := map[string]interface{}{"use_flag": 10}
  28. queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
  29. count := 0
  30. // 遍历 MongoDB 结果
  31. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  32. if count%100 == 0 {
  33. log.Println("current:", count, tmp["_id"])
  34. }
  35. id := util.ObjToString(tmp["_id"])
  36. // **删除 MongoDB 数据**
  37. whereDel := map[string]interface{}{"_id": tmp["_id"]}
  38. MgoQy.Delete("qyxy_std", whereDel)
  39. MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp)
  40. company_name := util.ObjToString(tmp["company_name"])
  41. if company_name == "无" || company_name == "|" || company_name == "" {
  42. continue
  43. }
  44. // 查询企业是否已经存在
  45. where2 := map[string]interface{}{
  46. "company_name": company_name,
  47. "use_flag": 0,
  48. }
  49. std, _ := MgoQy.FindOne("qyxy_std", where2)
  50. var newID string
  51. if len(*std) > 0 {
  52. newID = util.ObjToString((*std)["_id"])
  53. }
  54. // **构造查询 JSON**
  55. query := map[string]interface{}{
  56. "query": map[string]interface{}{
  57. "bool": map[string]interface{}{
  58. "must": []map[string]interface{}{
  59. {"term": map[string]interface{}{"entidlist": id}},
  60. },
  61. },
  62. },
  63. }
  64. queryBody, _ := json.Marshal(query)
  65. ctx := context.Background()
  66. res, err := es.Search(
  67. es.Search.WithContext(ctx),
  68. es.Search.WithIndex("projectset"),
  69. es.Search.WithTrackTotalHits(true),
  70. es.Search.WithPretty(),
  71. es.Search.WithBody(bytes.NewReader(queryBody)),
  72. )
  73. if err != nil {
  74. log.Println("ES 查询失败:", err)
  75. continue
  76. }
  77. defer res.Body.Close()
  78. // **解析初始查询结果**
  79. var esRes map[string]interface{}
  80. if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
  81. log.Println("解析查询结果失败:", err)
  82. continue
  83. }
  84. // 解析查询结果
  85. hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{})
  86. if !ok || len(hits) == 0 {
  87. continue
  88. }
  89. //log.Println("bbbbbb", len(hits), id, newID)
  90. for _, hit := range hits {
  91. doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{})
  92. esID := util.ObjToString(doc["id"])
  93. newEntidlist := make([]string, 0)
  94. //存入新表
  95. if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 {
  96. for _, v := range entidlist {
  97. list_id := util.ObjToString(v)
  98. if list_id != id && list_id != "-" {
  99. newEntidlist = append(newEntidlist, list_id)
  100. }
  101. }
  102. if newID != "" {
  103. newEntidlist = append(newEntidlist, newID)
  104. }
  105. //更新es
  106. esUpdate := map[string]interface{}{
  107. "entidlist": newEntidlist,
  108. }
  109. // 更新Es 数据
  110. updateEsPool <- []map[string]interface{}{
  111. {"_id": esID},
  112. esUpdate,
  113. }
  114. log.Println("aaaaaaaa", esID, newEntidlist)
  115. ////更新项目MongoDB
  116. MgoP.UpdateById("projectset_20230904", esID, map[string]interface{}{"$set": esUpdate})
  117. }
  118. }
  119. }
  120. log.Println("数据处理完毕")
  121. }
  122. func fixQyxy2() {
  123. // 连接 Elasticsearch
  124. cfg := elasticsearch.Config{
  125. //Addresses: []string{"http://172.17.4.184:19908"},
  126. Addresses: []string{"http://127.0.0.1:19908"},
  127. Username: "jybid",
  128. Password: "Top2023_JEB01i@31",
  129. }
  130. es, err := elasticsearch.NewClient(cfg)
  131. if err != nil {
  132. log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
  133. }
  134. // 连接 MongoDB
  135. sess := MgoQy.GetMgoConn()
  136. defer MgoQy.DestoryMongoConn(sess)
  137. // 查询 MongoDB 需要处理的企业
  138. where := map[string]interface{}{"use_flag": 10}
  139. queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
  140. count := 0
  141. // 遍历 MongoDB 结果
  142. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  143. if count%100 == 0 {
  144. log.Println("current:", count, tmp["_id"])
  145. }
  146. id := util.ObjToString(tmp["_id"])
  147. company_name := util.ObjToString(tmp["company_name"])
  148. if company_name == "无" || company_name == "|" || company_name == "" {
  149. continue
  150. }
  151. // 查询企业是否已经存在
  152. where2 := map[string]interface{}{
  153. "company_name": company_name,
  154. "use_flag": 0,
  155. }
  156. std, _ := MgoQy.FindOne("qyxy_std", where2)
  157. var newID string
  158. if len(*std) > 0 {
  159. newID = id
  160. }
  161. // **构造查询 JSON**
  162. query := map[string]interface{}{
  163. "query": map[string]interface{}{
  164. "bool": map[string]interface{}{
  165. "must": []map[string]interface{}{
  166. {"term": map[string]interface{}{"entidlist": id}},
  167. },
  168. },
  169. },
  170. "size": 500,
  171. "sort": []map[string]interface{}{
  172. {"_doc": map[string]string{"order": "asc"}},
  173. },
  174. }
  175. queryBody, _ := json.Marshal(query)
  176. // **初始化滚动查询**
  177. ctx := context.Background()
  178. //scrollTime := "2m"
  179. scrollTime, _ := time.ParseDuration("2m")
  180. res, err := es.Search(
  181. es.Search.WithContext(ctx),
  182. es.Search.WithIndex("projectset"),
  183. es.Search.WithBody(bytes.NewReader(queryBody)),
  184. es.Search.WithScroll(scrollTime),
  185. )
  186. if err != nil {
  187. log.Println("ES 查询失败:", err)
  188. continue
  189. }
  190. defer res.Body.Close()
  191. // **解析初始查询结果**
  192. var esRes map[string]interface{}
  193. if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
  194. log.Println("解析查询结果失败:", err)
  195. continue
  196. }
  197. scrollID, _ := esRes["_scroll_id"].(string)
  198. if scrollID == "" {
  199. log.Println("获取 Scroll ID 失败")
  200. continue
  201. }
  202. total := 0
  203. for {
  204. // 解析查询结果
  205. hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{})
  206. if !ok || len(hits) == 0 {
  207. break
  208. }
  209. // **批量更新 ES**
  210. //bulkUpdate := make([]byte, 0)
  211. for _, hit := range hits {
  212. doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{})
  213. //esID := util.ObjToString(doc["id"])
  214. newEntidlist := make([]string, 0)
  215. if entidlist, ok := doc["entidlist"].([]interface{}); ok {
  216. for _, v := range entidlist {
  217. list_id := util.ObjToString(v)
  218. if list_id != id && list_id != "-" {
  219. newEntidlist = append(newEntidlist, list_id)
  220. }
  221. }
  222. }
  223. if newID != "" {
  224. newEntidlist = append(newEntidlist, newID)
  225. }
  226. // 构造 Bulk Update 请求
  227. //updateData := map[string]interface{}{
  228. // "update": map[string]string{"_id": esID},
  229. //}
  230. //updateBody := map[string]interface{}{
  231. // "doc": map[string]interface{}{"entidlist": newEntidlist},
  232. //}
  233. //updateDataJSON, _ := json.Marshal(updateData)
  234. //updateBodyJSON, _ := json.Marshal(updateBody)
  235. //
  236. //bulkUpdate = append(bulkUpdate, append(updateDataJSON, '\n')...)
  237. //bulkUpdate = append(bulkUpdate, append(updateBodyJSON, '\n')...)
  238. }
  239. // 发送批量更新请求
  240. //if len(bulkUpdate) > 0 {
  241. // res, err := es.Bulk(bytes.NewReader(bulkUpdate), es.Bulk.WithContext(ctx), es.Bulk.WithIndex("projectset"))
  242. // if err != nil {
  243. // log.Println("批量更新 ES 失败:", err)
  244. // } else {
  245. // defer res.Body.Close()
  246. // log.Println("批量更新 ES 成功")
  247. // }
  248. //}
  249. total += len(hits)
  250. // **继续滚动查询**
  251. scrollReq := map[string]interface{}{
  252. "scroll": scrollTime,
  253. "scroll_id": scrollID,
  254. }
  255. scrollBody, _ := json.Marshal(scrollReq)
  256. res, err = es.Scroll(es.Scroll.WithContext(ctx), es.Scroll.WithBody(bytes.NewReader(scrollBody)))
  257. if err != nil {
  258. log.Println("滚动查询失败:", err)
  259. break
  260. }
  261. defer res.Body.Close()
  262. // **解析滚动查询响应**
  263. if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
  264. log.Println("解析滚动查询响应失败:", err)
  265. break
  266. }
  267. scrollID, _ = esRes["_scroll_id"].(string)
  268. if scrollID == "" {
  269. log.Println("滚动查询结束")
  270. break
  271. }
  272. log.Println("当前已更新:", total)
  273. }
  274. // **删除 MongoDB 数据**
  275. //whereDel := map[string]interface{}{"_id": id}
  276. //MgoQy.Delete("qyxy_std", whereDel)
  277. //MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp)
  278. // **清理滚动查询**
  279. if scrollID != "" {
  280. clearReq := map[string]interface{}{"scroll_id": []string{scrollID}}
  281. clearBody, _ := json.Marshal(clearReq)
  282. res, err := es.ClearScroll(es.ClearScroll.WithBody(bytes.NewReader(clearBody)))
  283. if err != nil {
  284. log.Println("清理滚动搜索失败:", err)
  285. } else {
  286. defer res.Body.Close()
  287. //log.Println("滚动查询清理成功")
  288. }
  289. }
  290. }
  291. log.Println("数据处理完毕")
  292. }