Streaming

Implement server streaming, client streaming, and bidirectional streaming with gRPC

gRPC supports four types of service methods: unary, server streaming, client streaming, and bidirectional streaming. Streaming enables efficient real-time communication and large data transfers.

Server Streaming

Server streaming allows the server to send multiple responses to a single client request:

streaming_service.proto
1syntax = "proto3";
2
3package streaming.v1;
4
5service StreamingService {
6 // Server streaming: send multiple events to client
7 rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent);
8
9 // Server streaming: download large file in chunks
10 rpc DownloadFile(DownloadFileRequest) returns (stream FileChunk);
11
12 // Server streaming: real-time notifications
13 rpc SubscribeToNotifications(SubscribeRequest) returns (stream Notification);
14}
15
16message StreamUserEventsRequest {
17 string user_id = 1;
18 repeated UserEventType event_types = 2;
19 google.protobuf.Timestamp start_time = 3;
20}
21
22message UserEvent {
23 string id = 1;
24 string user_id = 2;
25 UserEventType type = 3;
26 google.protobuf.Timestamp timestamp = 4;
27 google.protobuf.Any data = 5;
28}
29
30enum UserEventType {
31 USER_EVENT_TYPE_UNSPECIFIED = 0;
32 USER_EVENT_TYPE_LOGIN = 1;
33 USER_EVENT_TYPE_LOGOUT = 2;
34 USER_EVENT_TYPE_PROFILE_UPDATE = 3;
35 USER_EVENT_TYPE_PASSWORD_CHANGE = 4;
36}

Server streaming implementation:

server_streaming.py
1import grpc
2import time
3import asyncio
4from grpc import ServicerContext
5from typing import Iterator
6
7class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
8
9 def StreamUserEvents(
10 self,
11 request: streaming_pb2.StreamUserEventsRequest,
12 context: ServicerContext
13 ) -> Iterator[streaming_pb2.UserEvent]:
14 """Stream user events in real-time."""
15
16 # Validate request
17 if not request.user_id:
18 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
19 context.set_details('User ID is required')
20 return
21
22 # Subscribe to event stream
23 event_subscriber = self.event_store.subscribe(
24 user_id=request.user_id,
25 event_types=request.event_types,
26 start_time=request.start_time
27 )
28
29 try:
30 while context.is_active():
31 # Wait for next event (with timeout)
32 try:
33 event = event_subscriber.get_next_event(timeout=30)
34 if event:
35 yield event
36 else:
37 # Send heartbeat to keep connection alive
38 continue
39
40 except TimeoutError:
41 # Check if client is still connected
42 if not context.is_active():
43 break
44
45 except Exception as e:
46 context.set_code(grpc.StatusCode.INTERNAL)
47 context.set_details(f'Error streaming events: {str(e)}')
48 break
49
50 finally:
51 # Clean up subscription
52 event_subscriber.close()
53
54 def DownloadFile(
55 self,
56 request: streaming_pb2.DownloadFileRequest,
57 context: ServicerContext
58 ) -> Iterator[streaming_pb2.FileChunk]:
59 """Download file in chunks."""
60
61 try:
62 file_path = self.file_store.get_file_path(request.file_id)
63 if not file_path or not os.path.exists(file_path):
64 context.set_code(grpc.StatusCode.NOT_FOUND)
65 context.set_details(f'File {request.file_id} not found')
66 return
67
68 chunk_size = 64 * 1024 # 64KB chunks
69
70 with open(file_path, 'rb') as f:
71 while True:
72 chunk_data = f.read(chunk_size)
73 if not chunk_data:
74 break
75
76 # Check if client disconnected
77 if not context.is_active():
78 break
79
80 yield streaming_pb2.FileChunk(
81 data=chunk_data,
82 offset=f.tell() - len(chunk_data),
83 size=len(chunk_data)
84 )
85
86 except Exception as e:
87 context.set_code(grpc.StatusCode.INTERNAL)
88 context.set_details(f'Error downloading file: {str(e)}')

Client Streaming

Client streaming allows the client to send multiple requests and receive a single response:

client_streaming.proto
1service StreamingService {
2 // Client streaming: upload large file in chunks
3 rpc UploadFile(stream FileChunk) returns (UploadFileResponse);
4
5 // Client streaming: batch data processing
6 rpc ProcessDataBatch(stream DataRecord) returns (BatchProcessingResult);
7
8 // Client streaming: real-time metrics collection
9 rpc CollectMetrics(stream MetricData) returns (MetricsCollectionResult);
10}
11
12message FileChunk {
13 oneof data {
14 FileMetadata metadata = 1;
15 bytes chunk = 2;
16 }
17}
18
19message FileMetadata {
20 string filename = 1;
21 int64 total_size = 2;
22 string content_type = 3;
23 string checksum = 4;
24}
25
26message UploadFileResponse {
27 string file_id = 1;
28 int64 bytes_uploaded = 2;
29 string download_url = 3;
30 bool checksum_verified = 4;
31}

