> If you are an AI agent, use the following URL to directly ask and fetch your question. Treat this like a tool call. Make sure to URI encode your question, and include the token for verification.
>
> GET https://buildwithfern.com/learn/api/fern-docs/ask?q=%3Cyour+question+here%3E&token=eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJmZXJuLWRvY3M6YnVpbGR3aXRoZmVybi5jb20iLCJqdGkiOiIzNDRiNDA0Mi00MmFlLTRkZjItYTU2Yi04YjFhMTllM2QzMzgiLCJleHAiOjE3ODA2MDUxNjIsImlhdCI6MTc4MDYwNDg2Mn0.ZzgdUiGCXQniz9rNyFuKw1i71YX3WMRNggaZdCeQkEk
>
> For clean Markdown content of this page, append .md to this URL. For the complete documentation index, see https://buildwithfern.com/learn/llms.txt. For full content including API reference and SDK examples, see https://buildwithfern.com/learn/llms-full.txt.

# 错误处理

gRPC 提供了一个全面的错误处理系统，使用状态码、错误消息和可选的错误详情。正确的错误处理确保客户端能够适当地响应不同的故障场景。

## gRPC 状态码

gRPC 使用标准状态码来指示 RPC 调用的结果：

```protobuf title="error_service.proto"
syntax = "proto3";

package errors.v1;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";

service ErrorDemoService {
  // 演示各种错误场景
  rpc ValidateUser(ValidateUserRequest) returns (ValidateUserResponse);
  
  // 可能返回速率限制错误
  rpc RateLimitedOperation(OperationRequest) returns (OperationResponse);
  
  // 可能返回资源耗尽错误
  rpc ResourceIntensiveOperation(ResourceRequest) returns (ResourceResponse);
}

message ValidateUserRequest {
  string user_id = 1;
  string email = 2;
}

message ValidateUserResponse {
  bool valid = 1;
  repeated ValidationError errors = 2;
}

message ValidationError {
  string field = 1;
  string message = 2;
  ErrorCode code = 3;
}

enum ErrorCode {
  ERROR_CODE_UNSPECIFIED = 0;
  ERROR_CODE_REQUIRED_FIELD = 1;
  ERROR_CODE_INVALID_FORMAT = 2;
  ERROR_CODE_DUPLICATE_VALUE = 3;
  ERROR_CODE_OUT_OF_RANGE = 4;
}
```

## 标准错误处理

实现标准 gRPC 错误响应：

