main.go 15 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "sort"
  12. "strings"
  13. )
  14. func main() {
  15. //getProjectData() //从es 拉取数据
  16. //click() //写入clickhouse
  17. //click2()
  18. //dealData()
  19. //getProject()
  20. //getQyLimitData()
  21. getBiddingData()
  22. log.Println("over ------------------ over")
  23. }
  24. // getBiddingData 获取标讯数据
  25. func getBiddingData() {
  26. //url := "http://172.17.4.184:19908"
  27. url := "http://127.0.0.1:19908"
  28. username := "jybid"
  29. password := "Top2023_JEB01i@31"
  30. index := "bidding" //索引名称
  31. //index := "projectset" //索引名称
  32. // 创建 Elasticsearch 客户端
  33. client, err := elastic.NewClient(
  34. elastic.SetURL(url),
  35. elastic.SetBasicAuth(username, password),
  36. elastic.SetSniff(false),
  37. )
  38. if err != nil {
  39. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  40. }
  41. //85 抽取库
  42. //Mgo := &mongodb.MongodbSim{
  43. // //MongodbAddr: "127.0.0.1:27080",
  44. // MongodbAddr: "172.17.4.85:27080",
  45. // DbName: "top",
  46. // Size: 10,
  47. // //Direct: true,
  48. //}
  49. //Mgo.InitPool()
  50. MgoB := &mongodb.MongodbSim{
  51. //MongodbAddr: "172.17.189.140:27080",
  52. MongodbAddr: "127.0.0.1:27083",
  53. Size: 10,
  54. DbName: "qfw",
  55. UserName: "SJZY_RWbid_ES",
  56. Password: "SJZY@B4i4D5e6S",
  57. Direct: true,
  58. }
  59. MgoB.InitPool()
  60. //2023年01-01 2023-10-01,,1-3季度
  61. //2024-1 - 2024-4;1704038400-1711900800
  62. //2023-10-1 2024-1-1;1696089600-1704038400
  63. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  64. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  65. query := elastic.NewBoolQuery().
  66. Must(elastic.NewTermQuery("toptype", "结果")).
  67. Must(elastic.NewTermQuery("subtype", "招标"))
  68. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  69. //query := elastic.NewBoolQuery().
  70. // //北京,天津,河北,上海,江苏,浙江,安徽
  71. // //Must(elastic.NewTermQuery("area", "北京市")).
  72. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  73. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  74. // Must(rangeQuery)
  75. ctx := context.Background()
  76. //开始滚动搜索
  77. scrollID := ""
  78. scroll := "10m"
  79. searchSource := elastic.NewSearchSource().
  80. Query(query).
  81. Size(10000).
  82. Sort("_doc", true) //升序排序
  83. //Sort("_doc", false) //降序排序
  84. searchService := client.Scroll(index).
  85. Size(10000).
  86. Scroll(scroll).
  87. SearchSource(searchSource)
  88. res, err := searchService.Do(ctx)
  89. if err != nil {
  90. if err == io.EOF {
  91. fmt.Println("没有数据")
  92. } else {
  93. panic(err)
  94. }
  95. }
  96. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  97. fmt.Println("总数是:", res.TotalHits())
  98. total := 0
  99. for len(res.Hits.Hits) > 0 {
  100. for _, hit := range res.Hits.Hits {
  101. var doc map[string]interface{}
  102. err := json.Unmarshal(hit.Source, &doc)
  103. if err != nil {
  104. log.Printf("解析文档失败:%s", err)
  105. continue
  106. }
  107. delete(doc, "filetext")
  108. delete(doc, "detail")
  109. //存入新表
  110. err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  111. if err != nil {
  112. log.Println("error", doc["id"])
  113. }
  114. // 处理查询结果
  115. //area := util.ObjToString(doc["area"])
  116. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  117. //if !IsInStringArray(area, areas) {
  118. // continue
  119. //}
  120. //projectName := util.ObjToString(doc["projectname"])
  121. //if strings.Contains(projectName, "非政府") {
  122. // continue
  123. //}
  124. //buyerclass := util.ObjToString(doc["buyerclass"])
  125. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  126. // continue
  127. //}
  128. ////存入新表
  129. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  130. //if err != nil {
  131. // log.Println("error", doc["id"])
  132. //}
  133. }
  134. total = total + len(res.Hits.Hits)
  135. scrollID = res.ScrollId
  136. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  137. log.Println("current count:", total)
  138. if err != nil {
  139. if err == io.EOF {
  140. // 滚动到最后一批数据,退出循环
  141. break
  142. }
  143. log.Println("滚动搜索失败:", err, res)
  144. break // 处理错误时退出循环
  145. }
  146. }
  147. // 在循环外调用 ClearScroll
  148. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  149. if err != nil {
  150. log.Printf("清理滚动搜索失败:%s", err)
  151. }
  152. fmt.Println("结束~~~~~~~~~~~~~~~")
  153. }
  154. // getData 处理北京 京津翼数据 投标相关数据
  155. func getProjectData() {
  156. url := "http://172.17.4.184:19908"
  157. //url := "http://127.0.0.1:19908"
  158. username := "jybid"
  159. password := "Top2023_JEB01i@31"
  160. index := "qyxy" //索引名称
  161. //index := "projectset" //索引名称
  162. // 创建 Elasticsearch 客户端
  163. client, err := elastic.NewClient(
  164. elastic.SetURL(url),
  165. elastic.SetBasicAuth(username, password),
  166. elastic.SetSniff(false),
  167. )
  168. if err != nil {
  169. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  170. }
  171. //85 抽取库
  172. Mgo := &mongodb.MongodbSim{
  173. //MongodbAddr: "127.0.0.1:27080",
  174. MongodbAddr: "172.17.4.85:27080",
  175. DbName: "top",
  176. Size: 10,
  177. //Direct: true,
  178. }
  179. Mgo.InitPool()
  180. //MgoB := &mongodb.MongodbSim{
  181. // MongodbAddr: "172.17.189.140:27080",
  182. // //MongodbAddr: "127.0.0.1:27083",
  183. // Size: 10,
  184. // DbName: "qfw",
  185. // UserName: "SJZY_RWbid_ES",
  186. // Password: "SJZY@B4i4D5e6S",
  187. // //Direct: true,
  188. //}
  189. //MgoB.InitPool()
  190. //2023年01-01 2023-10-01,,1-3季度
  191. //2024-1 - 2024-4;1704038400-1711900800
  192. //2023-10-1 2024-1-1;1696089600-1704038400
  193. areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  194. rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  195. query := elastic.NewBoolQuery().
  196. Must(areaTermsQuery).
  197. Must(rangeQuery)
  198. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  199. //query := elastic.NewBoolQuery().
  200. // //北京,天津,河北,上海,江苏,浙江,安徽
  201. // //Must(elastic.NewTermQuery("area", "北京市")).
  202. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  203. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  204. // Must(rangeQuery)
  205. ctx := context.Background()
  206. //开始滚动搜索
  207. scrollID := ""
  208. scroll := "10m"
  209. searchSource := elastic.NewSearchSource().
  210. Query(query).
  211. Size(10000).
  212. Sort("_doc", true) //升序排序
  213. //Sort("_doc", false) //降序排序
  214. searchService := client.Scroll(index).
  215. Size(10000).
  216. Scroll(scroll).
  217. SearchSource(searchSource)
  218. res, err := searchService.Do(ctx)
  219. if err != nil {
  220. if err == io.EOF {
  221. fmt.Println("没有数据")
  222. } else {
  223. panic(err)
  224. }
  225. }
  226. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  227. fmt.Println("总数是:", res.TotalHits())
  228. total := 0
  229. for len(res.Hits.Hits) > 0 {
  230. for _, hit := range res.Hits.Hits {
  231. var doc map[string]interface{}
  232. err := json.Unmarshal(hit.Source, &doc)
  233. if err != nil {
  234. log.Printf("解析文档失败:%s", err)
  235. continue
  236. }
  237. delete(doc, "filetext")
  238. delete(doc, "detail")
  239. sWinner := util.ObjToString(doc["s_winner"])
  240. winners := strings.Split(sWinner, ",")
  241. for _, v := range winners {
  242. insert := doc
  243. insert["s_winner"] = v
  244. //存入新表
  245. err = Mgo.InsertOrUpdate("top", "wcc_allcity_2024Q1", insert)
  246. if err != nil {
  247. log.Println("error", doc["id"])
  248. }
  249. }
  250. // 处理查询结果
  251. //area := util.ObjToString(doc["area"])
  252. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  253. //if !IsInStringArray(area, areas) {
  254. // continue
  255. //}
  256. //projectName := util.ObjToString(doc["projectname"])
  257. //if strings.Contains(projectName, "非政府") {
  258. // continue
  259. //}
  260. //buyerclass := util.ObjToString(doc["buyerclass"])
  261. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  262. // continue
  263. //}
  264. ////存入新表
  265. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  266. //if err != nil {
  267. // log.Println("error", doc["id"])
  268. //}
  269. }
  270. total = total + len(res.Hits.Hits)
  271. scrollID = res.ScrollId
  272. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  273. log.Println("current count:", total)
  274. if err != nil {
  275. if err == io.EOF {
  276. // 滚动到最后一批数据,退出循环
  277. break
  278. }
  279. log.Println("滚动搜索失败:", err, res)
  280. break // 处理错误时退出循环
  281. }
  282. }
  283. // 在循环外调用 ClearScroll
  284. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  285. if err != nil {
  286. log.Printf("清理滚动搜索失败:%s", err)
  287. }
  288. fmt.Println("结束~~~~~~~~~~~~~~~")
  289. }
  290. // getQyxytData 获取企业数据
  291. func getQyxytData() {
  292. url := "http://172.17.4.184:19908"
  293. //url := "http://127.0.0.1:19908"
  294. username := "jybid"
  295. password := "Top2023_JEB01i@31"
  296. index := "qyxy" //索引名称
  297. // 创建 Elasticsearch 客户端
  298. client, err := elastic.NewClient(
  299. elastic.SetURL(url),
  300. elastic.SetBasicAuth(username, password),
  301. elastic.SetSniff(false),
  302. )
  303. if err != nil {
  304. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  305. }
  306. //85 抽取库
  307. //Mgo := &mongodb.MongodbSim{
  308. // //MongodbAddr: "127.0.0.1:27080",
  309. // MongodbAddr: "172.17.4.85:27080",
  310. // DbName: "top",
  311. // Size: 10,
  312. // //Direct: true,
  313. //}
  314. //Mgo.InitPool()
  315. MgoB := &mongodb.MongodbSim{
  316. MongodbAddr: "172.17.189.140:27080",
  317. //MongodbAddr: "127.0.0.1:27083",
  318. Size: 10,
  319. DbName: "qfw",
  320. UserName: "SJZY_RWbid_ES",
  321. Password: "SJZY@B4i4D5e6S",
  322. //Direct: true,
  323. }
  324. MgoB.InitPool()
  325. //2023年01-01 2023-10-01,,1-3季度
  326. //2024-1 - 2024-4;1704038400-1711900800
  327. //2023-10-1 2024-1-1;1696089600-1704038400
  328. //城市范围
  329. areaTermsQuery := elastic.NewTermsQuery("company_city", "平顶山市")
  330. rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  331. query := elastic.NewBoolQuery().
  332. Must(areaTermsQuery).
  333. Must(rangeQuery)
  334. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  335. //query := elastic.NewBoolQuery().
  336. // //北京,天津,河北,上海,江苏,浙江,安徽
  337. // //Must(elastic.NewTermQuery("area", "北京市")).
  338. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  339. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  340. // Must(rangeQuery)
  341. ctx := context.Background()
  342. //开始滚动搜索
  343. scrollID := ""
  344. scroll := "10m"
  345. searchSource := elastic.NewSearchSource().
  346. Query(query).
  347. Size(10000).
  348. Sort("_doc", true) //升序排序
  349. //Sort("_doc", false) //降序排序
  350. searchService := client.Scroll(index).
  351. Size(10000).
  352. Scroll(scroll).
  353. SearchSource(searchSource)
  354. res, err := searchService.Do(ctx)
  355. if err != nil {
  356. if err == io.EOF {
  357. fmt.Println("没有数据")
  358. } else {
  359. panic(err)
  360. }
  361. }
  362. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  363. fmt.Println("总数是:", res.TotalHits())
  364. total := 0
  365. for len(res.Hits.Hits) > 0 {
  366. for _, hit := range res.Hits.Hits {
  367. var doc map[string]interface{}
  368. err := json.Unmarshal(hit.Source, &doc)
  369. if err != nil {
  370. log.Printf("解析文档失败:%s", err)
  371. continue
  372. }
  373. //存入新表
  374. err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", doc)
  375. if err != nil {
  376. log.Println("error", doc["id"])
  377. }
  378. //sWinner := util.ObjToString(doc["s_winner"])
  379. //winners := strings.Split(sWinner, ",")
  380. //for _, v := range winners {
  381. // insert := doc
  382. // insert["s_winner"] = v
  383. // //存入新表
  384. // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert)
  385. // if err != nil {
  386. // log.Println("error", doc["id"])
  387. // }
  388. //}
  389. // 处理查询结果
  390. //area := util.ObjToString(doc["area"])
  391. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  392. //if !IsInStringArray(area, areas) {
  393. // continue
  394. //}
  395. //projectName := util.ObjToString(doc["projectname"])
  396. //if strings.Contains(projectName, "非政府") {
  397. // continue
  398. //}
  399. //buyerclass := util.ObjToString(doc["buyerclass"])
  400. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  401. // continue
  402. //}
  403. ////存入新表
  404. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  405. //if err != nil {
  406. // log.Println("error", doc["id"])
  407. //}
  408. }
  409. total = total + len(res.Hits.Hits)
  410. scrollID = res.ScrollId
  411. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  412. log.Println("current count:", total)
  413. if err != nil {
  414. if err == io.EOF {
  415. // 滚动到最后一批数据,退出循环
  416. break
  417. }
  418. log.Println("滚动搜索失败:", err, res)
  419. break // 处理错误时退出循环
  420. }
  421. }
  422. // 在循环外调用 ClearScroll
  423. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  424. if err != nil {
  425. log.Printf("清理滚动搜索失败:%s", err)
  426. }
  427. fmt.Println("结束~~~~~~~~~~~~~~~")
  428. }
  429. // getQyLimitData 获取qyxy 条件数据
  430. func getQyLimitData() {
  431. //url := "http://172.17.4.184:19908"
  432. url := "http://127.0.0.1:19908"
  433. username := "jybid"
  434. password := "Top2023_JEB01i@31"
  435. index := "qyxy" //索引名称
  436. // 创建 Elasticsearch 客户端
  437. client, err := elastic.NewClient(
  438. elastic.SetURL(url),
  439. elastic.SetBasicAuth(username, password),
  440. elastic.SetSniff(false),
  441. )
  442. if err != nil {
  443. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  444. }
  445. // 构建查询
  446. query := elastic.NewBoolQuery().
  447. Must(elastic.NewMatchQuery("company_area", "河南")).
  448. Must(elastic.NewMatchQuery("company_status", "存续")).
  449. MustNot(elastic.NewMatchQuery("company_type", "个体工商户"))
  450. // 执行查询
  451. searchResult, err := client.Search().Size(50).
  452. Index(index).
  453. Query(query).
  454. Do(context.Background())
  455. if err != nil {
  456. log.Fatalf("Error executing search: %s", err)
  457. }
  458. // 本地数据库
  459. MgoB := &mongodb.MongodbSim{
  460. MongodbAddr: "127.0.0.1:27017",
  461. Size: 10,
  462. DbName: "wcc",
  463. }
  464. MgoB.InitPool()
  465. for _, hit := range searchResult.Hits.Hits {
  466. var doc map[string]interface{}
  467. err := json.Unmarshal(hit.Source, &doc)
  468. if err != nil {
  469. log.Printf("解析文档失败:%s", err)
  470. continue
  471. }
  472. MgoB.SaveByOriID("wcc_henan_0428", doc)
  473. }
  474. }
  475. // IsInStringArray 判断数组中是否存在字符串
  476. func IsInStringArray(str string, arr []string) bool {
  477. // 先对字符串数组进行排序
  478. sort.Strings(arr)
  479. // 使用二分查找算法查找字符串
  480. pos := sort.SearchStrings(arr, str)
  481. // 如果找到了则返回 true,否则返回 false
  482. return pos < len(arr) && arr[pos] == str
  483. }