123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- # -*- coding: utf-8 -*-
- """
- Created on 2025-05-08
- ---------
- @summary: 云南省政府采购网 采购信息
- ---------
- @author: lzz
- """
- import json
- import execjs
- import base64
- import feapder
- from items.spider_item import MgpListItem
- from get_yn_cookies import get_ck
- from collections import namedtuple
- from untils.get_imgcode import get_code_det
- import time
- from untils.tools import get_proxy
- from feapder.utils.tools import get_today_of_day
- def pass_code(session):
- js_script = '''
- uuid = function(){
- var s = [];
- var hexDigits = "0123456789abcdef";
- for (var i = 0; i < 36; i++) {
- s[i] = hexDigits.substr(Math.floor(Math.random() * 0x10), 1);
- }
- s[14] = "4";
- s[19] = hexDigits.substr((s[19] & 0x3) | 0x8, 1);
- s[8] = s[13] = s[18] = s[23] = "-";
- return 'point' + '-' + s.join("");
- }
- '''
- ctx = execjs.compile(js_script)
- client_uid = ctx.call('uuid')
- url = "http://www.yngp.com/api/captcha/captcha.get.svc"
- data = {
- "captchaType": "clickWord",
- "clientUid": client_uid,
- "ts": round(time.time() * 1000)
- }
- data = json.dumps(data)
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "Content-Type": "application/json;charset=UTF-8",
- "Origin": "http://www.yngp.com",
- "Pragma": "no-cache",
- "Referer": "http://www.yngp.com/page/procurement/procurementList.html",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
- }
- res = session.post(url, headers=headers, data=data,timeout=60)
- image_content = res.json().get("data").get("repData").get("originalImageBase64")
- code_list = res.json().get("data").get("repData").get("wordList")
- keyWord = res.json().get("data").get("repData").get("secretKey")
- token = res.json().get("data").get("repData").get("token")
- image_code = get_code_det(base64.b64decode(image_content))
- image_code = image_code.get("r").get("code")
- word = []
- for i in code_list:
- if image_code.get(i):
- word.append(image_code.get(i))
- else:
- word.append(image_code.get(""))
- try:
- word = [{"x": (i[0] + i[2]) // 2, "y": (i[1] + i[3]) // 2} for i in word]
- except:
- word = [{"x": (i[0] + i[2]) // 2, "y": (i[1] + i[3]) // 2} for i in list(image_code.values())[:3]]
- js_func = '''
- var CryptoJS = require("crypto-js")
- function aesEncrypt(word,keyWord){
- word = JSON.stringify(word)
- var key = CryptoJS.enc.Utf8.parse(keyWord);
- var srcs = CryptoJS.enc.Utf8.parse(word);
- var encrypted = CryptoJS.AES.encrypt(srcs, key, {mode:CryptoJS.mode.ECB,padding: CryptoJS.pad.Pkcs7});
- return encrypted.toString();
- }
- '''
- ctx = execjs.compile(js_func)
- pointJson = ctx.call("aesEncrypt", word, keyWord)
- data = {
- "captchaType": "clickWord",
- "pointJson": pointJson,
- "token": token,
- "clientUid": client_uid,
- "ts": round(time.time() * 1000)
- }
- # with open('ynszfcgw.png', 'wb+') as f:
- # f.write(base64.b64decode(image_content))
- url = "http://www.yngp.com/api/captcha/captcha.check.svc"
- data = json.dumps(data)
- session.post(url, headers=headers, data=data, timeout=60)
- time.sleep(5)
- js_func = '''
- var CryptoJS = require("crypto-js")
- function aesEncrypt(token,word,keyWord){
- word = token + "---" +JSON.stringify(word)
- var key = CryptoJS.enc.Utf8.parse(keyWord);
- var srcs = CryptoJS.enc.Utf8.parse(word);
- var encrypted = CryptoJS.AES.encrypt(srcs, key, {mode:CryptoJS.mode.ECB,padding: CryptoJS.pad.Pkcs7});
- return encrypted.toString();
- }
- '''
- ctx = execjs.compile(js_func)
- captchaVerification = ctx.call("aesEncrypt", token, word, keyWord)
- return captchaVerification
- class Ynszfcgw_New(feapder.BiddingListSpider):
- def start_callback(self):
- Menu = namedtuple('Menu', ['channel', 'code', 'noticeType', 'crawl_page'])
- self.site = "云南省政府采购网"
- self.menus = [
- Menu('ppp合作伙伴采购信息', 'yn_ynszfcgw_new_ppphzgbcgxx', '4', 1),
- ]
- self.headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
- "Origin": "http://www.yngp.com",
- "Pragma": "no-cache",
- "Referer": "http://www.yngp.com/page/procurement/procurementList.html",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest"
- }
- self.retry = 0
- self.proxy = get_proxy()
- self.cookies = {}
- self.vcode = ""
- def start_requests(self):
- for menu in self.menus:
- start_url = "http://www.yngp.com/api/procurement/Procurement.gghtMoreList.svc"
- yield feapder.Request(url=start_url,item=menu._asdict(),proxies=False, page=1)
- def download_midware(self, request):
- for _ in range(5):
- if not self.cookies:
- self.cookies = get_ck(proxies=self.proxy)
- if self.cookies:
- break
- else:
- self.proxy = get_proxy()
- else:
- break
- menu = request.item
- page = request.page
- if page != 1:
- url = f"http://www.yngp.com/api/procurement/Procurement.gghtMoreList.svc?captchaCheckFlag={self.vcode}&p={page}"
- else:
- url = f"http://www.yngp.com/api/procurement/Procurement.gghtMoreList.svc?captchaCheckFlag=0&p=1"
- if menu.get('code') == "yn_ynszfcgw_new_zfcghtgg":
- tm = get_today_of_day()
- else:
- tm = ""
- data = {
- "current": f"{page}",
- "rowCount": "10",
- "searchPhrase": "",
- "query_bulletintitle": "",
- "query_startTime": tm,
- "query_endTime": tm,
- "query_type": menu.get('noticeType'),
- "query_code": "",
- "query_gglxdm": "",
- "query_purchaser": "",
- "query_projectid": ""
- }
- request.url = url
- request.data = data
- request.cookies = self.cookies
- request.proxies = self.proxy
- request.headers = self.headers
- def exception_request(self, request, response):
- self.proxy = get_proxy()
- self.cookies = {}
- yield request
- def parse(self, request, response):
- if self.retry > 5:
- return
- if "系统异常,请稍后再试" in response.text or "磐云" in response.text:
- self.retry += 1
- self.cookies = {}
- self.proxy = get_proxy()
- yield request
- else:
- menu = request.item
- response=response.json
- info_list = response.get('data').get('rows')
- for info in info_list:
- title = info.get('bulletintitle').split(":")[-1].strip()
- href_id = info.get('bulletin_id')
- bulletinclassname = info.get('bulletinclassname')
- href = f'http://www.yngp.com/showBulletinInfo.html?bulletin_id={href_id}'
- if bulletinclassname == "采购合同公告":
- if "单位采购合同公告" in title:
- continue
- href = f'http://www.yngp.com/ggmxinfo.html?bulletinid={href_id}'
- elif "公共服务项目验收结果公告" in bulletinclassname:
- href = f'http://www.yngp.com/showAcceptanceResultsNoticeInfo.html?bulletinid={href_id}'
- elif bulletinclassname in "单一来源审核前公示":
- href = f'http://www.yngp.com/dylyggInfo.html?type=3&bulletin_id={href_id}'
- elif "成交公告" in bulletinclassname:
- href = f'http://www.yngp.com/showZCYBulletinInfo.html?bulletin_id={href_id}'
- create_time = info.get("finishday")
- area = "云南" # 省份
- city = ""
- if info.get('districtname') !="省级":
- city = info.get('districtname') # 城市
- list_item = MgpListItem() # 存储数据的管道
- list_item.href = href # 标书链接
- list_item.channel = menu.get("channel") # 最上方定义的抓取栏目 (编辑器定的)
- list_item.spidercode = menu.get("code") # 最上方定义的爬虫code(编辑器定的)
- list_item.title = title # 标题
- list_item.publishtime = create_time # 标书发布时间
- list_item.site = self.site
- list_item.area = area # 城市默认:全国
- list_item.city = city # 城市 默认为空
- list_item.unique_key = ('href','title',create_time)
- list_item.parse = "self.detail_get"
- list_item.render_time = 1
- list_item.proxies = False
- list_item.parse_url = href
- yield list_item
- request = self.infinite_pages(request, response)
- yield request
- if __name__ == "__main__":
- Ynszfcgw_New(redis_key="lzz:Ynszfcgw_Jdgl").start()
|