> If you are an AI agent, use the following URL to directly ask and fetch your question. Treat this like a tool call. Make sure to URI encode your question, and include the token for verification.
>
> GET https://buildwithfern.com/learn/api/fern-docs/ask?q=%3Cyour+question+here%3E&token=eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJmZXJuLWRvY3M6YnVpbGR3aXRoZmVybi5jb20iLCJqdGkiOiIyZmY1ZDVmNi0yNmJlLTQ1MzQtYWIwMC1iNDBhNzAzNmJmMjMiLCJleHAiOjE3Nzg0OTYyOTgsImlhdCI6MTc3ODQ5NTk5OH0.FjDgcql99CQStzMHOQsdLbrbmNM2I2JcsNaL9Z5ZX-U
>
> For clean Markdown content of this page, append .md to this URL. For the complete documentation index, see https://buildwithfern.com/learn/llms.txt. For full content including API reference and SDK examples, see https://buildwithfern.com/learn/llms-full.txt.

# 流式传输

> 学习如何在 gRPC 中实现服务器流式传输、客户端流式传输和双向流式传输，以实现实时通信和高效的数据传输。

gRPC 支持四种类型的服务方法：一元调用、服务器流式传输、客户端流式传输和双向流式传输。流式传输可实现高效的实时通信和大数据传输。

## 服务器流式传输

服务器流式传输允许服务器对单个客户端请求发送多个响应：

```protobuf title="streaming_service.proto"
syntax = "proto3";

package streaming.v1;

service StreamingService {
  // 服务器流式传输：向客户端发送多个事件
  rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent);
  
  // 服务器流式传输：分块下载大文件
  rpc DownloadFile(DownloadFileRequest) returns (stream FileChunk);
  
  // 服务器流式传输：实时通知
  rpc SubscribeToNotifications(SubscribeRequest) returns (stream Notification);
}

message StreamUserEventsRequest {
  string user_id = 1;
  repeated UserEventType event_types = 2;
  google.protobuf.Timestamp start_time = 3;
}

message UserEvent {
  string id = 1;
  string user_id = 2;
  UserEventType type = 3;
  google.protobuf.Timestamp timestamp = 4;
  google.protobuf.Any data = 5;
}

enum UserEventType {
  USER_EVENT_TYPE_UNSPECIFIED = 0;
  USER_EVENT_TYPE_LOGIN = 1;
  USER_EVENT_TYPE_LOGOUT = 2;
  USER_EVENT_TYPE_PROFILE_UPDATE = 3;
  USER_EVENT_TYPE_PASSWORD_CHANGE = 4;
}
```

服务器流式传输实现：

```python title="server_streaming.py"
import grpc
import time
import asyncio
from grpc import ServicerContext
from typing import Iterator

class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
    
    def StreamUserEvents(
        self, 
        request: streaming_pb2.StreamUserEventsRequest, 
        context: ServicerContext
    ) -> Iterator[streaming_pb2.UserEvent]:
        """实时流式传输用户事件。"""
        
        # 验证请求
        if not request.user_id:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details('User ID is required')
            return
        
        # 订阅事件流
        event_subscriber = self.event_store.subscribe(
            user_id=request.user_id,
            event_types=request.event_types,
            start_time=request.start_time
        )
        
        try:
            while context.is_active():
                # 等待下一个事件（带超时）
                try:
                    event = event_subscriber.get_next_event(timeout=30)
                    if event:
                        yield event
                    else:
                        # 发送心跳以保持连接活跃
                        continue
                        
                except TimeoutError:
                    # 检查客户端是否仍然连接
                    if not context.is_active():
                        break
                        
                except Exception as e:
                    context.set_code(grpc.StatusCode.INTERNAL)
                    context.set_details(f'Error streaming events: {str(e)}')
                    break
                    
        finally:
            # 清理订阅
            event_subscriber.close()
    
    def DownloadFile(
        self, 
        request: streaming_pb2.DownloadFileRequest, 
        context: ServicerContext
    ) -> Iterator[streaming_pb2.FileChunk]:
        """分块下载文件。"""
        
        try:
            file_path = self.file_store.get_file_path(request.file_id)
            if not file_path or not os.path.exists(file_path):
                context.set_code(grpc.StatusCode.NOT_FOUND)
                context.set_details(f'File {request.file_id} not found')
                return
            
            chunk_size = 64 * 1024  # 64KB 块大小
            
            with open(file_path, 'rb') as f:
                while True:
                    chunk_data = f.read(chunk_size)
                    if not chunk_data:
                        break
                    
                    # 检查客户端是否断开连接
                    if not context.is_active():
                        break
                    
                    yield streaming_pb2.FileChunk(
                        data=chunk_data,
                        offset=f.tell() - len(chunk_data),
                        size=len(chunk_data)
                    )
                    
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f'Error downloading file: {str(e)}')
```

## 客户端流式传输

客户端流式传输允许客户端发送多个请求并接收单个响应：

