main.go 34 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "gorm.io/driver/mysql"
  8. "gorm.io/gorm"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "strings"
  14. "sync"
  15. "unicode/utf8"
  16. )
  17. var (
  18. MgoB *mongodb.MongodbSim
  19. )
  20. func InitMgo() {
  21. MgoB = &mongodb.MongodbSim{
  22. //MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  23. MongodbAddr: "127.0.0.1:27083",
  24. Size: 10,
  25. DbName: "qfw",
  26. UserName: "SJZY_RWbid_ES",
  27. Password: "SJZY@B4i4D5e6S",
  28. Direct: true,
  29. }
  30. MgoB.InitPool()
  31. }
  32. func main() {
  33. InitMgo()
  34. getBidding2()
  35. //InitMgo()
  36. //getCountProjectWinner3()
  37. //CountProjectWinner()
  38. //getBiddingLimitData()
  39. //getQyxyNationToFiles()
  40. //exportQyxy() //导出企业数据
  41. //dealXlsx()
  42. //getQyxyNation() //导出 国标行业分类,注册资金靠前的企业
  43. //getQyxyNation()
  44. //InitMgo()
  45. //getDataFromFile()
  46. //updateXlsxDa()
  47. return
  48. /**
  49. getProjectData click 是一起使用的,统计获取中标企业信息
  50. */
  51. //getProjectDataFromEs() //1.拉取项目中标成交数据
  52. //click() //2.处理项目数据,写入clickhouse
  53. //click2()
  54. //dealData()
  55. //getProject()
  56. //getQyLimitData()
  57. //getBiddingData()
  58. //getQyxytData()
  59. //getTidb()
  60. //getEntInfo() //法人库数据
  61. //getBuyerData()
  62. //mgoBidding()
  63. //log.Println("开启第二轮")
  64. //mgoBidding()
  65. //updateMgoEntInfoBuyer()
  66. //getZhiMa()
  67. //log.Println("over ------------------ over")
  68. //fixProjectPortrait()
  69. //
  70. //ClickhouseData() //gorm 操作Clickhouse;gorm 对Clickhouse的bitmap兼容性不行,放弃
  71. //dealClickhouse() //clickhouse-go 操作
  72. //testUpdateBitmap() //测试环境测试更新Clickhouse bitmap字段
  73. ///-------//
  74. //updateHrefByEs()
  75. //getGD() // 获取广东企业数据
  76. //
  77. //getBidding2()
  78. //--------------//
  79. //dealYJG() // 处理姚静歌需求,处理项目数据到Clickhouse
  80. log.Println("over ------------------ over")
  81. }
  82. // dealYJG 处理姚静歌 、韩鸿飞 之前的需要,处理企业数据到Clickhouse
  83. func dealYJG() {
  84. getProjectDataFromEs()
  85. }
  86. // getBiddingData 获取标讯数据
  87. func getBiddingData() {
  88. url := "http://172.17.4.184:19908"
  89. //url := "http://127.0.0.1:19908"
  90. username := "jybid"
  91. password := "Top2023_JEB01i@31"
  92. index := "bidding" //索引名称
  93. //index := "projectset" //索引名称
  94. // 创建 Elasticsearch 客户端
  95. client, err := elastic.NewClient(
  96. elastic.SetURL(url),
  97. elastic.SetBasicAuth(username, password),
  98. elastic.SetSniff(false),
  99. )
  100. if err != nil {
  101. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  102. }
  103. //85 抽取库
  104. //Mgo := &mongodb.MongodbSim{
  105. // //MongodbAddr: "127.0.0.1:27080",
  106. // MongodbAddr: "172.17.4.85:27080",
  107. // DbName: "top",
  108. // Size: 10,
  109. // //Direct: true,
  110. //}
  111. //Mgo.InitPool()
  112. MgoB := &mongodb.MongodbSim{
  113. //MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  114. MongodbAddr: "127.0.0.1:27083",
  115. Size: 10,
  116. DbName: "qfw",
  117. UserName: "SJZY_RWbid_ES",
  118. Password: "SJZY@B4i4D5e6S",
  119. //Direct: true,
  120. }
  121. MgoB.InitPool()
  122. //2023年01-01 2023-10-01,,1-3季度
  123. //2024-1 - 2024-4;1704038400-1711900800
  124. //2023-10-1 2024-1-1;1696089600-1704038400
  125. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  126. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  127. //rangeQuery := elastic.NewRangeQuery("publishtime").Gte("1640966400")
  128. //query := elastic.NewBoolQuery().
  129. // Must(rangeQuery).
  130. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同"))
  131. //Must(elastic.NewTermQuery("site", "中国招标与采购网")).Must(rangeQuery)
  132. //query := elastic.NewBoolQuery().
  133. // //北京,天津,河北,上海,江苏,浙江,安徽
  134. // //Must(elastic.NewTermQuery("area", "北京市")).sassss
  135. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  136. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  137. // Must(rangeQuery)
  138. //
  139. rangeQuery := elastic.NewRangeQuery("publishtime").Gte("1640966400")
  140. termsQuery := elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")
  141. // 综合所有条件
  142. query := elastic.NewBoolQuery().
  143. Must(rangeQuery).
  144. Must(termsQuery)
  145. ctx := context.Background()
  146. //开始滚动搜索
  147. scrollID := ""
  148. scroll := "10m"
  149. searchSource := elastic.NewSearchSource().
  150. Query(query).
  151. Size(10000).
  152. Sort("_doc", true) //升序排序
  153. //Sort("_doc", false) //降序排序
  154. searchService := client.Scroll(index).
  155. Size(10000).
  156. Scroll(scroll).
  157. SearchSource(searchSource)
  158. res, err := searchService.Do(ctx)
  159. if err != nil {
  160. if err == io.EOF {
  161. fmt.Println("没有数据")
  162. } else {
  163. panic(err)
  164. }
  165. }
  166. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  167. fmt.Println("总数是:", res.TotalHits())
  168. total := 0
  169. for len(res.Hits.Hits) > 0 {
  170. for _, hit := range res.Hits.Hits {
  171. var doc map[string]interface{}
  172. err := json.Unmarshal(hit.Source, &doc)
  173. if err != nil {
  174. log.Printf("解析文档失败:%s", err)
  175. continue
  176. }
  177. delete(doc, "filetext")
  178. delete(doc, "detail")
  179. purchasing := util.ObjToString(doc["purchasing"])
  180. if strings.Contains(purchasing, "新华三") || strings.Contains(purchasing, "华三") || strings.Contains(purchasing, "H3C") || strings.Contains(purchasing, "h3c") {
  181. //存入新表
  182. err = MgoB.InsertOrUpdate("qfw", "wcc_bidding_test_250219", doc)
  183. if err != nil {
  184. log.Println("error", doc["id"])
  185. }
  186. }
  187. // 处理查询结果
  188. //area := util.ObjToString(doc["area"])
  189. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  190. //if !IsInStringArray(area, areas) {
  191. // continue
  192. //}
  193. //projectName := util.ObjToString(doc["projectname"])
  194. //if strings.Contains(projectName, "非政府") {
  195. // continue
  196. //}
  197. //buyerclass := util.ObjToString(doc["buyerclass"])
  198. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  199. // continue
  200. //}
  201. ////存入新表
  202. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  203. //if err != nil {
  204. // log.Println("error", doc["id"])
  205. //}
  206. }
  207. total = total + len(res.Hits.Hits)
  208. scrollID = res.ScrollId
  209. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  210. log.Println("current count:", total)
  211. if err != nil {
  212. if err == io.EOF {
  213. // 滚动到最后一批数据,退出循环
  214. break
  215. }
  216. log.Println("滚动搜索失败:", err, res)
  217. break // 处理错误时退出循环
  218. }
  219. }
  220. // 在循环外调用 ClearScroll
  221. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  222. if err != nil {
  223. log.Printf("清理滚动搜索失败:%s", err)
  224. }
  225. fmt.Println("结束~~~~~~~~~~~~~~~")
  226. }
  227. // getProjectDataFromEs 获取项目 中标成交数据
  228. func getProjectDataFromEs() {
  229. //url := "http://172.17.4.184:19908"
  230. url := "http://127.0.0.1:19908"
  231. username := "jybid"
  232. password := "Top2023_JEB01i@31"
  233. index := "projectset" //索引名称
  234. // 创建 Elasticsearch 客户端
  235. client, err := elastic.NewClient(
  236. elastic.SetURL(url),
  237. elastic.SetBasicAuth(username, password),
  238. elastic.SetSniff(false),
  239. )
  240. if err != nil {
  241. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  242. }
  243. //85 抽取库
  244. Mgo := &mongodb.MongodbSim{
  245. MongodbAddr: "127.0.0.1:27080",
  246. //MongodbAddr: "172.17.4.85:27080",
  247. DbName: "top",
  248. Size: 10,
  249. Direct: true,
  250. }
  251. Mgo.InitPool()
  252. //MgoB := &mongodb.MongodbSim{
  253. // MongodbAddr: "172.17.189.140:27080",
  254. // //MongodbAddr: "127.0.0.1:27083",
  255. // Size: 10,
  256. // DbName: "qfw",
  257. // UserName: "SJZY_RWbid_ES",
  258. // Password: "SJZY@B4i4D5e6S",
  259. // //Direct: true,
  260. //}
  261. //MgoB.InitPool()
  262. //2023年01-01 2023-10-01,,1-3季度
  263. //2024-1 - 2024-4;1704038400-1711900800
  264. //2023-10-1 2024-1-1;1696089600-1704038400
  265. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  266. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1711900800).Lt(1719763200) //2024年4-7月
  267. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1735660800).Lt(1743436800) //2025年1-3月;25年第一季度数据
  268. rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1743436800).Lt(1751299200) //2025年4.1-7.1 ;25年第二季度数据
  269. query := elastic.NewBoolQuery().
  270. //Must(areaTermsQuery).
  271. Must(elastic.NewTermsQuery("bidstatus", "中标", "单一", "成交", "合同")).
  272. Must(rangeQuery)
  273. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  274. //query := elastic.NewBoolQuery().
  275. // //北京,天津,河北,上海,江苏,浙江,安徽
  276. // //Must(elastic.NewTermQuery("area", "北京市")).
  277. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  278. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  279. // Must(rangeQuery)
  280. ctx := context.Background()
  281. //开始滚动搜索
  282. scrollID := ""
  283. scroll := "10m"
  284. searchSource := elastic.NewSearchSource().
  285. Query(query).
  286. Size(10000).
  287. Sort("_doc", true) //升序排序
  288. //Sort("_doc", false) //降序排序
  289. searchService := client.Scroll(index).
  290. Size(10000).
  291. Scroll(scroll).
  292. SearchSource(searchSource)
  293. res, err := searchService.Do(ctx)
  294. if err != nil {
  295. if err == io.EOF {
  296. fmt.Println("没有数据")
  297. } else {
  298. panic(err)
  299. }
  300. }
  301. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  302. fmt.Println("总数是:", res.TotalHits())
  303. total := 0
  304. for len(res.Hits.Hits) > 0 {
  305. for _, hit := range res.Hits.Hits {
  306. var doc map[string]interface{}
  307. err := json.Unmarshal(hit.Source, &doc)
  308. if err != nil {
  309. log.Printf("解析文档失败:%s", err)
  310. continue
  311. }
  312. delete(doc, "filetext")
  313. delete(doc, "detail")
  314. sWinner := util.ObjToString(doc["s_winner"])
  315. winners := strings.Split(sWinner, ",")
  316. for _, v := range winners {
  317. insert := doc
  318. insert["s_winner"] = v
  319. //存入新表
  320. err = Mgo.InsertOrUpdate("top", "wcc_allcity_2025Q2", insert)
  321. if err != nil {
  322. log.Println("error", doc["id"])
  323. }
  324. }
  325. // 处理查询结果
  326. //area := util.ObjToString(doc["area"])
  327. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  328. //if !IsInStringArray(area, areas) {
  329. // continue
  330. //}
  331. //projectName := util.ObjToString(doc["projectname"])
  332. //if strings.Contains(projectName, "非政府") {
  333. // continue
  334. //}
  335. //buyerclass := util.ObjToString(doc["buyerclass"])
  336. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  337. // continue
  338. //}
  339. ////存入新表
  340. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  341. //if err != nil {
  342. // log.Println("error", doc["id"])
  343. //}
  344. }
  345. total = total + len(res.Hits.Hits)
  346. scrollID = res.ScrollId
  347. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  348. log.Println("current count:", total)
  349. if err != nil {
  350. if err == io.EOF {
  351. // 滚动到最后一批数据,退出循环
  352. break
  353. }
  354. log.Println("滚动搜索失败:", err, res)
  355. break // 处理错误时退出循环
  356. }
  357. }
  358. // 在循环外调用 ClearScroll
  359. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  360. if err != nil {
  361. log.Printf("清理滚动搜索失败:%s", err)
  362. }
  363. fmt.Println("结束~~~~~~~~~~~~~~~")
  364. }
  365. // getQyxytData 获取企业数据
  366. func getQyxytData() {
  367. //url := "http://172.17.4.184:19908"
  368. url := "http://127.0.0.1:19908"
  369. username := "jybid"
  370. password := "Top2023_JEB01i@31"
  371. index := "qyxy" //索引名称
  372. // 创建 Elasticsearch 客户端
  373. client, err := elastic.NewClient(
  374. elastic.SetURL(url),
  375. elastic.SetBasicAuth(username, password),
  376. elastic.SetSniff(false),
  377. )
  378. if err != nil {
  379. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  380. }
  381. //85 抽取库
  382. //Mgo := &mongodb.MongodbSim{
  383. // //MongodbAddr: "127.0.0.1:27080",
  384. // MongodbAddr: "172.17.4.85:27080",
  385. // DbName: "top",
  386. // Size: 10,
  387. // //Direct: true,
  388. //}
  389. //Mgo.InitPool()
  390. //MgoB := &mongodb.MongodbSim{
  391. // MongodbAddr: "172.17.189.140:27080",
  392. // //MongodbAddr: "127.0.0.1:27083",
  393. // Size: 10,
  394. // DbName: "qfw",
  395. // UserName: "SJZY_RWbid_ES",
  396. // Password: "SJZY@B4i4D5e6S",
  397. // //Direct: true,
  398. //}
  399. //MgoB.InitPool()
  400. //2023年01-01 2023-10-01,,1-3季度
  401. //2024-1 - 2024-4;1704038400-1711900800
  402. //2023-10-1 2024-1-1;1696089600-1704038400
  403. //城市范围
  404. //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
  405. //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  406. //query := elastic.NewBoolQuery().
  407. // Must(areaTermsQuery).
  408. // Must(rangeQuery)
  409. //---------------------------//
  410. query := elastic.NewBoolQuery()
  411. query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  412. query.Must(elastic.NewTermQuery("company_city", "北京市"))
  413. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  414. //query := elastic.NewBoolQuery().
  415. // //北京,天津,河北,上海,江苏,浙江,安徽
  416. // //Must(elastic.NewTermQuery("area", "北京市")).
  417. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  418. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  419. // Must(rangeQuery)
  420. ctx := context.Background()
  421. //开始滚动搜索
  422. scrollID := ""
  423. scroll := "10m"
  424. searchSource := elastic.NewSearchSource().
  425. Query(query).
  426. Size(10000).
  427. Sort("_doc", true) //升序排序
  428. //Sort("_doc", false) //降序排序
  429. searchService := client.Scroll(index).
  430. Size(10000).
  431. Scroll(scroll).
  432. SearchSource(searchSource)
  433. res, err := searchService.Do(ctx)
  434. if err != nil {
  435. if err == io.EOF {
  436. fmt.Println("没有数据")
  437. } else {
  438. panic(err)
  439. }
  440. }
  441. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  442. fmt.Println("总数是:", res.TotalHits())
  443. total := 0
  444. for len(res.Hits.Hits) > 0 {
  445. for _, hit := range res.Hits.Hits {
  446. var doc map[string]interface{}
  447. err := json.Unmarshal(hit.Source, &doc)
  448. if err != nil {
  449. log.Printf("解析文档失败:%s", err)
  450. continue
  451. }
  452. if strings.Contains(util.ObjToString(doc["business_scope"]), "招投标代理") {
  453. //存入新表
  454. insert := map[string]interface{}{
  455. "company_name": doc["company_name"],
  456. "business_scope": doc["business_scope"],
  457. "employee_name": doc["employee_name"],
  458. "company_phone": doc["company_phone"],
  459. }
  460. err = MgoB.InsertOrUpdate("qfw", "wcc_2024_beijing_dailijigou", insert)
  461. if err != nil {
  462. log.Println("error", doc["id"])
  463. }
  464. }
  465. //sWinner := util.ObjToString(doc["s_winner"])
  466. //winners := strings.Split(sWinner, ",")
  467. //for _, v := range winners {
  468. // insert := doc
  469. // insert["s_winner"] = v
  470. // //存入新表
  471. // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert)
  472. // if err != nil {
  473. // log.Println("error", doc["id"])
  474. // }
  475. //}
  476. // 处理查询结果
  477. //area := util.ObjToString(doc["area"])
  478. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  479. //if !IsInStringArray(area, areas) {
  480. // continue
  481. //}
  482. //projectName := util.ObjToString(doc["projectname"])
  483. //if strings.Contains(projectName, "非政府") {
  484. // continue
  485. //}
  486. //buyerclass := util.ObjToString(doc["buyerclass"])
  487. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  488. // continue
  489. //}
  490. ////存入新表
  491. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  492. //if err != nil {
  493. // log.Println("error", doc["id"])
  494. //}
  495. }
  496. total = total + len(res.Hits.Hits)
  497. scrollID = res.ScrollId
  498. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  499. log.Println("current count:", total)
  500. if err != nil {
  501. if err == io.EOF {
  502. // 滚动到最后一批数据,退出循环
  503. break
  504. }
  505. log.Println("滚动搜索失败:", err, res)
  506. break // 处理错误时退出循环
  507. }
  508. }
  509. // 在循环外调用 ClearScroll
  510. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  511. if err != nil {
  512. log.Printf("清理滚动搜索失败:%s", err)
  513. }
  514. fmt.Println("结束~~~~~~~~~~~~~~~")
  515. }
  516. // getQyLimitData 获取qyxy 条件数据
  517. func getQyLimitData() {
  518. //url := "http://172.17.4.184:19908"
  519. url := "http://127.0.0.1:19908"
  520. username := "jybid"
  521. password := "Top2023_JEB01i@31"
  522. index := "qyxy" //索引名称
  523. // 创建 Elasticsearch 客户端
  524. client, err := elastic.NewClient(
  525. elastic.SetURL(url),
  526. elastic.SetBasicAuth(username, password),
  527. elastic.SetSniff(false),
  528. )
  529. if err != nil {
  530. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  531. }
  532. // 构建查询
  533. query := elastic.NewBoolQuery().
  534. Must(elastic.NewMatchQuery("company_area", "河南")).
  535. Must(elastic.NewMatchQuery("company_status", "存续")).
  536. MustNot(elastic.NewMatchQuery("company_type", "个体工商户"))
  537. // 执行查询
  538. searchResult, err := client.Search().Size(50).
  539. Index(index).
  540. Query(query).
  541. Do(context.Background())
  542. if err != nil {
  543. log.Fatalf("Error executing search: %s", err)
  544. }
  545. // 本地数据库
  546. MgoB := &mongodb.MongodbSim{
  547. MongodbAddr: "127.0.0.1:27017",
  548. Size: 10,
  549. DbName: "wcc",
  550. }
  551. MgoB.InitPool()
  552. for _, hit := range searchResult.Hits.Hits {
  553. var doc map[string]interface{}
  554. err := json.Unmarshal(hit.Source, &doc)
  555. if err != nil {
  556. log.Printf("解析文档失败:%s", err)
  557. continue
  558. }
  559. MgoB.SaveByOriID("wcc_henan_0428", doc)
  560. }
  561. }
  562. // getTidb 获取tidb 数据
  563. func getTidb() {
  564. MgoB := &mongodb.MongodbSim{
  565. MongodbAddr: "172.17.189.140:27080",
  566. //MongodbAddr: "127.0.0.1:27083",
  567. Size: 10,
  568. DbName: "qfw",
  569. UserName: "SJZY_RWbid_ES",
  570. Password: "SJZY@B4i4D5e6S",
  571. //Direct: true,
  572. }
  573. MgoB.InitPool()
  574. //tidb
  575. username := "datascbi"
  576. password := "Da#Bi20221111SC"
  577. //host := "127.0.0.1:4001"
  578. host := "172.17.162.25:4000"
  579. database := "global_common_data"
  580. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  581. // 连接到数据库
  582. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  583. if err != nil {
  584. log.Println("Failed to connect to database:", err)
  585. return
  586. }
  587. fmt.Println("Connected to the database!")
  588. defer util.Catch()
  589. sess := MgoB.GetMgoConn()
  590. defer MgoB.DestoryMongoConn(sess)
  591. it := sess.DB("qfw").C("wcc_2024_beijing_dailijigou").Find(nil).Select(nil).Iter()
  592. fmt.Println("taskRun 开始")
  593. count := 0
  594. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  595. if count%10000 == 0 {
  596. log.Println("current:", count)
  597. }
  598. companyName := util.ObjToString(tmp["company_name"])
  599. var baseInfo EnterpriseBaseInfo
  600. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  601. if baseInfo.ID > 0 {
  602. insert := map[string]interface{}{
  603. "company_name": companyName,
  604. "name_id": baseInfo.NameID,
  605. "business_scope": tmp["business_scope"],
  606. }
  607. MgoB.InsertOrUpdate("qfw", "wcc_beijing_daili_bidding", insert)
  608. }
  609. }
  610. log.Println("over")
  611. }
  612. // getEntInfo 获取法人库数据
  613. func getEntInfo() {
  614. url := "http://172.17.4.184:19908"
  615. //url := "http://127.0.0.1:19908"
  616. username := "jybid"
  617. password := "Top2023_JEB01i@31"
  618. index := "ent_info" //索引名称
  619. // 创建 Elasticsearch 客户端
  620. client, err := elastic.NewClient(
  621. elastic.SetURL(url),
  622. elastic.SetBasicAuth(username, password),
  623. elastic.SetSniff(false),
  624. )
  625. if err != nil {
  626. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  627. }
  628. query := elastic.NewBoolQuery().
  629. //北京,天津,河北,上海,江苏,浙江,安徽
  630. //Must(elastic.NewMatchQuery("company_name", "医院")).
  631. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  632. Must(elastic.NewExistsQuery("tag_labels"))
  633. //Must(rangeQuery)
  634. ctx := context.Background()
  635. //开始滚动搜索
  636. scrollID := ""
  637. scroll := "10m"
  638. searchSource := elastic.NewSearchSource().
  639. Query(query).
  640. Size(10000).
  641. Sort("_doc", true) //升序排序
  642. //Sort("_doc", false) //降序排序
  643. searchService := client.Scroll(index).
  644. Size(10000).
  645. Scroll(scroll).
  646. SearchSource(searchSource)
  647. res, err := searchService.Do(ctx)
  648. if err != nil {
  649. if err == io.EOF {
  650. fmt.Println("没有数据")
  651. } else {
  652. panic(err)
  653. }
  654. }
  655. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  656. fmt.Println("总数是:", res.TotalHits())
  657. total := 0
  658. for len(res.Hits.Hits) > 0 {
  659. for _, hit := range res.Hits.Hits {
  660. var doc map[string]interface{}
  661. err = json.Unmarshal(hit.Source, &doc)
  662. if err != nil {
  663. log.Printf("解析文档失败:%s", err)
  664. continue
  665. }
  666. name := util.ObjToString(doc["company_name"])
  667. updateData := make(map[string]interface{})
  668. if tag_labels, ok := doc["tag_labels"].([]interface{}); ok {
  669. updateData["main_label"] = tag_labels[0]
  670. _, err = client.Update().
  671. Index(index).
  672. Id(util.ObjToString(doc["id"])).
  673. Doc(updateData).
  674. Do(context.Background())
  675. if err != nil {
  676. log.Println("更新失败", name, tag_labels, err)
  677. }
  678. }
  679. }
  680. total = total + len(res.Hits.Hits)
  681. scrollID = res.ScrollId
  682. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  683. log.Println("current count:", total)
  684. if err != nil {
  685. if err == io.EOF {
  686. // 滚动到最后一批数据,退出循环
  687. break
  688. }
  689. log.Println("滚动搜索失败:", err, res)
  690. break // 处理错误时退出循环
  691. }
  692. }
  693. // 在循环外调用 ClearScroll
  694. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  695. if err != nil {
  696. log.Printf("清理滚动搜索失败:%s", err)
  697. }
  698. fmt.Println("结束~~~~~~~~~~~~~~~")
  699. }
  700. // getBuyerData 获取采购单位数据
  701. func getBuyerData() {
  702. //key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
  703. //model := "glm-4-air"
  704. url := "http://172.17.4.184:19908"
  705. //url := "http://127.0.0.1:19908"
  706. username := "jybid"
  707. password := "Top2023_JEB01i@31"
  708. index := "buyer" //索引名称
  709. // 创建 Elasticsearch 客户端
  710. client, err := elastic.NewClient(
  711. elastic.SetURL(url),
  712. elastic.SetBasicAuth(username, password),
  713. elastic.SetSniff(false),
  714. )
  715. if err != nil {
  716. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  717. }
  718. MgoB := &mongodb.MongodbSim{
  719. MongodbAddr: "172.17.189.140:27080",
  720. //MongodbAddr: "127.0.0.1:27083",
  721. Size: 10,
  722. DbName: "qfw",
  723. UserName: "SJZY_RWbid_ES",
  724. Password: "SJZY@B4i4D5e6S",
  725. //Direct: true,
  726. }
  727. MgoB.InitPool()
  728. //query := elastic.NewBoolQuery().
  729. // //北京,天津,河北,上海,江苏,浙江,安徽
  730. // Must(elastic.NewMatchQuery("company_name", "医院")).
  731. // //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  732. // Must(elastic.NewTermsQuery("tag_labels", "学校", "教育"))
  733. //Must(rangeQuery)
  734. ctx := context.Background()
  735. //开始滚动搜索
  736. scrollID := ""
  737. scroll := "10m"
  738. searchSource := elastic.NewSearchSource().
  739. //Query(query).
  740. Size(10000).
  741. Sort("_doc", true) //升序排序
  742. //Sort("_doc", false) //降序排序
  743. searchService := client.Scroll(index).
  744. Size(10000).
  745. Scroll(scroll).
  746. SearchSource(searchSource)
  747. res, err := searchService.Do(ctx)
  748. if err != nil {
  749. if err == io.EOF {
  750. fmt.Println("没有数据")
  751. } else {
  752. panic(err)
  753. }
  754. }
  755. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  756. fmt.Println("总数是:", res.TotalHits())
  757. total := 0
  758. for len(res.Hits.Hits) > 0 {
  759. for _, hit := range res.Hits.Hits {
  760. var doc map[string]interface{}
  761. err = json.Unmarshal(hit.Source, &doc)
  762. if err != nil {
  763. log.Printf("解析文档失败:%s", err)
  764. continue
  765. }
  766. //name := util.ObjToString(doc["buyer_name"])
  767. //ra := ZpAI(key, model, name)
  768. //if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  769. // doc["national_top"] = ra["label1"]
  770. // doc["main_label"] = ra["label1"]
  771. //}
  772. //if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  773. // doc["national_sub"] = ra["label2"]
  774. //}
  775. //if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  776. // doc["national_subsub"] = ra["label3"]
  777. //}
  778. MgoB.Save("ent_info_buyer", doc)
  779. //time.Sleep(time.Microsecond)
  780. }
  781. total = total + len(res.Hits.Hits)
  782. scrollID = res.ScrollId
  783. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  784. log.Println("current count:", total)
  785. if err != nil {
  786. if err == io.EOF {
  787. // 滚动到最后一批数据,退出循环
  788. break
  789. }
  790. log.Println("滚动搜索失败:", err, res)
  791. break // 处理错误时退出循环
  792. }
  793. }
  794. // 在循环外调用 ClearScroll
  795. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  796. if err != nil {
  797. log.Printf("清理滚动搜索失败:%s", err)
  798. }
  799. fmt.Println("结束~~~~~~~~~~~~~~~")
  800. }
  801. // mgoBidding mgoBidding 数据
  802. func mgoBidding() {
  803. MgoB := &mongodb.MongodbSim{
  804. MongodbAddr: "172.17.189.140:27080",
  805. //MongodbAddr: "127.0.0.1:27083",
  806. Size: 10,
  807. DbName: "qfw",
  808. UserName: "SJZY_RWbid_ES",
  809. Password: "SJZY@B4i4D5e6S",
  810. //Direct: true,
  811. }
  812. MgoB.InitPool()
  813. sess := MgoB.GetMgoConn()
  814. defer MgoB.DestoryMongoConn(sess)
  815. //181 凭安库
  816. MgoQY := &mongodb.MongodbSim{
  817. MongodbAddr: "172.17.4.181:27001",
  818. //MongodbAddr: "127.0.0.1:27001",
  819. DbName: "mixdata",
  820. Size: 10,
  821. UserName: "",
  822. Password: "",
  823. //Direct: true,
  824. }
  825. MgoQY.InitPool()
  826. where := map[string]interface{}{
  827. "qy_flag": 1,
  828. }
  829. query := sess.DB("qfw").C("ent_info_buyer").Find(where).Select(map[string]interface{}{
  830. "contenthtml": 0}).Iter()
  831. count := 0
  832. key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
  833. model := "glm-4-air"
  834. ch := make(chan bool, 10)
  835. wg := &sync.WaitGroup{}
  836. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  837. if count%100 == 0 {
  838. log.Println("current:", count, tmp["name"])
  839. }
  840. //存在就不再调用大模型
  841. //if _, ok := tmp["national_top"]; ok {
  842. // continue
  843. //}
  844. if utf8.RuneCountInString(util.ObjToString(tmp["name"])) < 4 {
  845. continue
  846. }
  847. ch <- true
  848. wg.Add(1)
  849. go func(tmp map[string]interface{}) {
  850. defer func() {
  851. <-ch
  852. wg.Done()
  853. }()
  854. //
  855. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  856. update := make(map[string]interface{})
  857. name := util.ObjToString(tmp["name"])
  858. where2 := map[string]interface{}{
  859. "company_name": name,
  860. }
  861. data, _ := MgoQY.FindOne("company_base", where2)
  862. businessScope := util.ObjToString((*data)["business_scope"])
  863. ra := ZpAI1(key, model, name, businessScope)
  864. if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  865. //update["national_top"] = ra["label1"]
  866. //update["main_label"] = ra["label1"]
  867. update["label1"] = ra["label1"]
  868. }
  869. if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  870. //update["national_sub"] = ra["label2"]
  871. update["label2"] = ra["label2"]
  872. }
  873. if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  874. //update["national_subsub"] = ra["label3"]
  875. update["label3"] = ra["label3"]
  876. }
  877. if len(update) > 0 {
  878. MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  879. }
  880. }(tmp)
  881. tmp = map[string]interface{}{}
  882. }
  883. wg.Wait()
  884. log.Println("over 22222222222")
  885. //log.Println("开始第二轮迭代")
  886. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  887. // if _, ok := tmp["national_top"]; ok {
  888. // continue
  889. // }
  890. // biddingID := mongodb.BsonIdToSId(tmp["_id"])
  891. // name := util.ObjToString(tmp["name"])
  892. // update := make(map[string]interface{})
  893. // ra := ZpAI(key, model, name)
  894. // if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  895. // update["national_top"] = ra["label1"]
  896. // update["main_label"] = ra["label1"]
  897. // }
  898. // if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  899. // update["national_sub"] = ra["label2"]
  900. // }
  901. // if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  902. // update["national_subsub"] = ra["label3"]
  903. // }
  904. // if count%1000 == 0 {
  905. // log.Println("current", count, name, ra["label1"], ra["label2"])
  906. // }
  907. //
  908. // if len(update) > 0 {
  909. // MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  910. // }
  911. // //time.Sleep(time.Microsecond)
  912. //}
  913. //
  914. //log.Println("开始第3轮迭代")
  915. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  916. // if _, ok := tmp["national_top"]; ok {
  917. // continue
  918. // }
  919. // biddingID := mongodb.BsonIdToSId(tmp["_id"])
  920. // name := util.ObjToString(tmp["name"])
  921. // update := make(map[string]interface{})
  922. // ra := ZpAI(key, model, name)
  923. // if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  924. // update["national_top"] = ra["label1"]
  925. // update["main_label"] = ra["label1"]
  926. // }
  927. // if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  928. // update["national_sub"] = ra["label2"]
  929. // }
  930. // if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  931. // update["national_subsub"] = ra["label3"]
  932. // }
  933. // if count%1000 == 0 {
  934. // log.Println("current", count, name, ra["label1"], ra["label2"])
  935. // }
  936. //
  937. // if len(update) > 0 {
  938. // MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  939. // }
  940. // //time.Sleep(time.Microsecond)
  941. //}
  942. }
  943. // fixProjectPortrait 修复画像数据重复
  944. func fixProjectPortrait() {
  945. url := "http://172.17.4.184:19908"
  946. //url := "http://127.0.0.1:19908"
  947. username := "jybid"
  948. password := "Top2023_JEB01i@31"
  949. index := "project_portrait" //索引名称
  950. buyerMap := make(map[string]int)
  951. buyerDatas := make(map[string][]map[string]interface{})
  952. // 创建 Elasticsearch 客户端
  953. client, err := elastic.NewClient(
  954. elastic.SetURL(url),
  955. elastic.SetBasicAuth(username, password),
  956. elastic.SetSniff(false),
  957. )
  958. if err != nil {
  959. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  960. }
  961. query := elastic.NewBoolQuery()
  962. //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  963. query.Must(elastic.NewTermQuery("class", "情报_安防"))
  964. ctx := context.Background()
  965. //开始滚动搜索
  966. scrollID := ""
  967. scroll := "10m"
  968. searchSource := elastic.NewSearchSource().
  969. Query(query).
  970. Size(10000).
  971. //Sort("_doc", true) //升序排序
  972. Sort("_doc", false) //降序排序
  973. searchService := client.Scroll(index).
  974. Size(10000).
  975. Scroll(scroll).
  976. SearchSource(searchSource)
  977. res, err := searchService.Do(ctx)
  978. if err != nil {
  979. if err == io.EOF {
  980. fmt.Println("没有数据")
  981. } else {
  982. panic(err)
  983. }
  984. }
  985. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  986. fmt.Println("project_portrait 总数是:", res.TotalHits())
  987. MgoB := &mongodb.MongodbSim{
  988. MongodbAddr: "172.17.189.140:27080",
  989. //MongodbAddr: "127.0.0.1:27083",
  990. Size: 10,
  991. DbName: "qfw",
  992. UserName: "SJZY_RWbid_ES",
  993. Password: "SJZY@B4i4D5e6S",
  994. //Direct: true,
  995. }
  996. MgoB.InitPool()
  997. //wher := map[string]interface{}{
  998. // "_id": mongodb.StringTOBsonId("66faf189bf905908d4a252d6"),
  999. //}
  1000. //MgoB.Delete("project_portrait", wher)
  1001. //
  1002. //return
  1003. total := 0
  1004. for len(res.Hits.Hits) > 0 {
  1005. for _, hit := range res.Hits.Hits {
  1006. var doc map[string]interface{}
  1007. err := json.Unmarshal(hit.Source, &doc)
  1008. if err != nil {
  1009. log.Printf("解析文档失败:%s", err)
  1010. continue
  1011. }
  1012. buyerName := util.ObjToString(doc["buyer"])
  1013. buyerMap[buyerName]++
  1014. buyerArr := buyerDatas[buyerName]
  1015. buyerArr = append(buyerArr, doc)
  1016. buyerDatas[buyerName] = buyerArr
  1017. }
  1018. total = total + len(res.Hits.Hits)
  1019. scrollID = res.ScrollId
  1020. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  1021. log.Println("current count:", total)
  1022. if err != nil {
  1023. if err == io.EOF {
  1024. // 滚动到最后一批数据,退出循环
  1025. break
  1026. }
  1027. log.Println("滚动搜索失败:", err, res)
  1028. break // 处理错误时退出循环
  1029. }
  1030. }
  1031. // 在循环外调用 ClearScroll
  1032. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  1033. if err != nil {
  1034. log.Printf("清理滚动搜索失败:%s", err)
  1035. }
  1036. fmt.Println("迭代结束~~~~~~~~~~~~~~~")
  1037. su := 0
  1038. for k, v := range buyerMap {
  1039. su++
  1040. if su%1000 == 0 {
  1041. log.Println("su", su)
  1042. }
  1043. if v > 1 {
  1044. buyerName := k
  1045. buyerArr := buyerDatas[buyerName]
  1046. doc := buyerArr[0]
  1047. doc["_id"] = mongodb.StringTOBsonId(util.ObjToString(doc["id"]))
  1048. MgoB.SaveByOriID("project_portrait_1030_test", doc)
  1049. for kk, vv := range buyerArr {
  1050. id := util.ObjToString(vv["id"])
  1051. where := map[string]interface{}{
  1052. "_id": mongodb.StringTOBsonId(util.ObjToString(doc["id"])),
  1053. }
  1054. MgoB.Delete("project_portrait", where)
  1055. if kk > 0 {
  1056. client.Delete().Index(index).Id(id).Do(context.Background())
  1057. }
  1058. }
  1059. }
  1060. }
  1061. }
  1062. // updateMgoEntInfoBuyer updateMgoEntInfoBuyer
  1063. func updateMgoEntInfoBuyer() {
  1064. MgoB := &mongodb.MongodbSim{
  1065. MongodbAddr: "172.17.189.140:27080",
  1066. //MongodbAddr: "127.0.0.1:27083",
  1067. Size: 10,
  1068. DbName: "qfw",
  1069. UserName: "SJZY_RWbid_ES",
  1070. Password: "SJZY@B4i4D5e6S",
  1071. //Direct: true,
  1072. }
  1073. MgoB.InitPool()
  1074. //181 凭安库
  1075. MgoQY := &mongodb.MongodbSim{
  1076. MongodbAddr: "172.17.4.181:27001",
  1077. //MongodbAddr: "127.0.0.1:27001",
  1078. DbName: "mixdata",
  1079. Size: 10,
  1080. UserName: "",
  1081. Password: "",
  1082. //Direct: true,
  1083. }
  1084. MgoQY.InitPool()
  1085. sess := MgoB.GetMgoConn()
  1086. defer MgoB.DestoryMongoConn(sess)
  1087. query := sess.DB("qfw").C("ent_info_buyer").Find(nil).Select(map[string]interface{}{
  1088. "contenthtml": 0}).Iter()
  1089. count := 0
  1090. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  1091. if count%1000 == 0 {
  1092. log.Println("current:", count, tmp["name"])
  1093. }
  1094. name := util.ObjToString(tmp["name"])
  1095. where := map[string]interface{}{
  1096. "company_name": name,
  1097. }
  1098. id := mongodb.BsonIdToSId(tmp["_id"])
  1099. data, _ := MgoQY.FindOne("company_base", where)
  1100. if data != nil && len(*data) > 0 {
  1101. update := map[string]interface{}{
  1102. "qy_flag": 1,
  1103. "use_flag": (*data)["use_flag"],
  1104. "company_type": (*data)["company_type"],
  1105. "company_status": (*data)["company_status"],
  1106. "credit_no": (*data)["credit_no"],
  1107. "business_scope": (*data)["business_scope"],
  1108. }
  1109. MgoB.UpdateById("ent_info_buyer", id, map[string]interface{}{"$set": update})
  1110. }
  1111. }
  1112. }