main.go 32 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "gorm.io/driver/mysql"
  8. "gorm.io/gorm"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "strings"
  14. "sync"
  15. "unicode/utf8"
  16. )
  17. func main() {
  18. /**
  19. getProjectData click 是一起使用的,统计获取中标企业信息
  20. */
  21. //getProjectDataFromEs() //1.拉取项目中标成交数据
  22. //click() //2.处理项目数据,写入clickhouse
  23. //click2()
  24. //dealData()
  25. //getProject()
  26. //getQyLimitData()
  27. getBiddingData()
  28. //getQyxytData()
  29. //getTidb()
  30. //getEntInfo() //法人库数据
  31. //getBuyerData()
  32. //mgoBidding()
  33. //log.Println("开启第二轮")
  34. //mgoBidding()
  35. //updateMgoEntInfoBuyer()
  36. //getZhiMa()
  37. //log.Println("over ------------------ over")
  38. //fixProjectPortrait()
  39. //
  40. //ClickhouseData() //gorm 操作Clickhouse;gorm 对Clickhouse的bitmap兼容性不行,放弃
  41. //dealClickhouse() //clickhouse-go 操作
  42. //testUpdateBitmap() //测试环境测试更新Clickhouse bitmap字段
  43. ///-------//
  44. //updateHrefByEs()
  45. log.Println("over ------------------ over")
  46. }
  47. // getBiddingData 获取标讯数据
  48. func getBiddingData() {
  49. //url := "http://172.17.4.184:19908"
  50. url := "http://127.0.0.1:19908"
  51. username := "jybid"
  52. password := "Top2023_JEB01i@31"
  53. index := "bidding" //索引名称
  54. //index := "projectset" //索引名称
  55. // 创建 Elasticsearch 客户端
  56. client, err := elastic.NewClient(
  57. elastic.SetURL(url),
  58. elastic.SetBasicAuth(username, password),
  59. elastic.SetSniff(false),
  60. )
  61. if err != nil {
  62. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  63. }
  64. //85 抽取库
  65. //Mgo := &mongodb.MongodbSim{
  66. // //MongodbAddr: "127.0.0.1:27080",
  67. // MongodbAddr: "172.17.4.85:27080",
  68. // DbName: "top",
  69. // Size: 10,
  70. // //Direct: true,
  71. //}
  72. //Mgo.InitPool()
  73. MgoB := &mongodb.MongodbSim{
  74. //MongodbAddr: "172.17.189.140:27080",
  75. MongodbAddr: "127.0.0.1:27083",
  76. Size: 10,
  77. DbName: "qfw",
  78. UserName: "SJZY_RWbid_ES",
  79. Password: "SJZY@B4i4D5e6S",
  80. Direct: true,
  81. }
  82. MgoB.InitPool()
  83. //2023年01-01 2023-10-01,,1-3季度
  84. //2024-1 - 2024-4;1704038400-1711900800
  85. //2023-10-1 2024-1-1;1696089600-1704038400
  86. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  87. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  88. rangeQuery := elastic.NewRangeQuery("publishtime").Gte("1733760000").Lt("1734451200")
  89. query := elastic.NewBoolQuery().
  90. //Must(rangeQuery).
  91. Must(elastic.NewTermQuery("site", "中国招标与采购网")).Must(rangeQuery)
  92. //query := elastic.NewBoolQuery().
  93. // //北京,天津,河北,上海,江苏,浙江,安徽
  94. // //Must(elastic.NewTermQuery("area", "北京市")).
  95. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  96. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  97. // Must(rangeQuery)
  98. ctx := context.Background()
  99. //开始滚动搜索
  100. scrollID := ""
  101. scroll := "10m"
  102. searchSource := elastic.NewSearchSource().
  103. Query(query).
  104. Size(10000).
  105. Sort("_doc", true) //升序排序
  106. //Sort("_doc", false) //降序排序
  107. searchService := client.Scroll(index).
  108. Size(10000).
  109. Scroll(scroll).
  110. SearchSource(searchSource)
  111. res, err := searchService.Do(ctx)
  112. if err != nil {
  113. if err == io.EOF {
  114. fmt.Println("没有数据")
  115. } else {
  116. panic(err)
  117. }
  118. }
  119. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  120. fmt.Println("总数是:", res.TotalHits())
  121. total := 0
  122. for len(res.Hits.Hits) > 0 {
  123. for _, hit := range res.Hits.Hits {
  124. var doc map[string]interface{}
  125. err := json.Unmarshal(hit.Source, &doc)
  126. if err != nil {
  127. log.Printf("解析文档失败:%s", err)
  128. continue
  129. }
  130. delete(doc, "filetext")
  131. delete(doc, "detail")
  132. //存入新表
  133. err = MgoB.InsertOrUpdate("qfw", "wcc_bidding_test_1218", doc)
  134. if err != nil {
  135. log.Println("error", doc["id"])
  136. }
  137. // 处理查询结果
  138. //area := util.ObjToString(doc["area"])
  139. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  140. //if !IsInStringArray(area, areas) {
  141. // continue
  142. //}
  143. //projectName := util.ObjToString(doc["projectname"])
  144. //if strings.Contains(projectName, "非政府") {
  145. // continue
  146. //}
  147. //buyerclass := util.ObjToString(doc["buyerclass"])
  148. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  149. // continue
  150. //}
  151. ////存入新表
  152. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  153. //if err != nil {
  154. // log.Println("error", doc["id"])
  155. //}
  156. }
  157. total = total + len(res.Hits.Hits)
  158. scrollID = res.ScrollId
  159. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  160. log.Println("current count:", total)
  161. if err != nil {
  162. if err == io.EOF {
  163. // 滚动到最后一批数据,退出循环
  164. break
  165. }
  166. log.Println("滚动搜索失败:", err, res)
  167. break // 处理错误时退出循环
  168. }
  169. }
  170. // 在循环外调用 ClearScroll
  171. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  172. if err != nil {
  173. log.Printf("清理滚动搜索失败:%s", err)
  174. }
  175. fmt.Println("结束~~~~~~~~~~~~~~~")
  176. }
  177. // getProjectDataFromEs 获取项目 中标成交数据
  178. func getProjectDataFromEs() {
  179. url := "http://172.17.4.184:19908"
  180. //url := "http://127.0.0.1:19908"
  181. username := "jybid"
  182. password := "Top2023_JEB01i@31"
  183. index := "projectset" //索引名称
  184. // 创建 Elasticsearch 客户端
  185. client, err := elastic.NewClient(
  186. elastic.SetURL(url),
  187. elastic.SetBasicAuth(username, password),
  188. elastic.SetSniff(false),
  189. )
  190. if err != nil {
  191. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  192. }
  193. //85 抽取库
  194. Mgo := &mongodb.MongodbSim{
  195. //MongodbAddr: "127.0.0.1:27080",
  196. MongodbAddr: "172.17.4.85:27080",
  197. DbName: "top",
  198. Size: 10,
  199. //Direct: true,
  200. }
  201. Mgo.InitPool()
  202. //MgoB := &mongodb.MongodbSim{
  203. // MongodbAddr: "172.17.189.140:27080",
  204. // //MongodbAddr: "127.0.0.1:27083",
  205. // Size: 10,
  206. // DbName: "qfw",
  207. // UserName: "SJZY_RWbid_ES",
  208. // Password: "SJZY@B4i4D5e6S",
  209. // //Direct: true,
  210. //}
  211. //MgoB.InitPool()
  212. //2023年01-01 2023-10-01,,1-3季度
  213. //2024-1 - 2024-4;1704038400-1711900800
  214. //2023-10-1 2024-1-1;1696089600-1704038400
  215. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市")
  216. rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1711900800).Lt(1719763200) //2024年4-7月
  217. query := elastic.NewBoolQuery().
  218. //Must(areaTermsQuery).
  219. Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  220. Must(rangeQuery)
  221. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  222. //query := elastic.NewBoolQuery().
  223. // //北京,天津,河北,上海,江苏,浙江,安徽
  224. // //Must(elastic.NewTermQuery("area", "北京市")).
  225. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  226. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  227. // Must(rangeQuery)
  228. ctx := context.Background()
  229. //开始滚动搜索
  230. scrollID := ""
  231. scroll := "10m"
  232. searchSource := elastic.NewSearchSource().
  233. Query(query).
  234. Size(10000).
  235. Sort("_doc", true) //升序排序
  236. //Sort("_doc", false) //降序排序
  237. searchService := client.Scroll(index).
  238. Size(10000).
  239. Scroll(scroll).
  240. SearchSource(searchSource)
  241. res, err := searchService.Do(ctx)
  242. if err != nil {
  243. if err == io.EOF {
  244. fmt.Println("没有数据")
  245. } else {
  246. panic(err)
  247. }
  248. }
  249. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  250. fmt.Println("总数是:", res.TotalHits())
  251. total := 0
  252. for len(res.Hits.Hits) > 0 {
  253. for _, hit := range res.Hits.Hits {
  254. var doc map[string]interface{}
  255. err := json.Unmarshal(hit.Source, &doc)
  256. if err != nil {
  257. log.Printf("解析文档失败:%s", err)
  258. continue
  259. }
  260. delete(doc, "filetext")
  261. delete(doc, "detail")
  262. sWinner := util.ObjToString(doc["s_winner"])
  263. winners := strings.Split(sWinner, ",")
  264. for _, v := range winners {
  265. insert := doc
  266. insert["s_winner"] = v
  267. //存入新表
  268. err = Mgo.InsertOrUpdate("top", "wcc_allcity_2024Q2", insert)
  269. if err != nil {
  270. log.Println("error", doc["id"])
  271. }
  272. }
  273. // 处理查询结果
  274. //area := util.ObjToString(doc["area"])
  275. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  276. //if !IsInStringArray(area, areas) {
  277. // continue
  278. //}
  279. //projectName := util.ObjToString(doc["projectname"])
  280. //if strings.Contains(projectName, "非政府") {
  281. // continue
  282. //}
  283. //buyerclass := util.ObjToString(doc["buyerclass"])
  284. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  285. // continue
  286. //}
  287. ////存入新表
  288. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  289. //if err != nil {
  290. // log.Println("error", doc["id"])
  291. //}
  292. }
  293. total = total + len(res.Hits.Hits)
  294. scrollID = res.ScrollId
  295. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  296. log.Println("current count:", total)
  297. if err != nil {
  298. if err == io.EOF {
  299. // 滚动到最后一批数据,退出循环
  300. break
  301. }
  302. log.Println("滚动搜索失败:", err, res)
  303. break // 处理错误时退出循环
  304. }
  305. }
  306. // 在循环外调用 ClearScroll
  307. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  308. if err != nil {
  309. log.Printf("清理滚动搜索失败:%s", err)
  310. }
  311. fmt.Println("结束~~~~~~~~~~~~~~~")
  312. }
  313. // getQyxytData 获取企业数据
  314. func getQyxytData() {
  315. url := "http://172.17.4.184:19908"
  316. //url := "http://127.0.0.1:19908"
  317. username := "jybid"
  318. password := "Top2023_JEB01i@31"
  319. index := "qyxy" //索引名称
  320. // 创建 Elasticsearch 客户端
  321. client, err := elastic.NewClient(
  322. elastic.SetURL(url),
  323. elastic.SetBasicAuth(username, password),
  324. elastic.SetSniff(false),
  325. )
  326. if err != nil {
  327. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  328. }
  329. //85 抽取库
  330. //Mgo := &mongodb.MongodbSim{
  331. // //MongodbAddr: "127.0.0.1:27080",
  332. // MongodbAddr: "172.17.4.85:27080",
  333. // DbName: "top",
  334. // Size: 10,
  335. // //Direct: true,
  336. //}
  337. //Mgo.InitPool()
  338. MgoB := &mongodb.MongodbSim{
  339. MongodbAddr: "172.17.189.140:27080",
  340. //MongodbAddr: "127.0.0.1:27083",
  341. Size: 10,
  342. DbName: "qfw",
  343. UserName: "SJZY_RWbid_ES",
  344. Password: "SJZY@B4i4D5e6S",
  345. //Direct: true,
  346. }
  347. MgoB.InitPool()
  348. //2023年01-01 2023-10-01,,1-3季度
  349. //2024-1 - 2024-4;1704038400-1711900800
  350. //2023-10-1 2024-1-1;1696089600-1704038400
  351. //城市范围
  352. //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
  353. //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  354. //query := elastic.NewBoolQuery().
  355. // Must(areaTermsQuery).
  356. // Must(rangeQuery)
  357. //---------------------------//
  358. query := elastic.NewBoolQuery()
  359. query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  360. query.Must(elastic.NewTermQuery("company_city", "北京市"))
  361. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  362. //query := elastic.NewBoolQuery().
  363. // //北京,天津,河北,上海,江苏,浙江,安徽
  364. // //Must(elastic.NewTermQuery("area", "北京市")).
  365. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  366. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  367. // Must(rangeQuery)
  368. ctx := context.Background()
  369. //开始滚动搜索
  370. scrollID := ""
  371. scroll := "10m"
  372. searchSource := elastic.NewSearchSource().
  373. Query(query).
  374. Size(10000).
  375. Sort("_doc", true) //升序排序
  376. //Sort("_doc", false) //降序排序
  377. searchService := client.Scroll(index).
  378. Size(10000).
  379. Scroll(scroll).
  380. SearchSource(searchSource)
  381. res, err := searchService.Do(ctx)
  382. if err != nil {
  383. if err == io.EOF {
  384. fmt.Println("没有数据")
  385. } else {
  386. panic(err)
  387. }
  388. }
  389. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  390. fmt.Println("总数是:", res.TotalHits())
  391. total := 0
  392. for len(res.Hits.Hits) > 0 {
  393. for _, hit := range res.Hits.Hits {
  394. var doc map[string]interface{}
  395. err := json.Unmarshal(hit.Source, &doc)
  396. if err != nil {
  397. log.Printf("解析文档失败:%s", err)
  398. continue
  399. }
  400. if strings.Contains(util.ObjToString(doc["business_scope"]), "招投标代理") {
  401. //存入新表
  402. insert := map[string]interface{}{
  403. "company_name": doc["company_name"],
  404. "business_scope": doc["business_scope"],
  405. "employee_name": doc["employee_name"],
  406. "company_phone": doc["company_phone"],
  407. }
  408. err = MgoB.InsertOrUpdate("qfw", "wcc_2024_beijing_dailijigou", insert)
  409. if err != nil {
  410. log.Println("error", doc["id"])
  411. }
  412. }
  413. //sWinner := util.ObjToString(doc["s_winner"])
  414. //winners := strings.Split(sWinner, ",")
  415. //for _, v := range winners {
  416. // insert := doc
  417. // insert["s_winner"] = v
  418. // //存入新表
  419. // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert)
  420. // if err != nil {
  421. // log.Println("error", doc["id"])
  422. // }
  423. //}
  424. // 处理查询结果
  425. //area := util.ObjToString(doc["area"])
  426. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  427. //if !IsInStringArray(area, areas) {
  428. // continue
  429. //}
  430. //projectName := util.ObjToString(doc["projectname"])
  431. //if strings.Contains(projectName, "非政府") {
  432. // continue
  433. //}
  434. //buyerclass := util.ObjToString(doc["buyerclass"])
  435. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  436. // continue
  437. //}
  438. ////存入新表
  439. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  440. //if err != nil {
  441. // log.Println("error", doc["id"])
  442. //}
  443. }
  444. total = total + len(res.Hits.Hits)
  445. scrollID = res.ScrollId
  446. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  447. log.Println("current count:", total)
  448. if err != nil {
  449. if err == io.EOF {
  450. // 滚动到最后一批数据,退出循环
  451. break
  452. }
  453. log.Println("滚动搜索失败:", err, res)
  454. break // 处理错误时退出循环
  455. }
  456. }
  457. // 在循环外调用 ClearScroll
  458. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  459. if err != nil {
  460. log.Printf("清理滚动搜索失败:%s", err)
  461. }
  462. fmt.Println("结束~~~~~~~~~~~~~~~")
  463. }
  464. // getQyLimitData 获取qyxy 条件数据
  465. func getQyLimitData() {
  466. //url := "http://172.17.4.184:19908"
  467. url := "http://127.0.0.1:19908"
  468. username := "jybid"
  469. password := "Top2023_JEB01i@31"
  470. index := "qyxy" //索引名称
  471. // 创建 Elasticsearch 客户端
  472. client, err := elastic.NewClient(
  473. elastic.SetURL(url),
  474. elastic.SetBasicAuth(username, password),
  475. elastic.SetSniff(false),
  476. )
  477. if err != nil {
  478. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  479. }
  480. // 构建查询
  481. query := elastic.NewBoolQuery().
  482. Must(elastic.NewMatchQuery("company_area", "河南")).
  483. Must(elastic.NewMatchQuery("company_status", "存续")).
  484. MustNot(elastic.NewMatchQuery("company_type", "个体工商户"))
  485. // 执行查询
  486. searchResult, err := client.Search().Size(50).
  487. Index(index).
  488. Query(query).
  489. Do(context.Background())
  490. if err != nil {
  491. log.Fatalf("Error executing search: %s", err)
  492. }
  493. // 本地数据库
  494. MgoB := &mongodb.MongodbSim{
  495. MongodbAddr: "127.0.0.1:27017",
  496. Size: 10,
  497. DbName: "wcc",
  498. }
  499. MgoB.InitPool()
  500. for _, hit := range searchResult.Hits.Hits {
  501. var doc map[string]interface{}
  502. err := json.Unmarshal(hit.Source, &doc)
  503. if err != nil {
  504. log.Printf("解析文档失败:%s", err)
  505. continue
  506. }
  507. MgoB.SaveByOriID("wcc_henan_0428", doc)
  508. }
  509. }
  510. // getTidb 获取tidb 数据
  511. func getTidb() {
  512. MgoB := &mongodb.MongodbSim{
  513. MongodbAddr: "172.17.189.140:27080",
  514. //MongodbAddr: "127.0.0.1:27083",
  515. Size: 10,
  516. DbName: "qfw",
  517. UserName: "SJZY_RWbid_ES",
  518. Password: "SJZY@B4i4D5e6S",
  519. //Direct: true,
  520. }
  521. MgoB.InitPool()
  522. //tidb
  523. username := "datascbi"
  524. password := "Da#Bi20221111SC"
  525. //host := "127.0.0.1:4001"
  526. host := "172.17.162.25:4000"
  527. database := "global_common_data"
  528. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  529. // 连接到数据库
  530. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  531. if err != nil {
  532. log.Println("Failed to connect to database:", err)
  533. return
  534. }
  535. fmt.Println("Connected to the database!")
  536. defer util.Catch()
  537. sess := MgoB.GetMgoConn()
  538. defer MgoB.DestoryMongoConn(sess)
  539. it := sess.DB("qfw").C("wcc_2024_beijing_dailijigou").Find(nil).Select(nil).Iter()
  540. fmt.Println("taskRun 开始")
  541. count := 0
  542. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  543. if count%10000 == 0 {
  544. log.Println("current:", count)
  545. }
  546. companyName := util.ObjToString(tmp["company_name"])
  547. var baseInfo EnterpriseBaseInfo
  548. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  549. if baseInfo.ID > 0 {
  550. insert := map[string]interface{}{
  551. "company_name": companyName,
  552. "name_id": baseInfo.NameID,
  553. "business_scope": tmp["business_scope"],
  554. }
  555. MgoB.InsertOrUpdate("qfw", "wcc_beijing_daili_bidding", insert)
  556. }
  557. }
  558. log.Println("over")
  559. }
  560. // getEntInfo 获取法人库数据
  561. func getEntInfo() {
  562. url := "http://172.17.4.184:19908"
  563. //url := "http://127.0.0.1:19908"
  564. username := "jybid"
  565. password := "Top2023_JEB01i@31"
  566. index := "ent_info" //索引名称
  567. // 创建 Elasticsearch 客户端
  568. client, err := elastic.NewClient(
  569. elastic.SetURL(url),
  570. elastic.SetBasicAuth(username, password),
  571. elastic.SetSniff(false),
  572. )
  573. if err != nil {
  574. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  575. }
  576. query := elastic.NewBoolQuery().
  577. //北京,天津,河北,上海,江苏,浙江,安徽
  578. //Must(elastic.NewMatchQuery("company_name", "医院")).
  579. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  580. Must(elastic.NewExistsQuery("tag_labels"))
  581. //Must(rangeQuery)
  582. ctx := context.Background()
  583. //开始滚动搜索
  584. scrollID := ""
  585. scroll := "10m"
  586. searchSource := elastic.NewSearchSource().
  587. Query(query).
  588. Size(10000).
  589. Sort("_doc", true) //升序排序
  590. //Sort("_doc", false) //降序排序
  591. searchService := client.Scroll(index).
  592. Size(10000).
  593. Scroll(scroll).
  594. SearchSource(searchSource)
  595. res, err := searchService.Do(ctx)
  596. if err != nil {
  597. if err == io.EOF {
  598. fmt.Println("没有数据")
  599. } else {
  600. panic(err)
  601. }
  602. }
  603. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  604. fmt.Println("总数是:", res.TotalHits())
  605. total := 0
  606. for len(res.Hits.Hits) > 0 {
  607. for _, hit := range res.Hits.Hits {
  608. var doc map[string]interface{}
  609. err = json.Unmarshal(hit.Source, &doc)
  610. if err != nil {
  611. log.Printf("解析文档失败:%s", err)
  612. continue
  613. }
  614. name := util.ObjToString(doc["company_name"])
  615. updateData := make(map[string]interface{})
  616. if tag_labels, ok := doc["tag_labels"].([]interface{}); ok {
  617. updateData["main_label"] = tag_labels[0]
  618. _, err = client.Update().
  619. Index(index).
  620. Id(util.ObjToString(doc["id"])).
  621. Doc(updateData).
  622. Do(context.Background())
  623. if err != nil {
  624. log.Println("更新失败", name, tag_labels, err)
  625. }
  626. }
  627. }
  628. total = total + len(res.Hits.Hits)
  629. scrollID = res.ScrollId
  630. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  631. log.Println("current count:", total)
  632. if err != nil {
  633. if err == io.EOF {
  634. // 滚动到最后一批数据,退出循环
  635. break
  636. }
  637. log.Println("滚动搜索失败:", err, res)
  638. break // 处理错误时退出循环
  639. }
  640. }
  641. // 在循环外调用 ClearScroll
  642. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  643. if err != nil {
  644. log.Printf("清理滚动搜索失败:%s", err)
  645. }
  646. fmt.Println("结束~~~~~~~~~~~~~~~")
  647. }
  648. // getBuyerData 获取采购单位数据
  649. func getBuyerData() {
  650. //key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
  651. //model := "glm-4-air"
  652. url := "http://172.17.4.184:19908"
  653. //url := "http://127.0.0.1:19908"
  654. username := "jybid"
  655. password := "Top2023_JEB01i@31"
  656. index := "buyer" //索引名称
  657. // 创建 Elasticsearch 客户端
  658. client, err := elastic.NewClient(
  659. elastic.SetURL(url),
  660. elastic.SetBasicAuth(username, password),
  661. elastic.SetSniff(false),
  662. )
  663. if err != nil {
  664. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  665. }
  666. MgoB := &mongodb.MongodbSim{
  667. MongodbAddr: "172.17.189.140:27080",
  668. //MongodbAddr: "127.0.0.1:27083",
  669. Size: 10,
  670. DbName: "qfw",
  671. UserName: "SJZY_RWbid_ES",
  672. Password: "SJZY@B4i4D5e6S",
  673. //Direct: true,
  674. }
  675. MgoB.InitPool()
  676. //query := elastic.NewBoolQuery().
  677. // //北京,天津,河北,上海,江苏,浙江,安徽
  678. // Must(elastic.NewMatchQuery("company_name", "医院")).
  679. // //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  680. // Must(elastic.NewTermsQuery("tag_labels", "学校", "教育"))
  681. //Must(rangeQuery)
  682. ctx := context.Background()
  683. //开始滚动搜索
  684. scrollID := ""
  685. scroll := "10m"
  686. searchSource := elastic.NewSearchSource().
  687. //Query(query).
  688. Size(10000).
  689. Sort("_doc", true) //升序排序
  690. //Sort("_doc", false) //降序排序
  691. searchService := client.Scroll(index).
  692. Size(10000).
  693. Scroll(scroll).
  694. SearchSource(searchSource)
  695. res, err := searchService.Do(ctx)
  696. if err != nil {
  697. if err == io.EOF {
  698. fmt.Println("没有数据")
  699. } else {
  700. panic(err)
  701. }
  702. }
  703. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  704. fmt.Println("总数是:", res.TotalHits())
  705. total := 0
  706. for len(res.Hits.Hits) > 0 {
  707. for _, hit := range res.Hits.Hits {
  708. var doc map[string]interface{}
  709. err = json.Unmarshal(hit.Source, &doc)
  710. if err != nil {
  711. log.Printf("解析文档失败:%s", err)
  712. continue
  713. }
  714. //name := util.ObjToString(doc["buyer_name"])
  715. //ra := ZpAI(key, model, name)
  716. //if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  717. // doc["national_top"] = ra["label1"]
  718. // doc["main_label"] = ra["label1"]
  719. //}
  720. //if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  721. // doc["national_sub"] = ra["label2"]
  722. //}
  723. //if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  724. // doc["national_subsub"] = ra["label3"]
  725. //}
  726. MgoB.Save("ent_info_buyer", doc)
  727. //time.Sleep(time.Microsecond)
  728. }
  729. total = total + len(res.Hits.Hits)
  730. scrollID = res.ScrollId
  731. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  732. log.Println("current count:", total)
  733. if err != nil {
  734. if err == io.EOF {
  735. // 滚动到最后一批数据,退出循环
  736. break
  737. }
  738. log.Println("滚动搜索失败:", err, res)
  739. break // 处理错误时退出循环
  740. }
  741. }
  742. // 在循环外调用 ClearScroll
  743. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  744. if err != nil {
  745. log.Printf("清理滚动搜索失败:%s", err)
  746. }
  747. fmt.Println("结束~~~~~~~~~~~~~~~")
  748. }
  749. // mgoBidding mgoBidding 数据
  750. func mgoBidding() {
  751. MgoB := &mongodb.MongodbSim{
  752. MongodbAddr: "172.17.189.140:27080",
  753. //MongodbAddr: "127.0.0.1:27083",
  754. Size: 10,
  755. DbName: "qfw",
  756. UserName: "SJZY_RWbid_ES",
  757. Password: "SJZY@B4i4D5e6S",
  758. //Direct: true,
  759. }
  760. MgoB.InitPool()
  761. sess := MgoB.GetMgoConn()
  762. defer MgoB.DestoryMongoConn(sess)
  763. //181 凭安库
  764. MgoQY := &mongodb.MongodbSim{
  765. MongodbAddr: "172.17.4.181:27001",
  766. //MongodbAddr: "127.0.0.1:27001",
  767. DbName: "mixdata",
  768. Size: 10,
  769. UserName: "",
  770. Password: "",
  771. //Direct: true,
  772. }
  773. MgoQY.InitPool()
  774. where := map[string]interface{}{
  775. "qy_flag": 1,
  776. }
  777. query := sess.DB("qfw").C("ent_info_buyer").Find(where).Select(map[string]interface{}{
  778. "contenthtml": 0}).Iter()
  779. count := 0
  780. key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
  781. model := "glm-4-air"
  782. ch := make(chan bool, 10)
  783. wg := &sync.WaitGroup{}
  784. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  785. if count%100 == 0 {
  786. log.Println("current:", count, tmp["name"])
  787. }
  788. //存在就不再调用大模型
  789. //if _, ok := tmp["national_top"]; ok {
  790. // continue
  791. //}
  792. if utf8.RuneCountInString(util.ObjToString(tmp["name"])) < 4 {
  793. continue
  794. }
  795. ch <- true
  796. wg.Add(1)
  797. go func(tmp map[string]interface{}) {
  798. defer func() {
  799. <-ch
  800. wg.Done()
  801. }()
  802. //
  803. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  804. update := make(map[string]interface{})
  805. name := util.ObjToString(tmp["name"])
  806. where2 := map[string]interface{}{
  807. "company_name": name,
  808. }
  809. data, _ := MgoQY.FindOne("company_base", where2)
  810. businessScope := util.ObjToString((*data)["business_scope"])
  811. ra := ZpAI1(key, model, name, businessScope)
  812. if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  813. //update["national_top"] = ra["label1"]
  814. //update["main_label"] = ra["label1"]
  815. update["label1"] = ra["label1"]
  816. }
  817. if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  818. //update["national_sub"] = ra["label2"]
  819. update["label2"] = ra["label2"]
  820. }
  821. if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  822. //update["national_subsub"] = ra["label3"]
  823. update["label3"] = ra["label3"]
  824. }
  825. if len(update) > 0 {
  826. MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  827. }
  828. }(tmp)
  829. tmp = map[string]interface{}{}
  830. }
  831. wg.Wait()
  832. log.Println("over 22222222222")
  833. //log.Println("开始第二轮迭代")
  834. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  835. // if _, ok := tmp["national_top"]; ok {
  836. // continue
  837. // }
  838. // biddingID := mongodb.BsonIdToSId(tmp["_id"])
  839. // name := util.ObjToString(tmp["name"])
  840. // update := make(map[string]interface{})
  841. // ra := ZpAI(key, model, name)
  842. // if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  843. // update["national_top"] = ra["label1"]
  844. // update["main_label"] = ra["label1"]
  845. // }
  846. // if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  847. // update["national_sub"] = ra["label2"]
  848. // }
  849. // if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  850. // update["national_subsub"] = ra["label3"]
  851. // }
  852. // if count%1000 == 0 {
  853. // log.Println("current", count, name, ra["label1"], ra["label2"])
  854. // }
  855. //
  856. // if len(update) > 0 {
  857. // MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  858. // }
  859. // //time.Sleep(time.Microsecond)
  860. //}
  861. //
  862. //log.Println("开始第3轮迭代")
  863. //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  864. // if _, ok := tmp["national_top"]; ok {
  865. // continue
  866. // }
  867. // biddingID := mongodb.BsonIdToSId(tmp["_id"])
  868. // name := util.ObjToString(tmp["name"])
  869. // update := make(map[string]interface{})
  870. // ra := ZpAI(key, model, name)
  871. // if util.ObjToString(ra["label1"]) != "" && !checkString(util.ObjToString(ra["label1"])) {
  872. // update["national_top"] = ra["label1"]
  873. // update["main_label"] = ra["label1"]
  874. // }
  875. // if util.ObjToString(ra["label2"]) != "" && !checkString(util.ObjToString(ra["label2"])) {
  876. // update["national_sub"] = ra["label2"]
  877. // }
  878. // if util.ObjToString(ra["label3"]) != "" && !checkString(util.ObjToString(ra["label3"])) {
  879. // update["national_subsub"] = ra["label3"]
  880. // }
  881. // if count%1000 == 0 {
  882. // log.Println("current", count, name, ra["label1"], ra["label2"])
  883. // }
  884. //
  885. // if len(update) > 0 {
  886. // MgoB.UpdateById("ent_info_buyer", biddingID, map[string]interface{}{"$set": update})
  887. // }
  888. // //time.Sleep(time.Microsecond)
  889. //}
  890. }
  891. // fixProjectPortrait 修复画像数据重复
  892. func fixProjectPortrait() {
  893. url := "http://172.17.4.184:19908"
  894. //url := "http://127.0.0.1:19908"
  895. username := "jybid"
  896. password := "Top2023_JEB01i@31"
  897. index := "project_portrait" //索引名称
  898. buyerMap := make(map[string]int)
  899. buyerDatas := make(map[string][]map[string]interface{})
  900. // 创建 Elasticsearch 客户端
  901. client, err := elastic.NewClient(
  902. elastic.SetURL(url),
  903. elastic.SetBasicAuth(username, password),
  904. elastic.SetSniff(false),
  905. )
  906. if err != nil {
  907. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  908. }
  909. query := elastic.NewBoolQuery()
  910. //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  911. query.Must(elastic.NewTermQuery("class", "情报_安防"))
  912. ctx := context.Background()
  913. //开始滚动搜索
  914. scrollID := ""
  915. scroll := "10m"
  916. searchSource := elastic.NewSearchSource().
  917. Query(query).
  918. Size(10000).
  919. //Sort("_doc", true) //升序排序
  920. Sort("_doc", false) //降序排序
  921. searchService := client.Scroll(index).
  922. Size(10000).
  923. Scroll(scroll).
  924. SearchSource(searchSource)
  925. res, err := searchService.Do(ctx)
  926. if err != nil {
  927. if err == io.EOF {
  928. fmt.Println("没有数据")
  929. } else {
  930. panic(err)
  931. }
  932. }
  933. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  934. fmt.Println("project_portrait 总数是:", res.TotalHits())
  935. MgoB := &mongodb.MongodbSim{
  936. MongodbAddr: "172.17.189.140:27080",
  937. //MongodbAddr: "127.0.0.1:27083",
  938. Size: 10,
  939. DbName: "qfw",
  940. UserName: "SJZY_RWbid_ES",
  941. Password: "SJZY@B4i4D5e6S",
  942. //Direct: true,
  943. }
  944. MgoB.InitPool()
  945. //wher := map[string]interface{}{
  946. // "_id": mongodb.StringTOBsonId("66faf189bf905908d4a252d6"),
  947. //}
  948. //MgoB.Delete("project_portrait", wher)
  949. //
  950. //return
  951. total := 0
  952. for len(res.Hits.Hits) > 0 {
  953. for _, hit := range res.Hits.Hits {
  954. var doc map[string]interface{}
  955. err := json.Unmarshal(hit.Source, &doc)
  956. if err != nil {
  957. log.Printf("解析文档失败:%s", err)
  958. continue
  959. }
  960. buyerName := util.ObjToString(doc["buyer"])
  961. buyerMap[buyerName]++
  962. buyerArr := buyerDatas[buyerName]
  963. buyerArr = append(buyerArr, doc)
  964. buyerDatas[buyerName] = buyerArr
  965. }
  966. total = total + len(res.Hits.Hits)
  967. scrollID = res.ScrollId
  968. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  969. log.Println("current count:", total)
  970. if err != nil {
  971. if err == io.EOF {
  972. // 滚动到最后一批数据,退出循环
  973. break
  974. }
  975. log.Println("滚动搜索失败:", err, res)
  976. break // 处理错误时退出循环
  977. }
  978. }
  979. // 在循环外调用 ClearScroll
  980. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  981. if err != nil {
  982. log.Printf("清理滚动搜索失败:%s", err)
  983. }
  984. fmt.Println("迭代结束~~~~~~~~~~~~~~~")
  985. su := 0
  986. for k, v := range buyerMap {
  987. su++
  988. if su%1000 == 0 {
  989. log.Println("su", su)
  990. }
  991. if v > 1 {
  992. buyerName := k
  993. buyerArr := buyerDatas[buyerName]
  994. doc := buyerArr[0]
  995. doc["_id"] = mongodb.StringTOBsonId(util.ObjToString(doc["id"]))
  996. MgoB.SaveByOriID("project_portrait_1030_test", doc)
  997. for kk, vv := range buyerArr {
  998. id := util.ObjToString(vv["id"])
  999. where := map[string]interface{}{
  1000. "_id": mongodb.StringTOBsonId(util.ObjToString(doc["id"])),
  1001. }
  1002. MgoB.Delete("project_portrait", where)
  1003. if kk > 0 {
  1004. client.Delete().Index(index).Id(id).Do(context.Background())
  1005. }
  1006. }
  1007. }
  1008. }
  1009. }
  1010. // updateMgoEntInfoBuyer updateMgoEntInfoBuyer
  1011. func updateMgoEntInfoBuyer() {
  1012. MgoB := &mongodb.MongodbSim{
  1013. MongodbAddr: "172.17.189.140:27080",
  1014. //MongodbAddr: "127.0.0.1:27083",
  1015. Size: 10,
  1016. DbName: "qfw",
  1017. UserName: "SJZY_RWbid_ES",
  1018. Password: "SJZY@B4i4D5e6S",
  1019. //Direct: true,
  1020. }
  1021. MgoB.InitPool()
  1022. //181 凭安库
  1023. MgoQY := &mongodb.MongodbSim{
  1024. MongodbAddr: "172.17.4.181:27001",
  1025. //MongodbAddr: "127.0.0.1:27001",
  1026. DbName: "mixdata",
  1027. Size: 10,
  1028. UserName: "",
  1029. Password: "",
  1030. //Direct: true,
  1031. }
  1032. MgoQY.InitPool()
  1033. sess := MgoB.GetMgoConn()
  1034. defer MgoB.DestoryMongoConn(sess)
  1035. query := sess.DB("qfw").C("ent_info_buyer").Find(nil).Select(map[string]interface{}{
  1036. "contenthtml": 0}).Iter()
  1037. count := 0
  1038. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  1039. if count%1000 == 0 {
  1040. log.Println("current:", count, tmp["name"])
  1041. }
  1042. name := util.ObjToString(tmp["name"])
  1043. where := map[string]interface{}{
  1044. "company_name": name,
  1045. }
  1046. id := mongodb.BsonIdToSId(tmp["_id"])
  1047. data, _ := MgoQY.FindOne("company_base", where)
  1048. if data != nil && len(*data) > 0 {
  1049. update := map[string]interface{}{
  1050. "qy_flag": 1,
  1051. "use_flag": (*data)["use_flag"],
  1052. "company_type": (*data)["company_type"],
  1053. "company_status": (*data)["company_status"],
  1054. "credit_no": (*data)["credit_no"],
  1055. "business_scope": (*data)["business_scope"],
  1056. }
  1057. MgoB.UpdateById("ent_info_buyer", id, map[string]interface{}{"$set": update})
  1058. }
  1059. }
  1060. }