流式传输
流式传输
gRPC 支持四种类型的服务方法:一元调用、服务器流式传输、客户端流式传输和双向流式传输。流式传输可实现高效的实时通信和大数据传输。
服务器流式传输
服务器流式传输允许服务器对单个客户端请求发送多个响应:
streaming_service.proto
1 syntax = "proto3"; 2 3 package streaming.v1; 4 5 service StreamingService { 6 // 服务器流式传输:向客户端发送多个事件 7 rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent); 8 9 // 服务器流式传输:分块下载大文件 10 rpc DownloadFile(DownloadFileRequest) returns (stream FileChunk); 11 12 // 服务器流式传输:实时通知 13 rpc SubscribeToNotifications(SubscribeRequest) returns (stream Notification); 14 } 15 16 message StreamUserEventsRequest { 17 string user_id = 1; 18 repeated UserEventType event_types = 2; 19 google.protobuf.Timestamp start_time = 3; 20 } 21 22 message UserEvent { 23 string id = 1; 24 string user_id = 2; 25 UserEventType type = 3; 26 google.protobuf.Timestamp timestamp = 4; 27 google.protobuf.Any data = 5; 28 } 29 30 enum UserEventType { 31 USER_EVENT_TYPE_UNSPECIFIED = 0; 32 USER_EVENT_TYPE_LOGIN = 1; 33 USER_EVENT_TYPE_LOGOUT = 2; 34 USER_EVENT_TYPE_PROFILE_UPDATE = 3; 35 USER_EVENT_TYPE_PASSWORD_CHANGE = 4; 36 }
服务器流式传输实现:
server_streaming.py
1 import grpc 2 import time 3 import asyncio 4 from grpc import ServicerContext 5 from typing import Iterator 6 7 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 8 9 def StreamUserEvents( 10 self, 11 request: streaming_pb2.StreamUserEventsRequest, 12 context: ServicerContext 13 ) -> Iterator[streaming_pb2.UserEvent]: 14 """实时流式传输用户事件。""" 15 16 # 验证请求 17 if not request.user_id: 18 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 19 context.set_details('User ID is required') 20 return 21 22 # 订阅事件流 23 event_subscriber = self.event_store.subscribe( 24 user_id=request.user_id, 25 event_types=request.event_types, 26 start_time=request.start_time 27 ) 28 29 try: 30 while context.is_active(): 31 # 等待下一个事件(带超时) 32 try: 33 event = event_subscriber.get_next_event(timeout=30) 34 if event: 35 yield event 36 else: 37 # 发送心跳以保持连接活跃 38 continue 39 40 except TimeoutError: 41 # 检查客户端是否仍然连接 42 if not context.is_active(): 43 break 44 45 except Exception as e: 46 context.set_code(grpc.StatusCode.INTERNAL) 47 context.set_details(f'Error streaming events: {str(e)}') 48 break 49 50 finally: 51 # 清理订阅 52 event_subscriber.close() 53 54 def DownloadFile( 55 self, 56 request: streaming_pb2.DownloadFileRequest, 57 context: ServicerContext 58 ) -> Iterator[streaming_pb2.FileChunk]: 59 """分块下载文件。""" 60 61 try: 62 file_path = self.file_store.get_file_path(request.file_id) 63 if not file_path or not os.path.exists(file_path): 64 context.set_code(grpc.StatusCode.NOT_FOUND) 65 context.set_details(f'File {request.file_id} not found') 66 return 67 68 chunk_size = 64 * 1024 # 64KB 块大小 69 70 with open(file_path, 'rb') as f: 71 while True: 72 chunk_data = f.read(chunk_size) 73 if not chunk_data: 74 break 75 76 # 检查客户端是否断开连接 77 if not context.is_active(): 78 break 79 80 yield streaming_pb2.FileChunk( 81 data=chunk_data, 82 offset=f.tell() - len(chunk_data), 83 size=len(chunk_data) 84 ) 85 86 except Exception as e: 87 context.set_code(grpc.StatusCode.INTERNAL) 88 context.set_details(f'Error downloading file: {str(e)}')
客户端流式传输
客户端流式传输允许客户端发送多个请求并接收单个响应:
client_streaming.proto
1 service StreamingService { 2 // 客户端流式传输:分块上传大文件 3 rpc UploadFile(stream FileChunk) returns (UploadFileResponse); 4 5 // 客户端流式传输:批量数据处理 6 rpc ProcessDataBatch(stream DataRecord) returns (BatchProcessingResult); 7 8 // 客户端流式传输:实时指标收集 9 rpc CollectMetrics(stream MetricData) returns (MetricsCollectionResult); 10 } 11 12 message FileChunk { 13 oneof data { 14 FileMetadata metadata = 1; 15 bytes chunk = 2; 16 } 17 } 18 19 message FileMetadata { 20 string filename = 1; 21 int64 total_size = 2; 22 string content_type = 3; 23 string checksum = 4; 24 } 25 26 message UploadFileResponse { 27 string file_id = 1; 28 int64 bytes_uploaded = 2; 29 string download_url = 3; 30 bool checksum_verified = 4; 31 }
客户端流式传输实现:
client_streaming.py
1 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 2 3 def UploadFile( 4 self, 5 request_iterator: Iterator[streaming_pb2.FileChunk], 6 context: ServicerContext 7 ) -> streaming_pb2.UploadFileResponse: 8 """从客户端流上传文件。""" 9 10 file_metadata = None 11 total_bytes = 0 12 file_path = None 13 hasher = hashlib.sha256() 14 15 try: 16 for chunk in request_iterator: 17 if chunk.HasField('metadata'): 18 # 第一个块包含元数据 19 file_metadata = chunk.metadata 20 21 # 创建临时文件 22 file_id = str(uuid.uuid4()) 23 file_path = f'/tmp/uploads/{file_id}' 24 os.makedirs(os.path.dirname(file_path), exist_ok=True) 25 26 elif chunk.HasField('chunk'): 27 # 后续块包含文件数据 28 if not file_metadata: 29 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 30 context.set_details('File metadata must be sent first') 31 return streaming_pb2.UploadFileResponse() 32 33 # 将块写入文件 34 with open(file_path, 'ab') as f: 35 f.write(chunk.chunk) 36 37 total_bytes += len(chunk.chunk) 38 hasher.update(chunk.chunk) 39 40 # 检查大小限制 41 if total_bytes > file_metadata.total_size: 42 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 43 context.set_details('File size exceeds declared size') 44 return streaming_pb2.UploadFileResponse() 45 46 # 验证校验和 47 computed_checksum = hasher.hexdigest() 48 checksum_verified = computed_checksum == file_metadata.checksum 49 50 if not checksum_verified: 51 context.set_code(grpc.StatusCode.DATA_LOSS) 52 context.set_details('File checksum verification failed') 53 return streaming_pb2.UploadFileResponse() 54 55 # 将文件移动到永久存储 56 permanent_path = self.file_store.store_file(file_id, file_path) 57 download_url = self.file_store.get_download_url(file_id) 58 59 return streaming_pb2.UploadFileResponse( 60 file_id=file_id, 61 bytes_uploaded=total_bytes, 62 download_url=download_url, 63 checksum_verified=True 64 ) 65 66 except Exception as e: 67 context.set_code(grpc.StatusCode.INTERNAL) 68 context.set_details(f'Error uploading file: {str(e)}') 69 return streaming_pb2.UploadFileResponse() 70 71 finally: 72 # 清理临时文件 73 if file_path and os.path.exists(file_path): 74 os.remove(file_path)
双向流式传输
双向流式传输允许客户端和服务器都发送多个消息:
bidirectional_streaming.proto
1 service StreamingService { 2 // 双向流式传输:实时聊天 3 rpc Chat(stream ChatMessage) returns (stream ChatMessage); 4 5 // 双向流式传输:实时协作 6 rpc Collaborate(stream CollaborationEvent) returns (stream CollaborationEvent); 7 8 // 双向流式传输:实时数据处理 9 rpc ProcessLiveData(stream DataInput) returns (stream ProcessingResult); 10 } 11 12 message ChatMessage { 13 string id = 1; 14 string user_id = 2; 15 string room_id = 3; 16 string content = 4; 17 google.protobuf.Timestamp timestamp = 5; 18 ChatMessageType type = 6; 19 } 20 21 enum ChatMessageType { 22 CHAT_MESSAGE_TYPE_UNSPECIFIED = 0; 23 CHAT_MESSAGE_TYPE_TEXT = 1; 24 CHAT_MESSAGE_TYPE_IMAGE = 2; 25 CHAT_MESSAGE_TYPE_FILE = 3; 26 CHAT_MESSAGE_TYPE_SYSTEM = 4; 27 CHAT_MESSAGE_TYPE_TYPING = 5; 28 }
双向流式传输实现:
bidirectional_streaming.py
1 import asyncio 2 import queue 3 import threading 4 5 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 6 7 def Chat( 8 self, 9 request_iterator: Iterator[streaming_pb2.ChatMessage], 10 context: ServicerContext 11 ) -> Iterator[streaming_pb2.ChatMessage]: 12 """双向聊天流式传输。""" 13 14 # 出站消息队列 15 outgoing_queue = queue.Queue() 16 17 # 跟踪用户会话 18 user_session = None 19 20 def handle_incoming_messages(): 21 """处理来自客户端的入站消息。""" 22 nonlocal user_session 23 24 try: 25 for message in request_iterator: 26 if not user_session: 27 # 第一条消息建立会话 28 user_session = self.chat_service.join_room( 29 user_id=message.user_id, 30 room_id=message.room_id 31 ) 32 33 # 发送欢迎消息 34 welcome_msg = streaming_pb2.ChatMessage( 35 id=str(uuid.uuid4()), 36 user_id="system", 37 room_id=message.room_id, 38 content=f"User {message.user_id} joined the chat", 39 timestamp=google.protobuf.timestamp_pb2.Timestamp(), 40 type=streaming_pb2.CHAT_MESSAGE_TYPE_SYSTEM 41 ) 42 outgoing_queue.put(welcome_msg) 43 44 # 处理消息 45 if message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TEXT: 46 # 广播给房间内其他用户 47 self.chat_service.broadcast_message(message) 48 49 elif message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TYPING: 50 # 向其他用户发送打字指示器 51 self.chat_service.broadcast_typing(message) 52 53 except Exception as e: 54 print(f"Error handling incoming messages: {e}") 55 finally: 56 # 清理会话 57 if user_session: 58 self.chat_service.leave_room(user_session) 59 60 # 启动处理入站消息的后台线程 61 incoming_thread = threading.Thread(target=handle_incoming_messages) 62 incoming_thread.daemon = True 63 incoming_thread.start() 64 65 # 订阅房间消息 66 message_subscriber = None 67 if user_session: 68 message_subscriber = self.chat_service.subscribe_to_room( 69 user_session.room_id, 70 exclude_user=user_session.user_id 71 ) 72 73 try: 74 while context.is_active(): 75 # 检查队列中的出站消息 76 try: 77 message = outgoing_queue.get(timeout=1) 78 yield message 79 except queue.Empty: 80 pass 81 82 # 检查来自其他用户的消息 83 if message_subscriber: 84 try: 85 room_message = message_subscriber.get_message(timeout=1) 86 if room_message: 87 yield room_message 88 except TimeoutError: 89 pass 90 91 finally: 92 # 清理 93 if message_subscriber: 94 message_subscriber.close() 95 if user_session: 96 self.chat_service.leave_room(user_session)
流式传输最佳实践
流量控制
实现适当的流量控制以防止客户端不堪重负:
flow_control.py
1 def StreamData(self, request, context): 2 """带流量控制的流式传输。""" 3 4 # 使用有界队列控制内存使用 5 data_queue = queue.Queue(maxsize=100) 6 7 def data_producer(): 8 """生成数据的后台线程。""" 9 for item in self.data_source.get_items(): 10 try: 11 data_queue.put(item, timeout=5) 12 except queue.Full: 13 # 应用反压 14 print("Client is too slow, dropping data") 15 break 16 17 producer_thread = threading.Thread(target=data_producer) 18 producer_thread.start() 19 20 try: 21 while context.is_active(): 22 try: 23 item = data_queue.get(timeout=30) 24 yield item 25 except queue.Empty: 26 # 发送心跳或检查客户端连接 27 if not context.is_active(): 28 break 29 finally: 30 producer_thread.join(timeout=1)
流中的错误处理
在流式操作中优雅地处理错误:
stream_error_handling.py
1 def StreamWithErrorHandling(self, request, context): 2 """带健壮错误处理的流式传输。""" 3 4 try: 5 for item in self.get_stream_data(request): 6 if not context.is_active(): 7 break 8 9 try: 10 # 处理项目 11 processed_item = self.process_item(item) 12 yield processed_item 13 14 except ProcessingError as e: 15 # 将错误作为响应的一部分发送 16 error_response = create_error_response(e) 17 yield error_response 18 19 except Exception as e: 20 # 严重错误 - 中止流 21 context.set_code(grpc.StatusCode.INTERNAL) 22 context.set_details(f'Processing failed: {str(e)}') 23 break 24 25 except Exception as e: 26 context.set_code(grpc.StatusCode.INTERNAL) 27 context.set_details(f'Stream failed: {str(e)}')
客户端流式传输
在客户端处理流式传输:
streaming_client.py
1 import grpc 2 3 def stream_chat_client(): 4 """双向流式传输客户端示例。""" 5 6 channel = grpc.insecure_channel('localhost:50051') 7 stub = streaming_pb2_grpc.StreamingServiceStub(channel) 8 9 def message_generator(): 10 """生成出站消息。""" 11 # 发送初始消息 12 yield streaming_pb2.ChatMessage( 13 user_id="user123", 14 room_id="general", 15 content="Hello, world!", 16 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT 17 ) 18 19 # 保持连接活跃并定期发送消息 20 while True: 21 user_input = input("Enter message: ") 22 if user_input.lower() == 'quit': 23 break 24 25 yield streaming_pb2.ChatMessage( 26 user_id="user123", 27 room_id="general", 28 content=user_input, 29 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT 30 ) 31 32 # 启动双向流 33 responses = stub.Chat(message_generator()) 34 35 try: 36 for response in responses: 37 print(f"Received: {response.content}") 38 except grpc.RpcError as e: 39 print(f"RPC failed: {e}")
gRPC 中的流式传输可实现强大的实时应用程序,同时保持强类型契约和高效二进制协议的优势。