|
@@ -5,8 +5,6 @@ from a2s.a2s_client import a2s_execute
|
|
from docs.config import ReluMongodb
|
|
from docs.config import ReluMongodb
|
|
from util.mogodb_helper import MongoDBInterface
|
|
from util.mogodb_helper import MongoDBInterface
|
|
from pymongo import MongoClient
|
|
from pymongo import MongoClient
|
|
-from util.mysql_tool import MysqlUtil
|
|
|
|
-import json
|
|
|
|
from datetime import datetime, timedelta
|
|
from datetime import datetime, timedelta
|
|
from elasticsearch import Elasticsearch
|
|
from elasticsearch import Elasticsearch
|
|
|
|
|
|
@@ -14,7 +12,7 @@ from elasticsearch import Elasticsearch
|
|
|
|
|
|
ReluClient = MongoDBInterface(ReluMongodb)
|
|
ReluClient = MongoDBInterface(ReluMongodb)
|
|
# 评估服务配置
|
|
# 评估服务配置
|
|
-a2s_ip = "192.168.3.240:9090"
|
|
|
|
|
|
+a2s_ip = "172.20.100.235:9090"
|
|
# a2s_ip = "172.17.0.11:9090"
|
|
# a2s_ip = "172.17.0.11:9090"
|
|
topic = "quality_bid"
|
|
topic = "quality_bid"
|
|
#本地测试用的主题
|
|
#本地测试用的主题
|
|
@@ -30,11 +28,13 @@ yesterday = now - timedelta(days=1)
|
|
# 获取昨天早上8点的时间
|
|
# 获取昨天早上8点的时间
|
|
yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0)
|
|
yesterday_8_am = datetime(yesterday.year, yesterday.month, yesterday.day, 8, 0, 0)
|
|
# 转换为时间戳(秒级)
|
|
# 转换为时间戳(秒级)
|
|
-current_datetime = int(yesterday_8_am.timestamp())
|
|
|
|
|
|
+# current_datetime = int(yesterday_8_am.timestamp())
|
|
|
|
+#create_time,用于批次字段,值为数据的年月日:2025-02-12,1739289600,1739894400
|
|
|
|
+current_datetime=int(1740585600)
|
|
|
|
|
|
# 时间段
|
|
# 时间段
|
|
-start_date = int(datetime(2025, 1, 23, 8, 0, 0).timestamp()) # 2025-01-20 00:00:00
|
|
|
|
-end_date = int(datetime(2025, 1, 23, 12, 0, 0).timestamp()) # 2025-01-20 23:59:59
|
|
|
|
|
|
+start_date = int(datetime(2025, 2, 27, 8, 0, 0).timestamp()) # 2025-01-20 00:00:00
|
|
|
|
+end_date = int(datetime(2025, 2, 27, 12, 0 ,00).timestamp()) # 2025-01-20 23:59:59
|
|
|
|
|
|
|
|
|
|
# ES 连接配置
|
|
# ES 连接配置
|
|
@@ -115,14 +115,9 @@ def batch_load_data():
|
|
print(rules_id)
|
|
print(rules_id)
|
|
|
|
|
|
|
|
|
|
- # 初始化爬虫代码库
|
|
|
|
- collection_spider = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["lua_logs_auditor"]
|
|
|
|
- # 初始化爬虫config代码库
|
|
|
|
- collection_config = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["luaconfig"]
|
|
|
|
- # 初始化site代码库
|
|
|
|
- collection_site = MongoClient(f'mongodb://{"127.0.0.1:27089"}/', unicode_decode_error_handler="ignore", directConnection=True)["editor"]["site"]
|
|
|
|
|
|
+
|
|
#初始化mongo库
|
|
#初始化mongo库
|
|
- conn = MongoClient('192.168.3.149', 27180, unicode_decode_error_handler="ignore").data_quality
|
|
|
|
|
|
+ conn = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
|
|
coll_user = conn["bid_analysis"]
|
|
coll_user = conn["bid_analysis"]
|
|
|
|
|
|
# 获取上次处理的 ID,如果没有,则从头开始
|
|
# 获取上次处理的 ID,如果没有,则从头开始
|
|
@@ -159,7 +154,8 @@ def batch_load_data():
|
|
# 使用 scroll API 来分批获取数据
|
|
# 使用 scroll API 来分批获取数据
|
|
response = es_client.search(index="bidding", body=es_query, size=100)
|
|
response = es_client.search(index="bidding", body=es_query, size=100)
|
|
hits = response['hits']['hits']
|
|
hits = response['hits']['hits']
|
|
-
|
|
|
|
|
|
+ total_hits = response['hits']['total']['value'] # 获取数据总量
|
|
|
|
+ print(f"数据总量: {total_hits}")
|
|
while hits:
|
|
while hits:
|
|
print(f"---- 批次开始 ----")
|
|
print(f"---- 批次开始 ----")
|
|
max_id = None
|
|
max_id = None
|
|
@@ -191,20 +187,10 @@ def batch_load_data():
|
|
area = item.get("area", "")
|
|
area = item.get("area", "")
|
|
city = item.get("city", "")
|
|
city = item.get("city", "")
|
|
district = item.get("district", "")
|
|
district = item.get("district", "")
|
|
- spider_modified_time = current_datetime
|
|
|
|
- spider_important = False
|
|
|
|
- site_important = 0
|
|
|
|
|
|
+
|
|
create_time = current_datetime
|
|
create_time = current_datetime
|
|
- info = collection_spider.find_one({"code": spidercode})
|
|
|
|
- if info:
|
|
|
|
- spider_modified_time = info.get("modifytime", "")
|
|
|
|
- info_config=collection_config.find_one({"code": spidercode})
|
|
|
|
- if info_config:
|
|
|
|
- spider_important = info_config.get("spiderimportant","")
|
|
|
|
- info_site = collection_site.find_one({"site": site})
|
|
|
|
- if info_site:
|
|
|
|
- site_important = info_site.get("important","")
|
|
|
|
- params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, data,spider_modified_time,spider_important,site_important,create_time)
|
|
|
|
|
|
+
|
|
|
|
+ params = (item["_id"], toptype, subtype, site, spidercode,channel, comeintime, area, city, district, create_time)
|
|
insert_batch_data_mongo(coll_user, params)
|
|
insert_batch_data_mongo(coll_user, params)
|
|
|
|
|
|
|
|
|