Streaming
gRPC supports four types of service methods: unary, server streaming, client streaming, and bidirectional streaming. Streaming enables efficient real-time communication and large data transfers.
Server Streaming
Server streaming allows the server to send multiple responses to a single client request:
1 syntax = "proto3"; 2 3 package streaming.v1; 4 5 service StreamingService { 6 // Server streaming: send multiple events to client 7 rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent); 8 9 // Server streaming: download large file in chunks 10 rpc DownloadFile(DownloadFileRequest) returns (stream FileChunk); 11 12 // Server streaming: real-time notifications 13 rpc SubscribeToNotifications(SubscribeRequest) returns (stream Notification); 14 } 15 16 message StreamUserEventsRequest { 17 string user_id = 1; 18 repeated UserEventType event_types = 2; 19 google.protobuf.Timestamp start_time = 3; 20 } 21 22 message UserEvent { 23 string id = 1; 24 string user_id = 2; 25 UserEventType type = 3; 26 google.protobuf.Timestamp timestamp = 4; 27 google.protobuf.Any data = 5; 28 } 29 30 enum UserEventType { 31 USER_EVENT_TYPE_UNSPECIFIED = 0; 32 USER_EVENT_TYPE_LOGIN = 1; 33 USER_EVENT_TYPE_LOGOUT = 2; 34 USER_EVENT_TYPE_PROFILE_UPDATE = 3; 35 USER_EVENT_TYPE_PASSWORD_CHANGE = 4; 36 }
Server streaming implementation:
1 import grpc 2 import time 3 import asyncio 4 from grpc import ServicerContext 5 from typing import Iterator 6 7 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 8 9 def StreamUserEvents( 10 self, 11 request: streaming_pb2.StreamUserEventsRequest, 12 context: ServicerContext 13 ) -> Iterator[streaming_pb2.UserEvent]: 14 """Stream user events in real-time.""" 15 16 # Validate request 17 if not request.user_id: 18 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 19 context.set_details('User ID is required') 20 return 21 22 # Subscribe to event stream 23 event_subscriber = self.event_store.subscribe( 24 user_id=request.user_id, 25 event_types=request.event_types, 26 start_time=request.start_time 27 ) 28 29 try: 30 while context.is_active(): 31 # Wait for next event (with timeout) 32 try: 33 event = event_subscriber.get_next_event(timeout=30) 34 if event: 35 yield event 36 else: 37 # Send heartbeat to keep connection alive 38 continue 39 40 except TimeoutError: 41 # Check if client is still connected 42 if not context.is_active(): 43 break 44 45 except Exception as e: 46 context.set_code(grpc.StatusCode.INTERNAL) 47 context.set_details(f'Error streaming events: {str(e)}') 48 break 49 50 finally: 51 # Clean up subscription 52 event_subscriber.close() 53 54 def DownloadFile( 55 self, 56 request: streaming_pb2.DownloadFileRequest, 57 context: ServicerContext 58 ) -> Iterator[streaming_pb2.FileChunk]: 59 """Download file in chunks.""" 60 61 try: 62 file_path = self.file_store.get_file_path(request.file_id) 63 if not file_path or not os.path.exists(file_path): 64 context.set_code(grpc.StatusCode.NOT_FOUND) 65 context.set_details(f'File {request.file_id} not found') 66 return 67 68 chunk_size = 64 * 1024 # 64KB chunks 69 70 with open(file_path, 'rb') as f: 71 while True: 72 chunk_data = f.read(chunk_size) 73 if not chunk_data: 74 break 75 76 # Check if client disconnected 77 if not context.is_active(): 78 break 79 80 yield streaming_pb2.FileChunk( 81 data=chunk_data, 82 offset=f.tell() - len(chunk_data), 83 size=len(chunk_data) 84 ) 85 86 except Exception as e: 87 context.set_code(grpc.StatusCode.INTERNAL) 88 context.set_details(f'Error downloading file: {str(e)}')
Client Streaming
Client streaming allows the client to send multiple requests and receive a single response:
1 service StreamingService { 2 // Client streaming: upload large file in chunks 3 rpc UploadFile(stream FileChunk) returns (UploadFileResponse); 4 5 // Client streaming: batch data processing 6 rpc ProcessDataBatch(stream DataRecord) returns (BatchProcessingResult); 7 8 // Client streaming: real-time metrics collection 9 rpc CollectMetrics(stream MetricData) returns (MetricsCollectionResult); 10 } 11 12 message FileChunk { 13 oneof data { 14 FileMetadata metadata = 1; 15 bytes chunk = 2; 16 } 17 } 18 19 message FileMetadata { 20 string filename = 1; 21 int64 total_size = 2; 22 string content_type = 3; 23 string checksum = 4; 24 } 25 26 message UploadFileResponse { 27 string file_id = 1; 28 int64 bytes_uploaded = 2; 29 string download_url = 3; 30 bool checksum_verified = 4; 31 }
Client streaming implementation:
1 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 2 3 def UploadFile( 4 self, 5 request_iterator: Iterator[streaming_pb2.FileChunk], 6 context: ServicerContext 7 ) -> streaming_pb2.UploadFileResponse: 8 """Upload file from client stream.""" 9 10 file_metadata = None 11 total_bytes = 0 12 file_path = None 13 hasher = hashlib.sha256() 14 15 try: 16 for chunk in request_iterator: 17 if chunk.HasField('metadata'): 18 # First chunk contains metadata 19 file_metadata = chunk.metadata 20 21 # Create temporary file 22 file_id = str(uuid.uuid4()) 23 file_path = f'/tmp/uploads/{file_id}' 24 os.makedirs(os.path.dirname(file_path), exist_ok=True) 25 26 elif chunk.HasField('chunk'): 27 # Subsequent chunks contain file data 28 if not file_metadata: 29 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 30 context.set_details('File metadata must be sent first') 31 return streaming_pb2.UploadFileResponse() 32 33 # Write chunk to file 34 with open(file_path, 'ab') as f: 35 f.write(chunk.chunk) 36 37 total_bytes += len(chunk.chunk) 38 hasher.update(chunk.chunk) 39 40 # Check size limit 41 if total_bytes > file_metadata.total_size: 42 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 43 context.set_details('File size exceeds declared size') 44 return streaming_pb2.UploadFileResponse() 45 46 # Verify checksum 47 computed_checksum = hasher.hexdigest() 48 checksum_verified = computed_checksum == file_metadata.checksum 49 50 if not checksum_verified: 51 context.set_code(grpc.StatusCode.DATA_LOSS) 52 context.set_details('File checksum verification failed') 53 return streaming_pb2.UploadFileResponse() 54 55 # Move file to permanent storage 56 permanent_path = self.file_store.store_file(file_id, file_path) 57 download_url = self.file_store.get_download_url(file_id) 58 59 return streaming_pb2.UploadFileResponse( 60 file_id=file_id, 61 bytes_uploaded=total_bytes, 62 download_url=download_url, 63 checksum_verified=True 64 ) 65 66 except Exception as e: 67 context.set_code(grpc.StatusCode.INTERNAL) 68 context.set_details(f'Error uploading file: {str(e)}') 69 return streaming_pb2.UploadFileResponse() 70 71 finally: 72 # Clean up temporary file 73 if file_path and os.path.exists(file_path): 74 os.remove(file_path)
Bidirectional Streaming
Bidirectional streaming allows both client and server to send multiple messages:
1 service StreamingService { 2 // Bidirectional streaming: real-time chat 3 rpc Chat(stream ChatMessage) returns (stream ChatMessage); 4 5 // Bidirectional streaming: real-time collaboration 6 rpc Collaborate(stream CollaborationEvent) returns (stream CollaborationEvent); 7 8 // Bidirectional streaming: live data processing 9 rpc ProcessLiveData(stream DataInput) returns (stream ProcessingResult); 10 } 11 12 message ChatMessage { 13 string id = 1; 14 string user_id = 2; 15 string room_id = 3; 16 string content = 4; 17 google.protobuf.Timestamp timestamp = 5; 18 ChatMessageType type = 6; 19 } 20 21 enum ChatMessageType { 22 CHAT_MESSAGE_TYPE_UNSPECIFIED = 0; 23 CHAT_MESSAGE_TYPE_TEXT = 1; 24 CHAT_MESSAGE_TYPE_IMAGE = 2; 25 CHAT_MESSAGE_TYPE_FILE = 3; 26 CHAT_MESSAGE_TYPE_SYSTEM = 4; 27 CHAT_MESSAGE_TYPE_TYPING = 5; 28 }
Bidirectional streaming implementation:
1 import asyncio 2 import queue 3 import threading 4 5 class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer): 6 7 def Chat( 8 self, 9 request_iterator: Iterator[streaming_pb2.ChatMessage], 10 context: ServicerContext 11 ) -> Iterator[streaming_pb2.ChatMessage]: 12 """Bidirectional chat streaming.""" 13 14 # Message queue for outgoing messages 15 outgoing_queue = queue.Queue() 16 17 # Track user session 18 user_session = None 19 20 def handle_incoming_messages(): 21 """Process incoming messages from client.""" 22 nonlocal user_session 23 24 try: 25 for message in request_iterator: 26 if not user_session: 27 # First message establishes session 28 user_session = self.chat_service.join_room( 29 user_id=message.user_id, 30 room_id=message.room_id 31 ) 32 33 # Send welcome message 34 welcome_msg = streaming_pb2.ChatMessage( 35 id=str(uuid.uuid4()), 36 user_id="system", 37 room_id=message.room_id, 38 content=f"User {message.user_id} joined the chat", 39 timestamp=google.protobuf.timestamp_pb2.Timestamp(), 40 type=streaming_pb2.CHAT_MESSAGE_TYPE_SYSTEM 41 ) 42 outgoing_queue.put(welcome_msg) 43 44 # Process message 45 if message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TEXT: 46 # Broadcast to other users in room 47 self.chat_service.broadcast_message(message) 48 49 elif message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TYPING: 50 # Send typing indicator to other users 51 self.chat_service.broadcast_typing(message) 52 53 except Exception as e: 54 print(f"Error handling incoming messages: {e}") 55 finally: 56 # Clean up session 57 if user_session: 58 self.chat_service.leave_room(user_session) 59 60 # Start background thread for incoming messages 61 incoming_thread = threading.Thread(target=handle_incoming_messages) 62 incoming_thread.daemon = True 63 incoming_thread.start() 64 65 # Subscribe to room messages 66 message_subscriber = None 67 if user_session: 68 message_subscriber = self.chat_service.subscribe_to_room( 69 user_session.room_id, 70 exclude_user=user_session.user_id 71 ) 72 73 try: 74 while context.is_active(): 75 # Check for outgoing messages from queue 76 try: 77 message = outgoing_queue.get(timeout=1) 78 yield message 79 except queue.Empty: 80 pass 81 82 # Check for messages from other users 83 if message_subscriber: 84 try: 85 room_message = message_subscriber.get_message(timeout=1) 86 if room_message: 87 yield room_message 88 except TimeoutError: 89 pass 90 91 finally: 92 # Clean up 93 if message_subscriber: 94 message_subscriber.close() 95 if user_session: 96 self.chat_service.leave_room(user_session)
Streaming Best Practices
Flow Control
Implement proper flow control to prevent overwhelming clients:
1 def StreamData(self, request, context): 2 """Stream with flow control.""" 3 4 # Use a bounded queue to control memory usage 5 data_queue = queue.Queue(maxsize=100) 6 7 def data_producer(): 8 """Background thread that produces data.""" 9 for item in self.data_source.get_items(): 10 try: 11 data_queue.put(item, timeout=5) 12 except queue.Full: 13 # Apply backpressure 14 print("Client is too slow, dropping data") 15 break 16 17 producer_thread = threading.Thread(target=data_producer) 18 producer_thread.start() 19 20 try: 21 while context.is_active(): 22 try: 23 item = data_queue.get(timeout=30) 24 yield item 25 except queue.Empty: 26 # Send heartbeat or check client connection 27 if not context.is_active(): 28 break 29 finally: 30 producer_thread.join(timeout=1)
Error Handling in Streams
Handle errors gracefully in streaming operations:
1 def StreamWithErrorHandling(self, request, context): 2 """Stream with robust error handling.""" 3 4 try: 5 for item in self.get_stream_data(request): 6 if not context.is_active(): 7 break 8 9 try: 10 # Process item 11 processed_item = self.process_item(item) 12 yield processed_item 13 14 except ProcessingError as e: 15 # Send error as part of response 16 error_response = create_error_response(e) 17 yield error_response 18 19 except Exception as e: 20 # Critical error - abort stream 21 context.set_code(grpc.StatusCode.INTERNAL) 22 context.set_details(f'Processing failed: {str(e)}') 23 break 24 25 except Exception as e: 26 context.set_code(grpc.StatusCode.INTERNAL) 27 context.set_details(f'Stream failed: {str(e)}')
Client-side Streaming
Handle streaming on the client side:
1 import grpc 2 3 def stream_chat_client(): 4 """Example bidirectional streaming client.""" 5 6 channel = grpc.insecure_channel('localhost:50051') 7 stub = streaming_pb2_grpc.StreamingServiceStub(channel) 8 9 def message_generator(): 10 """Generate outgoing messages.""" 11 # Send initial message 12 yield streaming_pb2.ChatMessage( 13 user_id="user123", 14 room_id="general", 15 content="Hello, world!", 16 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT 17 ) 18 19 # Keep connection alive and send periodic messages 20 while True: 21 user_input = input("Enter message: ") 22 if user_input.lower() == 'quit': 23 break 24 25 yield streaming_pb2.ChatMessage( 26 user_id="user123", 27 room_id="general", 28 content=user_input, 29 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT 30 ) 31 32 # Start bidirectional stream 33 responses = stub.Chat(message_generator()) 34 35 try: 36 for response in responses: 37 print(f"Received: {response.content}") 38 except grpc.RpcError as e: 39 print(f"RPC failed: {e}")
Streaming in gRPC enables powerful real-time applications while maintaining the benefits of strongly-typed contracts and efficient binary protocols.