fetch_detail.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-11-11
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. from concurrent.futures import ThreadPoolExecutor
  11. from bson import Int64
  12. from pymongo import MongoClient
  13. from pymongo.operations import UpdateOne
  14. import net
  15. import setting
  16. from log import logger
  17. def spider(task):
  18. _id = task.pop('_id')
  19. url = task['href']
  20. ret = net.download_detail(url, proxies=net.get_proxy())
  21. if ret is None:
  22. logger.error(f'详情数据|下载失败|{url}')
  23. return _id, None
  24. logger.info(f'详情数据|下载成功|{url}')
  25. data = {
  26. 'site': task['site'],
  27. 'channel': task['channel'],
  28. 'spidercode': task['spidercode'],
  29. 'area': task['area'],
  30. 'city': task['city'],
  31. 'district': task['district'],
  32. 'href': url,
  33. 'title': task['title'],
  34. 's_title': task['title'],
  35. 'contenthtml': ret['contenthtml'],
  36. 'detail': ret['detail'],
  37. 'publishtime': task['publishtime'],
  38. 'l_np_publishtime': Int64(task['l_np_publishtime']),
  39. 'comeintime': Int64(int(time.time())),
  40. 'T': task['T'],
  41. 'infoformat': task['infoformat'],
  42. 'sendflag': task['sendflag'],
  43. 'iscompete': task['iscompete'],
  44. '_d': task['_d'],
  45. 'publishdept': task['publishdept'],
  46. 'type': task['type'],
  47. 'is_mixed': task['is_mixed'],
  48. }
  49. return _id, data
  50. def main():
  51. while True:
  52. client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT)
  53. to_lst_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL]
  54. to_data_coll = client[setting.MONGO_DB][setting.MONGO_DATA_COLL]
  55. data_count = 0
  56. fail_count = 0
  57. updates = []
  58. inserts = []
  59. q = {'isdownload': None}
  60. with to_lst_coll.find(q, limit=100) as cursor:
  61. with ThreadPoolExecutor(max_workers=4) as executor:
  62. fs = executor.map(spider, cursor)
  63. for f in fs:
  64. _id, result = f
  65. condition = {'_id': _id}
  66. if result is None:
  67. item = {'isdownload': 1, 'isfailed': 1}
  68. fail_count += 1
  69. else:
  70. item = {'isdownload': 1, 'isfailed': 0}
  71. inserts.append(result)
  72. data_count += 1
  73. updates.append(UpdateOne(condition, {'$set': item}))
  74. if len(inserts) == 10:
  75. to_data_coll.insert_many(inserts, ordered=False)
  76. logger.info(f'详情数据|数据下载|成功{len(inserts)}条')
  77. inserts = []
  78. if len(updates) == 10:
  79. to_lst_coll.bulk_write(updates, ordered=False)
  80. logger.info(f'详情数据|更新状态|完成{len(updates)}条')
  81. updates = []
  82. if len(inserts) > 0:
  83. to_data_coll.insert_many(inserts, ordered=False)
  84. logger.info(f'详情数据|数据下载|成功{len(inserts)}条')
  85. if len(updates) > 0:
  86. to_lst_coll.bulk_write(updates, ordered=False)
  87. logger.info(f'详情数据|更新状态|完成{len(updates)}条')
  88. logger.info(f'详情数据|数据下载|10s后执行...')
  89. time.sleep(10)
  90. if __name__ == '__main__':
  91. try:
  92. main()
  93. except KeyboardInterrupt:
  94. pass
  95. except Exception as e:
  96. net.send_wechat_warning('详情采集被中止')
  97. logger.exception(e)