# -*- coding: utf-8 -*-
"""
Created on 2023-02-21
---------
@summary: 数据上传(redis到mongo),采集数据同步服务
---------
@author: dzr
"""
import ast
import re
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Dict
import redis
from bson import int64
from elasticsearch import Elasticsearch
from func_timeout import func_set_timeout
from func_timeout.exceptions import FunctionTimedOut
from pymongo import MongoClient
from redis._compat import unicode, long, basestring
from redis.connection import Encoder as RedisEncoder
from redis.exceptions import DataError
from log import logger
class Encoder(RedisEncoder):
def encode(self, value):
"Return a bytestring or bytes-like representation of the value"
if isinstance(value, (bytes, memoryview)):
return value
# elif isinstance(value, bool):
# # special case bool since it is a subclass of int
# raise DataError(
# "Invalid input of type: 'bool'. Convert to a "
# "bytes, string, int or float first."
# )
elif isinstance(value, float):
value = repr(value).encode()
elif isinstance(value, (int, long)):
# python 2 repr() on longs is '123L', so use str() instead
value = str(value).encode()
elif isinstance(value, (list, dict, tuple)):
value = unicode(value)
elif not isinstance(value, basestring):
# a value we don't know how to deal with. throw an error
typename = type(value).__name__
raise DataError(
"Invalid input of type: '%s'. Convert to a "
"bytes, string, int or float first." % typename
)
if isinstance(value, unicode):
value = value.encode(self.encoding, self.encoding_errors)
return value
# redis
redis.connection.Encoder = Encoder
REDIS_HOST = "172.17.4.232"
REDIS_PORT = 7361
REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
REDIS_DB = 10
pool = redis.ConnectionPool(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDISDB_USER_PASS,
db=REDIS_DB
)
rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
redis_prefix = "savemongo"
# mongo
MONGO_HOST = "172.17.4.87"
MONGO_PORT = 27080
MONGO_DB = "py_spider"
mcli = MongoClient(MONGO_HOST, MONGO_PORT)
mongodb = mcli[MONGO_DB]
# es
ES_HOST = "172.17.145.178"
ES_PORT = 9200
ES_INDEX = "biddingall"
try:
ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
except ConnectionRefusedError as e:
logger.error(f"es服务拒绝访问,原因:{e}")
ecli = None
def err_msg(worker):
err = worker.exception()
if err:
logger.exception("[Send]worker err: {}".format(err))
return worker
def literal_eval(node_or_string):
"""反序列化数据"""
try:
return ast.literal_eval(node_or_string)
except ValueError as e:
if "malformed node or string" in e.args[0]:
from bson import Code, ObjectId # eval变量作用域,ObjectId参数
import datetime
return eval(node_or_string)
else:
raise e
def date2ts(date_str):
"""日期转时间戳"""
if ":" in date_str:
ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
else:
ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
return ts
def es_query(title, publish_time):
"""
查询es
:param title: 标题
:param publish_time: 发布时间
:return:
"""
if not ecli:
return 0 # 如果es检索服务异常,保证数据正常推送
publish_time = date2ts(publish_time)
stime = publish_time - 432000 # 往前推5天
etime = publish_time + 432000
# 通过发布标题和发布时间范围查询
query = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": title,
"type": "phrase",
"fields": ["title"]
}
},
{"range": {'publishtime': {"from": stime, "to": etime}}}
]
}
}
}
result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
total = int(result["hits"]["total"]['value'])
return total
def rpush(name, values, is_redis_cluster=False):
"""“将“values”推到列表“name”的尾部”"""
if isinstance(values, list):
pipe = rcli.pipeline()
if not is_redis_cluster:
pipe.multi()
for value in values:
pipe.rpush(name, value)
pipe.execute()
else:
return rcli.rpush(name, values)
def handle_big_document(item):
if "contenthtml" in item:
item["contenthtml"] = re.sub("
]*>", "
", item["contenthtml"])
def insert_one(table, item: Dict):
"""MongoDB 单条入库"""
table = "".join(table.split(f"{redis_prefix}:"))
if item is not None:
item.pop("_id", "")
if item.get("comeintime"):
item["comeintime"] = int64.Int64(item["comeintime"])
try:
title = item.get("title")
result = mongodb[table].insert_one(item)
logger.info(f"[Send]{table}-{str(result.inserted_id)}-{title}--上传成功")
except Exception as e:
if "BSON document too large" in ''.join(e.args):
handle_big_document(item) # MongoDB文档保存要求 BSON 大小限制 16 MB
# rpush(get_redis_key(table), item)
rpush(table, item)
logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
def delay_push_to_db(table_name, data, delay_time=43200):
"""
第三方数据,需延时入库,推送爬虫生产库
@param table_name: 表名
@param data: 延时的数据
@param delay_time: 延时时长,单位:秒
@return:
"""
site = data.get("item").get("site")
title = data.get("item").get("title")
time_diff = int(time.time()) - data.get("comeintime")
if time_diff <= delay_time:
rpush(table_name, data)
logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时推送")
else:
logger.info(f"[Send]{site}-{title}-等待{time_diff}秒--延时入库")
insert_one(table_name, data)
return True
def es_retrieval_push_to_db(table_name, data):
"""
通过es(近3月增量数据)进行数据去重,推送爬虫生产库
@param table_name: 表名
@param data: 判重数据
@return:
"""
site = data.get("item").get("site")
title = data.get("item").get("title")
pt = data.get("item").get("publishtime")
if not title or not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
return False
count = es_query(title.strip(), pt)
if count == 0:
insert_one(table_name, data)
logger.info(f"[Send]{site}-{title}-检索到{count}条--ES检索")
return True
def mixture_process_push_to_db(table_name, data, delay_time=43200):
"""
延时 + es检索 混合检索数据,推送爬虫生产库
@param table_name: 表名
@param data: 判重数据
@param delay_time: 延时时长,单位:秒
@return:
"""
site = data.get("item").get("site")
title = data.get("item").get("title")
pt = data.get("item").get("publishtime")
if not title and not pt: # es检索必须提供标题和发布时间,否则数据按照垃圾数据丢弃处理
return False
is_continue = False
time_diff = int(time.time()) - data.get("comeintime")
count = es_query(title.strip(), pt)
if count == 0:
if time_diff <= delay_time:
rpush(table_name, data)
else:
insert_one(table_name, data)
is_continue = True
msg = "保持轮询检索" if is_continue else "删除重复数据"
logger.info(f"[Send]{site}-{title}-{msg}--混合检索")
return True
def sync_data(table: str):
"""
保存数据
@param table:
@return:
"""
redis_key = table
total = rcli.llen(redis_key)
logger.info(f"[Send]同步数据表名:{table},推送总数:{total}")
for _ in range(total):
obj = rcli.lpop(redis_key)
if obj is None:
logger.warning(f"[Send]{table} 错误数据:{obj}")
continue
try:
item = literal_eval(obj)
if all([not table.endswith(char) for char in ["mgp_list", "bidding"]]):
insert_one(table, item)
else:
is_delay = item.get("is_delay") # 延时推送
is_es_retrieval = item.get("if_es") # es检索
if is_delay and is_es_retrieval:
mixture_process_push_to_db(table, item)
elif is_delay and not is_es_retrieval:
delay_push_to_db(table, item)
elif not is_delay and is_es_retrieval:
es_retrieval_push_to_db(table, item)
else:
insert_one(table, item)
except Exception as e:
rpush(table, obj)
logger.error(f"[Send]{table}--推送失败,原因:{''.join(e.args)}")
@func_set_timeout(60 * 20)
def main():
logger.info("[Send]同步数据开始")
with ThreadPoolExecutor() as threadPool:
futures = []
for key in rcli.keys(f"{redis_prefix}:*"):
table = key.decode()
f = threadPool.submit(sync_data, table)
f.add_done_callback(err_msg)
futures.append(f)
wait(futures)
logger.info("[Send]同步数据结束")
if __name__ == '__main__':
try:
main()
except FunctionTimedOut:
logger.warning("[Send]同步数据超时")