# -*- coding: utf-8 -*- """ Created on 2023-02-21 --------- @summary: 数据上传(redis到mongo),采集数据同步服务 --------- @author: dzr """ import ast import re import time from concurrent.futures import ThreadPoolExecutor, wait from typing import Dict import redis from bson import int64 from elasticsearch import Elasticsearch from func_timeout import func_set_timeout from func_timeout.exceptions import FunctionTimedOut from pymongo import MongoClient from redis._compat import unicode, long, basestring from redis.connection import Encoder as RedisEncoder from redis.exceptions import DataError from log import logger class Encoder(RedisEncoder): def encode(self, value): "Return a bytestring or bytes-like representation of the value" if isinstance(value, (bytes, memoryview)): return value # elif isinstance(value, bool): # # special case bool since it is a subclass of int # raise DataError( # "Invalid input of type: 'bool'. Convert to a " # "bytes, string, int or float first." # ) elif isinstance(value, float): value = repr(value).encode() elif isinstance(value, (int, long)): # python 2 repr() on longs is '123L', so use str() instead value = str(value).encode() elif isinstance(value, (list, dict, tuple)): value = unicode(value) elif not isinstance(value, basestring): # a value we don't know how to deal with. throw an error typename = type(value).__name__ raise DataError( "Invalid input of type: '%s'. Convert to a " "bytes, string, int or float first." % typename ) if isinstance(value, unicode): value = value.encode(self.encoding, self.encoding_errors) return value # redis redis.connection.Encoder = Encoder REDIS_HOST = "172.17.4.232" REDIS_PORT = 7361 REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ" REDIS_DB = 10 pool = redis.ConnectionPool( host=REDIS_HOST, port=REDIS_PORT, password=REDISDB_USER_PASS, db=REDIS_DB ) rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True) redis_prefix = "savemongo" # mongo MONGO_HOST = "172.17.4.87" MONGO_PORT = 27080 MONGO_DB = "py_spider" mcli = MongoClient(MONGO_HOST, MONGO_PORT) mongodb = mcli[MONGO_DB] # es ES_HOST = "172.17.145.178" ES_PORT = 9200 ES_INDEX = "biddingall" try: ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}]) except ConnectionRefusedError as e: logger.error(f"es服务拒绝访问,原因:{e}") ecli = None def err_msg(worker): err = worker.exception() if err: logger.exception("[Send]worker err: {}".format(err)) return worker def literal_eval(node_or_string): """反序列化数据""" try: return ast.literal_eval(node_or_string) except ValueError as e: if "malformed node or string" in e.args[0]: from bson import Code, ObjectId # eval变量作用域,ObjectId参数 import datetime return eval(node_or_string) else: raise e def date2ts(date_str): """日期转时间戳""" if ":" in date_str: ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S"))) else: ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d"))) return ts def es_query(title, publish_time): """ 查询es :param title: 标题 :param publish_time: 发布时间 :return: """ if not ecli: return 0 # 如果es检索服务异常,保证数据正常推送 publish_time = date2ts(publish_time) stime = publish_time - 432000 # 往前推5天 etime = publish_time + 432000 # 通过发布标题和发布时间范围查询 query = { "query": { "bool": { "must": [ { "multi_match": { "query": title, "type": "phrase", "fields": ["title"] } }, {"range": {'publishtime': {"from": stime, "to": etime}}} ] } } } result = ecli.search(body=query, index=ES_INDEX, request_timeout=100) total = int(result["hits"]["total"]['value']) return total def rpush(name, values, is_redis_cluster=False): """“将“values”推到列表“name”的尾部”""" if isinstance(values, list): pipe = rcli.pipeline() if not is_redis_cluster: pipe.multi() for value in values: pipe.rpush(name, value) pipe.execute() else: return rcli.rpush(name, values) def handle_big_document(item): if "contenthtml" in item: item["contenthtml"] = re.sub("]*>", "
", item["contenthtml"]) def insert_one(table, item: Dict): """MongoDB 单条入库""" table = "".join(table.split(f"{redis_prefix}:")) if item is not None: item.pop("_id", "") if item.get("comeintime"): item["comeintime"] = int64.Int64(item["comeintime"]) try: title = item.get("title") result = mongodb[table].insert_one(item) logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功") except Exception as e: if "BSON document too large" in ''.join(e.args): handle_big_document(item) # MongoDB文档保存要求 BSON 大小限制 16 MB # rpush(get_redis_key(table), item) rpush(table, item) logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}") def delay_push_to_db(table_name, data, delay_time=43200): """ 第三方数据,需延时入库,推送爬虫生产库 @param table_name: 表名 @param data: 延时的数据 @param delay_time: 延时时长,单位:秒 @return: """ site = data.get("item").get("site") title = data.get("item").get("title") time_diff = int(time.time()) - data.get("comeintime") if time_diff <= delay_time: rpush(table_name, data) logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送") else: logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库") insert_one(table_name, data) return True def es_retrieval_push_to_db(table_name, data): """ 通过es(近3月增量数据)进行数据去重,推送爬虫生产库 @param table_name: 表名 @param data: 判重数据 @return: """ site = data.get("item").get("site") title = data.get("item").get("title") pt = data.get("item").get("publishtime") if not title or not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理 return False count = es_query(title.strip(), pt) if count == 0: insert_one(table_name, data) logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索") return True def mixture_process_push_to_db(table_name, data, delay_time=43200): """ 延时 + es检索 混合检索数据,推送爬虫生产库 @param table_name: 表名 @param data: 判重数据 @param delay_time: 延时时长,单位:秒 @return: """ site = data.get("item").get("site") title = data.get("item").get("title") pt = data.get("item").get("publishtime") if not title and not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理 return False is_continue = False time_diff = int(time.time()) - data.get("comeintime") count = es_query(title.strip(), pt) if count == 0: if time_diff <= delay_time: rpush(table_name, data) else: insert_one(table_name, data) is_continue = True msg = "保持轮询检索" if is_continue else "删除重复数据" logger.info(f"[Send]{site}-{title}-{msg}--混合检索") return True def sync_data(table: str): """ 保存数据 @param table: @return: """ redis_key = table total = rcli.llen(redis_key) logger.info(f"[Send]同步数据表名:{table},推送总数:{total}") for _ in range(total): obj = rcli.lpop(redis_key) if obj is None: logger.warning(f"[Send]{table} 错误数据:{obj}") continue try: item = literal_eval(obj) if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]): insert_one(table, item) else: is_delay = item.get("is_delay") # 延时推送 is_es_retrieval = item.get("if_es") # es检索 if is_delay and is_es_retrieval: mixture_process_push_to_db(table, item) elif is_delay and not is_es_retrieval: delay_push_to_db(table, item) elif not is_delay and is_es_retrieval: es_retrieval_push_to_db(table, item) else: insert_one(table, item) except Exception as e: rpush(table, obj) logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}") @func_set_timeout(60 * 20) def main(): logger.info("[Send]同步数据开始") with ThreadPoolExecutor() as threadPool: futures = [] for key in rcli.keys(f"{redis_prefix}:*"): table = key.decode() f = threadPool.submit(sync_data, table) f.add_done_callback(err_msg) futures.append(f) wait(futures) logger.info("[Send]同步数据结束") if __name__ == '__main__': try: main() except FunctionTimedOut: logger.warning("[Send]同步数据超时")