pressure.py 13 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. import time
  4. import logging
  5. import pymysql
  6. # 配置日志记录
  7. logging.basicConfig(level=logging.ERROR, filename='error.log', filemode='w',
  8. format='%(asctime)s - %(levelname)s - %(message)s')
  9. # 数据库连接配置
  10. db_config = {
  11. 'host': '192.168.3.207',
  12. 'port': 14000,
  13. 'user': 'root',
  14. 'password': '=PDT49#80Z!RVv52_z',
  15. 'database': 'jianyu_subjectdb'
  16. }
  17. # SQL查询列表
  18. sql_queries = ["""select count(*) as totalRowCount from (select t0.`层次子字段(1)` as `层次子字段(1)(1)`,t0.职位ID as 职位ID,t0.销售人员 as 销售人员,t0.`入职日期\r\n` as `入职日期\r\n`,t0.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t0.离职日期 as 离职日期,t0.离职日期_年月日 as 离职日期_年月日,t0.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t0.层次子字段 as `层次子字段(2)`,t0.企业ID as 企业ID,t0.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.`更新时间(1)` as `更新时间(1)`,t0.`职位ID(1)` as `职位ID(1)`,t0.部门名称 as 部门名称,t0.`坐席号(1)` as `坐席号(1)`,t0.客成经理姓名 as 客成经理姓名,t0.合力线索归属 as 合力线索归属,t0.公司性质 as 公司性质,t0.联系人 as 联系人,t0.公司核验 as 公司核验,t0.坐席职位ID as 坐席职位ID,t0.`线索ID:UUID` as `线索ID:UUID`,t0.用户表UUID as 用户表UUID,t0.`公司性质,0:非集团公司 1:集团公司` as `公司性质,0:非集团公司 1:集团公司`,t0.`公司核验,0:不在工商库 1:在工商库` as `公司核验,0:不在工商库 1:在工商库`,t0.`线索名称:COMPANY_NAME` as `线索名称:COMPANY_NAME`,t0.`联系人类型:决策人,使用人` as `联系人类型:决策人,使用人`,t0.`线索跟进状态:无意向,已成交......` as `线索跟进状态:无意向,已成交......`,t0.坐席号 as 坐席号,t0.是否加车 as 是否加车,t0.任务状态 as 任务状态,t0.`是否移交;0:未移交 1:已移交` as `是否移交;0:未移交 1:已移交`,t0.最近跟进内容 as 最近跟进内容,t0.开始跟进时间 as 开始跟进时间,t0.最新跟进时间 as 最新跟进时间,t0.线索上限数量 as 线索上限数量,t0.备注 as 备注,t0.冻结时间 as 冻结时间,t0.UID更新说明 as UID更新说明,t0.线索名称 as 线索名称,t0.服务结束时间 as 服务结束时间,t0.同时跟进 as 同时跟进,t0.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t0.ID as ID,t0.是否离职 as 是否离职,t0.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t0.`是否分配:0:未分配,公海;1:已分配,私海` as `是否分配:0:未分配,公海;1:已分配,私海`,t0.客户需求 as 客户需求,t0.意向产品 as 意向产品,t0.客户预算 as 客户预算,CASE WHEN t0.坐席职位ID IS NOT NULL AND t0.坐席职位ID IS NOT NULL THEN 1 WHEN t0.坐席职位ID IS NOT NULL AND t0.坐席职位ID IS NULL AND NULL IS NOT NULL THEN 2 WHEN t0.坐席职位ID IS NULL THEN 3 ELSE 4 END as 排序字段,CASE WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=0 THEN '公海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=1 THEN '私海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-1 THEN '退出公海' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-2 THEN '域外' WHEN t0.`是否分配:0:未分配,公海;1:已分配,私海`=-3 THEN '冻结池' END as 线索池 from (select t3.`层次子字段(1)` as `层次子字段(1)`,t3.职位ID as 职位ID,t3.姓名 as 销售人员,t3.`入职日期\r\n` as `入职日期\r\n`,t3.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t3.离职日期 as 离职日期,t3.离职日期_年月日 as 离职日期_年月日,t3.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t3.层次子字段 as 层次子字段,t3.企业ID as 企业ID,t3.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t3.更新时间 as `更新时间(1)`,t3.`职位ID(1)` as `职位ID(1)`,t3.部门名称 as 部门名称,t3.坐席号 as `坐席号(1)`,t2.name as 客成经理姓名,null as 合力线索归属,CASE WHEN t0.COMPANY_NATURE=1 THEN '集团公司' WHEN t0.COMPANY_NATURE=0 THEN '非集团公司' ELSE NULL END as 公司性质,t0.name as 联系人,CASE WHEN t0.COMPANY_VERIFICATION=0 THEN '不在工商库' WHEN t0.COMPANY_VERIFICATION=1 THEN '在工商库' END as 公司核验,t0.position_id as 坐席职位ID,cast(t0.id as char) as `线索ID:UUID`,t0.uid as 用户表UUID,t0.COMPANY_NATURE as `公司性质,0:非集团公司 1:集团公司`,t0.COMPANY_VERIFICATION as `公司核验,0:不在工商库 1:在工商库`,t0.cluename as `线索名称:COMPANY_NAME`,t0.contact_type as `联系人类型:决策人,使用人`,t0.trailstatus as `线索跟进状态:无意向,已成交......`,t0.seatNumber as 坐席号,t0.IS_TASK as 是否加车,t0.TASKSTATUS as 任务状态,t0.IS_TRANSFER as `是否移交;0:未移交 1:已移交`,t0.CONTENT as 最近跟进内容,t0.START_TRAIL_TIME as 开始跟进时间,t0.TRAIL_TIME as 最新跟进时间,t3.线索上限数量 as 线索上限数量,t0.REMARK as 备注,t0.FREEZE_TIME as 冻结时间,t0.SHUOMING as UID更新说明,t0.cluename as 线索名称,t2.service_endtime as 服务结束时间,CASE WHEN t0.IS_TRANSFER=1 THEN CASE WHEN date_add(t2.service_endtime,interval -3 month)>=current_timestamp THEN '同时跟进' ELSE NULL END ELSE NULL END as 同时跟进,t3.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t3.ID as ID,t3.是否离职 as 是否离职,t3.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t0.is_assign as `是否分配:0:未分配,公海;1:已分配,私海`,t0.customer_demand as 客户需求,t0.intended_products as 意向产品,t0.customer_budget as 客户预算 from jianyu_subjectdb.dwd_f_crm_clue_info t0 left join jianyu_subjectdb.dwd_f_userbase_contacts t1 on (t0.uid=t1.baseinfo_id) and (1>2) left join jianyu_subjectdb.dwd_f_csm_customer_info t2 on (t0.id=t2.clue_id) left join (select t0.职位ID as 职位ID,t0.姓名 as 姓名,t0.`入职日期\r\n` as `入职日期\r\n`,t0.`入职日期_年月日\r\n` as `入职日期_年月日\r\n`,t0.离职日期 as 离职日期,t0.离职日期_年月日 as 离职日期_年月日,t0.`角色ID,关联维表获取对应权限` as `角色ID,关联维表获取对应权限`,t0.层次子字段 as 层次子字段,t0.企业ID as 企业ID,t0.`线索分配\n(1:参与 0:不参与,默认0)\r\n` as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.更新时间 as 更新时间,t0.线索上限数量 as 线索上限数量,t0.`线索分配等级\n(多值:A类,B类,C类)` as `线索分配等级\n(多值:A类,B类,C类)`,t0.ID as ID,t0.坐席号 as 坐席号,t0.是否离职 as 是否离职,t0.`是否配置(1:配置 0:未配置)` as `是否配置(1:配置 0:未配置)`,t1.position_id as `职位ID(1)`,t1.bi_code as `层次子字段(1)`,t1.DEPT_NAME as 部门名称 from (select t0.position_id as 职位ID,t0.name as 姓名,t0.reportduty_time as `入职日期\r\n`,t0.reportduty_date as `入职日期_年月日\r\n`,t0.resign_time as 离职日期,t0.resign_date as 离职日期_年月日,t0.ROLE_ID as `角色ID,关联维表获取对应权限`,concat('职位id:', ifnull(cast(t0.position_id as char), '')) as 层次子字段,t0.ENT_ID as 企业ID,t0.assign_type as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,t0.UPDATE_TIME as 更新时间,t0.LIMIT_SIZE as 线索上限数量,t0.assign_level as `线索分配等级\n(多值:A类,B类,C类)`,t0.id as ID,t0.seat_number as 坐席号,t0.resign as 是否离职,t0.config as `是否配置(1:配置 0:未配置)`,row_number() over (partition by t0.position_id order by t0.reportduty_time desc) as RN from jianyu_subjectdb.dwd_f_crm_personnel_management t0) t0 left join jianyu_subjectdb.dwd_d_crm_department_level_succbi t1 on (t0.职位ID=t1.position_id) where (t0.RN=1)) t3 on (t0.position_id=t3.职位ID) where (t0.cluename='泗阳县张三大药房有限公司') and (t0.cluename IS NOT NULL) union all select null as `层次子字段(1)`,NULL as 职位ID,null as 销售人员,NULL as `入职日期\r\n`,NULL as `入职日期_年月日\r\n`,NULL as 离职日期,NULL as 离职日期_年月日,NULL as `角色id,关联维表获取对应权限`,null as 层次子字段,NULL as 企业ID,NULL as `线索分配\n(1:参与 0:不参与,默认0)\r\n`,NULL as `更新时间(1)`,NULL as `职位ID(1)`,null as 部门名称,null as `坐席号(1)`,null as 客成经理姓名,concat('合作渠道:', ifnull(t1.name, '')) as 合力线索归属,NULL as 公司性质,t0.name as 联系人,NULL as 公司核验,NULL as 坐席职位ID,t0.phone as 用户登陆手机号,null as 用户表UUID,NULL as `公司性质,0:非集团公司 1:集团公司`,NULL as `公司核验,0:不在工商库 1:在工商库`,null as `线索名称:COMPANY_NAME`,null as `联系人类型:决策人,使用人`,null as `线索跟进状态:无意向,已成交......`,null as 坐席号,NULL as 是否加车,NULL as 任务状态,NULL as `是否移交;0:未移交 1:已移交`,NULL as 最近跟进内容,NULL as 开始跟进时间,NULL as 最新跟进时间,NULL as 线索上限数量,NULL as 备注,NULL as 冻结时间,null as uid更新说明,t0.company_name as 线索名称,NULL as 服务结束时间,NULL as 同时跟进,null as `线索分配等级\n(多值:A类,B类,C类)`,NULL as ID,NULL as 是否离职,NULL as `是否配置(1:配置 0:未配置)`,NULL as `是否分配:0:未分配,公海;1:已分配,私海`,NULL as 客户需求,NULL as 意向产品,NULL as 客户预算 from jianyu_subjectdb.dwd_f_userbase_baseinfo t0 left join jianyu_subjectdb.dwd_d_userbase_belongto_rulecode t1 on (t0.BELONG_TO=t1.code) left join jianyu_subjectdb.dwd_f_userbase_contacts t2 on (t0.phone=t2.phone) and (1>2) where (t0.company_name='泗阳县张三大药房有限公司') and (t0.STATUS NOT IN (0,2) AND t1.pcode='03')) t0) t0 where (t0.职位ID<>'参数-position_id:jyUserPositionId' AND t0.职位ID IS NOT NULL AND t0.`线索ID:UUID`<>'1381704')"""]
  19. def execute_sql_queries(thread_id, sql_queries):
  20. start_time = time.time()
  21. connection = None
  22. cursor = None
  23. try:
  24. # 连接数据库
  25. connection = pymysql.connect(**db_config)
  26. cursor = connection.cursor()
  27. # 执行多个SQL查询
  28. for sql_query in sql_queries:
  29. cursor.execute(sql_query)
  30. result = cursor.fetchall()
  31. print(f"线程 {thread_id}: 检索到 {len(result)} 行")
  32. # 记录执行时间
  33. end_time = time.time()
  34. execution_time = end_time - start_time
  35. print(f"线程 {thread_id}: 总执行时间: {execution_time:.4f} 秒")
  36. except Exception as e:
  37. logging.error(f"线程 {thread_id}: 错误: {e}")
  38. print(f"线程 {thread_id}: 错误: {e}")
  39. finally:
  40. if cursor:
  41. cursor.close()
  42. if connection:
  43. connection.close()
  44. def monitor_database():
  45. while True:
  46. try:
  47. connection = pymysql.connect(**db_config)
  48. cursor = connection.cursor()
  49. cursor.execute("SHOW STATUS LIKE 'Threads_connected';")
  50. connected_threads = cursor.fetchone()
  51. if connected_threads:
  52. print(f"当前连接数: {connected_threads[1]}")
  53. else:
  54. print("当前没有连接线程。")
  55. cursor.execute("SHOW STATUS LIKE 'Queries';")
  56. queries = cursor.fetchone()
  57. if queries:
  58. print(f"查询数: {queries[1]}")
  59. else:
  60. print("查询次数未获取。")
  61. cursor.close()
  62. connection.close()
  63. except Exception as e:
  64. logging.error(f"监控线程错误: {e}")
  65. print(f"监控线程错误: {e}")
  66. finally:
  67. time.sleep(1) # 每隔1分钟查询一次
  68. def create_and_start_threads(num_threads, sql_queries):
  69. threads = []
  70. for i in range(num_threads):
  71. thread = threading.Thread(target=execute_sql_queries, args=(i, sql_queries))
  72. threads.append(thread)
  73. thread.start()
  74. for thread in threads:
  75. thread.join()
  76. # 创建并启动监控线程
  77. monitor_thread = threading.Thread(target=monitor_database)
  78. monitor_thread.start()
  79. # 启动线程并发执行
  80. num_threads = 2
  81. print(f"启动 {num_threads} 个线程并发执行SQL查询")
  82. create_and_start_threads(num_threads, sql_queries)
  83. print("执行完毕.")