client.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import grpc
  2. import pb_pb2 as pb
  3. import pb_pb2_grpc as pb_grpc
  4. import time
  5. import random
  6. import string
  7. from concurrent.futures import ThreadPoolExecutor
  8. import argparse
  9. import statistics
  10. import matplotlib.pyplot as plt
  11. from tqdm import tqdm
  12. class ServiceClient:
  13. def __init__(self, host='127.0.0.1', port=50051):
  14. self.channel = grpc.insecure_channel(
  15. f'{host}:{port}',
  16. options=[
  17. ('grpc.keepalive_time_ms', 10000), # 10秒心跳
  18. ('grpc.keepalive_timeout_ms', 5000), # 5秒超时
  19. ('grpc.keepalive_permit_without_calls', 1),
  20. ('grpc.http2.max_pings_without_data', 0),
  21. ('grpc.max_reconnect_backoff_ms', 5000)
  22. ]
  23. )
  24. self.stub = pb_grpc.ServiceStub(self.channel)
  25. def download(self, bucket_id, object_name):
  26. """Client-side streaming download"""
  27. request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name)
  28. responses = self.stub.Download(request)
  29. received_data = bytearray()
  30. for response in responses:
  31. if response.errorCode != 0:
  32. print(f"Error: {response.errorMsg}")
  33. return None
  34. received_data.extend(response.data)
  35. return bytes(received_data)
  36. def upload(self, file_data, bucket_id, object_name, gzip=False):
  37. """Server-side streaming upload"""
  38. def request_generator():
  39. # Split data into chunks (simulate streaming)
  40. chunk_size = 1024 * 1024 # 1MB chunks
  41. for i in range(0, len(file_data), chunk_size):
  42. yield pb.UploadRequest(
  43. stream=file_data[i:i + chunk_size],
  44. gzip=gzip,
  45. bucketID=bucket_id,
  46. objectName=object_name
  47. )
  48. response = self.stub.Upload(request_generator())
  49. if response.errorCode != 0:
  50. print(f"Upload error: {response.errorMsg}")
  51. return False
  52. return True
  53. def get_bid_detail(self, bucket_id, object_name):
  54. """Client-side streaming for bid details"""
  55. request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name)
  56. responses = self.stub.GetBidDetail(request)
  57. details = []
  58. for response in responses:
  59. if response.errorCode != 0:
  60. print(f"Error: {response.errorMsg}")
  61. break
  62. details.append(response.data.decode('utf-8'))
  63. return details
  64. def close(self):
  65. self.channel.close()
  66. class StressTester:
  67. def __init__(self, host, port, num_threads=10):
  68. self.host = host
  69. self.port = port
  70. self.num_threads = num_threads
  71. self.results = {
  72. 'upload': {'times': [], 'success': 0, 'fail': 0},
  73. 'download': {'times': [], 'success': 0, 'fail': 0},
  74. 'bid_detail': {'times': [], 'success': 0, 'fail': 0}
  75. }
  76. def _generate_random_data(self, size_kb):
  77. """Generate random data of specified size in KB"""
  78. return ''.join(random.choices(string.ascii_letters + string.digits, k=size_kb * 1024)).encode('utf-8')
  79. def _test_upload(self, client, size_kb):
  80. """Test upload operation"""
  81. start_time = time.time()
  82. try:
  83. data = self._generate_random_data(size_kb)
  84. bucket_id = f"stress_bucket_{random.randint(1, 1000)}"
  85. object_name = f"stress_object_{random.randint(1, 1000)}.txt"
  86. success = client.upload(data, bucket_id, object_name)
  87. elapsed = time.time() - start_time
  88. if success:
  89. self.results['upload']['success'] += 1
  90. self.results['upload']['times'].append(elapsed)
  91. else:
  92. self.results['upload']['fail'] += 1
  93. except Exception as e:
  94. self.results['upload']['fail'] += 1
  95. print(f"Upload error: {str(e)}")
  96. def _test_download(self, client, size_kb):
  97. """Test download operation"""
  98. start_time = time.time()
  99. try:
  100. # First upload a file to download
  101. data = self._generate_random_data(size_kb)
  102. bucket_id = "stress_download_bucket"
  103. object_name = "stress_download_object.txt"
  104. # Upload then immediately download
  105. if client.upload(data, bucket_id, object_name):
  106. downloaded_data = client.download(bucket_id, object_name)
  107. elapsed = time.time() - start_time
  108. if downloaded_data is not None and len(downloaded_data) == len(data):
  109. self.results['download']['success'] += 1
  110. self.results['download']['times'].append(elapsed)
  111. else:
  112. self.results['download']['fail'] += 1
  113. else:
  114. self.results['download']['fail'] += 1
  115. except Exception as e:
  116. self.results['download']['fail'] += 1
  117. print(f"Download error: {str(e)}")
  118. def _test_bid_detail(self, client):
  119. """Test get_bid_detail operation"""
  120. start_time = time.time()
  121. try:
  122. bucket_id = "detail"
  123. object_name = "67c123333309c0998b619793"
  124. details = client.get_bid_detail(bucket_id, object_name)
  125. elapsed = time.time() - start_time
  126. if details is not None:
  127. self.results['bid_detail']['success'] += 1
  128. self.results['bid_detail']['times'].append(elapsed)
  129. else:
  130. self.results['bid_detail']['fail'] += 1
  131. except Exception as e:
  132. self.results['bid_detail']['fail'] += 1
  133. print(f"Bid detail error: {str(e)}")
  134. def run_test(self, test_type, num_requests=100, size_kb=10):
  135. """Run stress test for specified operation"""
  136. print(f"Starting {test_type} stress test with {num_requests} requests...")
  137. with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
  138. futures = []
  139. for _ in tqdm(range(num_requests), desc=f"Running {test_type} tests"):
  140. client = ServiceClient(self.host, self.port)
  141. if test_type == 'upload':
  142. futures.append(executor.submit(self._test_upload, client, size_kb))
  143. elif test_type == 'download':
  144. futures.append(executor.submit(self._test_download, client, size_kb))
  145. elif test_type == 'bid_detail':
  146. futures.append(executor.submit(self._test_bid_detail, client))
  147. # Close client after test
  148. client.close()
  149. # Wait for all tests to complete
  150. for future in futures:
  151. future.result()
  152. self._report_results(test_type)
  153. def _report_results(self, test_type):
  154. """Generate and display test results"""
  155. results = self.results[test_type]
  156. total = results['success'] + results['fail']
  157. if results['success'] > 0:
  158. avg_time = statistics.mean(results['times'])
  159. min_time = min(results['times'])
  160. max_time = max(results['times'])
  161. std_dev = statistics.stdev(results['times']) if len(results['times']) > 1 else 0
  162. else:
  163. avg_time = min_time = max_time = std_dev = 0
  164. print("\n" + "=" * 50)
  165. print(f"Stress Test Results for {test_type}:")
  166. print(f"Total requests: {total}")
  167. print(f"Successful: {results['success']} ({results['success'] / total * 100:.2f}%)")
  168. print(f"Failed: {results['fail']} ({results['fail'] / total * 100:.2f}%)")
  169. print(f"Average time: {avg_time:.4f}s")
  170. print(f"Min time: {min_time:.4f}s")
  171. print(f"Max time: {max_time:.4f}s")
  172. print(f"Standard deviation: {std_dev:.4f}s")
  173. print("=" * 50 + "\n")
  174. # Plot histogram of response times
  175. if results['success'] > 0:
  176. plt.figure(figsize=(10, 6))
  177. plt.hist(results['times'], bins=20, edgecolor='black')
  178. plt.title(f'Response Time Distribution for {test_type}')
  179. plt.xlabel('Time (seconds)')
  180. plt.ylabel('Frequency')
  181. plt.grid(True)
  182. plt.show()
  183. def run_all_tests(self, num_requests=100, size_kb=10):
  184. """Run all available tests"""
  185. self.run_test('upload', num_requests, size_kb)
  186. self.run_test('download', num_requests, size_kb)
  187. self.run_test('bid_detail', num_requests)
  188. def main():
  189. parser = argparse.ArgumentParser(description='gRPC Client with Stress Testing')
  190. parser.add_argument('--host', default='localhost', help='Server host')
  191. parser.add_argument('--port', type=int, default=50051, help='Server port')
  192. parser.add_argument('--test', choices=['upload', 'download', 'bid_detail', 'all'],
  193. help='Type of test to run')
  194. parser.add_argument('--requests', type=int, default=100, help='Number of requests')
  195. parser.add_argument('--size', type=int, default=10, help='Data size in KB (for upload/download)')
  196. parser.add_argument('--threads', type=int, default=10, help='Number of concurrent threads')
  197. args = parser.parse_args()
  198. if args.test:
  199. tester = StressTester(args.host, args.port, args.threads)
  200. if args.test == 'all':
  201. tester.run_all_tests(args.requests, args.size)
  202. else:
  203. tester.run_test(args.test, args.requests, args.size)
  204. else:
  205. # Interactive mode
  206. client = ServiceClient(args.host, args.port)
  207. try:
  208. while True:
  209. print("\nOptions:")
  210. print("1. Upload file")
  211. print("2. Download file")
  212. print("3. Get bid details")
  213. print("4. Exit")
  214. choice = input("Enter your choice: ")
  215. if choice == '1':
  216. file_path = input("Enter file path: ")
  217. bucket_id = input("Enter bucket ID: ")
  218. object_name = input("Enter object name: ")
  219. gzip = input("Use gzip compression? (y/n): ").lower() == 'y'
  220. try:
  221. with open(file_path, 'rb') as f:
  222. data = f.read()
  223. start_time = time.time()
  224. success = client.upload(data, bucket_id, object_name, gzip)
  225. elapsed = time.time() - start_time
  226. if success:
  227. print(f"Upload successful in {elapsed:.2f} seconds")
  228. else:
  229. print("Upload failed")
  230. except Exception as e:
  231. print(f"Error: {str(e)}")
  232. elif choice == '2':
  233. bucket_id = input("Enter bucket ID: ")
  234. object_name = input("Enter object name: ")
  235. start_time = time.time()
  236. data = client.download(bucket_id, object_name)
  237. elapsed = time.time() - start_time
  238. if data is not None:
  239. print(f"Downloaded {len(data)} bytes in {elapsed:.2f} seconds")
  240. save = input("Save to file? (y/n): ").lower() == 'y'
  241. if save:
  242. file_path = input("Enter save path: ")
  243. with open(file_path, 'wb') as f:
  244. f.write(data)
  245. print("File saved successfully")
  246. else:
  247. print("Download failed")
  248. elif choice == '3':
  249. bucket_id = input("Enter bucket ID: ")
  250. object_name = input("Enter object name: ")
  251. start_time = time.time()
  252. details = client.get_bid_detail(bucket_id, object_name)
  253. elapsed = time.time() - start_time
  254. if details:
  255. print(f"Got {len(details)} bid details in {elapsed:.2f} seconds")
  256. for i, detail in enumerate(details[:5]): # Show first 5 details
  257. print(f"{i + 1}. {detail}")
  258. if len(details) > 5:
  259. print(f"... and {len(details) - 5} more")
  260. else:
  261. print("Failed to get bid details")
  262. elif choice == '4':
  263. break
  264. else:
  265. print("Invalid choice")
  266. finally:
  267. client.close()
  268. if __name__ == '__main__':
  269. main()