import grpc import pb_pb2 as pb import pb_pb2_grpc as pb_grpc import time import random import string from concurrent.futures import ThreadPoolExecutor import argparse import statistics import matplotlib.pyplot as plt from tqdm import tqdm class ServiceClient: def __init__(self, host='127.0.0.1', port=50051): self.channel = grpc.insecure_channel( f'{host}:{port}', options=[ ('grpc.keepalive_time_ms', 10000), # 10秒心跳 ('grpc.keepalive_timeout_ms', 5000), # 5秒超时 ('grpc.keepalive_permit_without_calls', 1), ('grpc.http2.max_pings_without_data', 0), ('grpc.max_reconnect_backoff_ms', 5000) ] ) self.stub = pb_grpc.ServiceStub(self.channel) def download(self, bucket_id, object_name): """Client-side streaming download""" request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name) responses = self.stub.Download(request) received_data = bytearray() for response in responses: if response.errorCode != 0: print(f"Error: {response.errorMsg}") return None received_data.extend(response.data) return bytes(received_data) def upload(self, file_data, bucket_id, object_name, gzip=False): """Server-side streaming upload""" def request_generator(): # Split data into chunks (simulate streaming) chunk_size = 1024 * 1024 # 1MB chunks for i in range(0, len(file_data), chunk_size): yield pb.UploadRequest( stream=file_data[i:i + chunk_size], gzip=gzip, bucketID=bucket_id, objectName=object_name ) response = self.stub.Upload(request_generator()) if response.errorCode != 0: print(f"Upload error: {response.errorMsg}") return False return True def get_bid_detail(self, bucket_id, object_name): """Client-side streaming for bid details""" request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name) responses = self.stub.GetBidDetail(request) details = [] for response in responses: if response.errorCode != 0: print(f"Error: {response.errorMsg}") break details.append(response.data.decode('utf-8')) return details def close(self): self.channel.close() class StressTester: def __init__(self, host, port, num_threads=10): self.host = host self.port = port self.num_threads = num_threads self.results = { 'upload': {'times': [], 'success': 0, 'fail': 0}, 'download': {'times': [], 'success': 0, 'fail': 0}, 'bid_detail': {'times': [], 'success': 0, 'fail': 0} } def _generate_random_data(self, size_kb): """Generate random data of specified size in KB""" return ''.join(random.choices(string.ascii_letters + string.digits, k=size_kb * 1024)).encode('utf-8') def _test_upload(self, client, size_kb): """Test upload operation""" start_time = time.time() try: data = self._generate_random_data(size_kb) bucket_id = f"stress_bucket_{random.randint(1, 1000)}" object_name = f"stress_object_{random.randint(1, 1000)}.txt" success = client.upload(data, bucket_id, object_name) elapsed = time.time() - start_time if success: self.results['upload']['success'] += 1 self.results['upload']['times'].append(elapsed) else: self.results['upload']['fail'] += 1 except Exception as e: self.results['upload']['fail'] += 1 print(f"Upload error: {str(e)}") def _test_download(self, client, size_kb): """Test download operation""" start_time = time.time() try: # First upload a file to download data = self._generate_random_data(size_kb) bucket_id = "stress_download_bucket" object_name = "stress_download_object.txt" # Upload then immediately download if client.upload(data, bucket_id, object_name): downloaded_data = client.download(bucket_id, object_name) elapsed = time.time() - start_time if downloaded_data is not None and len(downloaded_data) == len(data): self.results['download']['success'] += 1 self.results['download']['times'].append(elapsed) else: self.results['download']['fail'] += 1 else: self.results['download']['fail'] += 1 except Exception as e: self.results['download']['fail'] += 1 print(f"Download error: {str(e)}") def _test_bid_detail(self, client): """Test get_bid_detail operation""" start_time = time.time() try: bucket_id = "detail" object_name = "67c123333309c0998b619793" details = client.get_bid_detail(bucket_id, object_name) elapsed = time.time() - start_time if details is not None: self.results['bid_detail']['success'] += 1 self.results['bid_detail']['times'].append(elapsed) else: self.results['bid_detail']['fail'] += 1 except Exception as e: self.results['bid_detail']['fail'] += 1 print(f"Bid detail error: {str(e)}") def run_test(self, test_type, num_requests=100, size_kb=10): """Run stress test for specified operation""" print(f"Starting {test_type} stress test with {num_requests} requests...") with ThreadPoolExecutor(max_workers=self.num_threads) as executor: futures = [] for _ in tqdm(range(num_requests), desc=f"Running {test_type} tests"): client = ServiceClient(self.host, self.port) if test_type == 'upload': futures.append(executor.submit(self._test_upload, client, size_kb)) elif test_type == 'download': futures.append(executor.submit(self._test_download, client, size_kb)) elif test_type == 'bid_detail': futures.append(executor.submit(self._test_bid_detail, client)) # Close client after test client.close() # Wait for all tests to complete for future in futures: future.result() self._report_results(test_type) def _report_results(self, test_type): """Generate and display test results""" results = self.results[test_type] total = results['success'] + results['fail'] if results['success'] > 0: avg_time = statistics.mean(results['times']) min_time = min(results['times']) max_time = max(results['times']) std_dev = statistics.stdev(results['times']) if len(results['times']) > 1 else 0 else: avg_time = min_time = max_time = std_dev = 0 print("\n" + "=" * 50) print(f"Stress Test Results for {test_type}:") print(f"Total requests: {total}") print(f"Successful: {results['success']} ({results['success'] / total * 100:.2f}%)") print(f"Failed: {results['fail']} ({results['fail'] / total * 100:.2f}%)") print(f"Average time: {avg_time:.4f}s") print(f"Min time: {min_time:.4f}s") print(f"Max time: {max_time:.4f}s") print(f"Standard deviation: {std_dev:.4f}s") print("=" * 50 + "\n") # Plot histogram of response times if results['success'] > 0: plt.figure(figsize=(10, 6)) plt.hist(results['times'], bins=20, edgecolor='black') plt.title(f'Response Time Distribution for {test_type}') plt.xlabel('Time (seconds)') plt.ylabel('Frequency') plt.grid(True) plt.show() def run_all_tests(self, num_requests=100, size_kb=10): """Run all available tests""" self.run_test('upload', num_requests, size_kb) self.run_test('download', num_requests, size_kb) self.run_test('bid_detail', num_requests) def main(): parser = argparse.ArgumentParser(description='gRPC Client with Stress Testing') parser.add_argument('--host', default='localhost', help='Server host') parser.add_argument('--port', type=int, default=50051, help='Server port') parser.add_argument('--test', choices=['upload', 'download', 'bid_detail', 'all'], help='Type of test to run') parser.add_argument('--requests', type=int, default=100, help='Number of requests') parser.add_argument('--size', type=int, default=10, help='Data size in KB (for upload/download)') parser.add_argument('--threads', type=int, default=10, help='Number of concurrent threads') args = parser.parse_args() if args.test: tester = StressTester(args.host, args.port, args.threads) if args.test == 'all': tester.run_all_tests(args.requests, args.size) else: tester.run_test(args.test, args.requests, args.size) else: # Interactive mode client = ServiceClient(args.host, args.port) try: while True: print("\nOptions:") print("1. Upload file") print("2. Download file") print("3. Get bid details") print("4. Exit") choice = input("Enter your choice: ") if choice == '1': file_path = input("Enter file path: ") bucket_id = input("Enter bucket ID: ") object_name = input("Enter object name: ") gzip = input("Use gzip compression? (y/n): ").lower() == 'y' try: with open(file_path, 'rb') as f: data = f.read() start_time = time.time() success = client.upload(data, bucket_id, object_name, gzip) elapsed = time.time() - start_time if success: print(f"Upload successful in {elapsed:.2f} seconds") else: print("Upload failed") except Exception as e: print(f"Error: {str(e)}") elif choice == '2': bucket_id = input("Enter bucket ID: ") object_name = input("Enter object name: ") start_time = time.time() data = client.download(bucket_id, object_name) elapsed = time.time() - start_time if data is not None: print(f"Downloaded {len(data)} bytes in {elapsed:.2f} seconds") save = input("Save to file? (y/n): ").lower() == 'y' if save: file_path = input("Enter save path: ") with open(file_path, 'wb') as f: f.write(data) print("File saved successfully") else: print("Download failed") elif choice == '3': bucket_id = input("Enter bucket ID: ") object_name = input("Enter object name: ") start_time = time.time() details = client.get_bid_detail(bucket_id, object_name) elapsed = time.time() - start_time if details: print(f"Got {len(details)} bid details in {elapsed:.2f} seconds") for i, detail in enumerate(details[:5]): # Show first 5 details print(f"{i + 1}. {detail}") if len(details) > 5: print(f"... and {len(details) - 5} more") else: print("Failed to get bid details") elif choice == '4': break else: print("Invalid choice") finally: client.close() if __name__ == '__main__': main()