流式传输

以 Markdown 格式查看

gRPC 支持四种类型的服务方法:一元调用、服务器流式传输、客户端流式传输和双向流式传输。流式传输可实现高效的实时通信和大数据传输。

服务器流式传输

服务器流式传输允许服务器对单个客户端请求发送多个响应:

streaming_service.proto
1syntax = "proto3";
2
3package streaming.v1;
4
5service StreamingService {
6 // 服务器流式传输:向客户端发送多个事件
7 rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent);
8
9 // 服务器流式传输:分块下载大文件
10 rpc DownloadFile(DownloadFileRequest) returns (stream FileChunk);
11
12 // 服务器流式传输:实时通知
13 rpc SubscribeToNotifications(SubscribeRequest) returns (stream Notification);
14}
15
16message StreamUserEventsRequest {
17 string user_id = 1;
18 repeated UserEventType event_types = 2;
19 google.protobuf.Timestamp start_time = 3;
20}
21
22message UserEvent {
23 string id = 1;
24 string user_id = 2;
25 UserEventType type = 3;
26 google.protobuf.Timestamp timestamp = 4;
27 google.protobuf.Any data = 5;
28}
29
30enum UserEventType {
31 USER_EVENT_TYPE_UNSPECIFIED = 0;
32 USER_EVENT_TYPE_LOGIN = 1;
33 USER_EVENT_TYPE_LOGOUT = 2;
34 USER_EVENT_TYPE_PROFILE_UPDATE = 3;
35 USER_EVENT_TYPE_PASSWORD_CHANGE = 4;
36}

服务器流式传输实现:

server_streaming.py
1import grpc
2import time
3import asyncio
4from grpc import ServicerContext
5from typing import Iterator
6
7class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
8
9 def StreamUserEvents(
10 self,
11 request: streaming_pb2.StreamUserEventsRequest,
12 context: ServicerContext
13 ) -> Iterator[streaming_pb2.UserEvent]:
14 """实时流式传输用户事件。"""
15
16 # 验证请求
17 if not request.user_id:
18 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
19 context.set_details('User ID is required')
20 return
21
22 # 订阅事件流
23 event_subscriber = self.event_store.subscribe(
24 user_id=request.user_id,
25 event_types=request.event_types,
26 start_time=request.start_time
27 )
28
29 try:
30 while context.is_active():
31 # 等待下一个事件(带超时)
32 try:
33 event = event_subscriber.get_next_event(timeout=30)
34 if event:
35 yield event
36 else:
37 # 发送心跳以保持连接活跃
38 continue
39
40 except TimeoutError:
41 # 检查客户端是否仍然连接
42 if not context.is_active():
43 break
44
45 except Exception as e:
46 context.set_code(grpc.StatusCode.INTERNAL)
47 context.set_details(f'Error streaming events: {str(e)}')
48 break
49
50 finally:
51 # 清理订阅
52 event_subscriber.close()
53
54 def DownloadFile(
55 self,
56 request: streaming_pb2.DownloadFileRequest,
57 context: ServicerContext
58 ) -> Iterator[streaming_pb2.FileChunk]:
59 """分块下载文件。"""
60
61 try:
62 file_path = self.file_store.get_file_path(request.file_id)
63 if not file_path or not os.path.exists(file_path):
64 context.set_code(grpc.StatusCode.NOT_FOUND)
65 context.set_details(f'File {request.file_id} not found')
66 return
67
68 chunk_size = 64 * 1024 # 64KB 块大小
69
70 with open(file_path, 'rb') as f:
71 while True:
72 chunk_data = f.read(chunk_size)
73 if not chunk_data:
74 break
75
76 # 检查客户端是否断开连接
77 if not context.is_active():
78 break
79
80 yield streaming_pb2.FileChunk(
81 data=chunk_data,
82 offset=f.tell() - len(chunk_data),
83 size=len(chunk_data)
84 )
85
86 except Exception as e:
87 context.set_code(grpc.StatusCode.INTERNAL)
88 context.set_details(f'Error downloading file: {str(e)}')

客户端流式传输

客户端流式传输允许客户端发送多个请求并接收单个响应:

client_streaming.proto
1service StreamingService {
2 // 客户端流式传输:分块上传大文件
3 rpc UploadFile(stream FileChunk) returns (UploadFileResponse);
4
5 // 客户端流式传输:批量数据处理
6 rpc ProcessDataBatch(stream DataRecord) returns (BatchProcessingResult);
7
8 // 客户端流式传输:实时指标收集
9 rpc CollectMetrics(stream MetricData) returns (MetricsCollectionResult);
10}
11
12message FileChunk {
13 oneof data {
14 FileMetadata metadata = 1;
15 bytes chunk = 2;
16 }
17}
18
19message FileMetadata {
20 string filename = 1;
21 int64 total_size = 2;
22 string content_type = 3;
23 string checksum = 4;
24}
25
26message UploadFileResponse {
27 string file_id = 1;
28 int64 bytes_uploaded = 2;
29 string download_url = 3;
30 bool checksum_verified = 4;
31}

