123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-02-21
- ---------
- @summary: 数据上传(redis到mongo),采集数据同步服务
- ---------
- @author: dzr
- """
- import ast
- import re
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import Dict
- import redis
- from bson import int64
- from elasticsearch import Elasticsearch
- from func_timeout import func_set_timeout
- from func_timeout.exceptions import FunctionTimedOut
- from pymongo import MongoClient
- from redis._compat import unicode, long, basestring
- from redis.connection import Encoder as RedisEncoder
- from redis.exceptions import DataError
- from log import logger
- class Encoder(RedisEncoder):
- def encode(self, value):
- "Return a bytestring or bytes-like representation of the value"
- if isinstance(value, (bytes, memoryview)):
- return value
- # elif isinstance(value, bool):
- # # special case bool since it is a subclass of int
- # raise DataError(
- # "Invalid input of type: 'bool'. Convert to a "
- # "bytes, string, int or float first."
- # )
- elif isinstance(value, float):
- value = repr(value).encode()
- elif isinstance(value, (int, long)):
- # python 2 repr() on longs is '123L', so use str() instead
- value = str(value).encode()
- elif isinstance(value, (list, dict, tuple)):
- value = unicode(value)
- elif not isinstance(value, basestring):
- # a value we don't know how to deal with. throw an error
- typename = type(value).__name__
- raise DataError(
- "Invalid input of type: '%s'. Convert to a "
- "bytes, string, int or float first." % typename
- )
- if isinstance(value, unicode):
- value = value.encode(self.encoding, self.encoding_errors)
- return value
- # redis
- redis.connection.Encoder = Encoder
- REDIS_HOST = "172.17.4.232"
- REDIS_PORT = 7361
- REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
- REDIS_DB = 10
- pool = redis.ConnectionPool(
- host=REDIS_HOST,
- port=REDIS_PORT,
- password=REDISDB_USER_PASS,
- db=REDIS_DB
- )
- rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
- redis_prefix = "savemongo"
- # mongo
- MONGO_HOST = "172.17.4.87"
- MONGO_PORT = 27080
- MONGO_DB = "py_spider"
- mcli = MongoClient(MONGO_HOST, MONGO_PORT)
- mongodb = mcli[MONGO_DB]
- # es
- ES_HOST = "172.17.145.178"
- ES_PORT = 9200
- ES_INDEX = "biddingall"
- try:
- ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
- except ConnectionRefusedError as e:
- logger.error(f"es服务拒绝访问,原因:{e}")
- ecli = None
- def err_msg(worker):
- err = worker.exception()
- if err:
- logger.exception("[Send]worker err: {}".format(err))
- return worker
- def literal_eval(node_or_string):
- """反序列化数据"""
- try:
- return ast.literal_eval(node_or_string)
- except ValueError as e:
- if "malformed node or string" in e.args[0]:
- from bson import Code, ObjectId # eval变量作用域,ObjectId参数
- import datetime
- return eval(node_or_string)
- else:
- raise e
- def date2ts(date_str):
- """日期转时间戳"""
- if ":" in date_str:
- ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
- else:
- ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
- return ts
- def es_query(title, publish_time):
- """
- 查询es
- :param title: 标题
- :param publish_time: 发布时间
- :return:
- """
- if not ecli:
- return 0 # 如果es检索服务异常,保证数据正常推送
- publish_time = date2ts(publish_time)
- stime = publish_time - 432000 # 往前推5天
- etime = publish_time + 432000
- # 通过发布标题和发布时间范围查询
- query = {
- "query": {
- "bool": {
- "must": [
- {
- "multi_match": {
- "query": title,
- "type": "phrase",
- "fields": ["title"]
- }
- },
- {"range": {'publishtime': {"from": stime, "to": etime}}}
- ]
- }
- }
- }
- result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
- total = int(result["hits"]["total"]['value'])
- return total
- def rpush(name, values, is_redis_cluster=False):
- """“将“values”推到列表“name”的尾部”"""
- if isinstance(values, list):
- pipe = rcli.pipeline()
- if not is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.rpush(name, value)
- pipe.execute()
- else:
- return rcli.rpush(name, values)
- def handle_big_document(item):
- if "contenthtml" in item:
- item["contenthtml"] = re.sub("<img[^>]*>", "<br>", item["contenthtml"])
- def insert_one(table, item: Dict):
- """MongoDB 单条入库"""
- table = "".join(table.split(f"{redis_prefix}:"))
- if item is not None:
- item.pop("_id", "")
- if item.get("comeintime"):
- item["comeintime"] = int64.Int64(item["comeintime"])
- try:
- title = item.get("title")
- result = mongodb[table].insert_one(item)
- logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功")
- except Exception as e:
- if "BSON document too large" in ''.join(e.args):
- handle_big_document(item) # MongoDB文档保存要求 BSON 大小限制 16 MB
- # rpush(get_redis_key(table), item)
- rpush(table, item)
- logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
- def delay_push_to_db(table_name, data, delay_time=43200):
- """
- 第三方数据,需延时入库,推送爬虫生产库
- @param table_name: 表名
- @param data: 延时的数据
- @param delay_time: 延时时长,单位:秒
- @return:
- """
- site = data.get("item").get("site")
- title = data.get("item").get("title")
- time_diff = int(time.time()) - data.get("comeintime")
- if time_diff <= delay_time:
- rpush(table_name, data)
- logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送")
- else:
- logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库")
- insert_one(table_name, data)
- return True
- def es_retrieval_push_to_db(table_name, data):
- """
- 通过es(近3月增量数据)进行数据去重,推送爬虫生产库
- @param table_name: 表名
- @param data: 判重数据
- @return:
- """
- site = data.get("item").get("site")
- title = data.get("item").get("title")
- pt = data.get("item").get("publishtime")
- if not title or not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
- return False
- count = es_query(title.strip(), pt)
- if count == 0:
- insert_one(table_name, data)
- logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索")
- return True
- def mixture_process_push_to_db(table_name, data, delay_time=43200):
- """
- 延时 + es检索 混合检索数据,推送爬虫生产库
- @param table_name: 表名
- @param data: 判重数据
- @param delay_time: 延时时长,单位:秒
- @return:
- """
- site = data.get("item").get("site")
- title = data.get("item").get("title")
- pt = data.get("item").get("publishtime")
- if not title and not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
- return False
- is_continue = False
- time_diff = int(time.time()) - data.get("comeintime")
- count = es_query(title.strip(), pt)
- if count == 0:
- if time_diff <= delay_time:
- rpush(table_name, data)
- else:
- insert_one(table_name, data)
- is_continue = True
- msg = "保持轮询检索" if is_continue else "删除重复数据"
- logger.info(f"[Send]{site}-{title}-{msg}--混合检索")
- return True
- def sync_data(table: str):
- """
- 保存数据
- @param table:
- @return:
- """
- redis_key = table
- total = rcli.llen(redis_key)
- logger.info(f"[Send]同步数据表名:{table},推送总数:{total}")
- for _ in range(total):
- obj = rcli.lpop(redis_key)
- if obj is None:
- logger.warning(f"[Send]{table} 错误数据:{obj}")
- continue
- try:
- item = literal_eval(obj)
- if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]):
- insert_one(table, item)
- else:
- is_delay = item.get("is_delay") # 延时推送
- is_es_retrieval = item.get("if_es") # es检索
- if is_delay and is_es_retrieval:
- mixture_process_push_to_db(table, item)
- elif is_delay and not is_es_retrieval:
- delay_push_to_db(table, item)
- elif not is_delay and is_es_retrieval:
- es_retrieval_push_to_db(table, item)
- else:
- insert_one(table, item)
- except Exception as e:
- rpush(table, obj)
- logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
- @func_set_timeout(60 * 20)
- def main():
- logger.info("[Send]同步数据开始")
- with ThreadPoolExecutor() as threadPool:
- futures = []
- for key in rcli.keys(f"{redis_prefix}:*"):
- table = key.decode()
- f = threadPool.submit(sync_data, table)
- f.add_done_callback(err_msg)
- futures.append(f)
- wait(futures)
- logger.info("[Send]同步数据结束")
- if __name__ == '__main__':
- try:
- main()
- except FunctionTimedOut:
- logger.warning("[Send]同步数据超时")
|