project.go 14 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/elastic/go-elasticsearch/v7"
  8. "github.com/olivere/elastic/v7"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "io"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "log"
  14. "regexp"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. // 并发控制参数
  20. const workerCount = 5
  21. type Task struct {
  22. ID string
  23. CompanyName string
  24. }
  25. func getCountProjectWinner3() {
  26. url := "http://172.17.4.184:19908"
  27. username := "jybid"
  28. password := "Top2023_JEB01i@31"
  29. // 初始化 Elasticsearch 客户端
  30. client, err := elastic.NewClient(
  31. elastic.SetURL(url),
  32. elastic.SetBasicAuth(username, password),
  33. elastic.SetSniff(false),
  34. )
  35. if err != nil {
  36. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  37. }
  38. sess := MgoB.GetMgoConn()
  39. defer MgoB.DestoryMongoConn(sess)
  40. coll := sess.DB("qfw").C("wcc_label_static_0625")
  41. it := coll.Find(nil).Select(nil).Iter()
  42. log.Println("taskRun 开始")
  43. taskCh := make(chan Task, 100)
  44. var wg sync.WaitGroup
  45. // 启动 worker 协程
  46. for i := 0; i < workerCount; i++ {
  47. wg.Add(1)
  48. go func(workerID int) {
  49. defer wg.Done()
  50. ctx := context.Background()
  51. for task := range taskCh {
  52. func() {
  53. defer util.Catch() // 捕获单个任务错误,避免 crash
  54. fields := []string{"buyer", "winner"}
  55. years := []int{2020, 2021, 2022, 2023, 2024}
  56. update := make(map[string]interface{})
  57. for _, role := range fields {
  58. for _, year := range years {
  59. start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
  60. end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
  61. query := elastic.NewBoolQuery().
  62. Must(elastic.NewTermQuery(role, task.CompanyName)).
  63. Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
  64. count, err := client.Count().
  65. Index("bidding").
  66. Query(query).
  67. Do(ctx)
  68. if err != nil {
  69. log.Printf("【Worker %d】 查询失败 [%s - %d]: %v", workerID, role, year, err)
  70. continue
  71. }
  72. key := fmt.Sprintf("%s-%d", role, year)
  73. update[key] = count
  74. }
  75. }
  76. MgoB.UpdateById("wcc_label_static_0625", task.ID, bson.M{"$set": update})
  77. }()
  78. }
  79. }(i)
  80. }
  81. // 主线程读取 MongoDB 数据发送到 task channel
  82. count := 0
  83. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  84. count++
  85. if count%1000 == 0 {
  86. log.Println("current:", count, tmp["company_name"])
  87. }
  88. task := Task{
  89. ID: mongodb.BsonIdToSId(tmp["_id"]),
  90. CompanyName: util.ObjToString(tmp["company_name"]),
  91. }
  92. taskCh <- task
  93. }
  94. close(taskCh) // 所有任务发完
  95. wg.Wait()
  96. log.Println("所有任务处理完成")
  97. }
  98. func getCountProjectWinner() {
  99. url := "http://172.17.4.184:19908"
  100. //url := "http://127.0.0.1:19908"
  101. username := "jybid"
  102. password := "Top2023_JEB01i@31"
  103. //index := "bidding" //索引名称
  104. // 创建 Elasticsearch 客户端
  105. client, err := elastic.NewClient(
  106. elastic.SetURL(url),
  107. elastic.SetBasicAuth(username, password),
  108. elastic.SetSniff(false),
  109. )
  110. if err != nil {
  111. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  112. }
  113. defer util.Catch()
  114. sess := MgoB.GetMgoConn()
  115. defer MgoB.DestoryMongoConn(sess)
  116. it := sess.DB("qfw").C("wcc_label_static_0625").Find(nil).Select(nil).Iter()
  117. log.Println("taskRun 开始")
  118. count := 0
  119. ctx := context.Background()
  120. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  121. if count%1000 == 0 {
  122. log.Println("current:", count, tmp["company_name"])
  123. }
  124. companyName := util.ObjToString(tmp["company_name"])
  125. //companyName := "上海市特种设备监督检验技术研究院"
  126. id := mongodb.BsonIdToSId(tmp["_id"])
  127. fields := []string{"buyer", "winner"}
  128. years := []int{2020, 2021, 2022, 2023, 2024}
  129. update := make(map[string]interface{})
  130. for _, role := range fields {
  131. //fmt.Printf("=== [%s 作为 %s 的数量统计] ===\n", companyName, role)
  132. for _, year := range years {
  133. // 年份范围(秒)
  134. start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
  135. end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
  136. // 构造查询
  137. query := elastic.NewBoolQuery().
  138. Must(elastic.NewTermQuery(role, companyName)).
  139. Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
  140. // 查询并只返回总数
  141. count11, err := client.Count().
  142. Index("bidding").
  143. Query(query).
  144. Do(ctx)
  145. if err != nil {
  146. log.Fatalf("查询 [%s-%d] 失败: %v", role, year, err)
  147. }
  148. ke := fmt.Sprintf("%v-%v", role, year)
  149. update[ke] = count11
  150. //fmt.Printf("年份: %d, 数量: %d\n", year, count11)
  151. }
  152. }
  153. MgoB.UpdateById("wcc_label_static_0625", id, map[string]interface{}{"$set": update})
  154. }
  155. }
  156. func CountProjectWinner2() {
  157. // 连接 Elasticsearch
  158. cfg := elasticsearch.Config{
  159. Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908"
  160. //Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908"
  161. Username: "jybid",
  162. Password: "Top2023_JEB01i@31",
  163. }
  164. es, err := elasticsearch.NewClient(cfg)
  165. if err != nil {
  166. log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
  167. }
  168. // 构造查询 JSON
  169. query := map[string]interface{}{
  170. "track_total_hits": true, // 必须有,确保超过1万条也能拿到真实数量
  171. "size": 100, // 每页条数
  172. "query": map[string]interface{}{
  173. "nested": map[string]interface{}{
  174. "path": "zhima_labels",
  175. "query": map[string]interface{}{
  176. "term": map[string]interface{}{
  177. "zhima_labels.zhima_name": "高新技术企业",
  178. },
  179. },
  180. },
  181. },
  182. }
  183. // 序列化为 JSON
  184. var buf bytes.Buffer
  185. if err := json.NewEncoder(&buf).Encode(query); err != nil {
  186. log.Fatalf("Error encoding query: %s", err)
  187. }
  188. // 执行查询
  189. res, err := es.Search(
  190. es.Search.WithContext(context.Background()),
  191. es.Search.WithIndex("qyxy"), // 替换为你的索引名
  192. es.Search.WithBody(&buf),
  193. es.Search.WithTrackTotalHits(true),
  194. )
  195. if err != nil {
  196. log.Fatalf("Error getting response: %s", err)
  197. }
  198. defer res.Body.Close()
  199. // 🔍 检查返回状态码
  200. if res.IsError() {
  201. bodyBytes, _ := io.ReadAll(res.Body)
  202. log.Fatalf("ES 返回错误: %s\n%s", res.Status(), string(bodyBytes))
  203. }
  204. // ✅ 正常解析 body
  205. var r map[string]interface{}
  206. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  207. log.Fatalf("解析响应出错: %s", err)
  208. }
  209. // 打印总命中数
  210. hits := r["hits"].(map[string]interface{})
  211. total := hits["total"].(map[string]interface{})["value"]
  212. fmt.Printf("命中总数: %v 条\n", total)
  213. // 打印每条结果的 ID 和 _source
  214. for _, hit := range hits["hits"].([]interface{}) {
  215. doc := hit.(map[string]interface{})
  216. id := doc["_id"]
  217. source := doc["_source"]
  218. sourceJSON, _ := json.MarshalIndent(source, "", " ")
  219. fmt.Printf("ID: %s\nSource: %s\n", id, sourceJSON)
  220. }
  221. }
  222. // CountProjectWinner 统计企业中标项目数量
  223. func CountProjectWinner() {
  224. url := "http://172.17.4.184:19908"
  225. //url := "http://127.0.0.1:19908"
  226. username := "jybid"
  227. password := "Top2023_JEB01i@31"
  228. //index := "bidding" //索引名称
  229. index := "qyxy" //索引名称
  230. // 创建 Elasticsearch 客户端
  231. client, err := elastic.NewClient(
  232. elastic.SetURL(url),
  233. elastic.SetBasicAuth(username, password),
  234. elastic.SetSniff(false),
  235. )
  236. if err != nil {
  237. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  238. }
  239. labels := `高新技术企业,小巨人企业,国家级技术创新示范企业,众创空间,国家级科技企业孵化器,瞪羚企业,科技型中小企业,制造业单项冠军示范企业,制造业单项冠军产品生产企业,制造业单项冠军培育企业,国家企业技术中心,专精特新企业,省级技术创新示范企业,技术先进型服务企业,省级企业技术中心`
  240. label_names := strings.Split(labels, ",")
  241. for _, name := range label_names {
  242. // 构造 nested 查询:zhima_labels.zhima_name == 高新技术企业
  243. nestedQuery := elastic.NewNestedQuery(
  244. "zhima_labels", // path 必须是 nested 字段名本身
  245. elastic.NewBoolQuery().Must( // nested 里的子查询,用 Bool 包一下更稳
  246. elastic.NewTermQuery("zhima_labels.zhima_name", name),
  247. ),
  248. )
  249. ctx := context.Background()
  250. //开始滚动搜索
  251. scrollID := ""
  252. scroll := "10m"
  253. searchSource := elastic.NewSearchSource().
  254. Query(nestedQuery).
  255. Size(10000).
  256. Sort("_doc", true) //升序排序
  257. //Sort("_doc", false) //降序排序
  258. searchService := client.Scroll(index).
  259. Size(10000).
  260. Scroll(scroll).
  261. SearchSource(searchSource)
  262. res, err := searchService.Do(ctx)
  263. if err != nil {
  264. if err == io.EOF {
  265. fmt.Println("没有数据")
  266. } else {
  267. panic(err)
  268. }
  269. }
  270. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  271. fmt.Println(name, "总数是:", res.TotalHits())
  272. total := 0
  273. for len(res.Hits.Hits) > 0 {
  274. for _, hit := range res.Hits.Hits {
  275. var doc map[string]interface{}
  276. err := json.Unmarshal(hit.Source, &doc)
  277. if err != nil {
  278. log.Printf("解析文档失败:%s", err)
  279. continue
  280. }
  281. //存入新表
  282. insert := map[string]interface{}{
  283. "company_name": doc["company_name"],
  284. "id": doc["id"],
  285. "label": name,
  286. }
  287. err = MgoB.InsertOrUpdate("qfw", "wcc_label_static_0625", insert)
  288. if err != nil {
  289. log.Println("error", doc["id"])
  290. }
  291. }
  292. total = total + len(res.Hits.Hits)
  293. scrollID = res.ScrollId
  294. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  295. log.Println("current count:", total, name)
  296. if err != nil {
  297. if err == io.EOF {
  298. // 滚动到最后一批数据,退出循环
  299. break
  300. }
  301. log.Println("滚动搜索失败:", err, res)
  302. break // 处理错误时退出循环
  303. }
  304. }
  305. // 在循环外调用 ClearScroll
  306. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  307. if err != nil {
  308. log.Printf("清理滚动搜索失败:%s", err)
  309. }
  310. }
  311. }
  312. // getProject 获取项目数据
  313. func getProject() {
  314. MgoB := &mongodb.MongodbSim{
  315. MongodbAddr: "172.17.189.140:27080",
  316. //MongodbAddr: "127.0.0.1:27083",
  317. Size: 10,
  318. DbName: "qfw",
  319. UserName: "SJZY_RWbid_ES",
  320. Password: "SJZY@B4i4D5e6S",
  321. //Direct: true,
  322. }
  323. MgoB.InitPool()
  324. url := "http://172.17.4.184:19805"
  325. //url := "http://127.0.0.1:19805"
  326. username := "es_all"
  327. password := "TopJkO2E_d1x"
  328. index := "projectset" //索引名称
  329. // 创建 Elasticsearch 客户端
  330. client, err := elastic.NewClient(
  331. elastic.SetURL(url),
  332. elastic.SetBasicAuth(username, password),
  333. elastic.SetSniff(false),
  334. )
  335. if err != nil {
  336. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  337. }
  338. rangeQuery := elastic.NewRangeQuery("pici").Gt("1685548800").Lte("1704038400")
  339. //termQ := elastic.NewTermQuery("multipackage", 0)
  340. //rangeQuery := elastic.NewRangeQuery("id").Gt("657b08556977356f5578cb25").Lte("657b08556977356f5578cb26")
  341. query := elastic.NewBoolQuery().Must(rangeQuery)
  342. ctx := context.Background()
  343. //开始滚动搜索
  344. scrollID := ""
  345. scroll := "1m"
  346. searchSource := elastic.NewSearchSource().
  347. Query(query).
  348. Size(100).
  349. Sort("_doc", true) //升序排序
  350. //Sort("_doc", false) //降序排序
  351. searchService := client.Scroll(index).
  352. Size(100).
  353. Scroll(scroll).
  354. SearchSource(searchSource)
  355. res, err := searchService.Do(ctx)
  356. if err != nil {
  357. if err == io.EOF {
  358. fmt.Println("没有数据")
  359. } else {
  360. panic(err)
  361. }
  362. }
  363. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  364. fmt.Println("总数是:", res.TotalHits())
  365. total := 0
  366. for len(res.Hits.Hits) > 0 {
  367. for _, hit := range res.Hits.Hits {
  368. var doc map[string]interface{}
  369. err := json.Unmarshal(hit.Source, &doc)
  370. if err != nil {
  371. log.Printf("解析文档失败:%s", err)
  372. continue
  373. }
  374. //id := util.ObjToString(doc["id"])
  375. //log.Println(id)
  376. var matchWords = make([]string, 0)
  377. var matchList = make([]interface{}, 0)
  378. if list, ok := doc["list"].([]interface{}); ok {
  379. for _, v := range list {
  380. if da, ok := v.(map[string]interface{}); ok {
  381. if util.ObjToString(da["toptype"]) != "招标" {
  382. continue
  383. }
  384. title := util.ObjToString(da["title"])
  385. // 使用正则表达式进行匹配
  386. matches := GetMatches(title)
  387. if len(matches) > 0 {
  388. matchList = append(matchList, da)
  389. }
  390. matchWords = append(matchWords, matches...)
  391. }
  392. }
  393. }
  394. insert := make(map[string]interface{})
  395. insert["project_id"] = doc["id"]
  396. insert["_id"] = doc["id"]
  397. insert["multipackage"] = doc["multipackage"]
  398. insert["list"] = doc["list"]
  399. insert["projectname"] = doc["projectname"]
  400. insert["sourceinfourl"] = doc["sourceinfourl"]
  401. if len(matchWords) > 0 {
  402. insert["matchList"] = matchList
  403. insert["package_name"] = util.ObjToString(doc["projectname"]) + "-" + strings.Join(matchWords, "、")
  404. insert["type"] = 1
  405. }
  406. MgoB.SaveByOriID("wcc_project_20240304", insert)
  407. }
  408. total = total + len(res.Hits.Hits)
  409. scrollID = res.ScrollId
  410. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  411. log.Println("current count:", total)
  412. if err != nil {
  413. if err == io.EOF {
  414. // 滚动到最后一批数据,退出循环
  415. break
  416. }
  417. log.Printf("滚动搜索失败:%s", err)
  418. break // 处理错误时退出循环
  419. }
  420. }
  421. fmt.Println("结束~~~~~~~~~~~~~~~")
  422. }
  423. func GetMatches(title string) (res []string) {
  424. // 编译正则表达式
  425. re := regexp.MustCompile(`包\d+`) // 标题只有一个包2
  426. // 定义正则表达式
  427. re2 := regexp.MustCompile(`包(\d+(?:、\d+)*)`) //标题含有多个包;包10、12、14、17、18、19
  428. re3 := regexp.MustCompile(`\d+包`) //冀中股份2023年12月标准件计划框架协议采购2包12.7
  429. re4 := regexp.MustCompile(`标包\d+`) //2024一季度水火电一般工序类中小金工件(标包1)-招标公告
  430. //re4 := regexp.MustCompile(`\d+标段`)
  431. //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19"
  432. //matches := re3.FindAllString(util.ObjToString(text), -1)
  433. matches := re2.FindAllString(util.ObjToString(title), -1)
  434. if len(matches) > 0 {
  435. return matches
  436. }
  437. matches = re4.FindAllString(util.ObjToString(title), -1)
  438. if len(matches) > 0 {
  439. return matches
  440. }
  441. matches = re3.FindAllString(util.ObjToString(title), -1)
  442. if len(matches) > 0 {
  443. return matches
  444. }
  445. matches = re.FindAllString(util.ObjToString(title), -1)
  446. return matches
  447. }