123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-02-21
- ---------
- @summary: 数据上传(redis到mongo),采集数据同步服务
- ---------
- @author: dzr
- """
- import ast
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import Dict
- import redis
- from bson import int64
- from elasticsearch import Elasticsearch
- from func_timeout import func_set_timeout
- from func_timeout.exceptions import FunctionTimedOut
- from pymongo import MongoClient
- from redis._compat import unicode, long, basestring
- from redis.connection import Encoder as RedisEncoder
- from redis.exceptions import DataError
- from log import logger
- # mongo
- MONGO_HOST = "172.17.4.87"
- MONGO_PORT = 27080
- MONGO_DB = "py_spider"
- mcli = MongoClient(MONGO_HOST, MONGO_PORT)
- mongodb = mcli[MONGO_DB]
- # redis
- class Encoder(RedisEncoder):
- def encode(self, value):
- "Return a bytestring or bytes-like representation of the value"
- if isinstance(value, (bytes, memoryview)):
- return value
- # elif isinstance(value, bool):
- # # special case bool since it is a subclass of int
- # raise DataError(
- # "Invalid input of type: 'bool'. Convert to a "
- # "bytes, string, int or float first."
- # )
- elif isinstance(value, float):
- value = repr(value).encode()
- elif isinstance(value, (int, long)):
- # python 2 repr() on longs is '123L', so use str() instead
- value = str(value).encode()
- elif isinstance(value, (list, dict, tuple)):
- value = unicode(value)
- elif not isinstance(value, basestring):
- # a value we don't know how to deal with. throw an error
- typename = type(value).__name__
- raise DataError(
- "Invalid input of type: '%s'. Convert to a "
- "bytes, string, int or float first." % typename
- )
- if isinstance(value, unicode):
- value = value.encode(self.encoding, self.encoding_errors)
- return value
- REDIS_HOST = "172.17.4.232"
- REDIS_PORT = 7361
- REDISDB_USER_PASS = "k5ZJR5KV4q7DRZ92DQ"
- REDIS_DB = 10
- redis.connection.Encoder = Encoder
- pool = redis.ConnectionPool(
- host=REDIS_HOST,
- port=REDIS_PORT,
- password=REDISDB_USER_PASS,
- db=REDIS_DB
- )
- rcli = redis.StrictRedis(connection_pool=pool, decode_responses=True)
- # es
- ES_HOST = '172.17.145.178'
- ES_PORT = 9800
- ES_INDEX = 'biddingall'
- ecli = Elasticsearch([{"host": ES_HOST, "port": ES_PORT}])
- # 延时间隔
- DELAY = 43200
- def literal_eval(node_or_string):
- try:
- return ast.literal_eval(node_or_string)
- except ValueError as e:
- if 'malformed node or string' in e.args[0]:
- from bson import Code, ObjectId # eval变量作用域,ObjectId参数
- return eval(node_or_string)
- else:
- raise e
- def date2ts(date_str):
- """日期转时间戳"""
- if ":" in date_str:
- ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d %H:%M:%S")))
- else:
- ts = int(time.mktime(time.strptime(date_str, "%Y-%m-%d")))
- return ts
- def es_query(title, publish_time):
- """
- 查询es
- :param title: 标题
- :param publish_time: 发布时间
- :return:
- """
- publish_time = date2ts(publish_time)
- stime = publish_time - 432000 # 往前推5天
- etime = publish_time + 432000
- # 通过发布标题和发布时间范围查询
- query = {
- "query": {
- "bool": {
- "must": [
- {
- "multi_match": {
- "query": title,
- "type": "phrase",
- "fields": ["title"]
- }
- },
- {"range": {'publishtime': {"from": stime, "to": etime}}}
- ]
- }
- }
- }
- result = ecli.search(body=query, index=ES_INDEX, request_timeout=100)
- # print(result['hits']['total'])
- total = int(result['hits']['total'])
- return total
- def get_redis_key(table, prefix="savemongo:"):
- return prefix + table
- def rpush(name, values, is_redis_cluster=False):
- """“将“values”推到列表“name”的尾部”"""
- if isinstance(values, list):
- pipe = rcli.pipeline()
- if not is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.rpush(name, value)
- pipe.execute()
- else:
- return rcli.rpush(name, values)
- def insert_one(table, item: Dict):
- """MongoDB 单条入库"""
- if item is not None:
- item.pop('_id', '')
- if item.get("comeintime"):
- item['comeintime'] = int64.Int64(item['comeintime'])
- try:
- title = item.get('title')
- result = mongodb[table].insert_one(item)
- logger.info(f'{table}-{str(result.inserted_id)}-{title}--上传成功')
- except Exception as e:
- rpush(get_redis_key(table), item)
- logger.error(table + f"--推送失败,原因:{''.join(e.args)}")
- def sync_data(table):
- redis_key = get_redis_key(table)
- total = rcli.llen(redis_key)
- logger.info(f"同步表名:{table},推送总数:{total}")
- for _ in range(total):
- obj = rcli.lpop(redis_key)
- if obj is None:
- logger.warning(f'{table} 错误数据:{obj}')
- continue
- try:
- item = literal_eval(obj)
- if table != 'mgp_list':
- insert_one(table, item)
- else:
- title = item.get("item").get("title")
- # 延时推送流程
- if item.get("is_delay"):
- site = item.get("item").get("site")
- t_diff = int(time.time()) - item.get("comeintime")
- if t_diff <= DELAY:
- rpush(redis_key, item)
- logger.info(f"{site}-{title}-等待{t_diff}秒--延时入库")
- # es检索流程
- elif item.get("if_es"):
- pt = item.get("item").get("publishtime")
- if title is not None and es_query(title.strip(), pt) == 0:
- insert_one(table, item)
- else:
- insert_one(table, item)
- except Exception as e:
- # print(e)
- # print(f'{table} {type(obj)} >>>>> {repr(obj)}')
- rpush(redis_key, obj)
- def err_msg(worker):
- err = worker.exception()
- if err:
- logger.exception("worker err: {}".format(err))
- return worker
- @func_set_timeout(60 * 20)
- def main():
- logger.info("数据同步开始")
- tables = ["mgp_list", "data_bak", "spider_heartbeat", "njpc_list", "data_njpc", "listdata_err"]
- with ThreadPoolExecutor() as pool:
- futures = []
- for table in tables:
- f = pool.submit(sync_data, table)
- f.add_done_callback(err_msg)
- futures.append(f)
- wait(futures)
- logger.info("数据同步结束")
- if __name__ == '__main__':
- try:
- main()
- except FunctionTimedOut:
- logger.warning("数据同步超时")
|