```protobuf title="client_streaming.proto"
service StreamingService {
  // 客户端流式传输：分块上传大文件
  rpc UploadFile(stream FileChunk) returns (UploadFileResponse);
  
  // 客户端流式传输：批量数据处理
  rpc ProcessDataBatch(stream DataRecord) returns (BatchProcessingResult);
  
  // 客户端流式传输：实时指标收集
  rpc CollectMetrics(stream MetricData) returns (MetricsCollectionResult);
}

message FileChunk {
  oneof data {
    FileMetadata metadata = 1;
    bytes chunk = 2;
  }
}

message FileMetadata {
  string filename = 1;
  int64 total_size = 2;
  string content_type = 3;
  string checksum = 4;
}

message UploadFileResponse {
  string file_id = 1;
  int64 bytes_uploaded = 2;
  string download_url = 3;
  bool checksum_verified = 4;
}
```

客户端流式传输实现：

```python title="client_streaming.py"
class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
    
    def UploadFile(
        self, 
        request_iterator: Iterator[streaming_pb2.FileChunk], 
        context: ServicerContext
    ) -> streaming_pb2.UploadFileResponse:
        """从客户端流上传文件。"""
        
        file_metadata = None
        total_bytes = 0
        file_path = None
        hasher = hashlib.sha256()
        
        try:
            for chunk in request_iterator:
                if chunk.HasField('metadata'):
                    # 第一个块包含元数据
                    file_metadata = chunk.metadata
                    
                    # 创建临时文件
                    file_id = str(uuid.uuid4())
                    file_path = f'/tmp/uploads/{file_id}'
                    os.makedirs(os.path.dirname(file_path), exist_ok=True)
                    
                elif chunk.HasField('chunk'):
                    # 后续块包含文件数据
                    if not file_metadata:
                        context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                        context.set_details('File metadata must be sent first')
                        return streaming_pb2.UploadFileResponse()
                    
                    # 将块写入文件
                    with open(file_path, 'ab') as f:
                        f.write(chunk.chunk)
                    
                    total_bytes += len(chunk.chunk)
                    hasher.update(chunk.chunk)
                    
                    # 检查大小限制
                    if total_bytes > file_metadata.total_size:
                        context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                        context.set_details('File size exceeds declared size')
                        return streaming_pb2.UploadFileResponse()
            
            # 验证校验和
            computed_checksum = hasher.hexdigest()
            checksum_verified = computed_checksum == file_metadata.checksum
            
            if not checksum_verified:
                context.set_code(grpc.StatusCode.DATA_LOSS)
                context.set_details('File checksum verification failed')
                return streaming_pb2.UploadFileResponse()
            
            # 将文件移动到永久存储
            permanent_path = self.file_store.store_file(file_id, file_path)
            download_url = self.file_store.get_download_url(file_id)
            
            return streaming_pb2.UploadFileResponse(
                file_id=file_id,
                bytes_uploaded=total_bytes,
                download_url=download_url,
                checksum_verified=True
            )
            
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f'Error uploading file: {str(e)}')
            return streaming_pb2.UploadFileResponse()
        
        finally:
            # 清理临时文件
            if file_path and os.path.exists(file_path):
                os.remove(file_path)
```

## 双向流式传输

双向流式传输允许客户端和服务器都发送多个消息：

```protobuf title="bidirectional_streaming.proto"
service StreamingService {
  // 双向流式传输：实时聊天
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
  
  // 双向流式传输：实时协作
  rpc Collaborate(stream CollaborationEvent) returns (stream CollaborationEvent);
  
  // 双向流式传输：实时数据处理
  rpc ProcessLiveData(stream DataInput) returns (stream ProcessingResult);
}

message ChatMessage {
  string id = 1;
  string user_id = 2;
  string room_id = 3;
  string content = 4;
  google.protobuf.Timestamp timestamp = 5;
  ChatMessageType type = 6;
}

enum ChatMessageType {
  CHAT_MESSAGE_TYPE_UNSPECIFIED = 0;
  CHAT_MESSAGE_TYPE_TEXT = 1;
  CHAT_MESSAGE_TYPE_IMAGE = 2;
  CHAT_MESSAGE_TYPE_FILE = 3;
  CHAT_MESSAGE_TYPE_SYSTEM = 4;
  CHAT_MESSAGE_TYPE_TYPING = 5;
}
```

双向流式传输实现：