客户端流式传输实现:

client_streaming.py
1class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
2
3 def UploadFile(
4 self,
5 request_iterator: Iterator[streaming_pb2.FileChunk],
6 context: ServicerContext
7 ) -> streaming_pb2.UploadFileResponse:
8 """从客户端流上传文件。"""
9
10 file_metadata = None
11 total_bytes = 0
12 file_path = None
13 hasher = hashlib.sha256()
14
15 try:
16 for chunk in request_iterator:
17 if chunk.HasField('metadata'):
18 # 第一个块包含元数据
19 file_metadata = chunk.metadata
20
21 # 创建临时文件
22 file_id = str(uuid.uuid4())
23 file_path = f'/tmp/uploads/{file_id}'
24 os.makedirs(os.path.dirname(file_path), exist_ok=True)
25
26 elif chunk.HasField('chunk'):
27 # 后续块包含文件数据
28 if not file_metadata:
29 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
30 context.set_details('File metadata must be sent first')
31 return streaming_pb2.UploadFileResponse()
32
33 # 将块写入文件
34 with open(file_path, 'ab') as f:
35 f.write(chunk.chunk)
36
37 total_bytes += len(chunk.chunk)
38 hasher.update(chunk.chunk)
39
40 # 检查大小限制
41 if total_bytes > file_metadata.total_size:
42 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
43 context.set_details('File size exceeds declared size')
44 return streaming_pb2.UploadFileResponse()
45
46 # 验证校验和
47 computed_checksum = hasher.hexdigest()
48 checksum_verified = computed_checksum == file_metadata.checksum
49
50 if not checksum_verified:
51 context.set_code(grpc.StatusCode.DATA_LOSS)
52 context.set_details('File checksum verification failed')
53 return streaming_pb2.UploadFileResponse()
54
55 # 将文件移动到永久存储
56 permanent_path = self.file_store.store_file(file_id, file_path)
57 download_url = self.file_store.get_download_url(file_id)
58
59 return streaming_pb2.UploadFileResponse(
60 file_id=file_id,
61 bytes_uploaded=total_bytes,
62 download_url=download_url,
63 checksum_verified=True
64 )
65
66 except Exception as e:
67 context.set_code(grpc.StatusCode.INTERNAL)
68 context.set_details(f'Error uploading file: {str(e)}')
69 return streaming_pb2.UploadFileResponse()
70
71 finally:
72 # 清理临时文件
73 if file_path and os.path.exists(file_path):
74 os.remove(file_path)

双向流式传输

双向流式传输允许客户端和服务器都发送多个消息:

bidirectional_streaming.proto
1service StreamingService {
2 // 双向流式传输:实时聊天
3 rpc Chat(stream ChatMessage) returns (stream ChatMessage);
4
5 // 双向流式传输:实时协作
6 rpc Collaborate(stream CollaborationEvent) returns (stream CollaborationEvent);
7
8 // 双向流式传输:实时数据处理
9 rpc ProcessLiveData(stream DataInput) returns (stream ProcessingResult);
10}
11
12message ChatMessage {
13 string id = 1;
14 string user_id = 2;
15 string room_id = 3;
16 string content = 4;
17 google.protobuf.Timestamp timestamp = 5;
18 ChatMessageType type = 6;
19}
20
21enum ChatMessageType {
22 CHAT_MESSAGE_TYPE_UNSPECIFIED = 0;
23 CHAT_MESSAGE_TYPE_TEXT = 1;
24 CHAT_MESSAGE_TYPE_IMAGE = 2;
25 CHAT_MESSAGE_TYPE_FILE = 3;
26 CHAT_MESSAGE_TYPE_SYSTEM = 4;
27 CHAT_MESSAGE_TYPE_TYPING = 5;
28}

双向流式传输实现:

