bidd.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "go.mongodb.org/mongo-driver/bson"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. func getBidding2233() {
  20. MgoB = &mongodb.MongodbSim{
  21. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  22. //MongodbAddr: "127.0.0.1:27083",
  23. Size: 10,
  24. DbName: "qfw",
  25. UserName: "SJZY_RWbid_ES",
  26. Password: "SJZY@B4i4D5e6S",
  27. //Direct: true,
  28. }
  29. MgoB.InitPool()
  30. sess := MgoB.GetMgoConn()
  31. defer MgoB.DestoryMongoConn(sess)
  32. where := map[string]interface{}{
  33. "comeintime": map[string]interface{}{
  34. "$gt": 1753113600,
  35. "$lt": time.Now().Unix(),
  36. },
  37. }
  38. query := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
  39. count := 0
  40. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  41. if count%10000 == 0 {
  42. log.Println("current:", count)
  43. }
  44. spicode := util.ObjToString(tmp["spidercode"])
  45. if spicode == "sdxzbiddingsjzypc" {
  46. MgoB.SaveByOriID("wcc_sdxzbiddingsjzypc", tmp)
  47. }
  48. }
  49. log.Println("count:", count)
  50. }
  51. // exportBidding exportBidding
  52. func exportBidding() {
  53. MgoB = &mongodb.MongodbSim{
  54. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  55. //MongodbAddr: "127.0.0.1:27083",
  56. Size: 10,
  57. DbName: "qfw",
  58. UserName: "SJZY_RWbid_ES",
  59. Password: "SJZY@B4i4D5e6S",
  60. //Direct: true,
  61. }
  62. MgoB.InitPool()
  63. sess := MgoB.GetMgoConn()
  64. defer MgoB.DestoryMongoConn(sess)
  65. where := map[string]interface{}{
  66. "subtype": "开标记录",
  67. }
  68. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "channel": 1, "detail": 1, "comeintime": 1, "extracttype": 1, "infoformat": 1}).Iter()
  69. count := 0
  70. ch := make(chan bool, 20)
  71. wg := &sync.WaitGroup{}
  72. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  73. if count%10000 == 0 {
  74. log.Println("current:", count)
  75. }
  76. ch <- true
  77. wg.Add(1)
  78. go func(tmp map[string]interface{}) {
  79. defer func() {
  80. <-ch
  81. wg.Done()
  82. }()
  83. MgoB.SaveByOriID("wcc_bidding_kaibiao", tmp)
  84. }(tmp)
  85. tmp = make(map[string]interface{})
  86. }
  87. wg.Wait()
  88. log.Println("处理完毕")
  89. }
  90. func getBidding0311() {
  91. // 3. 构造 MongoDB 查询
  92. filter := bson.M{
  93. "$and": []bson.M{
  94. {
  95. "$or": []bson.M{
  96. {"subtype": "合同"},
  97. {"toptype": "结果"},
  98. {"toptype": "招标"},
  99. },
  100. },
  101. {"publishtime": bson.M{"$gte": 1714492800, "$lt": 1719763200}},
  102. },
  103. }
  104. //where := map[string]interface{}{
  105. // "publishtime": map[string]interface{}{
  106. // "$gte": 1704038400,
  107. // "$lte": 1735660800,
  108. // },
  109. //}
  110. sess := MgoB.GetMgoConn()
  111. defer MgoB.DestoryMongoConn(sess)
  112. query := sess.DB("qfw").C("bidding").Find(filter).Select(map[string]interface{}{"buyer": 1, "toptype": 1, "subtype": 1}).Iter()
  113. count := 0
  114. ch := make(chan bool, 5)
  115. wg := &sync.WaitGroup{}
  116. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  117. if count%1000 == 0 {
  118. log.Println("current:", count, tmp["_id"])
  119. }
  120. if util.IntAll(tmp["extracttype"]) == -1 {
  121. continue
  122. }
  123. ch <- true
  124. wg.Add(1)
  125. go func(tmp map[string]interface{}) {
  126. defer func() {
  127. <-ch
  128. wg.Done()
  129. }()
  130. MgoB.SaveByOriID("wcc_bidding_20250311_1148", tmp)
  131. }(tmp)
  132. //if util.ObjToString(tmp["toptype"]) == "招标" || util.ObjToString(tmp["toptype"]) == "结果" || util.ObjToString(tmp["subtype"]) == "合同" {
  133. // MgoB.SaveByOriID("wcc_bidding_20250311_1148-2", tmp)
  134. //}
  135. tmp = make(map[string]interface{})
  136. }
  137. wg.Wait()
  138. }
  139. // getBidding2 获取bidding 1.3日无二级分类数据
  140. func getBidding2() {
  141. //2024-1-3日数据
  142. where := map[string]interface{}{
  143. "comeintime": map[string]interface{}{
  144. "$gte": 1704211200,
  145. "$lte": 1704297600,
  146. },
  147. "subtype": map[string]interface{}{
  148. "$exists": 1,
  149. },
  150. }
  151. sess := MgoB.GetMgoConn()
  152. defer MgoB.DestoryMongoConn(sess)
  153. //texts := make([]string, 0)
  154. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "subtype": 1, "href": 1, "detail": 1, "channel": 1}).Iter()
  155. count := 0
  156. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  157. if count%1000 == 0 {
  158. log.Println("current:", count)
  159. }
  160. if util.IntAll(tmp["extracttype"]) == -1 {
  161. continue
  162. }
  163. id := mongodb.BsonIdToSId(tmp["_id"])
  164. tmp["jyhref"] = GetJyURLByID(id)
  165. MgoB.SaveByOriID("wcc_bidding_20240103_subtype_exists", tmp)
  166. tmp = make(map[string]interface{})
  167. }
  168. log.Println("over")
  169. }
  170. // callAi 调用大模型
  171. func callAi() {
  172. sess := MgoB.GetMgoConn()
  173. defer MgoB.DestoryMongoConn(sess)
  174. //where := map[string]interface{}{
  175. // "subtype_a": map[string]interface{}{
  176. // "$exists": 0,
  177. // },
  178. //}
  179. query := sess.DB("qfw_data").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter()
  180. count := 0
  181. //ch := make(chan bool, 1)
  182. //wg := &sync.WaitGroup{}
  183. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  184. if count%1000 == 0 {
  185. log.Println("-------- current:", count, tmp["_id"], " ---------")
  186. }
  187. //ch <- true
  188. //wg.Add(1)
  189. //go func(tmp map[string]interface{}) {
  190. // defer func() {
  191. // <-ch
  192. // wg.Done()
  193. // }()
  194. id := mongodb.BsonIdToSId(tmp["_id"])
  195. title := util.ObjToString(tmp["title"])
  196. detail := util.ObjToString(tmp["detail"])
  197. data := map[string]interface{}{
  198. "title": title,
  199. "detail": detail,
  200. }
  201. reqData := map[string]interface{}{
  202. "texts": []interface{}{data},
  203. }
  204. now := time.Now()
  205. res := send(reqData)
  206. log.Println(time.Since(now).Seconds(), tmp["_id"])
  207. subtype := res["result"].([]interface{})
  208. result := subtype[0]
  209. types := strings.Split(util.ObjToString(result), "-")
  210. update := make(map[string]interface{})
  211. if len(types) == 2 {
  212. update["toptype_ai"] = types[0]
  213. update["subtype_ai"] = types[1]
  214. //没有内容
  215. if detail == "" {
  216. update["data_type"] = 1
  217. } else {
  218. update["data_type"] = 0
  219. }
  220. MgoB.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update})
  221. }
  222. //}(tmp)
  223. tmp = make(map[string]interface{})
  224. }
  225. //wg.Wait()
  226. log.Println("over")
  227. }
  228. // getBidding 调用分类大模型
  229. func getBidding() {
  230. //2024-1-3日数据
  231. where := map[string]interface{}{
  232. "comeintime": map[string]interface{}{
  233. "$gte": 1704211200,
  234. "$lte": 1704297600,
  235. },
  236. "subtype": map[string]interface{}{
  237. "$exists": 0,
  238. },
  239. }
  240. sess := MgoB.GetMgoConn()
  241. defer MgoB.DestoryMongoConn(sess)
  242. //texts := make([]string, 0)
  243. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "href": 1, "detail": 1}).Iter()
  244. count := 0
  245. ch := make(chan bool, 10)
  246. wg := &sync.WaitGroup{}
  247. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  248. if count%1000 == 0 {
  249. log.Println("current:", count)
  250. }
  251. if util.IntAll(tmp["extracttype"]) == -1 {
  252. continue
  253. }
  254. ch <- true
  255. wg.Add(1)
  256. go func(tmp map[string]interface{}) {
  257. defer func() {
  258. <-ch
  259. wg.Done()
  260. }()
  261. id := mongodb.BsonIdToSId(tmp["_id"])
  262. title := util.ObjToString(tmp["title"])
  263. detail := util.ObjToString(tmp["detail"])
  264. tmp["bidding_id"] = id
  265. data := map[string]interface{}{
  266. "title": title,
  267. "detail": detail,
  268. }
  269. reqData := map[string]interface{}{
  270. "texts": []interface{}{data},
  271. }
  272. res := SendAi(reqData)
  273. subtype := res["result"].([]interface{})
  274. result := subtype[0]
  275. types := strings.Split(util.ObjToString(result), "-")
  276. if len(types) == 2 {
  277. tmp["new_toptype"] = types[0]
  278. tmp["new_subtype"] = types[1]
  279. }
  280. tmp["jyhref"] = GetJyURLByID(id)
  281. //没有内容
  282. if detail == "" {
  283. tmp["data_type"] = 1
  284. } else {
  285. tmp["data_type"] = 0
  286. }
  287. MgoB.Save("wcc_20240103-2", tmp)
  288. }(tmp)
  289. tmp = make(map[string]interface{})
  290. }
  291. wg.Wait()
  292. log.Println("over")
  293. }
  294. func send(data map[string]interface{}) (res map[string]interface{}) {
  295. url := "http://192.168.3.109:16688"
  296. jsonData, err := json.Marshal(data)
  297. if err != nil {
  298. fmt.Println("JSON marshal error:", err)
  299. return
  300. }
  301. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  302. if err != nil {
  303. fmt.Println("Request error:", err)
  304. return
  305. }
  306. req.Header.Set("Content-Type", "application/json")
  307. client := &http.Client{}
  308. resp, err := client.Do(req)
  309. if err != nil {
  310. fmt.Println("Request error:", err)
  311. return
  312. }
  313. defer resp.Body.Close()
  314. err = json.NewDecoder(resp.Body).Decode(&res)
  315. if err != nil {
  316. fmt.Println("Response decoding error:", err)
  317. return
  318. }
  319. return
  320. }
  321. // SendAi 调用大模型招标分类
  322. func SendAi(data map[string]interface{}) (res map[string]interface{}) {
  323. // 设置 2 秒的超时
  324. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  325. defer cancel()
  326. url := "http://192.168.3.109:16688"
  327. jsonData, err := json.Marshal(data)
  328. if err != nil {
  329. fmt.Println("JSON marshal error:", err)
  330. return
  331. }
  332. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  333. if err != nil {
  334. fmt.Println("Request error:", err)
  335. return
  336. }
  337. req.Header.Set("Content-Type", "application/json")
  338. // 将请求与上下文关联
  339. req = req.WithContext(ctx)
  340. client := &http.Client{}
  341. resp, err := client.Do(req)
  342. if err != nil {
  343. // 使用 errors.Is 检查错误是否是超时错误
  344. if errors.Is(err, context.DeadlineExceeded) {
  345. fmt.Println("Request timed out")
  346. return
  347. }
  348. fmt.Println("Request error:", err)
  349. return
  350. }
  351. defer resp.Body.Close()
  352. err = json.NewDecoder(resp.Body).Decode(&res)
  353. if err != nil {
  354. fmt.Println("Response decoding error:", err)
  355. return
  356. }
  357. return
  358. }
  359. // getCount 北京中科闻歌科技股份有限公司 对指定数据源网站内的中标公告的数据量的查询需求
  360. func getCount() {
  361. //startTime := int64(1672502400) //2023-01-01
  362. //endTime := int64(1680278400) //2023-4-01
  363. //endTime2 := int64(1688140800) //2023-7-01
  364. //endTime3 := int64(1696089600) //2023-10-01
  365. endTime4 := int64(1704038400) //2024-1-01
  366. endTime5 := int64(1714492800) //2024-5-01
  367. where := map[string]interface{}{
  368. "_id": map[string]interface{}{
  369. "$gt": mongodb.StringTOBsonId(strconv.FormatInt(endTime4, 16) + "0000000000000000"),
  370. "$lte": mongodb.StringTOBsonId(strconv.FormatInt(endTime5, 16) + "0000000000000000"),
  371. },
  372. }
  373. sess := MgoB.GetMgoConn()
  374. defer MgoB.DestoryMongoConn(sess)
  375. log.Println(where)
  376. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"site": 1, "subtype": 1, "attach_text": 1}).Iter()
  377. count := 0
  378. count1 := 0 //中国南方电网 标讯数据
  379. count2 := 0 //中国南方电网 标讯成交数据
  380. count3 := 0 //中国南方电网 标讯有附件数据
  381. count4 := 0 //中国南方电网 标讯成交有附件数据
  382. count5 := 0 //国家电网公司电子商务平台 标讯数据
  383. count6 := 0 //国家电网公司电子商务平台 标讯成交数据
  384. count7 := 0 //国家电网公司电子商务平台 标讯有附件数据
  385. count8 := 0 //国家电网公司电子商务平台 标讯成交有附件数据
  386. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  387. if count%10000 == 0 {
  388. log.Println("current:", count, count1, count2, count3, count4, count5, count6, count7, count8)
  389. }
  390. site := util.ObjToString(tmp["site"])
  391. subtype := util.ObjToString(tmp["subtype"])
  392. attachText, _ := tmp["attach_text"].(map[string]interface{})
  393. if site == "中国南方电网" {
  394. count1++
  395. if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" {
  396. count2++
  397. if attachText != nil {
  398. count4++
  399. }
  400. }
  401. if attachText != nil {
  402. count3++
  403. }
  404. }
  405. if site == "国家电网公司电子商务平台" {
  406. count5++
  407. if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" {
  408. count6++
  409. if attachText != nil {
  410. count8++
  411. }
  412. }
  413. if attachText != nil {
  414. count7++
  415. }
  416. }
  417. //if util.ObjToString(tmp["site"]) != "中国南方电网" {
  418. // count1++
  419. // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" {
  420. // count2++
  421. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  422. // count4++
  423. // }
  424. // }
  425. // // 附件
  426. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  427. // count3++
  428. // }
  429. //
  430. //}
  431. //
  432. //if util.ObjToString(tmp["site"]) != "国家电网公司电子商务平台" {
  433. // count5++
  434. // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" {
  435. // count6++
  436. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  437. // count8++
  438. // }
  439. // }
  440. // //右附件
  441. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  442. // count7++
  443. // }
  444. //}
  445. tmp = make(map[string]interface{})
  446. }
  447. log.Println("count1", count1)
  448. log.Println("count2", count2)
  449. log.Println("count3", count3)
  450. log.Println("count4", count4)
  451. log.Println("count5", count5)
  452. log.Println("count6", count6)
  453. log.Println("count7", count7)
  454. log.Println("count8", count8)
  455. log.Println("over")
  456. }
  457. func deleteEs() {
  458. Mgo := &mongodb.MongodbSim{
  459. MongodbAddr: "172.17.189.140:27080",
  460. //MongodbAddr: "127.0.0.1:27083",
  461. Size: 10,
  462. DbName: "qfw",
  463. UserName: "SJZY_RWbid_ES",
  464. Password: "SJZY@B4i4D5e6S",
  465. //Direct: true,
  466. }
  467. Mgo.InitPool()
  468. sess := Mgo.GetMgoConn()
  469. defer Mgo.DestoryMongoConn(sess)
  470. //es
  471. Es := &elastic.Elastic{
  472. //S_esurl: "http://127.0.0.1:19908",
  473. S_esurl: "http://172.17.4.184:19908",
  474. I_size: 5,
  475. Username: "jybid",
  476. Password: "Top2023_JEB01i@31",
  477. }
  478. Es.InitElasticSize()
  479. //es 新集群
  480. EsNew := &elastic.Elastic{
  481. //S_esurl: "http://127.0.0.1:19905",
  482. S_esurl: "http://172.17.4.184:19905",
  483. I_size: 5,
  484. Username: "jybid",
  485. Password: "Top2023_JEB01i@31",
  486. }
  487. EsNew.InitElasticSize()
  488. where := map[string]interface{}{
  489. "_id": map[string]interface{}{
  490. "$gt": mongodb.StringTOBsonId("673458dbb25c3e1deb2fef58"),
  491. "$lte": mongodb.StringTOBsonId("6734638db25c3e1deb303821"),
  492. },
  493. "extracttype": -1,
  494. }
  495. query := sess.DB("qfw").C("bidding").Find(&where).Select(nil).Sort("-_id").Iter()
  496. count := 0
  497. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  498. if count%100 == 0 {
  499. log.Println("current:", count)
  500. }
  501. id := mongodb.BsonIdToSId(tmp["_id"])
  502. err := Es.DeleteByID("bidding", id)
  503. if err != nil {
  504. log.Println("es bidding", id)
  505. }
  506. err = EsNew.DeleteByID("bidding", id)
  507. if err != nil {
  508. log.Println("es new bidding", id)
  509. }
  510. err = EsNew.DeleteByID("bidding_year", id)
  511. if err != nil {
  512. log.Println("es new bidding_year", id)
  513. }
  514. err = EsNew.DeleteByID("bidding_free", id)
  515. if err != nil {
  516. log.Println("es new bidding_free", id)
  517. }
  518. }
  519. }