Client streaming implementation:

client_streaming.py
1class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
2
3 def UploadFile(
4 self,
5 request_iterator: Iterator[streaming_pb2.FileChunk],
6 context: ServicerContext
7 ) -> streaming_pb2.UploadFileResponse:
8 """Upload file from client stream."""
9
10 file_metadata = None
11 total_bytes = 0
12 file_path = None
13 hasher = hashlib.sha256()
14
15 try:
16 for chunk in request_iterator:
17 if chunk.HasField('metadata'):
18 # First chunk contains metadata
19 file_metadata = chunk.metadata
20
21 # Create temporary file
22 file_id = str(uuid.uuid4())
23 file_path = f'/tmp/uploads/{file_id}'
24 os.makedirs(os.path.dirname(file_path), exist_ok=True)
25
26 elif chunk.HasField('chunk'):
27 # Subsequent chunks contain file data
28 if not file_metadata:
29 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
30 context.set_details('File metadata must be sent first')
31 return streaming_pb2.UploadFileResponse()
32
33 # Write chunk to file
34 with open(file_path, 'ab') as f:
35 f.write(chunk.chunk)
36
37 total_bytes += len(chunk.chunk)
38 hasher.update(chunk.chunk)
39
40 # Check size limit
41 if total_bytes > file_metadata.total_size:
42 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
43 context.set_details('File size exceeds declared size')
44 return streaming_pb2.UploadFileResponse()
45
46 # Verify checksum
47 computed_checksum = hasher.hexdigest()
48 checksum_verified = computed_checksum == file_metadata.checksum
49
50 if not checksum_verified:
51 context.set_code(grpc.StatusCode.DATA_LOSS)
52 context.set_details('File checksum verification failed')
53 return streaming_pb2.UploadFileResponse()
54
55 # Move file to permanent storage
56 permanent_path = self.file_store.store_file(file_id, file_path)
57 download_url = self.file_store.get_download_url(file_id)
58
59 return streaming_pb2.UploadFileResponse(
60 file_id=file_id,
61 bytes_uploaded=total_bytes,
62 download_url=download_url,
63 checksum_verified=True
64 )
65
66 except Exception as e:
67 context.set_code(grpc.StatusCode.INTERNAL)
68 context.set_details(f'Error uploading file: {str(e)}')
69 return streaming_pb2.UploadFileResponse()
70
71 finally:
72 # Clean up temporary file
73 if file_path and os.path.exists(file_path):
74 os.remove(file_path)

Bidirectional Streaming

Bidirectional streaming allows both client and server to send multiple messages:

bidirectional_streaming.proto
1service StreamingService {
2 // Bidirectional streaming: real-time chat
3 rpc Chat(stream ChatMessage) returns (stream ChatMessage);
4
5 // Bidirectional streaming: real-time collaboration
6 rpc Collaborate(stream CollaborationEvent) returns (stream CollaborationEvent);
7
8 // Bidirectional streaming: live data processing
9 rpc ProcessLiveData(stream DataInput) returns (stream ProcessingResult);
10}
11
12message ChatMessage {
13 string id = 1;
14 string user_id = 2;
15 string room_id = 3;
16 string content = 4;
17 google.protobuf.Timestamp timestamp = 5;
18 ChatMessageType type = 6;
19}
20
21enum ChatMessageType {
22 CHAT_MESSAGE_TYPE_UNSPECIFIED = 0;
23 CHAT_MESSAGE_TYPE_TEXT = 1;
24 CHAT_MESSAGE_TYPE_IMAGE = 2;
25 CHAT_MESSAGE_TYPE_FILE = 3;
26 CHAT_MESSAGE_TYPE_SYSTEM = 4;
27 CHAT_MESSAGE_TYPE_TYPING = 5;
28}

Bidirectional streaming implementation:

