|
@@ -14,32 +14,21 @@ from elasticsearch import Elasticsearch
|
|
ReluClient = MongoDBInterface(ReluMongodb)
|
|
ReluClient = MongoDBInterface(ReluMongodb)
|
|
|
|
|
|
# 评估服务配置
|
|
# 评估服务配置
|
|
-a2s_ip = "192.168.3.240:9090"
|
|
|
|
-# a2s_ip = "172.17.0.11:9090"
|
|
|
|
|
|
+a2s_ip = "172.20.100.235:9090"
|
|
topic = "quality_bid"
|
|
topic = "quality_bid"
|
|
#本地测试用的主题
|
|
#本地测试用的主题
|
|
# topic = "test_quality_bid"
|
|
# topic = "test_quality_bid"
|
|
timeout = 180
|
|
timeout = 180
|
|
|
|
|
|
-# 获取当前时间
|
|
|
|
-now = datetime.now()
|
|
|
|
-current_datetime = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
-# 获取今天的日期
|
|
|
|
-today = datetime.today()
|
|
|
|
-# 获取昨天的日期
|
|
|
|
-yesterday = today - timedelta(days=1)
|
|
|
|
-# 获取昨天0点的时间
|
|
|
|
-yesterday_midnight = datetime(yesterday.year, yesterday.month, yesterday.day)
|
|
|
|
-# 获取今天0点的时间
|
|
|
|
-today_midnight = datetime(today.year, today.month, today.day)
|
|
|
|
-# 转换为Unix时间戳
|
|
|
|
-start_date = int(yesterday_midnight.timestamp())
|
|
|
|
-end_date = int(today_midnight.timestamp())
|
|
|
|
-
|
|
|
|
-# ES 连接配置
|
|
|
|
-es_host = "http://127.0.0.1:19800"
|
|
|
|
-es_username = "jianyuGr"
|
|
|
|
-es_password = "we3g8glKfe#"
|
|
|
|
|
|
+# # ES 连接配置
|
|
|
|
+# es_host = "http://127.0.0.1:19800"
|
|
|
|
+# es_username = "jianyuGr"
|
|
|
|
+# es_password = "we3g8glKfe#"
|
|
|
|
+
|
|
|
|
+#正式es
|
|
|
|
+es_host = "http://172.17.4.184:19908"
|
|
|
|
+es_username = "qyfw_es_2"
|
|
|
|
+es_password = "Khfdals33#"
|
|
|
|
|
|
# 初始化 Elasticsearch 客户端
|
|
# 初始化 Elasticsearch 客户端
|
|
es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
|
|
es_client = Elasticsearch(es_host,http_auth=(es_username, es_password),retry_on_timeout=True) # 使用基本认证
|
|
@@ -86,7 +75,7 @@ def insert_batch_data(conn, params):
|
|
"""
|
|
"""
|
|
执行批量插入数据
|
|
执行批量插入数据
|
|
"""
|
|
"""
|
|
- query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, spider_modified_time)
|
|
|
|
|
|
+ query = """INSERT IGNORE INTO bid_analysis (mongoid,toptype,subtype, site, spidercode, channel,comeintime, area, city, district, score, error_type, create_time)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s,%s)"""
|
|
MysqlUtil.insert_data(conn, query, params)
|
|
MysqlUtil.insert_data(conn, query, params)
|
|
|
|
|
|
@@ -112,7 +101,7 @@ def get_last_processed_id():
|
|
"""
|
|
"""
|
|
# 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
|
|
# 这里假设从文件读取中断 ID,你也可以从数据库或 Redis 等存储获取
|
|
try:
|
|
try:
|
|
- with open('docs/last_processed_id.txt', 'r') as f:
|
|
|
|
|
|
+ with open('docs/last_processed_id_mysql.txt', 'r') as f:
|
|
last_id = f.read().strip()
|
|
last_id = f.read().strip()
|
|
if last_id:
|
|
if last_id:
|
|
return last_id
|
|
return last_id
|
|
@@ -125,22 +114,31 @@ def save_last_processed_id(last_id):
|
|
"""
|
|
"""
|
|
保存当前处理的最大 ID,用于恢复
|
|
保存当前处理的最大 ID,用于恢复
|
|
"""
|
|
"""
|
|
- with open('docs/last_processed_id.txt', 'w') as f:
|
|
|
|
|
|
+ with open('docs/last_processed_id_mysql.txt', 'w') as f:
|
|
f.write(str(last_id))
|
|
f.write(str(last_id))
|
|
|
|
+def clear_last_processed_id():
|
|
|
|
+ """
|
|
|
|
+ 清空 last_processed_id.txt 文件
|
|
|
|
+ """
|
|
|
|
+ open('docs/last_processed_id_mysql.txt', 'w').close()
|
|
|
|
|
|
def batch_load_data():
|
|
def batch_load_data():
|
|
"""
|
|
"""
|
|
批量数据质量检查
|
|
批量数据质量检查
|
|
"""
|
|
"""
|
|
|
|
+ # 获取今天的日期(字符串格式)
|
|
|
|
+ today_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
+ # 获取今天 8:00:00 的时间戳
|
|
|
|
+ start_date = int(datetime.strptime(f"{today_date} 08:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
|
|
|
|
+ # 获取今天 12:00:00 的时间戳
|
|
|
|
+ end_date = int(datetime.strptime(f"{today_date} 12:00:00", "%Y-%m-%d %H:%M:%S").timestamp())
|
|
|
|
+
|
|
# 规则查询,根据必要条件 公司名称(用户ID)、版本号
|
|
# 规则查询,根据必要条件 公司名称(用户ID)、版本号
|
|
rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
|
|
rules_id = get_rule("北京剑鱼信息技术有限公司", "v1.2")
|
|
print(rules_id)
|
|
print(rules_id)
|
|
|
|
|
|
# 初始化mysql
|
|
# 初始化mysql
|
|
- # conn = MysqlUtil.connect_to_mysql(host='192.168.3.14', port='4000', user='DataScBi', password='DT#Sc20221123Ht',database='quality')
|
|
|
|
- conn = MysqlUtil.connect_to_mysql(host='192.168.3.217', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
|
|
|
|
- # 初始化爬虫代码库
|
|
|
|
- collection = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
|
|
|
|
|
|
+ conn = MysqlUtil.connect_to_mysql(host='172.20.45.129', port='4000', user='root', password='=PDT49#80Z!RVv52_z',database='quality')
|
|
|
|
|
|
# 获取上次处理的 ID,如果没有,则从头开始
|
|
# 获取上次处理的 ID,如果没有,则从头开始
|
|
last_processed_id = get_last_processed_id()
|
|
last_processed_id = get_last_processed_id()
|
|
@@ -174,7 +172,7 @@ def batch_load_data():
|
|
|
|
|
|
try:
|
|
try:
|
|
# 使用 scroll API 来分批获取数据
|
|
# 使用 scroll API 来分批获取数据
|
|
- response = es_client.search(index="bidding", body=es_query, size=100)
|
|
|
|
|
|
+ response = es_client.search(index="bidding", body=es_query)
|
|
hits = response['hits']['hits']
|
|
hits = response['hits']['hits']
|
|
|
|
|
|
while hits:
|
|
while hits:
|
|
@@ -211,29 +209,11 @@ def batch_load_data():
|
|
district = item.get("district", "")
|
|
district = item.get("district", "")
|
|
score = item.get("score", "")
|
|
score = item.get("score", "")
|
|
error_type_data = json.dumps(data)
|
|
error_type_data = json.dumps(data)
|
|
- spider_modified_time = current_datetime
|
|
|
|
-
|
|
|
|
- info = collection.find_one({"code": spidercode})
|
|
|
|
- if info:
|
|
|
|
- spider_modified_time = info.get("modifytime", "")
|
|
|
|
- spider_modified_time = datetime.fromtimestamp(spider_modified_time)
|
|
|
|
|
|
+ create_time = today_date
|
|
|
|
|
|
- params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,spider_modified_time)
|
|
|
|
|
|
+ params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, score, error_type_data,create_time)
|
|
insert_batch_data(conn, params)
|
|
insert_batch_data(conn, params)
|
|
|
|
|
|
- # 遍历错误原因字典并提取非空字典中的值
|
|
|
|
- for key, value in data.items():
|
|
|
|
- error_ids = []
|
|
|
|
- if isinstance(value, dict) and value:
|
|
|
|
- cleaned_key = key[:-3] if key.endswith('_qa') else key # 去除 '_qa' 后缀
|
|
|
|
- for sub_key, sub_value in value.items():
|
|
|
|
- error_id = find_error_id(conn, cleaned_key, sub_value)
|
|
|
|
- if error_id:
|
|
|
|
- error_ids.append(error_id)
|
|
|
|
- print(f" {sub_key}: {sub_value}")
|
|
|
|
-
|
|
|
|
- # 插入错误ID到cleaned_key_error字段
|
|
|
|
- insert_dynamic_error_field(conn, cleaned_key, error_ids, item["_id"])
|
|
|
|
print("------一条数据结束------")
|
|
print("------一条数据结束------")
|
|
# 保存当前批次处理的最大 ID
|
|
# 保存当前批次处理的最大 ID
|
|
if max_id:
|
|
if max_id:
|
|
@@ -245,12 +225,13 @@ def batch_load_data():
|
|
# 获取下一批数据
|
|
# 获取下一批数据
|
|
search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
|
|
search_after = hits[-1]["_id"] # 获取当前批次最后一条数据的 _id 作为下一批的起始点
|
|
es_query["search_after"] = [search_after] # 保持 _id 类型一致
|
|
es_query["search_after"] = [search_after] # 保持 _id 类型一致
|
|
- response = es_client.search(index="bidding", body=es_query, size="100")
|
|
|
|
|
|
+ response = es_client.search(index="bidding", body=es_query)
|
|
hits = response['hits']['hits']
|
|
hits = response['hits']['hits']
|
|
|
|
|
|
# 如果没有更多数据,跳出循环
|
|
# 如果没有更多数据,跳出循环
|
|
if not hits:
|
|
if not hits:
|
|
print("没有更多数据,结束批次处理")
|
|
print("没有更多数据,结束批次处理")
|
|
|
|
+ clear_last_processed_id()
|
|
break
|
|
break
|
|
print("数据处理完成")
|
|
print("数据处理完成")
|
|
except Exception as e:
|
|
except Exception as e:
|