main.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "gorm.io/driver/mysql"
  8. "gorm.io/gorm"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "sort"
  14. "strings"
  15. )
  16. func main() {
  17. /**
  18. getProjectData click 是一起使用的,统计获取中标企业信息
  19. */
  20. //getProjectDataFromEs() //1.拉取项目中标成交数据
  21. //click() //2.处理项目数据,写入clickhouse
  22. //click2()
  23. //dealData()
  24. //getProject()
  25. //getQyLimitData()
  26. //getBiddingData()
  27. //getQyxytData()
  28. //getTidb()
  29. log.Println("over ------------------ over")
  30. }
  31. // getBiddingData 获取标讯数据
  32. func getBiddingData() {
  33. //url := "http://172.17.4.184:19908"
  34. url := "http://127.0.0.1:19908"
  35. username := "jybid"
  36. password := "Top2023_JEB01i@31"
  37. index := "bidding" //索引名称
  38. //index := "projectset" //索引名称
  39. // 创建 Elasticsearch 客户端
  40. client, err := elastic.NewClient(
  41. elastic.SetURL(url),
  42. elastic.SetBasicAuth(username, password),
  43. elastic.SetSniff(false),
  44. )
  45. if err != nil {
  46. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  47. }
  48. //85 抽取库
  49. //Mgo := &mongodb.MongodbSim{
  50. // //MongodbAddr: "127.0.0.1:27080",
  51. // MongodbAddr: "172.17.4.85:27080",
  52. // DbName: "top",
  53. // Size: 10,
  54. // //Direct: true,
  55. //}
  56. //Mgo.InitPool()
  57. MgoB := &mongodb.MongodbSim{
  58. //MongodbAddr: "172.17.189.140:27080",
  59. MongodbAddr: "127.0.0.1:27083",
  60. Size: 10,
  61. DbName: "qfw",
  62. UserName: "SJZY_RWbid_ES",
  63. Password: "SJZY@B4i4D5e6S",
  64. Direct: true,
  65. }
  66. MgoB.InitPool()
  67. //2023年01-01 2023-10-01,,1-3季度
  68. //2024-1 - 2024-4;1704038400-1711900800
  69. //2023-10-1 2024-1-1;1696089600-1704038400
  70. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  71. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  72. query := elastic.NewBoolQuery().
  73. Must(elastic.NewTermQuery("toptype", "结果")).
  74. Must(elastic.NewTermQuery("subtype", "招标"))
  75. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  76. //query := elastic.NewBoolQuery().
  77. // //北京,天津,河北,上海,江苏,浙江,安徽
  78. // //Must(elastic.NewTermQuery("area", "北京市")).
  79. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  80. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  81. // Must(rangeQuery)
  82. ctx := context.Background()
  83. //开始滚动搜索
  84. scrollID := ""
  85. scroll := "10m"
  86. searchSource := elastic.NewSearchSource().
  87. Query(query).
  88. Size(10000).
  89. Sort("_doc", true) //升序排序
  90. //Sort("_doc", false) //降序排序
  91. searchService := client.Scroll(index).
  92. Size(10000).
  93. Scroll(scroll).
  94. SearchSource(searchSource)
  95. res, err := searchService.Do(ctx)
  96. if err != nil {
  97. if err == io.EOF {
  98. fmt.Println("没有数据")
  99. } else {
  100. panic(err)
  101. }
  102. }
  103. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  104. fmt.Println("总数是:", res.TotalHits())
  105. total := 0
  106. for len(res.Hits.Hits) > 0 {
  107. for _, hit := range res.Hits.Hits {
  108. var doc map[string]interface{}
  109. err := json.Unmarshal(hit.Source, &doc)
  110. if err != nil {
  111. log.Printf("解析文档失败:%s", err)
  112. continue
  113. }
  114. delete(doc, "filetext")
  115. delete(doc, "detail")
  116. //存入新表
  117. err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc)
  118. if err != nil {
  119. log.Println("error", doc["id"])
  120. }
  121. // 处理查询结果
  122. //area := util.ObjToString(doc["area"])
  123. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  124. //if !IsInStringArray(area, areas) {
  125. // continue
  126. //}
  127. //projectName := util.ObjToString(doc["projectname"])
  128. //if strings.Contains(projectName, "非政府") {
  129. // continue
  130. //}
  131. //buyerclass := util.ObjToString(doc["buyerclass"])
  132. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  133. // continue
  134. //}
  135. ////存入新表
  136. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  137. //if err != nil {
  138. // log.Println("error", doc["id"])
  139. //}
  140. }
  141. total = total + len(res.Hits.Hits)
  142. scrollID = res.ScrollId
  143. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  144. log.Println("current count:", total)
  145. if err != nil {
  146. if err == io.EOF {
  147. // 滚动到最后一批数据,退出循环
  148. break
  149. }
  150. log.Println("滚动搜索失败:", err, res)
  151. break // 处理错误时退出循环
  152. }
  153. }
  154. // 在循环外调用 ClearScroll
  155. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  156. if err != nil {
  157. log.Printf("清理滚动搜索失败:%s", err)
  158. }
  159. fmt.Println("结束~~~~~~~~~~~~~~~")
  160. }
  161. // getProjectDataFromEs 获取项目 中标成交数据
  162. func getProjectDataFromEs() {
  163. url := "http://172.17.4.184:19908"
  164. //url := "http://127.0.0.1:19908"
  165. username := "jybid"
  166. password := "Top2023_JEB01i@31"
  167. index := "projectset" //索引名称
  168. // 创建 Elasticsearch 客户端
  169. client, err := elastic.NewClient(
  170. elastic.SetURL(url),
  171. elastic.SetBasicAuth(username, password),
  172. elastic.SetSniff(false),
  173. )
  174. if err != nil {
  175. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  176. }
  177. //85 抽取库
  178. Mgo := &mongodb.MongodbSim{
  179. //MongodbAddr: "127.0.0.1:27080",
  180. MongodbAddr: "172.17.4.85:27080",
  181. DbName: "top",
  182. Size: 10,
  183. //Direct: true,
  184. }
  185. Mgo.InitPool()
  186. //MgoB := &mongodb.MongodbSim{
  187. // MongodbAddr: "172.17.189.140:27080",
  188. // //MongodbAddr: "127.0.0.1:27083",
  189. // Size: 10,
  190. // DbName: "qfw",
  191. // UserName: "SJZY_RWbid_ES",
  192. // Password: "SJZY@B4i4D5e6S",
  193. // //Direct: true,
  194. //}
  195. //MgoB.InitPool()
  196. //2023年01-01 2023-10-01,,1-3季度
  197. //2024-1 - 2024-4;1704038400-1711900800
  198. //2023-10-1 2024-1-1;1696089600-1704038400
  199. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  200. rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1711900800).Lt(1719763200) //2024年4-7月
  201. query := elastic.NewBoolQuery().
  202. //Must(areaTermsQuery).
  203. Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  204. Must(rangeQuery)
  205. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  206. //query := elastic.NewBoolQuery().
  207. // //北京,天津,河北,上海,江苏,浙江,安徽
  208. // //Must(elastic.NewTermQuery("area", "北京市")).
  209. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  210. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  211. // Must(rangeQuery)
  212. ctx := context.Background()
  213. //开始滚动搜索
  214. scrollID := ""
  215. scroll := "10m"
  216. searchSource := elastic.NewSearchSource().
  217. Query(query).
  218. Size(10000).
  219. Sort("_doc", true) //升序排序
  220. //Sort("_doc", false) //降序排序
  221. searchService := client.Scroll(index).
  222. Size(10000).
  223. Scroll(scroll).
  224. SearchSource(searchSource)
  225. res, err := searchService.Do(ctx)
  226. if err != nil {
  227. if err == io.EOF {
  228. fmt.Println("没有数据")
  229. } else {
  230. panic(err)
  231. }
  232. }
  233. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  234. fmt.Println("总数是:", res.TotalHits())
  235. total := 0
  236. for len(res.Hits.Hits) > 0 {
  237. for _, hit := range res.Hits.Hits {
  238. var doc map[string]interface{}
  239. err := json.Unmarshal(hit.Source, &doc)
  240. if err != nil {
  241. log.Printf("解析文档失败:%s", err)
  242. continue
  243. }
  244. delete(doc, "filetext")
  245. delete(doc, "detail")
  246. sWinner := util.ObjToString(doc["s_winner"])
  247. winners := strings.Split(sWinner, ",")
  248. for _, v := range winners {
  249. insert := doc
  250. insert["s_winner"] = v
  251. //存入新表
  252. err = Mgo.InsertOrUpdate("top", "wcc_allcity_2024Q2", insert)
  253. if err != nil {
  254. log.Println("error", doc["id"])
  255. }
  256. }
  257. // 处理查询结果
  258. //area := util.ObjToString(doc["area"])
  259. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  260. //if !IsInStringArray(area, areas) {
  261. // continue
  262. //}
  263. //projectName := util.ObjToString(doc["projectname"])
  264. //if strings.Contains(projectName, "非政府") {
  265. // continue
  266. //}
  267. //buyerclass := util.ObjToString(doc["buyerclass"])
  268. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  269. // continue
  270. //}
  271. ////存入新表
  272. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  273. //if err != nil {
  274. // log.Println("error", doc["id"])
  275. //}
  276. }
  277. total = total + len(res.Hits.Hits)
  278. scrollID = res.ScrollId
  279. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  280. log.Println("current count:", total)
  281. if err != nil {
  282. if err == io.EOF {
  283. // 滚动到最后一批数据,退出循环
  284. break
  285. }
  286. log.Println("滚动搜索失败:", err, res)
  287. break // 处理错误时退出循环
  288. }
  289. }
  290. // 在循环外调用 ClearScroll
  291. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  292. if err != nil {
  293. log.Printf("清理滚动搜索失败:%s", err)
  294. }
  295. fmt.Println("结束~~~~~~~~~~~~~~~")
  296. }
  297. // getQyxytData 获取企业数据
  298. func getQyxytData() {
  299. url := "http://172.17.4.184:19908"
  300. //url := "http://127.0.0.1:19908"
  301. username := "jybid"
  302. password := "Top2023_JEB01i@31"
  303. index := "qyxy" //索引名称
  304. // 创建 Elasticsearch 客户端
  305. client, err := elastic.NewClient(
  306. elastic.SetURL(url),
  307. elastic.SetBasicAuth(username, password),
  308. elastic.SetSniff(false),
  309. )
  310. if err != nil {
  311. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  312. }
  313. //85 抽取库
  314. //Mgo := &mongodb.MongodbSim{
  315. // //MongodbAddr: "127.0.0.1:27080",
  316. // MongodbAddr: "172.17.4.85:27080",
  317. // DbName: "top",
  318. // Size: 10,
  319. // //Direct: true,
  320. //}
  321. //Mgo.InitPool()
  322. MgoB := &mongodb.MongodbSim{
  323. MongodbAddr: "172.17.189.140:27080",
  324. //MongodbAddr: "127.0.0.1:27083",
  325. Size: 10,
  326. DbName: "qfw",
  327. UserName: "SJZY_RWbid_ES",
  328. Password: "SJZY@B4i4D5e6S",
  329. //Direct: true,
  330. }
  331. MgoB.InitPool()
  332. //2023年01-01 2023-10-01,,1-3季度
  333. //2024-1 - 2024-4;1704038400-1711900800
  334. //2023-10-1 2024-1-1;1696089600-1704038400
  335. //城市范围
  336. //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
  337. //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  338. //query := elastic.NewBoolQuery().
  339. // Must(areaTermsQuery).
  340. // Must(rangeQuery)
  341. //---------------------------//
  342. query := elastic.NewBoolQuery()
  343. query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  344. query.Must(elastic.NewTermQuery("company_city", "北京市"))
  345. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  346. //query := elastic.NewBoolQuery().
  347. // //北京,天津,河北,上海,江苏,浙江,安徽
  348. // //Must(elastic.NewTermQuery("area", "北京市")).
  349. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  350. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  351. // Must(rangeQuery)
  352. ctx := context.Background()
  353. //开始滚动搜索
  354. scrollID := ""
  355. scroll := "10m"
  356. searchSource := elastic.NewSearchSource().
  357. Query(query).
  358. Size(10000).
  359. Sort("_doc", true) //升序排序
  360. //Sort("_doc", false) //降序排序
  361. searchService := client.Scroll(index).
  362. Size(10000).
  363. Scroll(scroll).
  364. SearchSource(searchSource)
  365. res, err := searchService.Do(ctx)
  366. if err != nil {
  367. if err == io.EOF {
  368. fmt.Println("没有数据")
  369. } else {
  370. panic(err)
  371. }
  372. }
  373. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  374. fmt.Println("总数是:", res.TotalHits())
  375. total := 0
  376. for len(res.Hits.Hits) > 0 {
  377. for _, hit := range res.Hits.Hits {
  378. var doc map[string]interface{}
  379. err := json.Unmarshal(hit.Source, &doc)
  380. if err != nil {
  381. log.Printf("解析文档失败:%s", err)
  382. continue
  383. }
  384. if strings.Contains(util.ObjToString(doc["business_scope"]), "招投标代理") {
  385. //存入新表
  386. insert := map[string]interface{}{
  387. "company_name": doc["company_name"],
  388. "business_scope": doc["business_scope"],
  389. "employee_name": doc["employee_name"],
  390. "company_phone": doc["company_phone"],
  391. }
  392. err = MgoB.InsertOrUpdate("qfw", "wcc_2024_beijing_dailijigou", insert)
  393. if err != nil {
  394. log.Println("error", doc["id"])
  395. }
  396. }
  397. //sWinner := util.ObjToString(doc["s_winner"])
  398. //winners := strings.Split(sWinner, ",")
  399. //for _, v := range winners {
  400. // insert := doc
  401. // insert["s_winner"] = v
  402. // //存入新表
  403. // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert)
  404. // if err != nil {
  405. // log.Println("error", doc["id"])
  406. // }
  407. //}
  408. // 处理查询结果
  409. //area := util.ObjToString(doc["area"])
  410. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  411. //if !IsInStringArray(area, areas) {
  412. // continue
  413. //}
  414. //projectName := util.ObjToString(doc["projectname"])
  415. //if strings.Contains(projectName, "非政府") {
  416. // continue
  417. //}
  418. //buyerclass := util.ObjToString(doc["buyerclass"])
  419. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  420. // continue
  421. //}
  422. ////存入新表
  423. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  424. //if err != nil {
  425. // log.Println("error", doc["id"])
  426. //}
  427. }
  428. total = total + len(res.Hits.Hits)
  429. scrollID = res.ScrollId
  430. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  431. log.Println("current count:", total)
  432. if err != nil {
  433. if err == io.EOF {
  434. // 滚动到最后一批数据,退出循环
  435. break
  436. }
  437. log.Println("滚动搜索失败:", err, res)
  438. break // 处理错误时退出循环
  439. }
  440. }
  441. // 在循环外调用 ClearScroll
  442. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  443. if err != nil {
  444. log.Printf("清理滚动搜索失败:%s", err)
  445. }
  446. fmt.Println("结束~~~~~~~~~~~~~~~")
  447. }
  448. // getQyLimitData 获取qyxy 条件数据
  449. func getQyLimitData() {
  450. //url := "http://172.17.4.184:19908"
  451. url := "http://127.0.0.1:19908"
  452. username := "jybid"
  453. password := "Top2023_JEB01i@31"
  454. index := "qyxy" //索引名称
  455. // 创建 Elasticsearch 客户端
  456. client, err := elastic.NewClient(
  457. elastic.SetURL(url),
  458. elastic.SetBasicAuth(username, password),
  459. elastic.SetSniff(false),
  460. )
  461. if err != nil {
  462. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  463. }
  464. // 构建查询
  465. query := elastic.NewBoolQuery().
  466. Must(elastic.NewMatchQuery("company_area", "河南")).
  467. Must(elastic.NewMatchQuery("company_status", "存续")).
  468. MustNot(elastic.NewMatchQuery("company_type", "个体工商户"))
  469. // 执行查询
  470. searchResult, err := client.Search().Size(50).
  471. Index(index).
  472. Query(query).
  473. Do(context.Background())
  474. if err != nil {
  475. log.Fatalf("Error executing search: %s", err)
  476. }
  477. // 本地数据库
  478. MgoB := &mongodb.MongodbSim{
  479. MongodbAddr: "127.0.0.1:27017",
  480. Size: 10,
  481. DbName: "wcc",
  482. }
  483. MgoB.InitPool()
  484. for _, hit := range searchResult.Hits.Hits {
  485. var doc map[string]interface{}
  486. err := json.Unmarshal(hit.Source, &doc)
  487. if err != nil {
  488. log.Printf("解析文档失败:%s", err)
  489. continue
  490. }
  491. MgoB.SaveByOriID("wcc_henan_0428", doc)
  492. }
  493. }
  494. // getTidb 获取tidb 数据
  495. func getTidb() {
  496. MgoB := &mongodb.MongodbSim{
  497. MongodbAddr: "172.17.189.140:27080",
  498. //MongodbAddr: "127.0.0.1:27083",
  499. Size: 10,
  500. DbName: "qfw",
  501. UserName: "SJZY_RWbid_ES",
  502. Password: "SJZY@B4i4D5e6S",
  503. //Direct: true,
  504. }
  505. MgoB.InitPool()
  506. //tidb
  507. username := "datascbi"
  508. password := "Da#Bi20221111SC"
  509. //host := "127.0.0.1:4001"
  510. host := "172.17.162.25:4000"
  511. database := "global_common_data"
  512. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  513. // 连接到数据库
  514. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  515. if err != nil {
  516. log.Println("Failed to connect to database:", err)
  517. return
  518. }
  519. fmt.Println("Connected to the database!")
  520. defer util.Catch()
  521. sess := MgoB.GetMgoConn()
  522. defer MgoB.DestoryMongoConn(sess)
  523. it := sess.DB("qfw").C("wcc_2024_beijing_dailijigou").Find(nil).Select(nil).Iter()
  524. fmt.Println("taskRun 开始")
  525. count := 0
  526. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  527. if count%10000 == 0 {
  528. log.Println("current:", count)
  529. }
  530. companyName := util.ObjToString(tmp["company_name"])
  531. var baseInfo EnterpriseBaseInfo
  532. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  533. if baseInfo.ID > 0 {
  534. insert := map[string]interface{}{
  535. "company_name": companyName,
  536. "name_id": baseInfo.NameID,
  537. "business_scope": tmp["business_scope"],
  538. }
  539. MgoB.InsertOrUpdate("qfw", "wcc_beijing_daili_bidding", insert)
  540. }
  541. }
  542. log.Println("over")
  543. }
  544. // IsInStringArray 判断数组中是否存在字符串
  545. func IsInStringArray(str string, arr []string) bool {
  546. // 先对字符串数组进行排序
  547. sort.Strings(arr)
  548. // 使用二分查找算法查找字符串
  549. pos := sort.SearchStrings(arr, str)
  550. // 如果找到了则返回 true,否则返回 false
  551. return pos < len(arr) && arr[pos] == str
  552. }