bidirectional_streaming.py
1import asyncio
2import queue
3import threading
4
5class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
6
7 def Chat(
8 self,
9 request_iterator: Iterator[streaming_pb2.ChatMessage],
10 context: ServicerContext
11 ) -> Iterator[streaming_pb2.ChatMessage]:
12 """双向聊天流式传输。"""
13
14 # 出站消息队列
15 outgoing_queue = queue.Queue()
16
17 # 跟踪用户会话
18 user_session = None
19
20 def handle_incoming_messages():
21 """处理来自客户端的入站消息。"""
22 nonlocal user_session
23
24 try:
25 for message in request_iterator:
26 if not user_session:
27 # 第一条消息建立会话
28 user_session = self.chat_service.join_room(
29 user_id=message.user_id,
30 room_id=message.room_id
31 )
32
33 # 发送欢迎消息
34 welcome_msg = streaming_pb2.ChatMessage(
35 id=str(uuid.uuid4()),
36 user_id="system",
37 room_id=message.room_id,
38 content=f"User {message.user_id} joined the chat",
39 timestamp=google.protobuf.timestamp_pb2.Timestamp(),
40 type=streaming_pb2.CHAT_MESSAGE_TYPE_SYSTEM
41 )
42 outgoing_queue.put(welcome_msg)
43
44 # 处理消息
45 if message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TEXT:
46 # 广播给房间内其他用户
47 self.chat_service.broadcast_message(message)
48
49 elif message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TYPING:
50 # 向其他用户发送打字指示器
51 self.chat_service.broadcast_typing(message)
52
53 except Exception as e:
54 print(f"Error handling incoming messages: {e}")
55 finally:
56 # 清理会话
57 if user_session:
58 self.chat_service.leave_room(user_session)
59
60 # 启动处理入站消息的后台线程
61 incoming_thread = threading.Thread(target=handle_incoming_messages)
62 incoming_thread.daemon = True
63 incoming_thread.start()
64
65 # 订阅房间消息
66 message_subscriber = None
67 if user_session:
68 message_subscriber = self.chat_service.subscribe_to_room(
69 user_session.room_id,
70 exclude_user=user_session.user_id
71 )
72
73 try:
74 while context.is_active():
75 # 检查队列中的出站消息
76 try:
77 message = outgoing_queue.get(timeout=1)
78 yield message
79 except queue.Empty:
80 pass
81
82 # 检查来自其他用户的消息
83 if message_subscriber:
84 try:
85 room_message = message_subscriber.get_message(timeout=1)
86 if room_message:
87 yield room_message
88 except TimeoutError:
89 pass
90
91 finally:
92 # 清理
93 if message_subscriber:
94 message_subscriber.close()
95 if user_session:
96 self.chat_service.leave_room(user_session)

流式传输最佳实践

流量控制

实现适当的流量控制以防止客户端不堪重负:

flow_control.py
1def StreamData(self, request, context):
2 """带流量控制的流式传输。"""
3
4 # 使用有界队列控制内存使用
5 data_queue = queue.Queue(maxsize=100)
6
7 def data_producer():
8 """生成数据的后台线程。"""
9 for item in self.data_source.get_items():
10 try:
11 data_queue.put(item, timeout=5)
12 except queue.Full:
13 # 应用反压
14 print("Client is too slow, dropping data")
15 break
16
17 producer_thread = threading.Thread(target=data_producer)
18 producer_thread.start()
19
20 try:
21 while context.is_active():
22 try:
23 item = data_queue.get(timeout=30)
24 yield item
25 except queue.Empty:
26 # 发送心跳或检查客户端连接
27 if not context.is_active():
28 break
29 finally:
30 producer_thread.join(timeout=1)

流中的错误处理

在流式操作中优雅地处理错误:

stream_error_handling.py
1def StreamWithErrorHandling(self, request, context):
2 """带健壮错误处理的流式传输。"""
3
4 try:
5 for item in self.get_stream_data(request):
6 if not context.is_active():
7 break
8
9 try:
10 # 处理项目
11 processed_item = self.process_item(item)
12 yield processed_item
13
14 except ProcessingError as e:
15 # 将错误作为响应的一部分发送
16 error_response = create_error_response(e)
17 yield error_response
18
19 except Exception as e:
20 # 严重错误 - 中止流
21 context.set_code(grpc.StatusCode.INTERNAL)
22 context.set_details(f'Processing failed: {str(e)}')
23 break
24
25 except Exception as e:
26 context.set_code(grpc.StatusCode.INTERNAL)
27 context.set_details(f'Stream failed: {str(e)}')

客户端流式传输

在客户端处理流式传输:

streaming_client.py
1import grpc
2
3def stream_chat_client():
4 """双向流式传输客户端示例。"""
5
6 channel = grpc.insecure_channel('localhost:50051')
7 stub = streaming_pb2_grpc.StreamingServiceStub(channel)
8
9 def message_generator():
10 """生成出站消息。"""
11 # 发送初始消息
12 yield streaming_pb2.ChatMessage(
13 user_id="user123",
14 room_id="general",
15 content="Hello, world!",
16 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
17 )
18
19 # 保持连接活跃并定期发送消息
20 while True:
21 user_input = input("Enter message: ")
22 if user_input.lower() == 'quit':
23 break
24
25 yield streaming_pb2.ChatMessage(
26 user_id="user123",
27 room_id="general",
28 content=user_input,
29 type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
30 )
31
32 # 启动双向流
33 responses = stub.Chat(message_generator())
34
35 try:
36 for response in responses:
37 print(f"Received: {response.content}")
38 except grpc.RpcError as e:
39 print(f"RPC failed: {e}")

gRPC 中的流式传输可实现强大的实时应用程序,同时保持强类型契约和高效二进制协议的优势。