1234567891011121314151617181920212223242526272829303132333435 |
- # -*- coding: utf-8 -*-
- """
- Created on 2025-04-03
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import socket
- from common.log import logger
- def dial_timeout(addr, port, timeout=1):
- state = False
- # 创建一个TCP/IP套接字
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # 设置连接超时时间
- s.settimeout(timeout)
- try:
- # 尝试连接到指定的地址和端口
- s.connect((addr, port))
- logger.info("成功连接到 {}:{}".format(addr, port))
- state = True
- except socket.timeout:
- # 如果连接超时,打印错误信息
- logger.error("连接超时 {}:{}".format(addr, port))
- except socket.error as e:
- # 其他socket错误处理
- logger.error("连接错误: {}".format(e))
- finally:
- # 关闭套接字
- s.close()
- return state
|