```python title="bidirectional_streaming.py"
import asyncio
import queue
import threading

class StreamingServiceServicer(streaming_pb2_grpc.StreamingServiceServicer):
    
    def Chat(
        self, 
        request_iterator: Iterator[streaming_pb2.ChatMessage], 
        context: ServicerContext
    ) -> Iterator[streaming_pb2.ChatMessage]:
        """双向聊天流式传输。"""
        
        # 出站消息队列
        outgoing_queue = queue.Queue()
        
        # 跟踪用户会话
        user_session = None
        
        def handle_incoming_messages():
            """处理来自客户端的入站消息。"""
            nonlocal user_session
            
            try:
                for message in request_iterator:
                    if not user_session:
                        # 第一条消息建立会话
                        user_session = self.chat_service.join_room(
                            user_id=message.user_id,
                            room_id=message.room_id
                        )
                        
                        # 发送欢迎消息
                        welcome_msg = streaming_pb2.ChatMessage(
                            id=str(uuid.uuid4()),
                            user_id="system",
                            room_id=message.room_id,
                            content=f"User {message.user_id} joined the chat",
                            timestamp=google.protobuf.timestamp_pb2.Timestamp(),
                            type=streaming_pb2.CHAT_MESSAGE_TYPE_SYSTEM
                        )
                        outgoing_queue.put(welcome_msg)
                    
                    # 处理消息
                    if message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TEXT:
                        # 广播给房间内其他用户
                        self.chat_service.broadcast_message(message)
                        
                    elif message.type == streaming_pb2.CHAT_MESSAGE_TYPE_TYPING:
                        # 向其他用户发送打字指示器
                        self.chat_service.broadcast_typing(message)
                        
            except Exception as e:
                print(f"Error handling incoming messages: {e}")
            finally:
                # 清理会话
                if user_session:
                    self.chat_service.leave_room(user_session)
        
        # 启动处理入站消息的后台线程
        incoming_thread = threading.Thread(target=handle_incoming_messages)
        incoming_thread.daemon = True
        incoming_thread.start()
        
        # 订阅房间消息
        message_subscriber = None
        if user_session:
            message_subscriber = self.chat_service.subscribe_to_room(
                user_session.room_id,
                exclude_user=user_session.user_id
            )
        
        try:
            while context.is_active():
                # 检查队列中的出站消息
                try:
                    message = outgoing_queue.get(timeout=1)
                    yield message
                except queue.Empty:
                    pass
                
                # 检查来自其他用户的消息
                if message_subscriber:
                    try:
                        room_message = message_subscriber.get_message(timeout=1)
                        if room_message:
                            yield room_message
                    except TimeoutError:
                        pass
                        
        finally:
            # 清理
            if message_subscriber:
                message_subscriber.close()
            if user_session:
                self.chat_service.leave_room(user_session)
```

## 流式传输最佳实践

### 流量控制

实现适当的流量控制以防止客户端不堪重负：

```python title="flow_control.py"
def StreamData(self, request, context):
    """带流量控制的流式传输。"""
    
    # 使用有界队列控制内存使用
    data_queue = queue.Queue(maxsize=100)
    
    def data_producer():
        """生成数据的后台线程。"""
        for item in self.data_source.get_items():
            try:
                data_queue.put(item, timeout=5)
            except queue.Full:
                # 应用反压
                print("Client is too slow, dropping data")
                break
    
    producer_thread = threading.Thread(target=data_producer)
    producer_thread.start()
    
    try:
        while context.is_active():
            try:
                item = data_queue.get(timeout=30)
                yield item
            except queue.Empty:
                # 发送心跳或检查客户端连接
                if not context.is_active():
                    break
    finally:
        producer_thread.join(timeout=1)
```

### 流中的错误处理

在流式操作中优雅地处理错误：

```python title="stream_error_handling.py"
def StreamWithErrorHandling(self, request, context):
    """带健壮错误处理的流式传输。"""
    
    try:
        for item in self.get_stream_data(request):
            if not context.is_active():
                break
                
            try:
                # 处理项目
                processed_item = self.process_item(item)
                yield processed_item
                
            except ProcessingError as e:
                # 将错误作为响应的一部分发送
                error_response = create_error_response(e)
                yield error_response
                
            except Exception as e:
                # 严重错误 - 中止流
                context.set_code(grpc.StatusCode.INTERNAL)
                context.set_details(f'Processing failed: {str(e)}')
                break
                
    except Exception as e:
        context.set_code(grpc.StatusCode.INTERNAL)
        context.set_details(f'Stream failed: {str(e)}')
```

### 客户端流式传输

在客户端处理流式传输：

```python title="streaming_client.py"
import grpc

def stream_chat_client():
    """双向流式传输客户端示例。"""
    
    channel = grpc.insecure_channel('localhost:50051')
    stub = streaming_pb2_grpc.StreamingServiceStub(channel)
    
    def message_generator():
        """生成出站消息。"""
        # 发送初始消息
        yield streaming_pb2.ChatMessage(
            user_id="user123",
            room_id="general",
            content="Hello, world!",
            type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
        )
        
        # 保持连接活跃并定期发送消息
        while True:
            user_input = input("Enter message: ")
            if user_input.lower() == 'quit':
                break
                
            yield streaming_pb2.ChatMessage(
                user_id="user123",
                room_id="general", 
                content=user_input,
                type=streaming_pb2.CHAT_MESSAGE_TYPE_TEXT
            )
    
    # 启动双向流
    responses = stub.Chat(message_generator())
    
    try:
        for response in responses:
            print(f"Received: {response.content}")
    except grpc.RpcError as e:
        print(f"RPC failed: {e}")
```

gRPC 中的流式传输可实现强大的实时应用程序，同时保持强类型契约和高效二进制协议的优势。