123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import io
- import re
- from lxml.html import fromstring
- from pymongo import MongoClient
- from pymongo.collection import Collection
- from lxml.html import tostring
- hospital = MongoClient('127.0.0.1:27017').hospital
- area_tab: Collection = hospital.area
- crawl_tab: Collection = hospital.list_item
- save_tab: Collection = hospital.data_info
- err_tab: Collection = hospital.crawl_error
- region = MongoClient('127.0.0.1:27017').region
- address_tab: Collection = region.address
- headers = {
- "authority": "www.yixue.com",
- "cache-control": "max-age=0",
- "sec-ch-ua": "^\\^",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "^\\^Windows^^",
- "upgrade-insecure-requests": "1",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
- "sec-fetch-site": "none",
- "sec-fetch-mode": "navigate",
- "sec-fetch-user": "?1",
- "sec-fetch-dest": "document",
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8"
- }
- def html2element(html):
- return fromstring(html)
- def element2html(element):
- return tostring(element, encoding='utf-8').decode('utf-8')
- def unknown_element(element, item):
- page_source = element2html(element)
- data = {
- **item,
- 'page_source': page_source
- }
- err_tab.insert_one(data)
- def query_address(query=None, projection=None):
- results = []
- if query is None:
- query = {}
- if projection is None:
- projection = {'province': 1, 'city': 1, 'district': 1}
- with address_tab.find(query, projection=projection) as cursor:
- for item in cursor:
- del item['_id']
- results.append(item)
- return results
- def remove_suffix(text):
- if text is None:
- return None
- return re.sub('省|自治区|地区', '', text)
- def _query_region(text, items):
- _find_result = []
- for k in ['district', 'city', 'province']:
- for item in items:
- _val = item.get(k)
- if text is not None and _val is not None and text == _val:
- if k == 'province':
- _find_result.append({'province': _val})
- elif k == 'city':
- _find_result.append({'province': item.get('province'), 'city': _val})
- else:
- _find_result.append(item)
- if len(_find_result) > 0:
- return _find_result
- return None
- def query_region(text, address):
- result = re.match('(.*(?:省|自治区|市)){0,1}(.*(?:市|区|县|州|盟)){0,1}', text)
- prov, city = result.groups() # 抽取省 市信息
- prov = remove_suffix(prov)
- city = remove_suffix(city)
- if prov is None and city is None:
- return {}
- '''查询省市信息'''
- item = (_query_region(city, address) or _query_region(prov, address))
- # print(item)
- if item is None:
- return None
- elif len(item) > 1 and prov is not None:
- '''查询结果大于1,通过精准对比市级或者省级名称取出数据'''
- for _item in item:
- if prov == _item.get('city') or prov == _item['province']:
- return _item
- elif len(item) > 1 and prov is None:
- '''查询结果大于1,直接给出一个省市信息'''
- _item = item[0]
- return {'province': _item.get('province'), 'city': _item.get('city')}
- else:
- return item[0]
- res_name = re.compile('(.*(?:院|区|部|所|馆|科|局|诊|病|场|康|站|点|社|字|室|会|瘤|大|矿|腔|堂|岗|合|〗|校|办|)|号|坊|医|房|贵|光|吾|门诊|体检|中心|公司|机构|集团|美容|整形|部队|保健|基地|服务)){0,1}((.*)){0,1}$')
- def hospital_alias(text: str):
- """医院别名"""
- res = res_name.match(text)
- _, _other = res.groups()
- if _other is not None:
- _other = _other[1:-1]
- _other = ",".join(_other.split('、'))
- # print(_other)
- return _other if _other is not None else ''
- def hospital_name(text: str):
- res = res_name.match(text)
- _name, _ = res.groups()
- return _name
- def hospital_main_department(text: str):
- res = re.match(':(.*){0,1}(、){0,1}$', text)
- if res is None:
- return ''
- _department, _ = res.groups()
- if _department is not None:
- # print(_department)
- _departments = _department.split('、')
- _stream = io.StringIO()
- for val in _departments:
- if len(val) == 0:
- continue
- else:
- _stream.write(val + '、')
- _department = _stream.getvalue()
- # print(_department[:-1])
- return _department[:-1] if _department is None else ''
- if __name__ == '__main__':
- # ma = ':特需门诊、银屑病、白癜风科、、痤疮门诊、灰指甲专科'
- # ma = ':、、、、、、、、消化内科、心血管内科、眼科、产科'
- # ma = ':、心脏科、神经外科'
- # hospital_main_department(ma)
- # name = '北京鸿慈童康'
- # name = '上海精神卫生康复医院二部'
- # name = '海湾镇燎原卫生院'
- # name = '张家港时代港口医院有限公司'
- # name = '北京玉之光医疗整形美容国际连锁机构(玉之光(北京)国际医疗美容整形机构)'
- # name = '中国人民解放军第306医院(三零六医院、三0六医院)'
- # name = '上海浦东新区迎博社区卫生服务站'
- # name = '上海徐剑炜整形美容'
- # print(hospital_name(name))
- # print(hospital_alias(name))
- # name = '上海市宝山区医院列表'
- # name = '北京市宣武区医院列表'
- # name = '北京市延庆县医院列表'
- # name = '甘孜藏族自治州医院列表'
- # name = '湖北省神农架林区医院列表'
- # name = '永州市医院列表'
- # name = '德宏傣族景颇族自治州医院列表'
- # name = '云南省丽江地区医院列表' # 1
- # name = '延边朝鲜族自治州医院列表'
- # name = '兴安盟医院列表'
- # name = '新疆维吾尔自治区喀什地区医院列表'
- name = '石河子市医院列表'
- address = query_address()
- result = query_region(name, address)
- print(result)
|