# -*- coding: utf-8 -*- """ Created on 2023-02-21 --------- @summary: 数据上传(redis到mongo),采集数据同步服务 --------- @author: dzr """ import ast import time from concurrent.futures import ThreadPoolExecutor, wait from typing import Dict import redis from bson import int64 from elasticsearch import Elasticsearch from func_timeout import func_set_timeout from func_timeout.exceptions import FunctionTimedOut from pymongo import MongoClient from redis._compat import unicode, long, basestring from redis.connection import Encoder as RedisEncoder from redis.exceptions import DataError from log import logger # mongo MONGO_HOST = "172.17.4.87" MONGO_PORT = 27080 MONGO_DB = "py_spider" mcli = MongoClient(MONGO_HOST, MONGO_PORT) mongodb = mcli[MONGO_DB] # redis class Encoder(RedisEncoder): def encode(self, value): "Return a bytestring or bytes-like representation of the value" if isinstance(value, (bytes, memoryview)): return value # elif isinstance(value, bool): # # special case bool since it is a subclass of int # raise DataError( # "Invalid input of type: 'bool'. Convert to a " # "bytes, string, int or float first." # ) elif isinstance(value, float): value = repr(value).encode() elif isinstance(value, (int, long)): # python 2 repr() on longs is '123L', so use str() instead value = str(value).encode() elif isinstance(value, (list, dict, tuple)): value = unicode(value) elif not isinstance(value, basestring): # a value we don't know how to deal with. throw an error typename = type(value).__name__ raise DataError( "Invalid input of type: '%s'. Convert to a " "bytes, string, int or float first." % typename ) if isinstance(value, unicode): value = value.encode(self.encoding, self.encoding_errors) return value REDIS_HOST = "172.17.4.232" REDIS_PORT = 7361 REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ" REDIS_DB = 10 redis.connection.Encoder = Encoder pool = redis.ConnectionPool( host=REDIS_HOST, port=REDIS_PORT, password=REDISDB_USER_PASS, db=REDIS_DB ) rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True) # es ES_HOST = '172.17.145.178' ES_PORT = 9800 ES_INDEX = 'biddingall' ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}]) # 延时间隔 DELAY = 43200 def literal_eval(node_or_string): try: return ast.literal_eval(node_or_string) except ValueError as e: if 'malformed node or string' in e.args[0]: from bson import Code, ObjectId # eval变量作用域,ObjectId参数 return eval(node_or_string) else: raise e def date2ts(date_str): """日期转时间戳""" if ":" in date_str: ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S"))) else: ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d"))) return ts def es_query(title, publish_time): """ 查询es :param title: 标题 :param publish_time: 发布时间 :return: """ publish_time = date2ts(publish_time) stime = publish_time - 432000 # 往前推5天 etime = publish_time + 432000 # 通过发布标题和发布时间范围查询 query = { "query": { "bool": { "must": [ { "multi_match": { "query": title, "type": "phrase", "fields": ["title"] } }, {"range": {'publishtime': {"from": stime, "to": etime}}} ] } } } result = ecli.search(body=query, index=ES_INDEX, request_timeout=100) # print(result['hits']['total']) total = int(result['hits']['total']) return total def get_redis_key(table, prefix="savemongo:"): return prefix + table def rpush(name, values, is_redis_cluster=False): """“将“values”推到列表“name”的尾部”""" if isinstance(values, list): pipe = rcli.pipeline() if not is_redis_cluster: pipe.multi() for value in values: pipe.rpush(name, value) pipe.execute() else: return rcli.rpush(name, values) def insert_one(table, item: Dict): """MongoDB 单条入库""" if item is not None: item.pop('_id', '') if item.get("comeintime"): item['comeintime'] = int64.Int64(item['comeintime']) try: title = item.get('title') result = mongodb[table].insert_one(item) logger.info(f'{table}-{str(result.inserted_id)}-{title}--上传成功') except Exception as e: rpush(get_redis_key(table), item) logger.error(table + f"--推送失败,原因:{''.join(e.args)}") def sync_data(table): redis_key = get_redis_key(table) total = rcli.llen(redis_key) logger.info(f"同步表名:{table},推送总数:{total}") for _ in range(total): obj = rcli.lpop(redis_key) if obj is None: logger.warning(f'{table} 错误数据:{obj}') continue try: item = literal_eval(obj) if table != 'mgp_list': insert_one(table, item) else: title = item.get("item").get("title") # 延时推送流程 if item.get("is_delay"): site = item.get("item").get("site") t_diff = int(time.time()) - item.get("comeintime") if t_diff <= DELAY: rpush(redis_key, item) logger.info(f"{site}-{title}-等待{t_diff}秒--延时入库") # es检索流程 elif item.get("if_es"): pt = item.get("item").get("publishtime") if title is not None and es_query(title.strip(), pt) == 0: insert_one(table, item) else: insert_one(table, item) except Exception as e: # print(e) # print(f'{table} {type(obj)} >>>>> {repr(obj)}') rpush(redis_key, obj) def err_msg(worker): err = worker.exception() if err: logger.exception("worker err: {}".format(err)) return worker @func_set_timeout(60 * 20) def main(): logger.info("数据同步开始") tables = ["mgp_list", "data_bak", "spider_heartbeat", "njpc_list", "data_njpc", "listdata_err"] with ThreadPoolExecutor() as pool: futures = [] for table in tables: f = pool.submit(sync_data, table) f.add_done_callback(err_msg) futures.append(f) wait(futures) logger.info("数据同步结束") if __name__ == '__main__': try: main() except FunctionTimedOut: logger.warning("数据同步超时")