summary.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-04
  4. ---------
  5. @summary: 心跳数据汇总推送到list,目前仅汇总了列表页采集
  6. ---------
  7. @author: dzr
  8. """
  9. from datetime import datetime, time, timedelta
  10. from bson.int64 import Int64
  11. from bson.son import SON
  12. from pymongo import MongoClient
  13. from log import logger
  14. # mongo
  15. MONGO_HOST = "172.17.4.87"
  16. MONGO_PORT = 27080
  17. MONGO_DB = "py_spider"
  18. client = MongoClient(MONGO_HOST, MONGO_PORT)
  19. mongodb = client[MONGO_DB]
  20. # 数据生产表
  21. data_bak = mongodb["data_bak"]
  22. # 心跳表
  23. spider_heartbeat = mongodb["pyspider_heartbeat"]
  24. # 采集任务列表
  25. ybw_list = mongodb["ybw_list"]
  26. zbytb_list = mongodb["zbytb_list"]
  27. # 主题爬虫采集任务表
  28. zgzb_list = mongodb["zgzb_list"]
  29. # 列表页汇总表
  30. summary_table = mongodb["list"]
  31. def save(documents, collection, ordered=False):
  32. """保存数据"""
  33. is_list = isinstance(documents, list)
  34. documents = documents if is_list else [documents]
  35. data_lst = []
  36. for item in documents:
  37. item.pop("_id", None)
  38. data_lst.append(item)
  39. if len(data_lst) == 100:
  40. ret = collection.insert_many(data_lst, ordered)
  41. logger.info(f"MongoDB {collection.name} 保存 {len(ret.inserted_ids)} 条数据")
  42. data_lst = []
  43. if data_lst:
  44. collection.insert_many(data_lst, ordered)
  45. logger.info(f"MongoDB {collection.name} 保存 {len(documents)} 条数据")
  46. return len(documents)
  47. def pick_data(items, runtime, only_count_list_page=False):
  48. """聚合的数据进行分类"""
  49. results = []
  50. spidercode = items["spidercode"]
  51. site = items["site"]
  52. data = {
  53. "business_type": items["business_type"],
  54. "site": site,
  55. "channel": items["channel"],
  56. "spidercode": spidercode,
  57. "count": items["count"],
  58. "rel_count": items["rel_count"],
  59. "runtime": runtime,
  60. "create_at": Int64(datetime.now().timestamp())
  61. }
  62. if only_count_list_page:
  63. if str(items["business_type"]).endswith("List"):
  64. results.append(data)
  65. else:
  66. results.append(data)
  67. return results
  68. def feapder_crawl_aggregate_of_list_pages(datestr=None):
  69. """feapder采集列表页数据汇总(前一天的数据)"""
  70. if datestr is None:
  71. today = datetime.now().date()
  72. yesterday = today + timedelta(days=-1)
  73. datestr = yesterday.strftime("%Y-%m-%d")
  74. count = 0
  75. pipeline = [
  76. {"$match": {"runtime": datestr}},
  77. {
  78. "$group": {
  79. "_id": "$batch_no",
  80. "rel_count": {"$sum": "$rel_count"},
  81. "count": {"$sum": "$count"},
  82. "site": {"$first": "$site"},
  83. "channel": {"$first": "$channel"},
  84. "spidercode": {"$first": "$spidercode"},
  85. "business_type": {"$first": "$business_type"},
  86. }
  87. },
  88. {"$sort": SON([("rel_count", -1)])}
  89. ]
  90. # $group 阶段的内存限制为100M,默认情况下,如果stage超过此限制,$group 将产生错误,
  91. # 若要允许处理大型数据集,请将 allowDiskUse 选项设置为 true 以启用 $group
  92. # 操作以写入临时文件。
  93. cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
  94. try:
  95. results = []
  96. for doc in cursor:
  97. results.extend(pick_data(doc, datestr, True))
  98. count = save(results, summary_table)
  99. finally:
  100. client.close()
  101. logger.info(f"feapder - 数据汇总 {count} 条")
  102. def competing_products_crawl_aggregate(collection, datestr=None):
  103. """竞品采集聚合查询"""
  104. if datestr is not None:
  105. today = datetime.fromisoformat(datestr).date()
  106. else:
  107. today = datetime.now().date()
  108. yesterday = today + timedelta(days=-1)
  109. count = 0
  110. publish_time = yesterday.strftime("%Y-%m-%d")
  111. pipeline = [
  112. {
  113. "$addFields": {
  114. "rel_count": {
  115. "$cond": {
  116. "if": {"$ne": ["$count", 0]},
  117. "then": 1,
  118. "else": 0
  119. }
  120. }
  121. }
  122. },
  123. {"$match": {"publishtime": publish_time}},
  124. {
  125. "$group": {
  126. "_id": "$channel",
  127. "count": {"$sum": 1}, # 当天采集总数
  128. "rel_count": {"$sum": "$rel_count"}, # es检索结果为0的总数
  129. "site": {"$first": "$site"},
  130. "channel": {"$first": "$channel"},
  131. "spidercode": {"$first": "$spidercode"},
  132. "business_type": {
  133. "$first": {
  134. "$cond": {
  135. "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
  136. "then": "List",
  137. "else": "$business_type"
  138. }
  139. }
  140. },
  141. }
  142. },
  143. ]
  144. cursor = collection.aggregate(pipeline, allowDiskUse=True)
  145. try:
  146. results = []
  147. for doc in cursor:
  148. results.extend(pick_data(doc, publish_time))
  149. count = save(results, summary_table)
  150. finally:
  151. client.close()
  152. return count
  153. def competing_products_crawl_aggregate_of_list_pages(datestr=None):
  154. """竞品采集列表页数据汇总"""
  155. count = 0
  156. count += competing_products_crawl_aggregate(ybw_list, datestr)
  157. count += competing_products_crawl_aggregate(zbytb_list, datestr)
  158. logger.info(f"竞品采集 - 数据汇总 {count} 条")
  159. def zgzb_crawl_aggregate_of_list_pages(datestr=None):
  160. if datestr is not None:
  161. today = datetime.fromisoformat(datestr).date()
  162. else:
  163. today = datetime.now().date()
  164. yesterday = today + timedelta(days=-1)
  165. runtime = yesterday.strftime("%Y-%m-%d")
  166. start_time = int(datetime.combine(yesterday, time()).timestamp())
  167. end_time = int(datetime.combine(today, time()).timestamp())
  168. count = 0
  169. pipeline = [
  170. {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
  171. {
  172. "$group": {
  173. "_id": "$spidercode",
  174. "count": {"$sum": 1}, # 当天采集总数
  175. "rel_count": {"$sum": 1}, # 当天采集总数
  176. "site": {"$first": "$site"},
  177. "channel": {"$first": "$channel"},
  178. "spidercode": {"$first": "$spidercode"},
  179. "business_type": {
  180. "$first": {
  181. "$cond": {
  182. "if": {"$eq": [{"$type": "$business_type"}, "missing"]},
  183. "then": "List",
  184. "else": "$business_type"
  185. }
  186. }
  187. },
  188. }
  189. },
  190. ]
  191. cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True)
  192. try:
  193. results = []
  194. for doc in cursor:
  195. results.extend(pick_data(doc, runtime))
  196. count = save(results, summary_table)
  197. finally:
  198. client.close()
  199. logger.info(f"中国招标投标公共服务平台 - 数据汇总 {count} 条")
  200. def start_summary():
  201. feapder_crawl_aggregate_of_list_pages()
  202. competing_products_crawl_aggregate_of_list_pages()
  203. zgzb_crawl_aggregate_of_list_pages()
  204. logger.info("数据汇总完成")
  205. if __name__ == '__main__':
  206. start_summary()