subsequent.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import threading
  2. import time
  3. import logging
  4. import pymysql
  5. # 配置日志记录
  6. logging.basicConfig(level=logging.DEBUG, filename='debug.log', filemode='w',
  7. format='%(asctime)s - %(levelname)s - %(message)s')
  8. # 数据库连接配置
  9. db_config = {
  10. 'host': '192.168.3.207',
  11. 'port': 14000,
  12. 'user': 'root',
  13. 'password': '=PDT49#80Z!RVv52_z',
  14. 'database': 'jianyu_subjectdb_test'
  15. }
  16. # SQL查询列表
  17. sql_queries = [
  18. # 请确保SQL查询字符串正确,以下为示例
  19. """select count(*) as totalRowCount from (select t0.`层次子字段(1)` as `层次子字段(1)(1)`,t0.职位ID as 职位ID,t0.销售人员 as 销售人员,t0.`入职日期\r\n` as `入职日期\r\n`,t0.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t0.离职日期 as 离职日期,t0.离职日期_年月日 as 离职日期_年月日,t0.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t0.层次子字段 as `层次子字段(2)`,t0.企业ID as 企业ID,t0.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.`更新时间(1)` as `更新时间(1)`,t0.`职位ID(1)` as `职位ID(1)`,t0.部门名称 as 部门名称,t0.`坐席号(1)` as `坐席号(1)`,t0.客成经理姓名 as 客成经理姓名,t0.合力线索归属 as 合力线索归属,t0.公司性质 as 公司性质,t0.联系人 as 联系人,t0.公司核验 as 公司核验,t0.坐席职位ID as 坐席职位ID,t0.`线索ID:UUID` as `线索ID:UUID`,t0.用户表UUID as 用户表UUID,t0.`公司性质,0:非集团公司 1:集团公司` as `公司性质,0:非集团公司 1:集团公司`,t0.`公司核验,0:不在工商库 1:在工商库` as `公司核验,0:不在工商库 1:在工商库`,t0.`线索名称:COMPANY_NAME` as `线索名称:COMPANY_NAME`,t0.`联系人类型:决策人,使用人` as `联系人类型:决策人,使用人`,t0.`线索跟进状态:无意向,已成交......` as `线索跟进状态:无意向,已成交......`,t0.坐席号 as 坐席号,t0.是否加车 as 是否加车,t0.任务状态 as 任务状态,t0.`是否移交;0:未移交 1:已移交` as `是否移交;0:未移交 1:已移交`,t0.最近跟进内容 as 最近跟进内容,t0.开始跟进时间 as 开始跟进时间,t0.最新跟进时间 as 最新跟进时间,t0.线索上限数量 as 线索上限数量,t0.备注 as 备注,t0.冻结时间 as 冻结时间,t0.UID更新说明 as UID更新说明,t0.线索名称 as 线索名称,t0.服务结束时间 as 服务结束时间,t0.同时跟进 as 同时跟进,t0.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t0.ID as ID,t0.是否离职 as 是否离职,t0.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t0.`是否分配:0:未分配,公海;1:已分配,私海` as `是否分配:0:未分配,公海;1:已分配,私海`,t0.客户需求 as 客户需求,t0.意向产品 as 意向产品,t0.客户预算 as 客户预算,CASE WHEN t0.坐席职位ID IS NOT NULL AND t0.坐席职位ID IS NOT NULL THEN 1 WHEN t0.坐席职位ID IS NOT NULL AND t0.坐席职位ID IS NULL AND NULL IS NOT NULL THEN 2 WHEN t0.坐席职位ID IS NULL THEN 3 ELSE 4 END as 排序字段,CASE WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=0 THEN '公海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=1 THEN '私海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-1 THEN '退出公海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-2 THEN '域外' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-3 THEN '冻结池' END as 线索池 from (select t3.`层次子字段(1)` as `层次子字段(1)`,t3.职位ID as 职位ID,t3.姓名 as 销售人员,t3.`入职日期\r\n` as `入职日期\r\n`,t3.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t3.离职日期 as 离职日期,t3.离职日期_年月日 as 离职日期_年月日,t3.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t3.层次子字段 as 层次子字段,t3.企业ID as 企业ID,t3.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t3.更新时间 as `更新时间(1)`,t3.`职位ID(1)` as `职位ID(1)`,t3.部门名称 as 部门名称,t3.坐席号 as `坐席号(1)`,t2.name as 客成经理姓名,null as 合力线索归属,CASE WHEN t0.COMPANY_NATURE=1 THEN '集团公司' WHEN t0.COMPANY_NATURE=0 THEN '非集团公司' ELSE NULL END as 公司性质,t0.name as 联系人,CASE WHEN t0.COMPANY_VERIFICATION=0 THEN '不在工商库' WHEN t0.COMPANY_VERIFICATION=1 THEN '在工商库' END as 公司核验,t0.position_id as 坐席职位ID,cast(t0.id as char) as `线索ID:UUID`,t0.uid as 用户表UUID,t0.COMPANY_NATURE as `公司性质,0:非集团公司 1:集团公司`,t0.COMPANY_VERIFICATION as `公司核验,0:不在工商库 1:在工商库`,t0.cluename as `线索名称:COMPANY_NAME`,t0.contact_type as `联系人类型:决策人,使用人`,t0.trailstatus as `线索跟进状态:无意向,已成交......`,t0.seatNumber as 坐席号,t0.IS_TASK as 是否加车,t0.TASKSTATUS as 任务状态,t0.IS_TRANSFER as `是否移交;0:未移交 1:已移交`,t0.CONTENT as 最近跟进内容,t0.START_TRAIL_TIME as 开始跟进时间,t0.TRAIL_TIME as 最新跟进时间,t3.线索上限数量 as 线索上限数量,t0.REMARK as 备注,t0.FREEZE_TIME as 冻结时间,t0.SHUOMING as UID更新说明,t0.cluename as 线索名称,t2.service_endtime as 服务结束时间,CASE WHEN t0.IS_TRANSFER=1 THEN CASE WHEN date_add(t2.service_endtime,interval -3 month)>=current_timestamp THEN '同时跟进' ELSE NULL END ELSE NULL END as 同时跟进,t3.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t3.ID as ID,t3.是否离职 as 是否离职,t3.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t0.is_assign as `是否分配:0:未分配,公海;1:已分配,私海`,t0.customer_demand as 客户需求,t0.intended_products as 意向产品,t0.customer_budget as 客户预算 from jianyu_subjectdb_test.dwd_f_crm_clue_info t0 left join jianyu_subjectdb_test.dwd_f_userbase_contacts t1 on (t0.uid=t1.baseinfo_id) and (1>2) left join jianyu_subjectdb_test.dwd_f_csm_customer_info t2 on (t0.id=t2.clue_id) left join (select t0.职位ID as 职位ID,t0.姓名 as 姓名,t0.`入职日期\r\n` as `入职日期\r\n`,t0.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t0.离职日期 as 离职日期,t0.离职日期_年月日 as 离职日期_年月日,t0.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t0.层次子字段 as 层次子字段,t0.企业ID as 企业ID,t0.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.更新时间 as 更新时间,t0.线索上限数量 as 线索上限数量,t0.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t0.ID as ID,t0.坐席号 as 坐席号,t0.是否离职 as 是否离职,t0.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t1.position_id as `职位ID(1)`,t1.bi_code as `层次子字段(1)`,t1.DEPT_NAME as 部门名称 from (select t0.position_id as 职位ID,t0.name as 姓名,t0.reportduty_time as `入职日期\r\n`,t0.reportduty_date as `入职日期_年月日\r\n`,t0.resign_time as 离职日期,t0.resign_date as 离职日期_年月日,t0.ROLE_ID as `角色ID,关联维表获取对应权限`,concat('职位id:', ifnull(cast(t0.position_id as char), '')) as 层次子字段,t0.ENT_ID as 企业ID,t0.assign_type as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.UPDATE_TIME as 更新时间,t0.LIMIT_SIZE as 线索上限数量,t0.assign_level as `线索分配等级\n(多值:A类,B类,C类)`,t0.id as ID,t0.seat_number as 坐席号,t0.resign as 是否离职,t0.config as `是否配置(1:配置 0:未配置)`,row_number() over (partition by t0.position_id order by t0.reportduty_time desc) as RN from jianyu_subjectdb_test.dwd_f_crm_personnel_management t0) t0 left join jianyu_subjectdb_test.dwd_d_crm_department_level_succbi t1 on (t0.职位ID=t1.position_id) where (t0.RN=1)) t3 on (t0.position_id=t3.职位ID) where (t0.cluename='泗阳县张三大药房有限公司') and (t0.cluename IS NOT NULL) union all select null as `层次子字段(1)`,NULL as 职位ID,null as 销售人员,NULL as `入职日期\r\n`,NULL as `入职日期_年月日\r\n`,NULL as 离职日期,NULL as 离职日期_年月日,NULL as `角色id,关联维表获取对应权限`,null as 层次子字段,NULL as 企业ID,NULL as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,NULL as `更新时间(1)`,NULL as `职位ID(1)`,null as 部门名称,null as `坐席号(1)`,null as 客成经理姓名,concat('合作渠道:', ifnull(t1.name, '')) as 合力线索归属,NULL as 公司性质,t0.name as 联系人,NULL as 公司核验,NULL as 坐席职位ID,t0.phone as 用户登陆手机号,null as 用户表UUID,NULL as `公司性质,0:非集团公司 1:集团公司`,NULL as `公司核验,0:不在工商库 1:在工商库`,null as `线索名称:COMPANY_NAME`,null as `联系人类型:决策人,使用人`,null as `线索跟进状态:无意向,已成交......`,null as 坐席号,NULL as 是否加车,NULL as 任务状态,NULL as `是否移交;0:未移交 1:已移交`,NULL as 最近跟进内容,NULL as 开始跟进时间,NULL as 最新跟进时间,NULL as 线索上限数量,NULL as 备注,NULL as 冻结时间,null as uid更新说明,t0.company_name as 线索名称,NULL as 服务结束时间,NULL as 同时跟进,null as `线索分配等级\n(多值:A类,B类,C类)`,NULL as ID,NULL as 是否离职,NULL as `是否配置(1:配置 0:未配置)`,NULL as `是否分配:0:未分配,公海;1:已分配,私海`,NULL as 客户需求,NULL as 意向产品,NULL as 客户预算 from jianyu_subjectdb_test.dwd_f_userbase_baseinfo t0 left join jianyu_subjectdb_test.dwd_d_userbase_belongto_rulecode t1 on (t0.BELONG_TO=t1.code) left join jianyu_subjectdb_test.dwd_f_userbase_contacts t2 on (t0.phone=t2.phone) and (1>2) where (t0.company_name='泗阳县张三大药房有限公司') and (t0.STATUS NOT IN (0,2) AND t1.pcode='03')) t0) t0 where (t0.职位ID<>'参数-position_id:jyUserPositionId' AND t0.职位ID IS NOT NULL AND t0.`线索ID:UUID`<>'1381704')"""
  20. ]
  21. # 全局变量来存储每个线程的执行时间和总执行时间
  22. execution_times = []
  23. total_execution_times = []
  24. execution_times_lock = threading.Lock()
  25. def execute_sql_query(thread_id, sql_query):
  26. start_time = time.time()
  27. connection = None
  28. cursor = None
  29. try:
  30. connection = pymysql.connect(**db_config)
  31. cursor = connection.cursor()
  32. cursor.execute(sql_query)
  33. result = cursor.fetchall()
  34. end_time = time.time()
  35. execution_time = end_time - start_time
  36. with execution_times_lock:
  37. execution_times.append(execution_time)
  38. print(f"线程 {thread_id}: 执行时间: {execution_time:.4f} 秒")
  39. print(f"线程 {thread_id}: 检索到 {len(result)} 行")
  40. except Exception as e:
  41. logging.error(f"线程 {thread_id}: 错误: {e}")
  42. print(f"线程 {thread_id}: 错误: {e}")
  43. finally:
  44. if cursor:
  45. cursor.close()
  46. if connection:
  47. connection.close()
  48. def monitor_database():
  49. while True:
  50. try:
  51. with pymysql.connect(**db_config) as connection:
  52. with connection.cursor() as cursor:
  53. cursor.execute("SHOW STATUS LIKE 'Threads_connected';")
  54. connected_threads = cursor.fetchone()
  55. if connected_threads:
  56. print(f"当前连接数: {connected_threads[1]}")
  57. else:
  58. logging.warning("无法获取当前连接数,可能因为查询结果为空。")
  59. cursor.execute("SHOW STATUS LIKE 'Queries';")
  60. queries = cursor.fetchone()
  61. if queries:
  62. print(f"查询数: {queries[1]}")
  63. else:
  64. logging.warning("无法获取查询数,可能因为查询结果为空。")
  65. except Exception as e:
  66. logging.error(f"监控线程错误: {e}")
  67. print(f"监控线程错误: {e}")
  68. time.sleep(2) # 每隔2秒查询一次
  69. def create_and_start_threads(num_threads, sql_queries, duration):
  70. threads = []
  71. for i in range(num_threads):
  72. sql_query = sql_queries[i % len(sql_queries)]
  73. thread = threading.Thread(target=execute_sql_query, args=(i, sql_query))
  74. threads.append(thread)
  75. thread.start()
  76. for thread in threads:
  77. thread.join()
  78. # 创建并启动监控线程
  79. monitor_thread = threading.Thread(target=monitor_database, daemon=True)
  80. monitor_thread.start()
  81. # 阶梯式增加并发用户数
  82. stages = [(20, 300)] # (线程数, 持续时间) 持续时间单位为秒
  83. for num_threads, duration in stages:
  84. print(f"启动 {num_threads} 个线程,持续时间: {duration} 秒")
  85. create_and_start_threads(num_threads, sql_queries, duration)
  86. time.sleep(duration)
  87. # 计算并输出平均执行时间和最大执行时间
  88. if execution_times:
  89. average_execution_time = sum(execution_times) / len(execution_times)
  90. max_execution_time = max(execution_times)
  91. print(f"每个线程执行时间的平均值: {average_execution_time:.4f} 秒")
  92. print(f"线程最大执行时间: {max_execution_time:.4f} 秒")
  93. print("执行完毕.")