summary.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-04
  4. ---------
  5. @summary: 心跳数据汇总推送到list,目前仅汇总了列表页采集
  6. ---------
  7. @author: dzr
  8. """
  9. from datetime import datetime, time, timedelta
  10. from bson.int64 import Int64
  11. from bson.son import SON
  12. from pymongo import MongoClient
  13. from log import logger
  14. MONGO_HOST = "127.0.0.1"
  15. MONGO_PORT = 27001
  16. MONGO_DB = "py_spider"
  17. # mongo
  18. # MONGO_HOST = "172.17.4.87"
  19. # MONGO_PORT = 27080
  20. # MONGO_DB = "py_spider"
  21. client = MongoClient(MONGO_HOST, MONGO_PORT)
  22. mongodb = client[MONGO_DB]
  23. # 爬虫数据表
  24. data_bak = mongodb["data_bak"]
  25. # 心跳表
  26. spider_heartbeat = mongodb["spider_heartbeat"]
  27. # py_spiders列表
  28. py_spiders_crawl_list = mongodb["crawl_data"]
  29. # 竞品列表
  30. ybw_list = mongodb["ybw_list"]
  31. zbytb_list = mongodb["zbytb_list"]
  32. # 主题爬虫
  33. zgzb_list = mongodb["zgzb_list"]
  34. # 列表页汇总表
  35. # summary_table_of_list_pages = mongodb["list"]
  36. summary_table_of_list_pages = mongodb["123qqq"]
  37. def save(documents, collection):
  38. """保存数据"""
  39. is_list = isinstance(documents, list)
  40. documents = documents if is_list else [documents]
  41. count = 0
  42. data_lst = []
  43. for item in documents:
  44. item.pop("_id", None)
  45. data_lst.append(item)
  46. count += 1
  47. if len(data_lst) % 100 == 0:
  48. collection.insert_many(data_lst)
  49. data_lst.clear()
  50. logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
  51. # 提交剩余数据
  52. if len(data_lst) > 0:
  53. collection.insert_many(data_lst)
  54. logger.info(f"[Summary]{collection.name}-批量保存{count}条数据--已完成")
  55. return count
  56. def summary_data(document, runtime, only_count_list_page=False):
  57. """对聚合的数据进行汇总和分类"""
  58. summary_lst = []
  59. spider_item = document["spider_item"]
  60. for item in spider_item:
  61. spidercode = item["spidercode"]
  62. site = item["site"]
  63. data = {
  64. "business_type": item["business_type"],
  65. "site": site,
  66. "channel": item["channel"],
  67. "spidercode": spidercode,
  68. "count": document["count"],
  69. "rel_count": document["rel_count"],
  70. "runtime": runtime,
  71. "create_at": Int64(datetime.now().timestamp())
  72. }
  73. if len(spider_item) > 1:
  74. logger.warning(f"[Summary]{spidercode} -> {site} --映射关系错误")
  75. data["warning"] = "spidercode业务对应关系错误"
  76. if only_count_list_page:
  77. if str(item["business_type"]).endswith("List"):
  78. summary_lst.append(data)
  79. continue
  80. summary_lst.append(data)
  81. return summary_lst
  82. def feapder_crawl_aggregate_of_list_pages(datestr=None):
  83. """feapder采集列表页数据汇总(前一天的数据)"""
  84. if datestr is None:
  85. today = datetime.now().date()
  86. yesterday = today + timedelta(days=-1)
  87. datestr = yesterday.strftime("%Y-%m-%d")
  88. pipeline = [
  89. {"$match": {"runtime": datestr}},
  90. {
  91. "$group": {
  92. "_id": "$spider_id",
  93. "rel_count": {"$sum": "$rel_count"},
  94. "count": {"$sum": "$count"},
  95. "spider_item": {
  96. "$addToSet": {
  97. "site": "$site",
  98. "channel": "$channel",
  99. "spidercode": "$spidercode",
  100. "business_type": "$business_type"
  101. }
  102. }
  103. }
  104. },
  105. {"$sort": SON([("rel_count", -1)])}
  106. ]
  107. # $group阶段的内存限制为100M,默认情况下,如果stage超过此限制,
  108. # $group将产生错误,但是,要允许处理大型数据集,请将allowDiskUse选项设置为true以启用$group操作以写入临时文件。
  109. cursor = spider_heartbeat.aggregate(pipeline, allowDiskUse=True)
  110. try:
  111. results = []
  112. for doc in cursor:
  113. results.extend(summary_data(doc, datestr, True))
  114. save(results, summary_table_of_list_pages)
  115. finally:
  116. client.close()
  117. logger.info("[Summary]feapder数据汇总结束")
  118. def py_spiders_crawl_aggregate_of_list_pages(datestr=None):
  119. """py_spiders采集列表页数据汇总(前一天的数据)"""
  120. if datestr is not None:
  121. today = datetime.fromisoformat(datestr).date()
  122. else:
  123. today = datetime.now().date()
  124. yesterday = today + timedelta(days=-1)
  125. runtime = yesterday.strftime("%Y-%m-%d")
  126. start_time = int(datetime.combine(yesterday, time()).timestamp())
  127. end_time = int(datetime.combine(today, time()).timestamp())
  128. pipeline = [
  129. {
  130. "$addFields": {
  131. "rel_count": {
  132. "$cond": {
  133. "if": {"$ne": ["$finished", True]},
  134. "then": 1,
  135. "else": 0
  136. }
  137. }
  138. }
  139. },
  140. {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
  141. {
  142. "$group": {
  143. "_id": "$spidercode",
  144. "count": {"$sum": 1}, # 当天采集总数
  145. "rel_count": {"$sum": 1}, # 当天采集总数
  146. # "rel_count": {"$sum": "$rel_count"}, # 当天采集详情总数(仅成功)
  147. "spider_item": {
  148. "$addToSet": {
  149. "site": "$site",
  150. "channel": "$channel",
  151. "spidercode": "$spidercode",
  152. "business_type": "List"
  153. }
  154. }
  155. }
  156. },
  157. {"$sort": SON([("rel_count", -1)])}
  158. ]
  159. cursor = py_spiders_crawl_list.aggregate(pipeline, allowDiskUse=True)
  160. try:
  161. results = []
  162. for doc in cursor:
  163. results.extend(summary_data(doc, runtime))
  164. save(results, summary_table_of_list_pages)
  165. finally:
  166. client.close()
  167. logger.info("[Summary]py_spiders数据汇总结束")
  168. def competing_products_crawl_aggregate(collection, datestr=None):
  169. """竞品采集聚合查询"""
  170. if datestr is not None:
  171. today = datetime.fromisoformat(datestr).date()
  172. else:
  173. today = datetime.now().date()
  174. yesterday = today + timedelta(days=-1)
  175. publish_time = yesterday.strftime("%Y-%m-%d")
  176. table_name = collection.name
  177. pipeline = [
  178. {
  179. "$addFields": {
  180. "rel_count": {
  181. "$cond": {
  182. "if": {"$ne": ["$count", 0]},
  183. "then": 1,
  184. "else": 0
  185. }
  186. }
  187. }
  188. },
  189. {"$match": {"publishtime": publish_time}},
  190. {
  191. "$group": {
  192. "_id": "$channel",
  193. "count": {"$sum": 1}, # 当天采集总数
  194. "rel_count": {"$sum": "$rel_count"}, # es检索结果为0的总数
  195. "spider_item": {
  196. "$addToSet": {
  197. "site": "$site",
  198. "channel": "$channel",
  199. "spidercode": "$spidercode",
  200. "business_type": "List"
  201. }
  202. }
  203. }
  204. },
  205. ]
  206. cursor = collection.aggregate(pipeline, allowDiskUse=True)
  207. try:
  208. results = []
  209. for doc in cursor:
  210. results.extend(summary_data(doc, publish_time))
  211. save(results, summary_table_of_list_pages)
  212. finally:
  213. client.close()
  214. logger.info(f"[Summary]{table_name}数据汇总结束")
  215. def competing_products_crawl_aggregate_of_list_pages(datestr=None):
  216. """竞品采集列表页数据汇总"""
  217. competing_products_crawl_aggregate(ybw_list, datestr)
  218. competing_products_crawl_aggregate(zbytb_list, datestr)
  219. def zgzb_crawl_aggregate_of_list_pages(datestr=None):
  220. if datestr is not None:
  221. today = datetime.fromisoformat(datestr).date()
  222. else:
  223. today = datetime.now().date()
  224. yesterday = today + timedelta(days=-1)
  225. runtime = yesterday.strftime("%Y-%m-%d")
  226. start_time = int(datetime.combine(yesterday, time()).timestamp())
  227. end_time = int(datetime.combine(today, time()).timestamp())
  228. pipeline = [
  229. {"$match": {"comeintime": {"$gte": start_time, "$lt": end_time}}},
  230. {
  231. "$group": {
  232. "_id": "$spidercode",
  233. "count": {"$sum": 1}, # 当天采集总数
  234. "rel_count": {"$sum": 1}, # 当天采集总数
  235. "spider_item": {
  236. "$addToSet": {
  237. "site": "$site",
  238. "channel": "$channel",
  239. "spidercode": "$spidercode",
  240. "business_type": "List"
  241. }
  242. }
  243. }
  244. },
  245. ]
  246. cursor = zgzb_list.aggregate(pipeline, allowDiskUse=True)
  247. try:
  248. results = []
  249. for doc in cursor:
  250. results.extend(summary_data(doc, runtime))
  251. save(results, summary_table_of_list_pages)
  252. finally:
  253. client.close()
  254. logger.info("[Summary]zgzb_list数据汇总结束")
  255. if __name__ == '__main__':
  256. feapder_crawl_aggregate_of_list_pages()
  257. py_spiders_crawl_aggregate_of_list_pages()
  258. competing_products_crawl_aggregate_of_list_pages()
  259. zgzb_crawl_aggregate_of_list_pages()