采集详情页.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-10-10
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Dzr
  8. """
  9. import json
  10. import time
  11. import bson
  12. from loguru import logger
  13. from pymongo import MongoClient
  14. from pymongo.operations import UpdateOne
  15. import net
  16. from clean_html import cleaner, drop_tree_by_lxml
  17. from pathlib import Path
  18. from login import auto_login, account_pool
  19. Int64 = bson.int64.Int64
  20. def setup_cfg(username):
  21. file = (Path(__file__).parent / f'account/{username}.json').absolute()
  22. with open(file, encoding='utf-8') as rp:
  23. json_data = json.load(rp)
  24. net.set_cookies(ck=json_data['cookies'])
  25. net.set_headers(h=json_data['headers'])
  26. net.set_proxies(p=json_data['proxies'])
  27. def bulk_update(collection, id_lst, update):
  28. """
  29. 批量更新任务状态
  30. :param pymongo.collection.Collection collection:
  31. :param id_lst:
  32. :param dict update:更新条件
  33. :return:
  34. """
  35. count = 0
  36. update_lst = []
  37. for id_ in id_lst:
  38. update['updatetime'] = Int64(int(time.time())) # 更新任务时间
  39. update_lst.append(UpdateOne({'_id': id_}, {'$set': update}))
  40. if len(update_lst) == 50:
  41. results = collection.bulk_write(update_lst, ordered=False)
  42. count += results.modified_count
  43. update_lst = []
  44. if len(update_lst) > 0:
  45. results = collection.bulk_write(update_lst, ordered=False)
  46. count += results.modified_count
  47. return count
  48. def finalize(insert_lst, update_dict, data_coll, lst_coll):
  49. if len(insert_lst) > 0:
  50. data_coll.insert_many(insert_lst, ordered=False)
  51. success_ids = update_dict['success']
  52. if bulk_update(lst_coll, success_ids, {'isdownload': True}):
  53. logger.info(f'批量更新[采集成功{len(success_ids)}条]任务状态')
  54. failed_ids = update_dict['failed']
  55. if bulk_update(lst_coll, failed_ids, {'isdownload': True, 'isfailed': True}):
  56. logger.info(f'批量更新[采集失败{len(failed_ids)}条]任务状态')
  57. def spider(username, password, task_lst, data_coll, lst_coll):
  58. setup_cfg(username)
  59. update_dict = {'success': [], 'failed': []}
  60. insert_lst = []
  61. def handle_task(task, ret):
  62. if len(ret) == 0:
  63. update_dict['failed'].append(task['_id'])
  64. logger.error(f'下载失败|{href}')
  65. else:
  66. html = drop_tree_by_lxml(ret['content'], '//*[contains(text(), "企业信息")]')
  67. insert_lst.append({
  68. 'site': task['site'],
  69. 'channel': task['channel'],
  70. 'spidercode': task['spidercode'],
  71. 'area': task['area'],
  72. 'city': task['city'],
  73. 'district': task['district'],
  74. 'href': '#',
  75. 'competehref': href,
  76. 'title': task['title'],
  77. 's_title': task['title'],
  78. 'contenthtml': html,
  79. 'detail': cleaner(html),
  80. 'publishtime': task['publishtime'],
  81. 'l_np_publishtime': task['l_np_publishtime'],
  82. 'comeintime': Int64(int(time.time())),
  83. 'T': 'bidding',
  84. 'infoformat': 1,
  85. 'sendflag': 'false',
  86. 'repeat': 'true',
  87. 'iscompete': True,
  88. '_d': 'comeintime',
  89. 'publishdept': '',
  90. 'type': '',
  91. 'is_mixed': True
  92. })
  93. update_dict['success'].append(task['_id'])
  94. for task in task_lst:
  95. href = task['href']
  96. ret = net.download_json(href, referer=False)
  97. if isinstance(ret, int) and ret == 429:
  98. auto_login(username, password, proxy=True, headless=False, auto_quit=True, accident_url=href)
  99. setup_cfg(username)
  100. ret = net.download_json(href, referer=False)
  101. if input('退出:0 继续:1\n') == '0':
  102. finalize(insert_lst, update_dict, data_coll, lst_coll)
  103. return False
  104. if ret is False:
  105. logger.error(f'账号失效|{username}')
  106. finalize(insert_lst, update_dict, data_coll, lst_coll)
  107. return False
  108. handle_task(task, ret)
  109. if len(insert_lst) == 50:
  110. data_coll.insert_many(insert_lst, ordered=False)
  111. insert_lst = []
  112. time.sleep(.5)
  113. finalize(insert_lst, update_dict, data_coll, lst_coll)
  114. return True
  115. def main():
  116. logger.info('**** 数据采集开始 ****')
  117. client = MongoClient('192.168.3.182', 27017)
  118. data_coll = client['zjb_poc']['jy_data_bak']
  119. lst_coll = client['zjb_poc']['jy_data_lst']
  120. try:
  121. while True:
  122. if len(account_pool) == 0:
  123. logger.warning('账号数量已不足,请及时补充')
  124. break
  125. # q = {'isdownload': False, 'isuse': {'$in': [4]}}
  126. # q = {'isdownload': False, 'isuse': {'$in': [2, 3]}}
  127. q = {'isdownload': False, 'is_use': 0}
  128. with lst_coll.find(q, limit=100) as cursor:
  129. task_lst = [item for item in cursor]
  130. username, password = account_pool.pop(0)
  131. auto_login(username, password, proxy=True, headless=True, auto_quit=True)
  132. ret = spider(username, password, task_lst, data_coll, lst_coll)
  133. if ret is False:
  134. logger.info('切换账号')
  135. continue
  136. if not lst_coll.count_documents(q):
  137. break
  138. except KeyboardInterrupt:
  139. pass
  140. finally:
  141. logger.info('**** 数据采集结束 ****')
  142. if __name__ == '__main__':
  143. main()