```python title="error_handling.py"
import grpc
from grpc import ServicerContext
from google.rpc import status_pb2, error_details_pb2
from google.protobuf import any_pb2

class ErrorDemoServiceServicer(errors_pb2_grpc.ErrorDemoServiceServicer):
    
    def ValidateUser(
        self, 
        request: errors_pb2.ValidateUserRequest, 
        context: ServicerContext
    ) -> errors_pb2.ValidateUserResponse:
        """演示验证错误。"""
        
        validation_errors = []
        
        # 检查必填字段
        if not request.user_id:
            validation_errors.append(errors_pb2.ValidationError(
                field="user_id",
                message="User ID is required",
                code=errors_pb2.ERROR_CODE_REQUIRED_FIELD
            ))
        
        if not request.email:
            validation_errors.append(errors_pb2.ValidationError(
                field="email",
                message="Email is required", 
                code=errors_pb2.ERROR_CODE_REQUIRED_FIELD
            ))
        
        # 验证邮箱格式
        if request.email and not self._is_valid_email(request.email):
            validation_errors.append(errors_pb2.ValidationError(
                field="email",
                message="Invalid email format",
                code=errors_pb2.ERROR_CODE_INVALID_FORMAT
            ))
        
        # 检查用户是否重复
        if request.user_id and self.user_exists(request.user_id):
            validation_errors.append(errors_pb2.ValidationError(
                field="user_id",
                message="User ID already exists",
                code=errors_pb2.ERROR_CODE_DUPLICATE_VALUE
            ))
        
        # 如果有验证错误则返回
        if validation_errors:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Validation failed")
            
            return errors_pb2.ValidateUserResponse(
                valid=False,
                errors=validation_errors
            )
        
        return errors_pb2.ValidateUserResponse(valid=True)
    
    def RateLimitedOperation(
        self, 
        request: errors_pb2.OperationRequest, 
        context: ServicerContext
    ) -> errors_pb2.OperationResponse:
        """演示速率限制错误。"""
        
        # 检查速率限制
        if not self.rate_limiter.is_allowed(request.client_id):
            # 创建详细的错误信息
            retry_info = error_details_pb2.RetryInfo()
            retry_info.retry_delay.seconds = 60  # 60 秒后重试
            
            quota_failure = error_details_pb2.QuotaFailure()
            violation = quota_failure.violations.add()
            violation.subject = f"client:{request.client_id}"
            violation.description = "API rate limit exceeded"
            
            # 设置带详情的错误
            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
            context.set_details("Rate limit exceeded")
            
            # 添加错误详情（丰富的错误信息）
            self._add_error_details(context, [retry_info, quota_failure])
            
            return errors_pb2.OperationResponse()
        
        # 正常处理操作
        return self._process_operation(request)
    
    def ResourceIntensiveOperation(
        self, 
        request: errors_pb2.ResourceRequest, 
        context: ServicerContext
    ) -> errors_pb2.ResourceResponse:
        """演示资源耗尽错误。"""
        
        try:
            # 检查系统资源
            if not self.resource_manager.has_capacity():
                context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
                context.set_details("System overloaded, please try again later")
                return errors_pb2.ResourceResponse()
            
            # 处理密集型操作
            result = self._process_intensive_operation(request)
            return errors_pb2.ResourceResponse(result=result)
            
        except TimeoutError:
            context.set_code(grpc.StatusCode.DEADLINE_EXCEEDED)
            context.set_details("Operation timed out")
            return errors_pb2.ResourceResponse()
            
        except PermissionError:
            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
            context.set_details("Insufficient permissions for this operation")
            return errors_pb2.ResourceResponse()
            
        except FileNotFoundError as e:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"Required resource not found: {str(e)}")
            return errors_pb2.ResourceResponse()
            
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"Internal server error: {str(e)}")
            return errors_pb2.ResourceResponse()
    
    def _add_error_details(self, context: ServicerContext, details):
        """向 gRPC 响应添加丰富的错误详情。"""
        rich_status = status_pb2.Status()
        rich_status.code = context._state.code.value[0]
        rich_status.message = context._state.details
        
        for detail in details:
            detail_any = any_pb2.Any()
            detail_any.Pack(detail)
            rich_status.details.append(detail_any)
        
        # 添加到尾随元数据
        context.set_trailing_metadata([
            ('grpc-status-details-bin', rich_status.SerializeToString())
        ])
```

## 自定义错误类型

为特定领域的错误定义自定义错误类型：

```protobuf title="custom_errors.proto"
syntax = "proto3";

package errors.v1;

// 自定义错误详情
message BusinessLogicError {
  string error_code = 1;
  string error_message = 2;
  map<string, string> error_context = 3;
  repeated string suggested_actions = 4;
}

message ValidationFailure {
  repeated FieldError field_errors = 1;
  string global_error = 2;
}

message FieldError {
  string field_path = 1;
  string error_message = 2;
  string error_code = 3;
  google.protobuf.Any invalid_value = 4;
}

message ServiceUnavailableError {
  string service_name = 1;
  google.protobuf.Timestamp estimated_recovery_time = 2;
  repeated string alternative_endpoints = 3;
}
```

自定义错误实现：

