main.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-10-17
  4. ---------
  5. @summary:
  6. ---------
  7. """
  8. import json
  9. import math
  10. import random
  11. import time
  12. from datetime import datetime, timedelta
  13. from functools import partial
  14. from pathlib import Path
  15. import bson
  16. import requests
  17. from pybloom_live import BloomFilter
  18. from pymongo import MongoClient
  19. from pymongo.operations import UpdateOne
  20. from rgg import net
  21. from rgg.account import account_pool
  22. from rgg.clean_html import cleaner
  23. from rgg.log import logger
  24. from rgg.net import send_wechat_warning
  25. Int64 = bson.Int64
  26. def get_random(min_value=None, max_value=None):
  27. # 定义目标范围
  28. min_value = min_value or 60
  29. max_value = max_value or 600
  30. # 计算对数值的范围
  31. log_min = math.log(min_value, 10)
  32. log_max = math.log(max_value, 10)
  33. # 生成 log_min 和 log_max 之间的随机数
  34. random_log = random.uniform(log_min, log_max)
  35. # 将随机对数值转换回原始范围内的数
  36. result = 10 ** random_log
  37. # print(f"生成的随机数 ({min_value}-{max_value}): {result:.2f}")
  38. return float(f'{result:.1f}')
  39. def delay(a, b, msg=None):
  40. secs = get_random(a, b)
  41. logger.info(f'{msg if msg else "采集"}延时{secs}秒')
  42. time.sleep(secs)
  43. def bulk_update_task_status(collection, id_lst, update):
  44. """
  45. 批量更新任务状态
  46. :param pymongo.collection.Collection collection:
  47. :param id_lst:
  48. :param dict update:更新条件
  49. :return:
  50. """
  51. count = 0
  52. update_lst = []
  53. for id_ in id_lst:
  54. update['updatetime'] = Int64(int(time.time())) # 更新任务时间
  55. update_lst.append(UpdateOne({'_id': id_}, {'$set': update}))
  56. if len(update_lst) == 50:
  57. results = collection.bulk_write(update_lst, ordered=False)
  58. update_lst = []
  59. count += results.modified_count
  60. if len(update_lst) > 0:
  61. results = collection.bulk_write(update_lst, ordered=False)
  62. count += results.modified_count
  63. return count
  64. def setup_login_info(username):
  65. logger.debug('加载登录身份信息...')
  66. file = (Path(__file__).parent / f'rgg/account/{username}.json').absolute()
  67. with open(file, encoding='utf-8') as rp:
  68. json_data = json.load(rp)
  69. net.set_cookies(ck=json_data['cookies'])
  70. net.set_headers(h=json_data['headers'])
  71. net.set_proxies(p=json_data['proxies'])
  72. def launch_filter():
  73. """创建布隆过滤器"""
  74. logger.debug('创建布隆过滤器...')
  75. backup = (Path(__file__).parent / 'rgg/backup')
  76. if not backup.exists():
  77. backup.mkdir(exist_ok=True)
  78. file = (backup / 'bloomfilter.f')
  79. if not file.exists():
  80. file.touch() # 初始创建存储文件
  81. bf = BloomFilter(capacity=1000000, error_rate=0.001) # 创建布隆过滤器,预计插入100万个元素,错误率0.1%
  82. else:
  83. if file.stat().st_size == 0:
  84. bf = BloomFilter(capacity=1000000, error_rate=0.001)
  85. else:
  86. bf = BloomFilter.fromfile(file.open('rb'))
  87. return file, bf
  88. def fetch_list(coll, bf, keyword, page, page_size, begin_time=None, end_time=None):
  89. count = 0
  90. dedup_count = 0
  91. href_lst = []
  92. params = dict(begin_time=begin_time, end_time=end_time)
  93. lst = net.download_list(keyword, page, page_size, **params)
  94. if lst is None:
  95. return False, lst, count, dedup_count
  96. lst_data = []
  97. for item in lst:
  98. href = item.get('url')
  99. if href is None:
  100. # print(f'问题数据|{item}')
  101. continue
  102. if href in bf:
  103. logger.debug(f'重复数据|{href}')
  104. dedup_count += 1
  105. continue
  106. title = item['popTitle'] if 'popTitle' in item else item['showTitle']
  107. publishtime = item['updateTime']
  108. l_np_publishtime = datetime.strptime(publishtime, '%Y-%m-%d').timestamp()
  109. addr = str(item['areaName']).split('-')
  110. area = addr[0] if len(addr) > 0 else ''
  111. city = addr[1] if len(addr) > 1 else ''
  112. if '国土' in item.get('progName', ''):
  113. channel = item['progName']
  114. else:
  115. channel = (item['noticeSegmentTypeName'] or item['progName'])
  116. data = {
  117. 'site': '千里马',
  118. 'channel': channel,
  119. 'spidercode': 'sdxzbiddingsjzypc', # 人工补录专用
  120. 'area': area,
  121. 'city': city,
  122. 'district': '',
  123. 'href': href,
  124. 'title': title,
  125. 'publishtime': publishtime,
  126. 'l_np_publishtime': Int64(l_np_publishtime),
  127. 'comeintime': Int64(int(time.time())),
  128. 'isdownload': False, # 是否下载
  129. 'isfailed': False, # 是否失败
  130. 'keywords': ",".join(item['hiLightKey']), # 查询关键词
  131. }
  132. lst_data.append(data)
  133. if len(lst_data) == 50:
  134. coll.insert_many(lst_data, ordered=False)
  135. count += len(lst_data)
  136. lst_data = []
  137. href_lst.append(href)
  138. if len(lst_data) > 0:
  139. coll.insert_many(lst_data, ordered=False)
  140. count += len(lst_data)
  141. # 添加过滤器
  142. for href in href_lst:
  143. bf.add(href)
  144. return True, lst, count, dedup_count
  145. def auto_page_turning(coll, bf, keywords, page_size=40, **kwargs):
  146. page = 1
  147. page_size = 100 if page_size > 100 else page_size
  148. while True:
  149. ret, *args = fetch_list(coll, bf, keywords, page, page_size, **kwargs)
  150. if ret is False:
  151. return False
  152. ret_lst, count, dedup_count = args
  153. logger.info(f'自动翻页|第{page}页|{keywords}|入库{count}条|重复{dedup_count}条')
  154. if len(ret_lst) < page_size:
  155. break
  156. else:
  157. page += 1
  158. return True
  159. def get_tasks(collection, max_limit=100):
  160. now = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  161. start_ts = int(now.timestamp())
  162. second_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  163. end_ts = int(second_day.timestamp())
  164. q = {'comeintime': {'$gte': start_ts, '$lt': end_ts}}
  165. with collection.find(q) as cursor:
  166. rets = [item for item in cursor]
  167. rets = rets[:max_limit] if len(rets) > max_limit else rets # 限制每天采集最大条数
  168. rets = list(filter(lambda x: x['isdownload'] is False, rets)) # 仅保留未下载数据
  169. return rets
  170. def fetch_detail(collection, tasks, username):
  171. results = {'success': [], 'failed': []}
  172. insert_lst = []
  173. for task in tasks:
  174. href = task['href']
  175. ret = net.download_json(href, referer=False)
  176. if ret is False:
  177. logger.error(f'账号失效|{username}')
  178. return False, results
  179. if not ret:
  180. results['failed'].append(task['_id'])
  181. else:
  182. data = {
  183. 'site': task['site'],
  184. 'channel': task['channel'],
  185. 'spidercode': task['spidercode'],
  186. 'area': task['area'],
  187. 'city': task['city'],
  188. 'district': task['district'],
  189. 'href': '#',
  190. 'competehref': href,
  191. 'title': task['title'],
  192. 's_title': task['title'],
  193. 'contenthtml': ret['content'],
  194. 'detail': cleaner(ret['content']),
  195. 'publishtime': task['publishtime'],
  196. 'l_np_publishtime': task['l_np_publishtime'],
  197. 'comeintime': Int64(int(time.time())),
  198. 'T': 'bidding',
  199. 'infoformat': 1,
  200. 'sendflag': 'false',
  201. 'iscompete': True,
  202. '_d': 'comeintime',
  203. 'publishdept': '',
  204. 'type': '',
  205. 'is_mixed': True
  206. }
  207. insert_lst.append(data)
  208. results['success'].append(task['_id'])
  209. if len(insert_lst) == 50:
  210. collection.insert_many(insert_lst, ordered=False)
  211. insert_lst = []
  212. # delay(45, 540)
  213. time.sleep(.5)
  214. if len(insert_lst) > 0:
  215. collection.insert_many(insert_lst, ordered=False)
  216. return True, results
  217. def download(lst_coll, detail_coll, bf, keyword, username, **kwargs):
  218. # partial 绑定方法
  219. bulk_update = partial(bulk_update_task_status, lst_coll)
  220. '''数据采集'''
  221. ret = auto_page_turning(lst_coll, bf, keyword, **kwargs) # 列表页下载
  222. if ret is False:
  223. return False
  224. tasks = get_tasks(lst_coll) # 领取采集任务
  225. state, detail_ret = fetch_detail(detail_coll, tasks, username) # 详情页下载
  226. # 批量更新任务状态
  227. success_ids = detail_ret['success']
  228. if bulk_update(success_ids, {'isdownload': True}):
  229. logger.info(f'批量更新|任务状态|成功|数量{len(success_ids)}条')
  230. # 批量更新失败任务
  231. failed_ids = detail_ret['failed']
  232. if bulk_update(failed_ids, {'isdownload': True, 'isfailed': True}):
  233. logger.info(f'批量更新|任务状态|失败|数量{len(failed_ids)}条]')
  234. if state is False:
  235. return False
  236. def spider(username, keyword, begin_time=None, end_time=None, **kwargs):
  237. logger.info('+++ 开始采集 +++')
  238. if 'begin_time' not in kwargs:
  239. kwargs['begin_time'] = begin_time
  240. if 'end_time' not in kwargs:
  241. kwargs['end_time'] = end_time
  242. # 创建MongoDB连接
  243. # to_mgo = MongoClient('192.168.3.182', 27080)
  244. to_mgo = MongoClient('172.17.4.87', 27080)
  245. lst_coll = to_mgo['py_spider']['zyjc_qlm_list']
  246. detail_coll = to_mgo['py_spider']['data_bak']
  247. # 创建布隆过滤器
  248. f, bf = launch_filter()
  249. try:
  250. ret = download(lst_coll, detail_coll, bf, keyword, username, **kwargs)
  251. if ret is False:
  252. # TODO 或许该退出采集,防止账号被封禁
  253. return False
  254. except requests.exceptions.ConnectionError:
  255. send_wechat_warning(f'浙移集成|访问失败|代理访问超时|{net.get_proxies("https")}')
  256. return
  257. except (Exception, BaseException) as e:
  258. logger.exception(f'爬虫异常,原因:{e}')
  259. return False
  260. finally:
  261. logger.debug('保存过滤器')
  262. bf.tofile(f.open('wb')) # 保存布隆过滤器到本地
  263. logger.info('+++ 采集结束 +++')
  264. def is_workhour(start, end):
  265. """
  266. 是工作时间?
  267. :param int start: 开始时间
  268. :param int end: 结束时间
  269. :return:
  270. """
  271. if end < start:
  272. raise ValueError('end must be greater than start')
  273. if start <= datetime.now().hour <= end:
  274. return True
  275. return False
  276. def main():
  277. username, _ = account_pool.pop(0)
  278. setup_login_info(username)
  279. keyword = '甄选公告'
  280. is_weekday = True
  281. skip = False
  282. try:
  283. while True:
  284. if skip:
  285. net.send_wechat_warning('浙移集成|数据采集|异常停止')
  286. else:
  287. now = datetime.now()
  288. try:
  289. # 每周日17点 执行一次
  290. if now.weekday() == 6 and is_workhour(17, 17):
  291. if not is_weekday:
  292. is_weekday = True
  293. sat = (now - timedelta(1)).strftime('%Y-%m-%d')
  294. sun = now.strftime('%Y-%m-%d')
  295. ret = spider(
  296. username=username,
  297. begin_time=sat,
  298. end_time=sun,
  299. keyword=keyword,
  300. page_size=40,
  301. )
  302. if ret is False:
  303. skip = True
  304. # 工作日9点 - 18点
  305. if 0 <= now.weekday() <= 4 and is_workhour(9, 18):
  306. is_weekday = False # 重置周末采集标识
  307. start = now.strftime('%Y-%m-%d')
  308. ret = spider(
  309. username=username,
  310. begin_time=start,
  311. end_time=start,
  312. keyword=keyword,
  313. page_size=40,
  314. )
  315. if ret is False:
  316. skip = True # 账号异常
  317. except Exception as e:
  318. logger.exception(e)
  319. delay(4200, 7200, "[浙移集成]下轮采集") # 间隔 70分钟 - 120分钟执行1次
  320. except KeyboardInterrupt:
  321. pass
  322. finally:
  323. logger.info('任务结束')
  324. if __name__ == '__main__':
  325. main()