import datetime import json import time from collections import deque from functools import wraps import execjs import requests import urllib3 from utils.databases import mongo_table from utils.log import logger urllib3.disable_warnings() zktest_unexists_name = mongo_table('py_spider', 'zktest_unexists_name') f_hospital_codes = mongo_table('py_theme', 'f_hospital_codes') openid = None # 全局openid class TimerError(IOError): def __init__(self, *args, **kwargs): self.msg = args[0] class CrawlError(Exception): def __init__(self, *args, **kwargs): self.msg = args[0] class RequestError(Exception): def __init__(self, *args, **kwargs): self.msg = args[0] def spider_listener(func): @wraps(func) def wrapper(*args, **kwargs): if all([ 0 <= datetime.datetime.now().weekday() <= 4, # 周一到周五 9 <= datetime.datetime.now().hour <= 18 # 早9点到晚19点 ]): result = func(*args, **kwargs) return result raise TimerError('小程序接口停止运营') return wrapper def get_cursor(): projection = {'_id': 1, 'name': 1} q = { "$and": [ {"count": {"$exists": True}}, {"count": {"$gt": 0}} ], 'is_crawl': {'$exists': False} } cursor = zktest_unexists_name.find(q, projection=projection) return cursor def get_openid(openid_deque: deque): openid = openid_deque.popleft() openid_deque.append(openid) return openid def md5_hex(val): salt = "A523B4A5C52203AA9C2D97F6CB45CB35" val = val + salt script = """ function n(t, e, n, r, o, i) { return u(function(t, e) { return t << e | t >>> 32 - e; }(u(u(e, t), u(r, i)), o), n); } function r(t, e, r, o, i, a, u) { return n(e & r | ~e & o, t, e, i, a, u); } function o(t, e, r, o, i, a, u) { return n(e & o | r & ~o, t, e, i, a, u); } function i(t, e, r, o, i, a, u) { return n(e ^ r ^ o, t, e, i, a, u); } function a(t, e, r, o, i, a, u) { return n(r ^ (e | ~o), t, e, i, a, u); } function u(t, e) { var n = (65535 & t) + (65535 & e); return (t >> 16) + (e >> 16) + (n >> 16) << 16 | 65535 & n; } hex_md5= function(t) { return function(t) { for (var e = "0123456789abcdef", n = "", r = 0; r < 4 * t.length; r++) n += e.charAt(t[r >> 2] >> r % 4 * 8 + 4 & 15) + e.charAt(t[r >> 2] >> r % 4 * 8 & 15); return n; }(function(t, e) { t[e >> 5] |= 128 << e % 32, t[14 + (e + 64 >>> 9 << 4)] = e; for (var n = 1732584193, c = -271733879, s = -1732584194, f = 271733878, l = 0; l < t.length; l += 16) { var p = n, h = c, d = s, v = f; n = r(n, c, s, f, t[l + 0], 7, -680876936), f = r(f, n, c, s, t[l + 1], 12, -389564586), s = r(s, f, n, c, t[l + 2], 17, 606105819), c = r(c, s, f, n, t[l + 3], 22, -1044525330), n = r(n, c, s, f, t[l + 4], 7, -176418897), f = r(f, n, c, s, t[l + 5], 12, 1200080426), s = r(s, f, n, c, t[l + 6], 17, -1473231341), c = r(c, s, f, n, t[l + 7], 22, -45705983), n = r(n, c, s, f, t[l + 8], 7, 1770035416), f = r(f, n, c, s, t[l + 9], 12, -1958414417), s = r(s, f, n, c, t[l + 10], 17, -42063), c = r(c, s, f, n, t[l + 11], 22, -1990404162), n = r(n, c, s, f, t[l + 12], 7, 1804603682), f = r(f, n, c, s, t[l + 13], 12, -40341101), s = r(s, f, n, c, t[l + 14], 17, -1502002290), c = r(c, s, f, n, t[l + 15], 22, 1236535329), n = o(n, c, s, f, t[l + 1], 5, -165796510), f = o(f, n, c, s, t[l + 6], 9, -1069501632), s = o(s, f, n, c, t[l + 11], 14, 643717713), c = o(c, s, f, n, t[l + 0], 20, -373897302), n = o(n, c, s, f, t[l + 5], 5, -701558691), f = o(f, n, c, s, t[l + 10], 9, 38016083), s = o(s, f, n, c, t[l + 15], 14, -660478335), c = o(c, s, f, n, t[l + 4], 20, -405537848), n = o(n, c, s, f, t[l + 9], 5, 568446438), f = o(f, n, c, s, t[l + 14], 9, -1019803690), s = o(s, f, n, c, t[l + 3], 14, -187363961), c = o(c, s, f, n, t[l + 8], 20, 1163531501), n = o(n, c, s, f, t[l + 13], 5, -1444681467), f = o(f, n, c, s, t[l + 2], 9, -51403784), s = o(s, f, n, c, t[l + 7], 14, 1735328473), c = o(c, s, f, n, t[l + 12], 20, -1926607734), n = i(n, c, s, f, t[l + 5], 4, -378558), f = i(f, n, c, s, t[l + 8], 11, -2022574463), s = i(s, f, n, c, t[l + 11], 16, 1839030562), c = i(c, s, f, n, t[l + 14], 23, -35309556), n = i(n, c, s, f, t[l + 1], 4, -1530992060), f = i(f, n, c, s, t[l + 4], 11, 1272893353), s = i(s, f, n, c, t[l + 7], 16, -155497632), c = i(c, s, f, n, t[l + 10], 23, -1094730640), n = i(n, c, s, f, t[l + 13], 4, 681279174), f = i(f, n, c, s, t[l + 0], 11, -358537222), s = i(s, f, n, c, t[l + 3], 16, -722521979), c = i(c, s, f, n, t[l + 6], 23, 76029189), n = i(n, c, s, f, t[l + 9], 4, -640364487), f = i(f, n, c, s, t[l + 12], 11, -421815835), s = i(s, f, n, c, t[l + 15], 16, 530742520), c = i(c, s, f, n, t[l + 2], 23, -995338651), n = a(n, c, s, f, t[l + 0], 6, -198630844), f = a(f, n, c, s, t[l + 7], 10, 1126891415), s = a(s, f, n, c, t[l + 14], 15, -1416354905), c = a(c, s, f, n, t[l + 5], 21, -57434055), n = a(n, c, s, f, t[l + 12], 6, 1700485571), f = a(f, n, c, s, t[l + 3], 10, -1894986606), s = a(s, f, n, c, t[l + 10], 15, -1051523), c = a(c, s, f, n, t[l + 1], 21, -2054922799), n = a(n, c, s, f, t[l + 8], 6, 1873313359), f = a(f, n, c, s, t[l + 15], 10, -30611744), s = a(s, f, n, c, t[l + 6], 15, -1560198380), c = a(c, s, f, n, t[l + 13], 21, 1309151649), n = a(n, c, s, f, t[l + 4], 6, -145523070), f = a(f, n, c, s, t[l + 11], 10, -1120210379), s = a(s, f, n, c, t[l + 2], 15, 718787259), c = a(c, s, f, n, t[l + 9], 21, -343485551), n = u(n, p), c = u(c, h), s = u(s, d), f = u(f, v); } return Array(n, c, s, f); }(function(t) { for (var e = Array(), n = 0; n < 8 * t.length; n += 8) e[n >> 5] |= (255 & t.charCodeAt(n / 8)) << n % 32; return e; }(t), 8 * t.length)); } """ ctx = execjs.compile(script) result = ctx.call('hex_md5', val) # print(result) return result def quote(data): script = """ getQuote = function(data){return encodeURIComponent(JSON.stringify(data))} """ ctx = execjs.compile(script) quote_str = ctx.call('getQuote', data) # print(quote_str) return quote_str def check_response(response, *args): resp_json = response.json() logger.debug(json.dumps(resp_json, indent=4, ensure_ascii=False)) open_id = f" OpenId:{args[0]}" if len(args) > 0 else None if 'resultType' in resp_json and resp_json['resultType'] == 'ipError': raise CrawlError(resp_json['resultTypeMemo'] + open_id) if 'code' in resp_json and resp_json["code"] != '0': raise CrawlError(resp_json['msg'] + open_id) if len(resp_json) == 0: raise CrawlError("详情页请求结果为空" + open_id) @spider_listener def callback_requests(func, *args, **kwargs): global openid proxy = kwargs.pop('proxy', None) openid = kwargs.pop('openid') openid_dq = kwargs.pop('openid_dq') while True: kwargs['openid'] = openid logger.debug(f"[当前openid]:{openid}") proxies = proxy.proxies if proxy is not None else None kwargs['proxies'] = proxies logger.debug(f"[当前代理]:{proxies}") try: return func(*args, **kwargs) except RequestError as e: logger.error(f"[请求异常]:{e}") time.sleep(3) if proxy is not None: proxy.switch() except (CrawlError, AssertionError) as e: logger.error(f"[查询异常]:{e}") openid = get_openid(openid_dq) time.sleep(3) def get_jgdm(query, proxies, openid): results = [] url = "https://ss.cods.org.cn/MiniProService/search/searchRMini" headers = { "Host": "ss.cods.org.cn", "content-type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.26(0x18001a2e) NetType/WIFI Language/zh_CN", "Referer": "https://servicewechat.com/wxa97584cd2e4d83ad/10/page-frame.html" } val = { "q": query, "t": "common", "currentPage": 1, "xzqh": "", "jglx": "B", # 事业类型 "zczj": "", "clrq": "", "mobile": "", "isDeepSearch": False, "platform": "weixin", "openid": openid } json_str = quote(val) sign = md5_hex(json_str) data = { "jsonString": json_str, "sign": sign } # print(data) request_params = dict( headers=headers, data=data, verify=False, timeout=60, proxies=proxies ) try: response = requests.post(url, **request_params) except requests.RequestException as e: raise RequestError(f"'{query}'jgdm请求失败, 原因:{e}") # print(response) check_response(response, openid) resp_json = response.json() assert 'resultType' in resp_json and resp_json['resultType'] != 'ipError' documents = resp_json['jginfoList']["documents"] for item in documents: if item['jyzt'] != '注销': results.append(item['encJgdm']) logger.info(f"[查询成功]获取{len(results)}条'{query}'相关信息") return results def get_hospital(query, jgdm, proxies, openid): url = "https://ss.cods.org.cn/MiniProService/detailPage/detail.base" headers = { "Host": "ss.cods.org.cn", "Content-Length": "531", "content-type": "application/x-www-form-urlencoded", "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.26(0x18001a2e) NetType/WIFI Language/zh_CN", "Referer": "https://servicewechat.com/wxa97584cd2e4d83ad/10/page-frame.html", "Connection": "keep-alive" } val = { "jgdm": jgdm, "keyword": query, "platform": "weixin", "openid": openid } json_str = quote(val) data = { 'jsonString': json_str, 'sign': md5_hex(json_str) } # print(data) request_params = dict( headers=headers, data=data, proxies=proxies, verify=False, timeout=60 ) try: response = requests.post(url, **request_params) except requests.RequestException as e: raise RequestError(f"'{jgdm}'医院详情请求失败, 原因:{e}") check_response(response, openid) resp_json = response.json() # print(json.dumps(resp_json, indent=4, ensure_ascii=False)) assert "code" in resp_json and resp_json["code"] == '0' item = resp_json['document'] alias_name = "" if "jgmchis" in item: alias_name = ",".join(item["jgmchis"]) hospital = { "search_name": query, # 搜索名称 "hospital_name": item["jgmc"], # 医院名称 "alias_name": alias_name, # 曾用名 ("xxx,xxx") "credit_no": item["tydm"], # 统一信用代码 "legal_person": item["fddbr"], # 法定代表人 "capital": item["newZczj"], # 注册资本 "establish_date": item["clrq"], # 成立日期 "company_type": "事业单位", # 企业类型 (机构类型) "operation_startdate": item["jyqxz"], # 营业期限自 "operation_enddate": item["jyqxe"], # 营业期限至 "business_scope": item["jyfw"], # 经营范围 "authority": item["djbmmc"], # 登记机关(批准机构名称) "company_address": item["zcdz"], # 联系地址(注册地址) "company_code": item["djh"], # 注册号(登记号) "organization": item["jjlxdm"], # (经济类型) "industry": item["jjhydm"], # (经济行业) } f_hospital_codes.update_one( {'search_name': query, 'credit_no': item["tydm"]}, {'$set': hospital}, upsert=True ) # print(json.dumps(hospital, indent=4, ensure_ascii=False)) logger.info(f"[查询成功]获取'{hospital['hospital_name']}'详情数据") return hospital def query_hospital(tasks, proxy, openid_deque): global openid while len(tasks) > 0: task = tasks.pop(0) query = task['name'] openid = get_openid(openid_deque) logger.info(f"[开始查询]{query}") params = dict(proxy=proxy, openid=openid, openid_dq=openid_deque) # 列表页 jgdm_lst = callback_requests(get_jgdm, query, **params) time.sleep(3) # 详情页 for jgdm in jgdm_lst: params.update(dict(openid=openid)) callback_requests(get_hospital, query, jgdm, **params) logger.info(f"[保存数据]jgdm:{jgdm}") time.sleep(15) # 更新采集任务状态 zktest_unexists_name.update_one( {'_id': task['_id']}, { '$set': { 'is_crawl': True, 'count': len(jgdm_lst) # 事业单位的数量 } } ) time.sleep(60) # if __name__ == '__main__': # get_hospital('沈阳市儿童医院', '1653fbc6f5c496974321f967286cba59', None, 'o0VVO5Qj5EZzjeaKjCQUhhiYprBw') # get_jgdm('沈阳市儿童医院', None, 'o0VVO5Qj5EZzjeaKjCQUhhiYprBw')