bidd.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "go.mongodb.org/mongo-driver/bson"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. // exportBidding exportBidding
  20. func exportBidding() {
  21. MgoB = &mongodb.MongodbSim{
  22. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  23. //MongodbAddr: "127.0.0.1:27083",
  24. Size: 10,
  25. DbName: "qfw",
  26. UserName: "SJZY_RWbid_ES",
  27. Password: "SJZY@B4i4D5e6S",
  28. //Direct: true,
  29. }
  30. MgoB.InitPool()
  31. sess := MgoB.GetMgoConn()
  32. defer MgoB.DestoryMongoConn(sess)
  33. where := map[string]interface{}{
  34. "subtype": "开标记录",
  35. }
  36. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "channel": 1, "detail": 1, "comeintime": 1, "extracttype": 1, "infoformat": 1}).Iter()
  37. count := 0
  38. ch := make(chan bool, 20)
  39. wg := &sync.WaitGroup{}
  40. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  41. if count%10000 == 0 {
  42. log.Println("current:", count)
  43. }
  44. ch <- true
  45. wg.Add(1)
  46. go func(tmp map[string]interface{}) {
  47. defer func() {
  48. <-ch
  49. wg.Done()
  50. }()
  51. MgoB.SaveByOriID("wcc_bidding_kaibiao", tmp)
  52. }(tmp)
  53. tmp = make(map[string]interface{})
  54. }
  55. wg.Wait()
  56. log.Println("处理完毕")
  57. }
  58. func getBidding0311() {
  59. // 3. 构造 MongoDB 查询
  60. filter := bson.M{
  61. "$and": []bson.M{
  62. {
  63. "$or": []bson.M{
  64. {"subtype": "合同"},
  65. {"toptype": "结果"},
  66. {"toptype": "招标"},
  67. },
  68. },
  69. {"publishtime": bson.M{"$gte": 1714492800, "$lt": 1719763200}},
  70. },
  71. }
  72. //where := map[string]interface{}{
  73. // "publishtime": map[string]interface{}{
  74. // "$gte": 1704038400,
  75. // "$lte": 1735660800,
  76. // },
  77. //}
  78. sess := MgoB.GetMgoConn()
  79. defer MgoB.DestoryMongoConn(sess)
  80. query := sess.DB("qfw").C("bidding").Find(filter).Select(map[string]interface{}{"buyer": 1, "toptype": 1, "subtype": 1}).Iter()
  81. count := 0
  82. ch := make(chan bool, 5)
  83. wg := &sync.WaitGroup{}
  84. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  85. if count%1000 == 0 {
  86. log.Println("current:", count, tmp["_id"])
  87. }
  88. if util.IntAll(tmp["extracttype"]) == -1 {
  89. continue
  90. }
  91. ch <- true
  92. wg.Add(1)
  93. go func(tmp map[string]interface{}) {
  94. defer func() {
  95. <-ch
  96. wg.Done()
  97. }()
  98. MgoB.SaveByOriID("wcc_bidding_20250311_1148", tmp)
  99. }(tmp)
  100. //if util.ObjToString(tmp["toptype"]) == "招标" || util.ObjToString(tmp["toptype"]) == "结果" || util.ObjToString(tmp["subtype"]) == "合同" {
  101. // MgoB.SaveByOriID("wcc_bidding_20250311_1148-2", tmp)
  102. //}
  103. tmp = make(map[string]interface{})
  104. }
  105. wg.Wait()
  106. }
  107. // getBidding2 获取bidding 1.3日无二级分类数据
  108. func getBidding2() {
  109. //2024-1-3日数据
  110. where := map[string]interface{}{
  111. "comeintime": map[string]interface{}{
  112. "$gte": 1704211200,
  113. "$lte": 1704297600,
  114. },
  115. "subtype": map[string]interface{}{
  116. "$exists": 1,
  117. },
  118. }
  119. sess := MgoB.GetMgoConn()
  120. defer MgoB.DestoryMongoConn(sess)
  121. //texts := make([]string, 0)
  122. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "subtype": 1, "href": 1, "detail": 1, "channel": 1}).Iter()
  123. count := 0
  124. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  125. if count%1000 == 0 {
  126. log.Println("current:", count)
  127. }
  128. if util.IntAll(tmp["extracttype"]) == -1 {
  129. continue
  130. }
  131. id := mongodb.BsonIdToSId(tmp["_id"])
  132. tmp["jyhref"] = GetJyURLByID(id)
  133. MgoB.SaveByOriID("wcc_bidding_20240103_subtype_exists", tmp)
  134. tmp = make(map[string]interface{})
  135. }
  136. log.Println("over")
  137. }
  138. // callAi 调用大模型
  139. func callAi() {
  140. sess := MgoB.GetMgoConn()
  141. defer MgoB.DestoryMongoConn(sess)
  142. //where := map[string]interface{}{
  143. // "subtype_a": map[string]interface{}{
  144. // "$exists": 0,
  145. // },
  146. //}
  147. query := sess.DB("qfw_data").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter()
  148. count := 0
  149. //ch := make(chan bool, 1)
  150. //wg := &sync.WaitGroup{}
  151. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  152. if count%1000 == 0 {
  153. log.Println("-------- current:", count, tmp["_id"], " ---------")
  154. }
  155. //ch <- true
  156. //wg.Add(1)
  157. //go func(tmp map[string]interface{}) {
  158. // defer func() {
  159. // <-ch
  160. // wg.Done()
  161. // }()
  162. id := mongodb.BsonIdToSId(tmp["_id"])
  163. title := util.ObjToString(tmp["title"])
  164. detail := util.ObjToString(tmp["detail"])
  165. data := map[string]interface{}{
  166. "title": title,
  167. "detail": detail,
  168. }
  169. reqData := map[string]interface{}{
  170. "texts": []interface{}{data},
  171. }
  172. now := time.Now()
  173. res := send(reqData)
  174. log.Println(time.Since(now).Seconds(), tmp["_id"])
  175. subtype := res["result"].([]interface{})
  176. result := subtype[0]
  177. types := strings.Split(util.ObjToString(result), "-")
  178. update := make(map[string]interface{})
  179. if len(types) == 2 {
  180. update["toptype_ai"] = types[0]
  181. update["subtype_ai"] = types[1]
  182. //没有内容
  183. if detail == "" {
  184. update["data_type"] = 1
  185. } else {
  186. update["data_type"] = 0
  187. }
  188. MgoB.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update})
  189. }
  190. //}(tmp)
  191. tmp = make(map[string]interface{})
  192. }
  193. //wg.Wait()
  194. log.Println("over")
  195. }
  196. // getBidding 调用分类大模型
  197. func getBidding() {
  198. //2024-1-3日数据
  199. where := map[string]interface{}{
  200. "comeintime": map[string]interface{}{
  201. "$gte": 1704211200,
  202. "$lte": 1704297600,
  203. },
  204. "subtype": map[string]interface{}{
  205. "$exists": 0,
  206. },
  207. }
  208. sess := MgoB.GetMgoConn()
  209. defer MgoB.DestoryMongoConn(sess)
  210. //texts := make([]string, 0)
  211. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "href": 1, "detail": 1}).Iter()
  212. count := 0
  213. ch := make(chan bool, 10)
  214. wg := &sync.WaitGroup{}
  215. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  216. if count%1000 == 0 {
  217. log.Println("current:", count)
  218. }
  219. if util.IntAll(tmp["extracttype"]) == -1 {
  220. continue
  221. }
  222. ch <- true
  223. wg.Add(1)
  224. go func(tmp map[string]interface{}) {
  225. defer func() {
  226. <-ch
  227. wg.Done()
  228. }()
  229. id := mongodb.BsonIdToSId(tmp["_id"])
  230. title := util.ObjToString(tmp["title"])
  231. detail := util.ObjToString(tmp["detail"])
  232. tmp["bidding_id"] = id
  233. data := map[string]interface{}{
  234. "title": title,
  235. "detail": detail,
  236. }
  237. reqData := map[string]interface{}{
  238. "texts": []interface{}{data},
  239. }
  240. res := SendAi(reqData)
  241. subtype := res["result"].([]interface{})
  242. result := subtype[0]
  243. types := strings.Split(util.ObjToString(result), "-")
  244. if len(types) == 2 {
  245. tmp["new_toptype"] = types[0]
  246. tmp["new_subtype"] = types[1]
  247. }
  248. tmp["jyhref"] = GetJyURLByID(id)
  249. //没有内容
  250. if detail == "" {
  251. tmp["data_type"] = 1
  252. } else {
  253. tmp["data_type"] = 0
  254. }
  255. MgoB.Save("wcc_20240103-2", tmp)
  256. }(tmp)
  257. tmp = make(map[string]interface{})
  258. }
  259. wg.Wait()
  260. log.Println("over")
  261. }
  262. func send(data map[string]interface{}) (res map[string]interface{}) {
  263. url := "http://192.168.3.109:16688"
  264. jsonData, err := json.Marshal(data)
  265. if err != nil {
  266. fmt.Println("JSON marshal error:", err)
  267. return
  268. }
  269. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  270. if err != nil {
  271. fmt.Println("Request error:", err)
  272. return
  273. }
  274. req.Header.Set("Content-Type", "application/json")
  275. client := &http.Client{}
  276. resp, err := client.Do(req)
  277. if err != nil {
  278. fmt.Println("Request error:", err)
  279. return
  280. }
  281. defer resp.Body.Close()
  282. err = json.NewDecoder(resp.Body).Decode(&res)
  283. if err != nil {
  284. fmt.Println("Response decoding error:", err)
  285. return
  286. }
  287. return
  288. }
  289. // SendAi 调用大模型招标分类
  290. func SendAi(data map[string]interface{}) (res map[string]interface{}) {
  291. // 设置 2 秒的超时
  292. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  293. defer cancel()
  294. url := "http://192.168.3.109:16688"
  295. jsonData, err := json.Marshal(data)
  296. if err != nil {
  297. fmt.Println("JSON marshal error:", err)
  298. return
  299. }
  300. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  301. if err != nil {
  302. fmt.Println("Request error:", err)
  303. return
  304. }
  305. req.Header.Set("Content-Type", "application/json")
  306. // 将请求与上下文关联
  307. req = req.WithContext(ctx)
  308. client := &http.Client{}
  309. resp, err := client.Do(req)
  310. if err != nil {
  311. // 使用 errors.Is 检查错误是否是超时错误
  312. if errors.Is(err, context.DeadlineExceeded) {
  313. fmt.Println("Request timed out")
  314. return
  315. }
  316. fmt.Println("Request error:", err)
  317. return
  318. }
  319. defer resp.Body.Close()
  320. err = json.NewDecoder(resp.Body).Decode(&res)
  321. if err != nil {
  322. fmt.Println("Response decoding error:", err)
  323. return
  324. }
  325. return
  326. }
  327. // getCount 北京中科闻歌科技股份有限公司 对指定数据源网站内的中标公告的数据量的查询需求
  328. func getCount() {
  329. //startTime := int64(1672502400) //2023-01-01
  330. //endTime := int64(1680278400) //2023-4-01
  331. //endTime2 := int64(1688140800) //2023-7-01
  332. //endTime3 := int64(1696089600) //2023-10-01
  333. endTime4 := int64(1704038400) //2024-1-01
  334. endTime5 := int64(1714492800) //2024-5-01
  335. where := map[string]interface{}{
  336. "_id": map[string]interface{}{
  337. "$gt": mongodb.StringTOBsonId(strconv.FormatInt(endTime4, 16) + "0000000000000000"),
  338. "$lte": mongodb.StringTOBsonId(strconv.FormatInt(endTime5, 16) + "0000000000000000"),
  339. },
  340. }
  341. sess := MgoB.GetMgoConn()
  342. defer MgoB.DestoryMongoConn(sess)
  343. log.Println(where)
  344. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"site": 1, "subtype": 1, "attach_text": 1}).Iter()
  345. count := 0
  346. count1 := 0 //中国南方电网 标讯数据
  347. count2 := 0 //中国南方电网 标讯成交数据
  348. count3 := 0 //中国南方电网 标讯有附件数据
  349. count4 := 0 //中国南方电网 标讯成交有附件数据
  350. count5 := 0 //国家电网公司电子商务平台 标讯数据
  351. count6 := 0 //国家电网公司电子商务平台 标讯成交数据
  352. count7 := 0 //国家电网公司电子商务平台 标讯有附件数据
  353. count8 := 0 //国家电网公司电子商务平台 标讯成交有附件数据
  354. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  355. if count%10000 == 0 {
  356. log.Println("current:", count, count1, count2, count3, count4, count5, count6, count7, count8)
  357. }
  358. site := util.ObjToString(tmp["site"])
  359. subtype := util.ObjToString(tmp["subtype"])
  360. attachText, _ := tmp["attach_text"].(map[string]interface{})
  361. if site == "中国南方电网" {
  362. count1++
  363. if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" {
  364. count2++
  365. if attachText != nil {
  366. count4++
  367. }
  368. }
  369. if attachText != nil {
  370. count3++
  371. }
  372. }
  373. if site == "国家电网公司电子商务平台" {
  374. count5++
  375. if subtype == "中标" || subtype == "单一" || subtype == "成交" || subtype == "合同" {
  376. count6++
  377. if attachText != nil {
  378. count8++
  379. }
  380. }
  381. if attachText != nil {
  382. count7++
  383. }
  384. }
  385. //if util.ObjToString(tmp["site"]) != "中国南方电网" {
  386. // count1++
  387. // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" {
  388. // count2++
  389. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  390. // count4++
  391. // }
  392. // }
  393. // // 附件
  394. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  395. // count3++
  396. // }
  397. //
  398. //}
  399. //
  400. //if util.ObjToString(tmp["site"]) != "国家电网公司电子商务平台" {
  401. // count5++
  402. // if util.ObjToString(tmp["subtype"]) == "中标" || util.ObjToString(tmp["subtype"]) == "单一" || util.ObjToString(tmp["subtype"]) == "成交" || util.ObjToString(tmp["subtype"]) == "合同" {
  403. // count6++
  404. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  405. // count8++
  406. // }
  407. // }
  408. // //右附件
  409. // if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
  410. // count7++
  411. // }
  412. //}
  413. tmp = make(map[string]interface{})
  414. }
  415. log.Println("count1", count1)
  416. log.Println("count2", count2)
  417. log.Println("count3", count3)
  418. log.Println("count4", count4)
  419. log.Println("count5", count5)
  420. log.Println("count6", count6)
  421. log.Println("count7", count7)
  422. log.Println("count8", count8)
  423. log.Println("over")
  424. }
  425. func deleteEs() {
  426. Mgo := &mongodb.MongodbSim{
  427. MongodbAddr: "172.17.189.140:27080",
  428. //MongodbAddr: "127.0.0.1:27083",
  429. Size: 10,
  430. DbName: "qfw",
  431. UserName: "SJZY_RWbid_ES",
  432. Password: "SJZY@B4i4D5e6S",
  433. //Direct: true,
  434. }
  435. Mgo.InitPool()
  436. sess := Mgo.GetMgoConn()
  437. defer Mgo.DestoryMongoConn(sess)
  438. //es
  439. Es := &elastic.Elastic{
  440. //S_esurl: "http://127.0.0.1:19908",
  441. S_esurl: "http://172.17.4.184:19908",
  442. I_size: 5,
  443. Username: "jybid",
  444. Password: "Top2023_JEB01i@31",
  445. }
  446. Es.InitElasticSize()
  447. //es 新集群
  448. EsNew := &elastic.Elastic{
  449. //S_esurl: "http://127.0.0.1:19905",
  450. S_esurl: "http://172.17.4.184:19905",
  451. I_size: 5,
  452. Username: "jybid",
  453. Password: "Top2023_JEB01i@31",
  454. }
  455. EsNew.InitElasticSize()
  456. where := map[string]interface{}{
  457. "_id": map[string]interface{}{
  458. "$gt": mongodb.StringTOBsonId("673458dbb25c3e1deb2fef58"),
  459. "$lte": mongodb.StringTOBsonId("6734638db25c3e1deb303821"),
  460. },
  461. "extracttype": -1,
  462. }
  463. query := sess.DB("qfw").C("bidding").Find(&where).Select(nil).Sort("-_id").Iter()
  464. count := 0
  465. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  466. if count%100 == 0 {
  467. log.Println("current:", count)
  468. }
  469. id := mongodb.BsonIdToSId(tmp["_id"])
  470. err := Es.DeleteByID("bidding", id)
  471. if err != nil {
  472. log.Println("es bidding", id)
  473. }
  474. err = EsNew.DeleteByID("bidding", id)
  475. if err != nil {
  476. log.Println("es new bidding", id)
  477. }
  478. err = EsNew.DeleteByID("bidding_year", id)
  479. if err != nil {
  480. log.Println("es new bidding_year", id)
  481. }
  482. err = EsNew.DeleteByID("bidding_free", id)
  483. if err != nil {
  484. log.Println("es new bidding_free", id)
  485. }
  486. }
  487. }