123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- import grpc
- import pb_pb2 as pb
- import pb_pb2_grpc as pb_grpc
- import time
- import random
- import string
- from concurrent.futures import ThreadPoolExecutor
- import argparse
- import statistics
- import matplotlib.pyplot as plt
- from tqdm import tqdm
- class ServiceClient:
- def __init__(self, host='127.0.0.1', port=50051):
- self.channel = grpc.insecure_channel(
- f'{host}:{port}',
- options=[
- ('grpc.keepalive_time_ms', 10000), # 10秒心跳
- ('grpc.keepalive_timeout_ms', 5000), # 5秒超时
- ('grpc.keepalive_permit_without_calls', 1),
- ('grpc.http2.max_pings_without_data', 0),
- ('grpc.max_reconnect_backoff_ms', 5000)
- ]
- )
- self.stub = pb_grpc.ServiceStub(self.channel)
- def download(self, bucket_id, object_name):
- """Client-side streaming download"""
- request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name)
- responses = self.stub.Download(request)
- received_data = bytearray()
- for response in responses:
- if response.errorCode != 0:
- print(f"Error: {response.errorMsg}")
- return None
- received_data.extend(response.data)
- return bytes(received_data)
- def upload(self, file_data, bucket_id, object_name, gzip=False):
- """Server-side streaming upload"""
- def request_generator():
- # Split data into chunks (simulate streaming)
- chunk_size = 1024 * 1024 # 1MB chunks
- for i in range(0, len(file_data), chunk_size):
- yield pb.UploadRequest(
- stream=file_data[i:i + chunk_size],
- gzip=gzip,
- bucketID=bucket_id,
- objectName=object_name
- )
- response = self.stub.Upload(request_generator())
- if response.errorCode != 0:
- print(f"Upload error: {response.errorMsg}")
- return False
- return True
- def get_bid_detail(self, bucket_id, object_name):
- """Client-side streaming for bid details"""
- request = pb.DownloadRequest(bucketID=bucket_id, objectName=object_name)
- responses = self.stub.GetBidDetail(request)
- details = []
- for response in responses:
- if response.errorCode != 0:
- print(f"Error: {response.errorMsg}")
- break
- details.append(response.data.decode('utf-8'))
- return details
- def close(self):
- self.channel.close()
- class StressTester:
- def __init__(self, host, port, num_threads=10):
- self.host = host
- self.port = port
- self.num_threads = num_threads
- self.results = {
- 'upload': {'times': [], 'success': 0, 'fail': 0},
- 'download': {'times': [], 'success': 0, 'fail': 0},
- 'bid_detail': {'times': [], 'success': 0, 'fail': 0}
- }
- def _generate_random_data(self, size_kb):
- """Generate random data of specified size in KB"""
- return ''.join(random.choices(string.ascii_letters + string.digits, k=size_kb * 1024)).encode('utf-8')
- def _test_upload(self, client, size_kb):
- """Test upload operation"""
- start_time = time.time()
- try:
- data = self._generate_random_data(size_kb)
- bucket_id = f"stress_bucket_{random.randint(1, 1000)}"
- object_name = f"stress_object_{random.randint(1, 1000)}.txt"
- success = client.upload(data, bucket_id, object_name)
- elapsed = time.time() - start_time
- if success:
- self.results['upload']['success'] += 1
- self.results['upload']['times'].append(elapsed)
- else:
- self.results['upload']['fail'] += 1
- except Exception as e:
- self.results['upload']['fail'] += 1
- print(f"Upload error: {str(e)}")
- def _test_download(self, client, size_kb):
- """Test download operation"""
- start_time = time.time()
- try:
- # First upload a file to download
- data = self._generate_random_data(size_kb)
- bucket_id = "stress_download_bucket"
- object_name = "stress_download_object.txt"
- # Upload then immediately download
- if client.upload(data, bucket_id, object_name):
- downloaded_data = client.download(bucket_id, object_name)
- elapsed = time.time() - start_time
- if downloaded_data is not None and len(downloaded_data) == len(data):
- self.results['download']['success'] += 1
- self.results['download']['times'].append(elapsed)
- else:
- self.results['download']['fail'] += 1
- else:
- self.results['download']['fail'] += 1
- except Exception as e:
- self.results['download']['fail'] += 1
- print(f"Download error: {str(e)}")
- def _test_bid_detail(self, client):
- """Test get_bid_detail operation"""
- start_time = time.time()
- try:
- bucket_id = "detail"
- object_name = "67c123333309c0998b619793"
- details = client.get_bid_detail(bucket_id, object_name)
- elapsed = time.time() - start_time
- if details is not None:
- self.results['bid_detail']['success'] += 1
- self.results['bid_detail']['times'].append(elapsed)
- else:
- self.results['bid_detail']['fail'] += 1
- except Exception as e:
- self.results['bid_detail']['fail'] += 1
- print(f"Bid detail error: {str(e)}")
- def run_test(self, test_type, num_requests=100, size_kb=10):
- """Run stress test for specified operation"""
- print(f"Starting {test_type} stress test with {num_requests} requests...")
- with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
- futures = []
- for _ in tqdm(range(num_requests), desc=f"Running {test_type} tests"):
- client = ServiceClient(self.host, self.port)
- if test_type == 'upload':
- futures.append(executor.submit(self._test_upload, client, size_kb))
- elif test_type == 'download':
- futures.append(executor.submit(self._test_download, client, size_kb))
- elif test_type == 'bid_detail':
- futures.append(executor.submit(self._test_bid_detail, client))
- # Close client after test
- client.close()
- # Wait for all tests to complete
- for future in futures:
- future.result()
- self._report_results(test_type)
- def _report_results(self, test_type):
- """Generate and display test results"""
- results = self.results[test_type]
- total = results['success'] + results['fail']
- if results['success'] > 0:
- avg_time = statistics.mean(results['times'])
- min_time = min(results['times'])
- max_time = max(results['times'])
- std_dev = statistics.stdev(results['times']) if len(results['times']) > 1 else 0
- else:
- avg_time = min_time = max_time = std_dev = 0
- print("\n" + "=" * 50)
- print(f"Stress Test Results for {test_type}:")
- print(f"Total requests: {total}")
- print(f"Successful: {results['success']} ({results['success'] / total * 100:.2f}%)")
- print(f"Failed: {results['fail']} ({results['fail'] / total * 100:.2f}%)")
- print(f"Average time: {avg_time:.4f}s")
- print(f"Min time: {min_time:.4f}s")
- print(f"Max time: {max_time:.4f}s")
- print(f"Standard deviation: {std_dev:.4f}s")
- print("=" * 50 + "\n")
- # Plot histogram of response times
- if results['success'] > 0:
- plt.figure(figsize=(10, 6))
- plt.hist(results['times'], bins=20, edgecolor='black')
- plt.title(f'Response Time Distribution for {test_type}')
- plt.xlabel('Time (seconds)')
- plt.ylabel('Frequency')
- plt.grid(True)
- plt.show()
- def run_all_tests(self, num_requests=100, size_kb=10):
- """Run all available tests"""
- self.run_test('upload', num_requests, size_kb)
- self.run_test('download', num_requests, size_kb)
- self.run_test('bid_detail', num_requests)
- def main():
- parser = argparse.ArgumentParser(description='gRPC Client with Stress Testing')
- parser.add_argument('--host', default='localhost', help='Server host')
- parser.add_argument('--port', type=int, default=50051, help='Server port')
- parser.add_argument('--test', choices=['upload', 'download', 'bid_detail', 'all'],
- help='Type of test to run')
- parser.add_argument('--requests', type=int, default=100, help='Number of requests')
- parser.add_argument('--size', type=int, default=10, help='Data size in KB (for upload/download)')
- parser.add_argument('--threads', type=int, default=10, help='Number of concurrent threads')
- args = parser.parse_args()
- if args.test:
- tester = StressTester(args.host, args.port, args.threads)
- if args.test == 'all':
- tester.run_all_tests(args.requests, args.size)
- else:
- tester.run_test(args.test, args.requests, args.size)
- else:
- # Interactive mode
- client = ServiceClient(args.host, args.port)
- try:
- while True:
- print("\nOptions:")
- print("1. Upload file")
- print("2. Download file")
- print("3. Get bid details")
- print("4. Exit")
- choice = input("Enter your choice: ")
- if choice == '1':
- file_path = input("Enter file path: ")
- bucket_id = input("Enter bucket ID: ")
- object_name = input("Enter object name: ")
- gzip = input("Use gzip compression? (y/n): ").lower() == 'y'
- try:
- with open(file_path, 'rb') as f:
- data = f.read()
- start_time = time.time()
- success = client.upload(data, bucket_id, object_name, gzip)
- elapsed = time.time() - start_time
- if success:
- print(f"Upload successful in {elapsed:.2f} seconds")
- else:
- print("Upload failed")
- except Exception as e:
- print(f"Error: {str(e)}")
- elif choice == '2':
- bucket_id = input("Enter bucket ID: ")
- object_name = input("Enter object name: ")
- start_time = time.time()
- data = client.download(bucket_id, object_name)
- elapsed = time.time() - start_time
- if data is not None:
- print(f"Downloaded {len(data)} bytes in {elapsed:.2f} seconds")
- save = input("Save to file? (y/n): ").lower() == 'y'
- if save:
- file_path = input("Enter save path: ")
- with open(file_path, 'wb') as f:
- f.write(data)
- print("File saved successfully")
- else:
- print("Download failed")
- elif choice == '3':
- bucket_id = input("Enter bucket ID: ")
- object_name = input("Enter object name: ")
- start_time = time.time()
- details = client.get_bid_detail(bucket_id, object_name)
- elapsed = time.time() - start_time
- if details:
- print(f"Got {len(details)} bid details in {elapsed:.2f} seconds")
- for i, detail in enumerate(details[:5]): # Show first 5 details
- print(f"{i + 1}. {detail}")
- if len(details) > 5:
- print(f"... and {len(details) - 5} more")
- else:
- print("Failed to get bid details")
- elif choice == '4':
- break
- else:
- print("Invalid choice")
- finally:
- client.close()
- if __name__ == '__main__':
- main()
|