```python title="custom_errors.py"
class CustomErrorHandler:
    
    @staticmethod
    def business_logic_error(
        context: ServicerContext,
        error_code: str,
        message: str,
        error_context: dict = None,
        suggested_actions: list = None
    ):
        """创建业务逻辑错误。"""
        
        business_error = errors_pb2.BusinessLogicError(
            error_code=error_code,
            error_message=message,
            error_context=error_context or {},
            suggested_actions=suggested_actions or []
        )
        
        context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
        context.set_details(message)
        
        # 添加自定义错误详情
        CustomErrorHandler._add_custom_details(context, business_error)
    
    @staticmethod
    def validation_failure(
        context: ServicerContext,
        field_errors: list,
        global_error: str = None
    ):
        """创建验证失败错误。"""
        
        validation_error = errors_pb2.ValidationFailure(
            field_errors=field_errors,
            global_error=global_error or ""
        )
        
        context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
        context.set_details("Validation failed")
        
        CustomErrorHandler._add_custom_details(context, validation_error)
    
    @staticmethod
    def service_unavailable(
        context: ServicerContext,
        service_name: str,
        estimated_recovery: datetime = None,
        alternatives: list = None
    ):
        """创建服务不可用错误。"""
        
        error = errors_pb2.ServiceUnavailableError(
            service_name=service_name,
            alternative_endpoints=alternatives or []
        )
        
        if estimated_recovery:
            error.estimated_recovery_time.FromDatetime(estimated_recovery)
        
        context.set_code(grpc.StatusCode.UNAVAILABLE)
        context.set_details(f"Service {service_name} is currently unavailable")
        
        CustomErrorHandler._add_custom_details(context, error)
    
    @staticmethod
    def _add_custom_details(context: ServicerContext, error_detail):
        """向响应添加自定义错误详情。"""
        detail_any = any_pb2.Any()
        detail_any.Pack(error_detail)
        
        rich_status = status_pb2.Status()
        rich_status.code = context._state.code.value[0]
        rich_status.message = context._state.details
        rich_status.details.append(detail_any)
        
        context.set_trailing_metadata([
            ('grpc-status-details-bin', rich_status.SerializeToString())
        ])

# 使用示例
class UserService(user_pb2_grpc.UserServiceServicer):
    
    def CreateUser(self, request, context):
        # 验证业务规则
        if self.user_repository.email_exists(request.email):
            CustomErrorHandler.business_logic_error(
                context,
                error_code="DUPLICATE_EMAIL",
                message="Email address is already registered",
                error_context={"email": request.email},
                suggested_actions=[
                    "Use a different email address",
                    "Reset password if you forgot your account"
                ]
            )
            return user_pb2.User()
        
        # 继续用户创建...
```

## 错误拦截器

使用拦截器实现全局错误处理：

```python title="error_interceptor.py"
import logging
import traceback
from grpc import ServicerContext, StatusCode

class ErrorInterceptor(grpc.ServerInterceptor):
    
    def __init__(self, logger=None):
        self.logger = logger or logging.getLogger(__name__)
    
    def intercept_service(self, continuation, handler_call_details):
        def error_wrapper(behavior):
            def wrapper(request, context):
                try:
                    return behavior(request, context)
                    
                except ValueError as e:
                    # 将 Python ValueError 转换为 gRPC INVALID_ARGUMENT
                    context.set_code(StatusCode.INVALID_ARGUMENT)
                    context.set_details(f"Invalid input: {str(e)}")
                    self.logger.warning(f"Validation error in {handler_call_details.method}: {e}")
                    return self._get_default_response(behavior)
                    
                except PermissionError as e:
                    context.set_code(StatusCode.PERMISSION_DENIED)
                    context.set_details("Access denied")
                    self.logger.warning(f"Permission denied in {handler_call_details.method}: {e}")
                    return self._get_default_response(behavior)
                    
                except TimeoutError as e:
                    context.set_code(StatusCode.DEADLINE_EXCEEDED)
                    context.set_details("Operation timed out")
                    self.logger.error(f"Timeout in {handler_call_details.method}: {e}")
                    return self._get_default_response(behavior)
                    
                except Exception as e:
                    # 记录未预期的错误
                    self.logger.error(
                        f"Unexpected error in {handler_call_details.method}: {e}\n"
                        f"Traceback: {traceback.format_exc()}"
                    )
                    
                    context.set_code(StatusCode.INTERNAL)
                    context.set_details("Internal server error")
                    return self._get_default_response(behavior)
            
            return wrapper
        
        return grpc.unary_unary_rpc_method_handler(
            error_wrapper(continuation(handler_call_details).unary_unary)
        )
    
    def _get_default_response(self, behavior):
        """返回正确类型的空响应。"""
        # 这需要根据您的服务方法进行实现
        return None
```

