身份验证
身份验证
为 gRPC 服务配置身份验证,包括 TLS、JWT 和自定义认证
身份验证
为 gRPC 服务配置身份验证,包括 TLS、JWT 和自定义认证
gRPC 支持各种身份验证机制来保护您的服务。身份验证可以在传输层(TLS)和应用层(凭证、令牌等)进行配置。
gRPC 强烈建议在生产环境服务中使用 TLS 以确保加密通信:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 // Authentication service 6 service AuthService { 7 // Authenticate user and return JWT token 8 rpc Login(LoginRequest) returns (LoginResponse); 9 10 // Validate and refresh JWT token 11 rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse); 12 13 // Logout and invalidate token 14 rpc Logout(LogoutRequest) returns (google.protobuf.Empty); 15 } 16 17 message LoginRequest { 18 string email = 1; 19 string password = 2; 20 } 21 22 message LoginResponse { 23 string access_token = 1; 24 string refresh_token = 2; 25 int64 expires_in = 3; 26 User user = 4; 27 }
在您的服务器中配置 TLS:
1 import grpc 2 from grpc import ssl_channel_credentials 3 import auth_service_pb2_grpc 4 5 def create_secure_server(): 6 # Load TLS credentials 7 server_credentials = grpc.ssl_server_credentials([ 8 (private_key, certificate_chain) 9 ]) 10 11 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 12 auth_service_pb2_grpc.add_AuthServiceServicer_to_server( 13 AuthServiceServicer(), server 14 ) 15 16 # Listen on secure port 17 server.add_secure_port('[::]:443', server_credentials) 18 return server
使用 JWT 令牌进行无状态身份验证:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 // JWT claims for user authentication 6 message JWTClaims { 7 string user_id = 1; 8 string email = 2; 9 repeated string roles = 3; 10 google.protobuf.Timestamp issued_at = 4; 11 google.protobuf.Timestamp expires_at = 5; 12 }
在您的服务中实现 JWT 身份验证:
1 import grpc 2 import jwt 3 from grpc import ServicerContext 4 5 class AuthServiceServicer(auth_service_pb2_grpc.AuthServiceServicer): 6 7 def Login(self, request, context): 8 # Validate credentials 9 user = self.validate_credentials(request.email, request.password) 10 if not user: 11 context.set_code(grpc.StatusCode.UNAUTHENTICATED) 12 context.set_details('Invalid credentials') 13 return auth_service_pb2.LoginResponse() 14 15 # Generate JWT token 16 payload = { 17 'user_id': user.id, 18 'email': user.email, 19 'roles': user.roles, 20 'exp': datetime.utcnow() + timedelta(hours=1) 21 } 22 23 access_token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') 24 25 return auth_service_pb2.LoginResponse( 26 access_token=access_token, 27 refresh_token=self.generate_refresh_token(user.id), 28 expires_in=3600, 29 user=user 30 )
使用 gRPC 拦截器来处理所有方法的身份验证:
1 import grpc 2 import jwt 3 from grpc import ServicerContext 4 5 class AuthInterceptor(grpc.ServerInterceptor): 6 7 def __init__(self, jwt_secret, exempt_methods=None): 8 self.jwt_secret = jwt_secret 9 self.exempt_methods = exempt_methods or [] 10 11 def intercept_service(self, continuation, handler_call_details): 12 method_name = handler_call_details.method 13 14 # Skip authentication for exempt methods 15 if method_name in self.exempt_methods: 16 return continuation(handler_call_details) 17 18 # Extract metadata 19 metadata = dict(handler_call_details.invocation_metadata) 20 authorization = metadata.get('authorization', '') 21 22 if not authorization.startswith('Bearer '): 23 return self._unauthenticated_response() 24 25 token = authorization[7:] # Remove 'Bearer ' prefix 26 27 try: 28 # Validate JWT token 29 payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256']) 30 31 # Add user info to context 32 handler_call_details = handler_call_details._replace( 33 invocation_metadata=handler_call_details.invocation_metadata + ( 34 ('user_id', payload['user_id']), 35 ('user_email', payload['email']), 36 ) 37 ) 38 39 return continuation(handler_call_details) 40 41 except jwt.ExpiredSignatureError: 42 return self._expired_token_response() 43 except jwt.InvalidTokenError: 44 return self._invalid_token_response() 45 46 def _unauthenticated_response(self): 47 def abort(ignored_request, context): 48 context.set_code(grpc.StatusCode.UNAUTHENTICATED) 49 context.set_details('Missing or invalid authorization header') 50 return grpc.unary_unary_rpc_method_handler(abort)
实现基于 API 密钥的身份验证:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 message ApiKeyRequest { 6 string api_key = 1; 7 string service_name = 2; 8 } 9 10 message ApiKeyResponse { 11 bool valid = 1; 12 string client_id = 2; 13 repeated string permissions = 3; 14 google.protobuf.Timestamp expires_at = 4; 15 }
服务器实现:
1 class ApiKeyAuthInterceptor(grpc.ServerInterceptor): 2 3 def __init__(self, api_key_store): 4 self.api_key_store = api_key_store 5 6 def intercept_service(self, continuation, handler_call_details): 7 metadata = dict(handler_call_details.invocation_metadata) 8 api_key = metadata.get('x-api-key', '') 9 10 if not api_key: 11 return self._unauthorized_response('API key required') 12 13 # Validate API key 14 key_info = self.api_key_store.get(api_key) 15 if not key_info or key_info.is_expired(): 16 return self._unauthorized_response('Invalid or expired API key') 17 18 # Add client info to context 19 handler_call_details = handler_call_details._replace( 20 invocation_metadata=handler_call_details.invocation_metadata + ( 21 ('client_id', key_info.client_id), 22 ('permissions', ','.join(key_info.permissions)), 23 ) 24 ) 25 26 return continuation(handler_call_details)
与 OAuth2 提供商集成:
1 import requests 2 from google.oauth2 import id_token 3 from google.auth.transport import requests as google_requests 4 5 class OAuth2Interceptor(grpc.ServerInterceptor): 6 7 def __init__(self, google_client_id): 8 self.google_client_id = google_client_id 9 10 def intercept_service(self, continuation, handler_call_details): 11 metadata = dict(handler_call_details.invocation_metadata) 12 auth_header = metadata.get('authorization', '') 13 14 if not auth_header.startswith('Bearer '): 15 return self._unauthorized_response() 16 17 token = auth_header[7:] 18 19 try: 20 # Verify Google ID token 21 idinfo = id_token.verify_oauth2_token( 22 token, google_requests.Request(), self.google_client_id 23 ) 24 25 # Add user info to context 26 handler_call_details = handler_call_details._replace( 27 invocation_metadata=handler_call_details.invocation_metadata + ( 28 ('user_id', idinfo['sub']), 29 ('user_email', idinfo['email']), 30 ('user_name', idinfo.get('name', '')), 31 ) 32 ) 33 34 return continuation(handler_call_details) 35 36 except ValueError: 37 return self._invalid_token_response()
在客户端配置身份验证:
1 import grpc 2 3 # TLS with JWT 4 def create_authenticated_channel(server_address, jwt_token): 5 credentials = grpc.ssl_channel_credentials() 6 channel = grpc.secure_channel(server_address, credentials) 7 8 # Add JWT token to all requests 9 def jwt_interceptor(continuation, client_call_details): 10 metadata = list(client_call_details.metadata or []) 11 metadata.append(('authorization', f'Bearer {jwt_token}')) 12 13 client_call_details = client_call_details._replace(metadata=metadata) 14 return continuation(client_call_details) 15 16 intercepted_channel = grpc.intercept_channel(channel, jwt_interceptor) 17 return intercepted_channel 18 19 # API Key authentication 20 def create_api_key_channel(server_address, api_key): 21 credentials = grpc.ssl_channel_credentials() 22 channel = grpc.secure_channel(server_address, credentials) 23 24 def api_key_interceptor(continuation, client_call_details): 25 metadata = list(client_call_details.metadata or []) 26 metadata.append(('x-api-key', api_key)) 27 28 client_call_details = client_call_details._replace(metadata=metadata) 29 return continuation(client_call_details) 30 31 intercepted_channel = grpc.intercept_channel(channel, api_key_interceptor) 32 return intercepted_channel
实现用于细粒度权限控制的 RBAC:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 message Permission { 6 string resource = 1; 7 string action = 2; 8 } 9 10 message Role { 11 string name = 1; 12 repeated Permission permissions = 2; 13 } 14 15 message UserRoles { 16 string user_id = 1; 17 repeated string role_names = 2; 18 }
RBAC 拦截器实现:
1 class RBACInterceptor(grpc.ServerInterceptor): 2 3 def __init__(self, permission_store): 4 self.permission_store = permission_store 5 6 def intercept_service(self, continuation, handler_call_details): 7 # Get user info from context (added by auth interceptor) 8 metadata = dict(handler_call_details.invocation_metadata) 9 user_id = metadata.get('user_id') 10 11 if not user_id: 12 return self._unauthorized_response() 13 14 # Check permissions for the method 15 method_name = handler_call_details.method 16 required_permission = self._get_required_permission(method_name) 17 18 if required_permission and not self._has_permission(user_id, required_permission): 19 return self._forbidden_response() 20 21 return continuation(handler_call_details) 22 23 def _has_permission(self, user_id, permission): 24 user_roles = self.permission_store.get_user_roles(user_id) 25 for role in user_roles: 26 if permission in role.permissions: 27 return True 28 return False
gRPC 的灵活身份验证系统允许您实现安全、可扩展的身份验证模式,这些模式可以在不同的环境和用例中工作。