123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- # coding:utf-8
- import time
- from a2s.tools import json_serialize, json_deserialize
- from a2s.a2s_client import a2s_execute
- from docs.config import ReluMongodb
- from util.mogodb_helper import MongoDBInterface
- from pymongo import MongoClient
- from datetime import datetime, timedelta
- from elasticsearch import Elasticsearch
- #直接给es数据打分,并将结果存入mongo库,不在拉取数据打分
- ReluClient = MongoDBInterface(ReluMongodb)
- # 评估服务配置
- a2s_ip = "172.20.100.235:9090"
- # a2s_ip = "172.17.0.11:9090"
- topic = "quality_bid"
- #本地测试用的主题
- # topic = "test_quality_bid"
- timeout = 180
- # 获取当前时间
- now = datetime.now()
- # current_datetime = int(now.timestamp())
- # 计算昨天的时间
- yesterday = now - timedelta(days=1)
- # 获取昨天早上8点的时间
- yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0)
- # 转换为时间戳(秒级)
- # current_datetime = int(yesterday_8_am.timestamp())
- #create_time,用于批次字段,值为数据的年月日:2025-02-12,1739289600,1739894400
- current_datetime=int(1743004800)
- # 时间段
- start_date = int(datetime(2025, 3, 27, 8, 0, 0).timestamp()) # 2025-01-20 00:00:00
- end_date = int(datetime(2025, 3, 27, 12, 0, 0).timestamp()) # 2025-01-20 23:59:59
- # ES 连接配置
- es_host = "http://127.0.0.1:19800"
- es_username = "jianyuGr"
- es_password = "we3g8glKfe#"
- # 初始化 Elasticsearch 客户端
- es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
- # 开始评估
- def start_quality(data: dict, rules_id: int, a2s_ip, topic, timeout, retry=3):
- # 本次不使用SSL,所以channel是不安全的
- row = {"data": data, "rules_id": rules_id}
- bytes_data = json_serialize(row)
- for t in range(retry):
- print("topic",topic)
- try:
- resp_data = a2s_execute(a2s_ip, topic, timeout, bytes_data)
- if resp_data is None:
- continue
- result = json_deserialize(resp_data)
- return result
- except Exception as e:
- print(e)
- return {}
- # 获取规则ID
- def get_rule(company, version):
- rule_id = ReluClient.find_rule_by_company(ReluMongodb["col"], company, version)
- return rule_id
- def insert_batch_data_mongo(collection,params):
- """
- 执行批量插入数据到 MongoDB
- """
- # 将参数转换为字典列表
- documents=["mongoid","toptype","subtype","site","spidercode","channel","comeintime", "area","city","district","error_type","create_time"]
- doc={}
- for indx,param in enumerate(params):
- doc[documents[indx]] =param
- print(f"{doc}数据")
- # 插入数据
- try:
- collection.insert_one(doc) # `ordered=False` 忽略重复数据错误
- print(f"{doc}数据已成功插入到 MongoDB")
- except Exception as e:
- print("插入数据时发生错误:", e)
- def get_last_processed_id():
- """
- 获取上次处理的最大 ID (例如从数据库或文件中读取)
- """
- # 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
- try:
- with open('docs/last_processed_id.txt', 'r') as f:
- last_id = f.read().strip()
- if last_id:
- return last_id
- else:
- return None
- except FileNotFoundError:
- return None
- def save_last_processed_id(last_id):
- """
- 保存当前处理的最大 ID,用于恢复
- """
- with open('docs/last_processed_id.txt', 'w') as f:
- f.write(str(last_id))
- def batch_load_data():
- """
- 批量数据质量检查
- """
- # 规则查询,根据必要条件 公司名称(用户ID)、版本号
- rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
- print(rules_id)
- #初始化mongo库
- conn = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
- coll_user = conn["bid_analysis"]
- # 获取上次处理的 ID,如果没有,则从头开始
- last_processed_id = get_last_processed_id()
- print(f"上次处理的 ID: {last_processed_id}")
- # 获取ES数据
- es_query = {
- "query": {
- "bool": {
- "filter": [
- {
- "range": {
- "comeintime": {
- "gte": start_date,
- "lt": end_date
- }
- }
- }
- ]
- }
- },
- "sort": [
- {"_id": {"order": "asc"}} # 如果 comeintime 相同,再按 _id 排序
- ],
- "size": 100 # 每次返回的数据量
- }
- # 如果有上次处理的 ID,使用 `search_after` 进行分页
- if last_processed_id:
- es_query["search_after"] = [last_processed_id] # 确保传入的是字符串类型的 _id
- try:
- # 使用 scroll API 来分批获取数据
- response = es_client.search(index="bidding", body=es_query, size=100)
- hits = response['hits']['hits']
- total_hits = response['hits']['total']['value'] # 获取数据总量
- print(f"数据总量: {total_hits}")
- while hits:
- print(f"---- 批次开始 ----")
- max_id = None
- for hit in hits:
- item = hit["_source"]
- print("------一条数据开始--------")
- max_id = hit["_id"]
- print(f"正在处理数据: {max_id}")
- item["_id"] = str(hit["_id"])
- # 质量检查逻辑
- result = start_quality(item, rules_id, a2s_ip, topic, timeout)
- print(result)
- code = result.get("code")
- if code != 200:
- # 数据出错,跳过
- continue
- data = result.get("data", {})
- # 数据插入到 mongo
- toptype = item.get("toptype", "")
- subtype = item.get("subtype", "")
- site = item.get("site", "")
- spidercode = item.get("spidercode", "")
- channel = item.get("channel", "")
- comeintime = item.get("comeintime", "")
- area = item.get("area", "")
- city = item.get("city", "")
- district = item.get("district", "")
- error_type=data
- create_time = current_datetime
- params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, error_type,create_time)
- insert_batch_data_mongo(coll_user, params)
- print("------一条数据结束------")
- # 保存当前批次处理的最大 ID
- if max_id:
- save_last_processed_id(max_id)
- print(f"保存当前处理的最大 ID: {max_id}")
- # 批次结束的打印信息
- print("---- 当前批次数据处理完成 ----")
- # 获取下一批数据
- search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
- es_query["search_after"] = [search_after] # 保持 _id 类型一致
- response = es_client.search(index="bidding", body=es_query, size="100")
- hits = response['hits']['hits']
- # 如果没有更多数据,跳出循环
- if not hits:
- print("没有更多数据,结束批次处理")
- break
- print("数据处理完成")
- except Exception as e:
- print(f"错误: {e}")
- time.sleep(10)
- finally:
- if conn.is_connected():
- conn.close() # 确保连接关闭
- print("MySQL 连接已关闭")
- if __name__ == '__main__':
- batch_load_data()
|