qyxy.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. )
  12. // fixQyxy 修复企业数据
  13. func fixQyxy() {
  14. //c查询use_flag=10的 qyxy 数据,然后去
  15. //url := "http://172.17.4.184:19908"
  16. url := "http://127.0.0.1:19908"
  17. username := "jybid"
  18. password := "Top2023_JEB01i@31"
  19. //index := "bidding" //索引名称
  20. // 创建 Elasticsearch 客户端
  21. client, err := elastic.NewClient(
  22. elastic.SetURL(url),
  23. elastic.SetBasicAuth(username, password),
  24. elastic.SetSniff(false),
  25. )
  26. if err != nil {
  27. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  28. }
  29. //
  30. sess := MgoQy.GetMgoConn()
  31. defer MgoQy.DestoryMongoConn(sess)
  32. where := map[string]interface{}{
  33. "use_flag": 10,
  34. }
  35. queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Limit(2).Iter()
  36. count := 0
  37. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  38. if count%1000 == 0 {
  39. log.Println("current:", count, tmp["_id"])
  40. }
  41. id := util.ObjToString(tmp["_id"])
  42. company_name := util.ObjToString(tmp["company_name"])
  43. if company_name == "无" || company_name == "|" {
  44. continue
  45. }
  46. where2 := map[string]interface{}{
  47. "company_name": company_name,
  48. "use_flag": 0,
  49. }
  50. std, _ := MgoQy.FindOne("qyxy_std", where2)
  51. var newID string
  52. if len(*std) > 0 {
  53. newID = util.ObjToString(tmp["_id"])
  54. }
  55. query := elastic.NewBoolQuery().
  56. Must(
  57. elastic.NewTermQuery("entidlist", id), // 模糊匹配 projectname
  58. )
  59. ctx := context.Background()
  60. //开始滚动搜索
  61. scrollID := ""
  62. scroll := "10m"
  63. searchSource := elastic.NewSearchSource().
  64. Query(query).
  65. Size(10000).
  66. Sort("_doc", true) //升序排序
  67. //Sort("_doc", false) //降序排序
  68. searchService := client.Scroll("projectset").
  69. Size(10000).
  70. Scroll(scroll).
  71. SearchSource(searchSource)
  72. res, err := searchService.Do(ctx)
  73. if err != nil {
  74. if err == io.EOF {
  75. fmt.Println("没有数据")
  76. } else {
  77. panic(err)
  78. }
  79. }
  80. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  81. fmt.Println("总数是:", res.TotalHits())
  82. total := 0
  83. //1.处理更新es 数据
  84. for len(res.Hits.Hits) > 0 {
  85. for _, hit := range res.Hits.Hits {
  86. var doc map[string]interface{}
  87. err := json.Unmarshal(hit.Source, &doc)
  88. if err != nil {
  89. log.Printf("解析文档失败:%s", err)
  90. continue
  91. }
  92. esID := util.ObjToString(doc["id"])
  93. newEntidlist := make([]string, 0)
  94. //存入新表
  95. if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 {
  96. for _, v := range entidlist {
  97. list_id := util.ObjToString(v)
  98. if list_id != id && list_id != "-" {
  99. newEntidlist = append(newEntidlist, list_id)
  100. }
  101. }
  102. if newID != "" {
  103. newEntidlist = append(newEntidlist, newID)
  104. }
  105. //更新es
  106. esUpdate := map[string]interface{}{
  107. "entidlist": newEntidlist,
  108. }
  109. // 更新Es 数据
  110. updateEsPool <- []map[string]interface{}{
  111. {"_id": esID},
  112. esUpdate,
  113. }
  114. }
  115. }
  116. total = total + len(res.Hits.Hits)
  117. scrollID = res.ScrollId
  118. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  119. log.Println("current count:", total)
  120. if err != nil {
  121. if err == io.EOF {
  122. // 滚动到最后一批数据,退出循环
  123. break
  124. }
  125. log.Println("滚动搜索失败:", err, res)
  126. break // 处理错误时退出循环
  127. }
  128. }
  129. //2.删除MongoDB
  130. whereDel := map[string]interface{}{
  131. "_id": id,
  132. }
  133. MgoQy.Delete("qyxy_std", whereDel)
  134. MgoQy.Save("wcc_qyxy_std_delete", tmp)
  135. // 在循环外调用 ClearScroll
  136. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  137. if err != nil {
  138. log.Printf("清理滚动搜索失败:%s", err)
  139. }
  140. }
  141. log.Println("数据处理完毕")
  142. }
  143. // findData 找出企业信息中,company_type 不等于合体工商户的企业数据,然后写入一个临时表
  144. func findData() {
  145. Mgo := &mongodb.MongodbSim{
  146. MongodbAddr: "172.17.189.140:27080",
  147. //MongodbAddr: "127.0.0.1:27083",
  148. Size: 10,
  149. DbName: "mixdata",
  150. UserName: "SJZY_RWbid_ES",
  151. Password: "SJZY@B4i4D5e6S",
  152. //Direct: true,
  153. }
  154. Mgo.InitPool()
  155. sess := Mgo.GetMgoConn()
  156. defer Mgo.DestoryMongoConn(sess)
  157. //where := map[string]interface{}{
  158. // "company_type": map[string]interface{}{
  159. // "$ne": "个体工商户",
  160. // },
  161. //}
  162. query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(map[string]interface{}{"company_name": 1, "company_type": 1, "company_status": 1, "use_flag": 1}).Iter()
  163. count := 0
  164. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  165. if count%10000 == 0 {
  166. log.Println("current:", count)
  167. }
  168. companyType := util.ObjToString(tmp["company_type"])
  169. if companyType == "个体工商户" {
  170. continue
  171. }
  172. company_name := util.ObjToString(tmp["company_name"])
  173. whereN := map[string]interface{}{
  174. "company_name": company_name,
  175. }
  176. num := Mgo.Count("qyxy_std", whereN)
  177. if num > 1 {
  178. Mgo.Save("wcc_qyxy_20240311", tmp)
  179. }
  180. tmp = make(map[string]interface{})
  181. }
  182. log.Println("结束")
  183. }
  184. // getCompanyName
  185. func getCompanyName() {
  186. // 找出wcc_qyxy_20240311表中,公司名称有多个,并且一个use_flag=0,一个use_flag=10的数据
  187. Mgo := &mongodb.MongodbSim{
  188. MongodbAddr: "172.17.189.140:27080",
  189. //MongodbAddr: "127.0.0.1:27083",
  190. Size: 10,
  191. DbName: "mixdata",
  192. UserName: "SJZY_RWbid_ES",
  193. Password: "SJZY@B4i4D5e6S",
  194. //Direct: true,
  195. }
  196. Mgo.InitPool()
  197. companyMap := make(map[string]bool)
  198. sess := Mgo.GetMgoConn()
  199. defer Mgo.DestoryMongoConn(sess)
  200. query := sess.DB("mixdata").C("wcc_qyxy_20240311").Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1}).Iter()
  201. count := 0
  202. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  203. if count%10000 == 0 {
  204. log.Println("current:", count)
  205. }
  206. name := util.ObjToString(tmp["company_name"])
  207. if companyMap[name] {
  208. continue
  209. } else {
  210. companyMap[name] = true
  211. }
  212. where := map[string]interface{}{
  213. "company_name": name,
  214. }
  215. res, _ := Mgo.Find("wcc_qyxy_20240311", where, nil, nil, false, -1, -1)
  216. flaga := false
  217. flagb := false
  218. for _, v := range *res {
  219. if util.Int64All(v["use_flag"]) == 0 {
  220. flaga = true
  221. } else if util.Int64All(v["use_flag"]) == 10 {
  222. flagb = true
  223. }
  224. }
  225. // 存在0和10 二个状态
  226. if flaga && flagb {
  227. Mgo.Save("wcc_qyxy_name_0325", map[string]interface{}{"company_name": name})
  228. }
  229. tmp = make(map[string]interface{})
  230. }
  231. log.Println("结束")
  232. }
  233. // SpecialData 处理特殊企业数据,更新 qyxy_std 表use_flag
  234. func SpecialData() {
  235. // 处理 特殊企业 数据,
  236. tables := []string{"special_enterprise", "special_foundation", "special_law_office", "special_social_organ", "special_trade_union"}
  237. //1.先处理 special_enterprise 表数据,循环数据,根据company_name 去查询 qyxy_std 表,如果
  238. // 数据只有一条,并且 special_enterprise.company_id 等于 qyxy_std._id;就更新 qyxy_std表的 use_flag 字段
  239. // qyxy_std 表
  240. Mgo := &mongodb.MongodbSim{
  241. MongodbAddr: "172.17.189.140:27080",
  242. //MongodbAddr: "127.0.0.1:27083",
  243. Size: 10,
  244. DbName: "mixdata",
  245. UserName: "SJZY_RWbid_ES",
  246. Password: "SJZY@B4i4D5e6S",
  247. //Direct: true,
  248. }
  249. Mgo.InitPool()
  250. // 181 凭安库
  251. Mgo2 := &mongodb.MongodbSim{
  252. MongodbAddr: "172.17.4.181:27001",
  253. //MongodbAddr: "127.0.0.1:27001",
  254. DbName: "mixdata",
  255. Size: 10,
  256. UserName: "",
  257. Password: "",
  258. //Direct: true,
  259. }
  260. Mgo2.InitPool()
  261. sess := Mgo2.GetMgoConn()
  262. defer Mgo2.DestoryMongoConn(sess)
  263. for _, v := range tables {
  264. query := sess.DB("mixdata").C(v).Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1, "company_id": 1}).Iter()
  265. count := 0
  266. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  267. if count%10000 == 0 {
  268. log.Println("current:", count, "table - ", v, tmp["company_name"])
  269. }
  270. if _, ok := tmp["company_id"]; !ok {
  271. continue
  272. }
  273. where := map[string]interface{}{
  274. "_id": tmp["company_id"],
  275. }
  276. //update := map[string]interface{}{
  277. // "use_flag": tmp["use_flag"],
  278. //}
  279. //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  280. //name := util.ObjToString(tmp["company_name"])
  281. //where := map[string]interface{}{
  282. // "company_name": name,
  283. //}
  284. // 如果 ID 相同,std 表use_flag =10,更新
  285. datas, _ := Mgo.FindOne("qyxy_std", where)
  286. if len(*datas) == 0 {
  287. continue
  288. }
  289. //if util.Int64All((*datas)["use_flag"]) == util.Int64All(tmp["use_flag"]) {
  290. // continue
  291. //}
  292. //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
  293. data := *datas
  294. data["use_flag"] = tmp["use_flag"]
  295. Mgo.SaveByOriID("wcc_std_0411", data)
  296. }
  297. }
  298. log.Println("结束")
  299. }
  300. func StdData() {
  301. Mgo := &mongodb.MongodbSim{
  302. MongodbAddr: "172.17.189.140:27080",
  303. //MongodbAddr: "127.0.0.1:27083",
  304. Size: 10,
  305. DbName: "mixdata",
  306. UserName: "SJZY_RWbid_ES",
  307. Password: "SJZY@B4i4D5e6S",
  308. //Direct: true,
  309. }
  310. Mgo.InitPool()
  311. sess := Mgo.GetMgoConn()
  312. defer Mgo.DestoryMongoConn(sess)
  313. query := sess.DB("mixdata").C("wcc_std_0410").Find(nil).Select(nil).Iter()
  314. count := 0
  315. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  316. if count%10000 == 0 {
  317. log.Println("current:", count)
  318. }
  319. autoid := util.Int64All(tmp["autoid"])
  320. if autoid == 0 {
  321. continue
  322. }
  323. where := map[string]interface{}{
  324. "autoid": autoid,
  325. }
  326. datas, _ := Mgo.FindOne("qyxy_std", where)
  327. if len(*datas) == 0 {
  328. continue
  329. }
  330. Mgo.SaveByOriID("wcc_std_0411", datas)
  331. }
  332. log.Println("over")
  333. }