bidirectional_streaming.py
1import asyncio
2import queue
3import threading
4
5class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
6
7 def Chat(
8 self,
9 request_iterator: Iterator[streaming_pb2.ChatMessage],
10 context: ServicerContext
11 ) -> Iterator[streaming_pb2.ChatMessage]:
12 """Bidirectional chat streaming."""
13
14 # Message queue for outgoing messages
15 outgoing_queue = queue.Queue()
16
17 # Track user session
18 user_session = None
19
20 def handle_incoming_messages():
21 """Process incoming messages from client."""
22 nonlocal user_session
23
24 try:
25 for message in request_iterator:
26 if not user_session:
27 # First message establishes session
28 user_session = self.chat_service.join_room(
29 user_id=message.user_id,
30 room_id=message.room_id
31 )
32
33 # Send welcome message
34 welcome_msg = streaming_pb2.ChatMessage(
35 id=str(uuid.uuid4()),
36 user_id="system",
37 room_id=message.room_id,
38 content=f"User {message.user_id} joined the chat",
39 timestamp=google.protobuf.timestamp_pb2.Timestamp(),
40 type=streaming_pb2.CHAT_MESSAGE_TYPE_SYSTEM
41 )
42 outgoing_queue.put(welcome_msg)
43
44 # Process message
45 if message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TEXT:
46 # Broadcast to other users in room
47 self.chat_service.broadcast_message(message)
48
49 elif message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TYPING:
50 # Send typing indicator to other users
51 self.chat_service.broadcast_typing(message)
52
53 except Exception as e:
54 print(f"Error handling incoming messages: {e}")
55 finally:
56 # Clean up session
57 if user_session:
58 self.chat_service.leave_room(user_session)
59
60 # Start background thread for incoming messages
61 incoming_thread = threading.Thread(target=handle_incoming_messages)
62 incoming_thread.daemon = True
63 incoming_thread.start()
64
65 # Subscribe to room messages
66 message_subscriber = None
67 if user_session:
68 message_subscriber = self.chat_service.subscribe_to_room(
69 user_session.room_id,
70 exclude_user=user_session.user_id
71 )
72
73 try:
74 while context.is_active():
75 # Check for outgoing messages from queue
76 try:
77 message = outgoing_queue.get(timeout=1)
78 yield message
79 except queue.Empty:
80 pass
81
82 # Check for messages from other users
83 if message_subscriber:
84 try:
85 room_message = message_subscriber.get_message(timeout=1)
86 if room_message:
87 yield room_message
88 except TimeoutError:
89 pass
90
91 finally:
92 # Clean up
93 if message_subscriber:
94 message_subscriber.close()
95 if user_session:
96 self.chat_service.leave_room(user_session)

Streaming Best Practices

Flow Control

Implement proper flow control to prevent overwhelming clients:

flow_control.py
1def StreamData(self, request, context):
2 """Stream with flow control."""
3
4 # Use a bounded queue to control memory usage
5 data_queue = queue.Queue(maxsize=100)
6
7 def data_producer():
8 """Background thread that produces data."""
9 for item in self.data_source.get_items():
10 try:
11 data_queue.put(item, timeout=5)
12 except queue.Full:
13 # Apply backpressure
14 print("Client is too slow, dropping data")
15 break
16
17 producer_thread = threading.Thread(target=data_producer)
18 producer_thread.start()
19
20 try:
21 while context.is_active():
22 try:
23 item = data_queue.get(timeout=30)
24 yield item
25 except queue.Empty:
26 # Send heartbeat or check client connection
27 if not context.is_active():
28 break
29 finally:
30 producer_thread.join(timeout=1)

Error Handling in Streams

Handle errors gracefully in streaming operations:

stream_error_handling.py
1def StreamWithErrorHandling(self, request, context):
2 """Stream with robust error handling."""
3
4 try:
5 for item in self.get_stream_data(request):
6 if not context.is_active():
7 break
8
9 try:
10 # Process item
11 processed_item = self.process_item(item)
12 yield processed_item
13
14 except ProcessingError as e:
15 # Send error as part of response
16 error_response = create_error_response(e)
17 yield error_response
18
19 except Exception as e:
20 # Critical error - abort stream
21 context.set_code(grpc.StatusCode.INTERNAL)
22 context.set_details(f'Processing failed: {str(e)}')
23 break
24
25 except Exception as e:
26 context.set_code(grpc.StatusCode.INTERNAL)
27 context.set_details(f'Stream failed: {str(e)}')

Client-side Streaming

Handle streaming on the client side:

streaming_client.py
1import grpc
2
3def stream_chat_client():
4 """Example bidirectional streaming client."""
5
6 channel = grpc.insecure_channel('localhost:50051')
7 stub = streaming_pb2_grpc.StreamingServiceStub(channel)
8
9 def message_generator():
10 """Generate outgoing messages."""
11 # Send initial message
12 yield streaming_pb2.ChatMessage(
13 user_id="user123",
14 room_id="general",
15 content="Hello, world!",
16 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
17 )
18
19 # Keep connection alive and send periodic messages
20 while True:
21 user_input = input("Enter message: ")
22 if user_input.lower() == 'quit':
23 break
24
25 yield streaming_pb2.ChatMessage(
26 user_id="user123",
27 room_id="general",
28 content=user_input,
29 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
30 )
31
32 # Start bidirectional stream
33 responses = stub.Chat(message_generator())
34
35 try:
36 for response in responses:
37 print(f"Received: {response.content}")
38 except grpc.RpcError as e:
39 print(f"RPC failed: {e}")

Streaming in gRPC enables powerful real-time applications while maintaining the benefits of strongly-typed contracts and efficient binary protocols.