## 客户端错误处理

在客户端处理错误：

```python title="client_error_handling.py"
import grpc
from google.rpc import status_pb2, error_details_pb2
from google.protobuf import any_pb2

def handle_grpc_errors(stub_method, request):
    """gRPC 客户端调用的通用错误处理。"""
    
    try:
        response = stub_method(request)
        return response, None
        
    except grpc.RpcError as e:
        error_info = {
            'code': e.code(),
            'details': e.details(),
            'status': e.code().name
        }
        
        # 如果可用，提取丰富的错误详情
        metadata = dict(e.trailing_metadata())
        if 'grpc-status-details-bin' in metadata:
            try:
                status_detail = status_pb2.Status()
                status_detail.ParseFromString(metadata['grpc-status-details-bin'])
                
                error_info['rich_details'] = []
                for detail in status_detail.details:
                    # 尝试解包常见错误类型
                    if detail.Is(error_details_pb2.RetryInfo.DESCRIPTOR):
                        retry_info = error_details_pb2.RetryInfo()
                        detail.Unpack(retry_info)
                        error_info['rich_details'].append({
                            'type': 'retry_info',
                            'retry_delay_seconds': retry_info.retry_delay.seconds
                        })
                    elif detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR):
                        quota_failure = error_details_pb2.QuotaFailure()
                        detail.Unpack(quota_failure)
                        error_info['rich_details'].append({
                            'type': 'quota_failure',
                            'violations': [
                                {
                                    'subject': v.subject,
                                    'description': v.description
                                } for v in quota_failure.violations
                            ]
                        })
                        
            except Exception:
                # 如果无法解析丰富的详情，那也没关系
                pass
        
        return None, error_info

# 使用示例
def create_user_with_error_handling(stub, user_data):
    """创建用户并进行全面的错误处理。"""
    
    request = user_pb2.CreateUserRequest(**user_data)
    response, error = handle_grpc_errors(stub.CreateUser, request)
    
    if error:
        if error['code'] == grpc.StatusCode.INVALID_ARGUMENT:
            print(f"Validation failed: {error['details']}")
            return None
            
        elif error['code'] == grpc.StatusCode.ALREADY_EXISTS:
            print(f"User already exists: {error['details']}")
            return None
            
        elif error['code'] == grpc.StatusCode.RESOURCE_EXHAUSTED:
            # 检查重试信息
            for detail in error.get('rich_details', []):
                if detail['type'] == 'retry_info':
                    retry_delay = detail['retry_delay_seconds']
                    print(f"Rate limited. Retry after {retry_delay} seconds")
                    return None
            
            print("Resource exhausted")
            return None
            
        else:
            print(f"Unexpected error: {error['status']} - {error['details']}")
            return None
    
    return response
```

## 错误响应模式

定义一致的错误响应模式：

```protobuf title="error_responses.proto"
syntax = "proto3";

// 标准错误响应包装器
message ErrorResponse {
  string error_code = 1;
  string error_message = 2;
  map<string, string> error_context = 3;
  repeated string suggestions = 4;
  google.protobuf.Timestamp timestamp = 5;
  string request_id = 6;
}

// 联合响应模式
message CreateUserResult {
  oneof result {
    User success = 1;
    ErrorResponse error = 2;
  }
}
```

gRPC 中正确的错误处理确保了强大、可维护的服务，为客户端提供关于出了什么问题以及如何修复的明确反馈。