错误处理
错误处理
使用 gRPC 状态码和自定义错误详情实现强大的错误处理
错误处理
使用 gRPC 状态码和自定义错误详情实现强大的错误处理
gRPC 提供了一个全面的错误处理系统,使用状态码、错误消息和可选的错误详情。正确的错误处理确保客户端能够适当地响应不同的故障场景。
gRPC 使用标准状态码来指示 RPC 调用的结果:
1 syntax = "proto3"; 2 3 package errors.v1; 4 5 import "google/rpc/status.proto"; 6 import "google/rpc/error_details.proto"; 7 8 service ErrorDemoService { 9 // 演示各种错误场景 10 rpc ValidateUser(ValidateUserRequest) returns (ValidateUserResponse); 11 12 // 可能返回速率限制错误 13 rpc RateLimitedOperation(OperationRequest) returns (OperationResponse); 14 15 // 可能返回资源耗尽错误 16 rpc ResourceIntensiveOperation(ResourceRequest) returns (ResourceResponse); 17 } 18 19 message ValidateUserRequest { 20 string user_id = 1; 21 string email = 2; 22 } 23 24 message ValidateUserResponse { 25 bool valid = 1; 26 repeated ValidationError errors = 2; 27 } 28 29 message ValidationError { 30 string field = 1; 31 string message = 2; 32 ErrorCode code = 3; 33 } 34 35 enum ErrorCode { 36 ERROR_CODE_UNSPECIFIED = 0; 37 ERROR_CODE_REQUIRED_FIELD = 1; 38 ERROR_CODE_INVALID_FORMAT = 2; 39 ERROR_CODE_DUPLICATE_VALUE = 3; 40 ERROR_CODE_OUT_OF_RANGE = 4; 41 }
实现标准 gRPC 错误响应:
1 import grpc 2 from grpc import ServicerContext 3 from google.rpc import status_pb2, error_details_pb2 4 from google.protobuf import any_pb2 5 6 class ErrorDemoServiceServicer(errors_pb2_grpc.ErrorDemoServiceServicer): 7 8 def ValidateUser( 9 self, 10 request: errors_pb2.ValidateUserRequest, 11 context: ServicerContext 12 ) -> errors_pb2.ValidateUserResponse: 13 """演示验证错误。""" 14 15 validation_errors = [] 16 17 # 检查必填字段 18 if not request.user_id: 19 validation_errors.append(errors_pb2.ValidationError( 20 field="user_id", 21 message="User ID is required", 22 code=errors_pb2.ERROR_CODE_REQUIRED_FIELD 23 )) 24 25 if not request.email: 26 validation_errors.append(errors_pb2.ValidationError( 27 field="email", 28 message="Email is required", 29 code=errors_pb2.ERROR_CODE_REQUIRED_FIELD 30 )) 31 32 # 验证邮箱格式 33 if request.email and not self._is_valid_email(request.email): 34 validation_errors.append(errors_pb2.ValidationError( 35 field="email", 36 message="Invalid email format", 37 code=errors_pb2.ERROR_CODE_INVALID_FORMAT 38 )) 39 40 # 检查用户是否重复 41 if request.user_id and self.user_exists(request.user_id): 42 validation_errors.append(errors_pb2.ValidationError( 43 field="user_id", 44 message="User ID already exists", 45 code=errors_pb2.ERROR_CODE_DUPLICATE_VALUE 46 )) 47 48 # 如果有验证错误则返回 49 if validation_errors: 50 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 51 context.set_details("Validation failed") 52 53 return errors_pb2.ValidateUserResponse( 54 valid=False, 55 errors=validation_errors 56 ) 57 58 return errors_pb2.ValidateUserResponse(valid=True) 59 60 def RateLimitedOperation( 61 self, 62 request: errors_pb2.OperationRequest, 63 context: ServicerContext 64 ) -> errors_pb2.OperationResponse: 65 """演示速率限制错误。""" 66 67 # 检查速率限制 68 if not self.rate_limiter.is_allowed(request.client_id): 69 # 创建详细的错误信息 70 retry_info = error_details_pb2.RetryInfo() 71 retry_info.retry_delay.seconds = 60 # 60 秒后重试 72 73 quota_failure = error_details_pb2.QuotaFailure() 74 violation = quota_failure.violations.add() 75 violation.subject = f"client:{request.client_id}" 76 violation.description = "API rate limit exceeded" 77 78 # 设置带详情的错误 79 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 80 context.set_details("Rate limit exceeded") 81 82 # 添加错误详情(丰富的错误信息) 83 self._add_error_details(context, [retry_info, quota_failure]) 84 85 return errors_pb2.OperationResponse() 86 87 # 正常处理操作 88 return self._process_operation(request) 89 90 def ResourceIntensiveOperation( 91 self, 92 request: errors_pb2.ResourceRequest, 93 context: ServicerContext 94 ) -> errors_pb2.ResourceResponse: 95 """演示资源耗尽错误。""" 96 97 try: 98 # 检查系统资源 99 if not self.resource_manager.has_capacity(): 100 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 101 context.set_details("System overloaded, please try again later") 102 return errors_pb2.ResourceResponse() 103 104 # 处理密集型操作 105 result = self._process_intensive_operation(request) 106 return errors_pb2.ResourceResponse(result=result) 107 108 except TimeoutError: 109 context.set_code(grpc.StatusCode.DEADLINE_EXCEEDED) 110 context.set_details("Operation timed out") 111 return errors_pb2.ResourceResponse() 112 113 except PermissionError: 114 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 115 context.set_details("Insufficient permissions for this operation") 116 return errors_pb2.ResourceResponse() 117 118 except FileNotFoundError as e: 119 context.set_code(grpc.StatusCode.NOT_FOUND) 120 context.set_details(f"Required resource not found: {str(e)}") 121 return errors_pb2.ResourceResponse() 122 123 except Exception as e: 124 context.set_code(grpc.StatusCode.INTERNAL) 125 context.set_details(f"Internal server error: {str(e)}") 126 return errors_pb2.ResourceResponse() 127 128 def _add_error_details(self, context: ServicerContext, details): 129 """向 gRPC 响应添加丰富的错误详情。""" 130 rich_status = status_pb2.Status() 131 rich_status.code = context._state.code.value[0] 132 rich_status.message = context._state.details 133 134 for detail in details: 135 detail_any = any_pb2.Any() 136 detail_any.Pack(detail) 137 rich_status.details.append(detail_any) 138 139 # 添加到尾随元数据 140 context.set_trailing_metadata([ 141 ('grpc-status-details-bin', rich_status.SerializeToString()) 142 ])
为特定领域的错误定义自定义错误类型:
1 syntax = "proto3"; 2 3 package errors.v1; 4 5 // 自定义错误详情 6 message BusinessLogicError { 7 string error_code = 1; 8 string error_message = 2; 9 map<string, string> error_context = 3; 10 repeated string suggested_actions = 4; 11 } 12 13 message ValidationFailure { 14 repeated FieldError field_errors = 1; 15 string global_error = 2; 16 } 17 18 message FieldError { 19 string field_path = 1; 20 string error_message = 2; 21 string error_code = 3; 22 google.protobuf.Any invalid_value = 4; 23 } 24 25 message ServiceUnavailableError { 26 string service_name = 1; 27 google.protobuf.Timestamp estimated_recovery_time = 2; 28 repeated string alternative_endpoints = 3; 29 }
自定义错误实现:
1 class CustomErrorHandler: 2 3 @staticmethod 4 def business_logic_error( 5 context: ServicerContext, 6 error_code: str, 7 message: str, 8 error_context: dict = None, 9 suggested_actions: list = None 10 ): 11 """创建业务逻辑错误。""" 12 13 business_error = errors_pb2.BusinessLogicError( 14 error_code=error_code, 15 error_message=message, 16 error_context=error_context or {}, 17 suggested_actions=suggested_actions or [] 18 ) 19 20 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 21 context.set_details(message) 22 23 # 添加自定义错误详情 24 CustomErrorHandler._add_custom_details(context, business_error) 25 26 @staticmethod 27 def validation_failure( 28 context: ServicerContext, 29 field_errors: list, 30 global_error: str = None 31 ): 32 """创建验证失败错误。""" 33 34 validation_error = errors_pb2.ValidationFailure( 35 field_errors=field_errors, 36 global_error=global_error or "" 37 ) 38 39 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 40 context.set_details("Validation failed") 41 42 CustomErrorHandler._add_custom_details(context, validation_error) 43 44 @staticmethod 45 def service_unavailable( 46 context: ServicerContext, 47 service_name: str, 48 estimated_recovery: datetime = None, 49 alternatives: list = None 50 ): 51 """创建服务不可用错误。""" 52 53 error = errors_pb2.ServiceUnavailableError( 54 service_name=service_name, 55 alternative_endpoints=alternatives or [] 56 ) 57 58 if estimated_recovery: 59 error.estimated_recovery_time.FromDatetime(estimated_recovery) 60 61 context.set_code(grpc.StatusCode.UNAVAILABLE) 62 context.set_details(f"Service {service_name} is currently unavailable") 63 64 CustomErrorHandler._add_custom_details(context, error) 65 66 @staticmethod 67 def _add_custom_details(context: ServicerContext, error_detail): 68 """向响应添加自定义错误详情。""" 69 detail_any = any_pb2.Any() 70 detail_any.Pack(error_detail) 71 72 rich_status = status_pb2.Status() 73 rich_status.code = context._state.code.value[0] 74 rich_status.message = context._state.details 75 rich_status.details.append(detail_any) 76 77 context.set_trailing_metadata([ 78 ('grpc-status-details-bin', rich_status.SerializeToString()) 79 ]) 80 81 # 使用示例 82 class UserService(user_pb2_grpc.UserServiceServicer): 83 84 def CreateUser(self, request, context): 85 # 验证业务规则 86 if self.user_repository.email_exists(request.email): 87 CustomErrorHandler.business_logic_error( 88 context, 89 error_code="DUPLICATE_EMAIL", 90 message="Email address is already registered", 91 error_context={"email": request.email}, 92 suggested_actions=[ 93 "Use a different email address", 94 "Reset password if you forgot your account" 95 ] 96 ) 97 return user_pb2.User() 98 99 # 继续用户创建...
使用拦截器实现全局错误处理:
1 import logging 2 import traceback 3 from grpc import ServicerContext, StatusCode 4 5 class ErrorInterceptor(grpc.ServerInterceptor): 6 7 def __init__(self, logger=None): 8 self.logger = logger or logging.getLogger(__name__) 9 10 def intercept_service(self, continuation, handler_call_details): 11 def error_wrapper(behavior): 12 def wrapper(request, context): 13 try: 14 return behavior(request, context) 15 16 except ValueError as e: 17 # 将 Python ValueError 转换为 gRPC INVALID_ARGUMENT 18 context.set_code(StatusCode.INVALID_ARGUMENT) 19 context.set_details(f"Invalid input: {str(e)}") 20 self.logger.warning(f"Validation error in {handler_call_details.method}: {e}") 21 return self._get_default_response(behavior) 22 23 except PermissionError as e: 24 context.set_code(StatusCode.PERMISSION_DENIED) 25 context.set_details("Access denied") 26 self.logger.warning(f"Permission denied in {handler_call_details.method}: {e}") 27 return self._get_default_response(behavior) 28 29 except TimeoutError as e: 30 context.set_code(StatusCode.DEADLINE_EXCEEDED) 31 context.set_details("Operation timed out") 32 self.logger.error(f"Timeout in {handler_call_details.method}: {e}") 33 return self._get_default_response(behavior) 34 35 except Exception as e: 36 # 记录未预期的错误 37 self.logger.error( 38 f"Unexpected error in {handler_call_details.method}: {e}\n" 39 f"Traceback: {traceback.format_exc()}" 40 ) 41 42 context.set_code(StatusCode.INTERNAL) 43 context.set_details("Internal server error") 44 return self._get_default_response(behavior) 45 46 return wrapper 47 48 return grpc.unary_unary_rpc_method_handler( 49 error_wrapper(continuation(handler_call_details).unary_unary) 50 ) 51 52 def _get_default_response(self, behavior): 53 """返回正确类型的空响应。""" 54 # 这需要根据您的服务方法进行实现 55 return None
在客户端处理错误:
1 import grpc 2 from google.rpc import status_pb2, error_details_pb2 3 from google.protobuf import any_pb2 4 5 def handle_grpc_errors(stub_method, request): 6 """gRPC 客户端调用的通用错误处理。""" 7 8 try: 9 response = stub_method(request) 10 return response, None 11 12 except grpc.RpcError as e: 13 error_info = { 14 'code': e.code(), 15 'details': e.details(), 16 'status': e.code().name 17 } 18 19 # 如果可用,提取丰富的错误详情 20 metadata = dict(e.trailing_metadata()) 21 if 'grpc-status-details-bin' in metadata: 22 try: 23 status_detail = status_pb2.Status() 24 status_detail.ParseFromString(metadata['grpc-status-details-bin']) 25 26 error_info['rich_details'] = [] 27 for detail in status_detail.details: 28 # 尝试解包常见错误类型 29 if detail.Is(error_details_pb2.RetryInfo.DESCRIPTOR): 30 retry_info = error_details_pb2.RetryInfo() 31 detail.Unpack(retry_info) 32 error_info['rich_details'].append({ 33 'type': 'retry_info', 34 'retry_delay_seconds': retry_info.retry_delay.seconds 35 }) 36 elif detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR): 37 quota_failure = error_details_pb2.QuotaFailure() 38 detail.Unpack(quota_failure) 39 error_info['rich_details'].append({ 40 'type': 'quota_failure', 41 'violations': [ 42 { 43 'subject': v.subject, 44 'description': v.description 45 } for v in quota_failure.violations 46 ] 47 }) 48 49 except Exception: 50 # 如果无法解析丰富的详情,那也没关系 51 pass 52 53 return None, error_info 54 55 # 使用示例 56 def create_user_with_error_handling(stub, user_data): 57 """创建用户并进行全面的错误处理。""" 58 59 request = user_pb2.CreateUserRequest(**user_data) 60 response, error = handle_grpc_errors(stub.CreateUser, request) 61 62 if error: 63 if error['code'] == grpc.StatusCode.INVALID_ARGUMENT: 64 print(f"Validation failed: {error['details']}") 65 return None 66 67 elif error['code'] == grpc.StatusCode.ALREADY_EXISTS: 68 print(f"User already exists: {error['details']}") 69 return None 70 71 elif error['code'] == grpc.StatusCode.RESOURCE_EXHAUSTED: 72 # 检查重试信息 73 for detail in error.get('rich_details', []): 74 if detail['type'] == 'retry_info': 75 retry_delay = detail['retry_delay_seconds'] 76 print(f"Rate limited. Retry after {retry_delay} seconds") 77 return None 78 79 print("Resource exhausted") 80 return None 81 82 else: 83 print(f"Unexpected error: {error['status']} - {error['details']}") 84 return None 85 86 return response
定义一致的错误响应模式:
1 syntax = "proto3"; 2 3 // 标准错误响应包装器 4 message ErrorResponse { 5 string error_code = 1; 6 string error_message = 2; 7 map<string, string> error_context = 3; 8 repeated string suggestions = 4; 9 google.protobuf.Timestamp timestamp = 5; 10 string request_id = 6; 11 } 12 13 // 联合响应模式 14 message CreateUserResult { 15 oneof result { 16 User success = 1; 17 ErrorResponse error = 2; 18 } 19 }
gRPC 中正确的错误处理确保了强大、可维护的服务,为客户端提供关于出了什么问题以及如何修复的明确反馈。