Authentication
gRPC supports various authentication mechanisms to secure your services. Authentication can be configured at the transport level (TLS) and at the application level (credentials, tokens, etc.).
Transport Security (TLS)
gRPC strongly recommends using TLS for production services to ensure encrypted communication:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 // Authentication service 6 service AuthService { 7 // Authenticate user and return JWT token 8 rpc Login(LoginRequest) returns (LoginResponse); 9 10 // Validate and refresh JWT token 11 rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse); 12 13 // Logout and invalidate token 14 rpc Logout(LogoutRequest) returns (google.protobuf.Empty); 15 } 16 17 message LoginRequest { 18 string email = 1; 19 string password = 2; 20 } 21 22 message LoginResponse { 23 string access_token = 1; 24 string refresh_token = 2; 25 int64 expires_in = 3; 26 User user = 4; 27 }
Configure TLS in your server:
1 import grpc 2 from grpc import ssl_channel_credentials 3 import auth_service_pb2_grpc 4 5 def create_secure_server(): 6 # Load TLS credentials 7 server_credentials = grpc.ssl_server_credentials([ 8 (private_key, certificate_chain) 9 ]) 10 11 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 12 auth_service_pb2_grpc.add_AuthServiceServicer_to_server( 13 AuthServiceServicer(), server 14 ) 15 16 # Listen on secure port 17 server.add_secure_port('[::]:443', server_credentials) 18 return server
JWT Authentication
Use JWT tokens for stateless authentication:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 // JWT claims for user authentication 6 message JWTClaims { 7 string user_id = 1; 8 string email = 2; 9 repeated string roles = 3; 10 google.protobuf.Timestamp issued_at = 4; 11 google.protobuf.Timestamp expires_at = 5; 12 }
Implement JWT authentication in your service:
1 import grpc 2 import jwt 3 from grpc import ServicerContext 4 5 class AuthServiceServicer(auth_service_pb2_grpc.AuthServiceServicer): 6 7 def Login(self, request, context): 8 # Validate credentials 9 user = self.validate_credentials(request.email, request.password) 10 if not user: 11 context.set_code(grpc.StatusCode.UNAUTHENTICATED) 12 context.set_details('Invalid credentials') 13 return auth_service_pb2.LoginResponse() 14 15 # Generate JWT token 16 payload = { 17 'user_id': user.id, 18 'email': user.email, 19 'roles': user.roles, 20 'exp': datetime.utcnow() + timedelta(hours=1) 21 } 22 23 access_token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') 24 25 return auth_service_pb2.LoginResponse( 26 access_token=access_token, 27 refresh_token=self.generate_refresh_token(user.id), 28 expires_in=3600, 29 user=user 30 )
Interceptors for Authentication
Use gRPC interceptors to handle authentication across all methods:
1 import grpc 2 import jwt 3 from grpc import ServicerContext 4 5 class AuthInterceptor(grpc.ServerInterceptor): 6 7 def __init__(self, jwt_secret, exempt_methods=None): 8 self.jwt_secret = jwt_secret 9 self.exempt_methods = exempt_methods or [] 10 11 def intercept_service(self, continuation, handler_call_details): 12 method_name = handler_call_details.method 13 14 # Skip authentication for exempt methods 15 if method_name in self.exempt_methods: 16 return continuation(handler_call_details) 17 18 # Extract metadata 19 metadata = dict(handler_call_details.invocation_metadata) 20 authorization = metadata.get('authorization', '') 21 22 if not authorization.startswith('Bearer '): 23 return self._unauthenticated_response() 24 25 token = authorization[7:] # Remove 'Bearer ' prefix 26 27 try: 28 # Validate JWT token 29 payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256']) 30 31 # Add user info to context 32 handler_call_details = handler_call_details._replace( 33 invocation_metadata=handler_call_details.invocation_metadata + ( 34 ('user_id', payload['user_id']), 35 ('user_email', payload['email']), 36 ) 37 ) 38 39 return continuation(handler_call_details) 40 41 except jwt.ExpiredSignatureError: 42 return self._expired_token_response() 43 except jwt.InvalidTokenError: 44 return self._invalid_token_response() 45 46 def _unauthenticated_response(self): 47 def abort(ignored_request, context): 48 context.set_code(grpc.StatusCode.UNAUTHENTICATED) 49 context.set_details('Missing or invalid authorization header') 50 return grpc.unary_unary_rpc_method_handler(abort)
API Key Authentication
Implement API key-based authentication:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 message ApiKeyRequest { 6 string api_key = 1; 7 string service_name = 2; 8 } 9 10 message ApiKeyResponse { 11 bool valid = 1; 12 string client_id = 2; 13 repeated string permissions = 3; 14 google.protobuf.Timestamp expires_at = 4; 15 }
Server implementation:
1 class ApiKeyAuthInterceptor(grpc.ServerInterceptor): 2 3 def __init__(self, api_key_store): 4 self.api_key_store = api_key_store 5 6 def intercept_service(self, continuation, handler_call_details): 7 metadata = dict(handler_call_details.invocation_metadata) 8 api_key = metadata.get('x-api-key', '') 9 10 if not api_key: 11 return self._unauthorized_response('API key required') 12 13 # Validate API key 14 key_info = self.api_key_store.get(api_key) 15 if not key_info or key_info.is_expired(): 16 return self._unauthorized_response('Invalid or expired API key') 17 18 # Add client info to context 19 handler_call_details = handler_call_details._replace( 20 invocation_metadata=handler_call_details.invocation_metadata + ( 21 ('client_id', key_info.client_id), 22 ('permissions', ','.join(key_info.permissions)), 23 ) 24 ) 25 26 return continuation(handler_call_details)
OAuth2 Integration
Integrate with OAuth2 providers:
1 import requests 2 from google.oauth2 import id_token 3 from google.auth.transport import requests as google_requests 4 5 class OAuth2Interceptor(grpc.ServerInterceptor): 6 7 def __init__(self, google_client_id): 8 self.google_client_id = google_client_id 9 10 def intercept_service(self, continuation, handler_call_details): 11 metadata = dict(handler_call_details.invocation_metadata) 12 auth_header = metadata.get('authorization', '') 13 14 if not auth_header.startswith('Bearer '): 15 return self._unauthorized_response() 16 17 token = auth_header[7:] 18 19 try: 20 # Verify Google ID token 21 idinfo = id_token.verify_oauth2_token( 22 token, google_requests.Request(), self.google_client_id 23 ) 24 25 # Add user info to context 26 handler_call_details = handler_call_details._replace( 27 invocation_metadata=handler_call_details.invocation_metadata + ( 28 ('user_id', idinfo['sub']), 29 ('user_email', idinfo['email']), 30 ('user_name', idinfo.get('name', '')), 31 ) 32 ) 33 34 return continuation(handler_call_details) 35 36 except ValueError: 37 return self._invalid_token_response()
Client-side Authentication
Configure authentication on the client side:
1 import grpc 2 3 # TLS with JWT 4 def create_authenticated_channel(server_address, jwt_token): 5 credentials = grpc.ssl_channel_credentials() 6 channel = grpc.secure_channel(server_address, credentials) 7 8 # Add JWT token to all requests 9 def jwt_interceptor(continuation, client_call_details): 10 metadata = list(client_call_details.metadata or []) 11 metadata.append(('authorization', f'Bearer {jwt_token}')) 12 13 client_call_details = client_call_details._replace(metadata=metadata) 14 return continuation(client_call_details) 15 16 intercepted_channel = grpc.intercept_channel(channel, jwt_interceptor) 17 return intercepted_channel 18 19 # API Key authentication 20 def create_api_key_channel(server_address, api_key): 21 credentials = grpc.ssl_channel_credentials() 22 channel = grpc.secure_channel(server_address, credentials) 23 24 def api_key_interceptor(continuation, client_call_details): 25 metadata = list(client_call_details.metadata or []) 26 metadata.append(('x-api-key', api_key)) 27 28 client_call_details = client_call_details._replace(metadata=metadata) 29 return continuation(client_call_details) 30 31 intercepted_channel = grpc.intercept_channel(channel, api_key_interceptor) 32 return intercepted_channel
Role-Based Access Control
Implement RBAC for fine-grained permissions:
1 syntax = "proto3"; 2 3 package auth.v1; 4 5 message Permission { 6 string resource = 1; 7 string action = 2; 8 } 9 10 message Role { 11 string name = 1; 12 repeated Permission permissions = 2; 13 } 14 15 message UserRoles { 16 string user_id = 1; 17 repeated string role_names = 2; 18 }
RBAC interceptor implementation:
1 class RBACInterceptor(grpc.ServerInterceptor): 2 3 def __init__(self, permission_store): 4 self.permission_store = permission_store 5 6 def intercept_service(self, continuation, handler_call_details): 7 # Get user info from context (added by auth interceptor) 8 metadata = dict(handler_call_details.invocation_metadata) 9 user_id = metadata.get('user_id') 10 11 if not user_id: 12 return self._unauthorized_response() 13 14 # Check permissions for the method 15 method_name = handler_call_details.method 16 required_permission = self._get_required_permission(method_name) 17 18 if required_permission and not self._has_permission(user_id, required_permission): 19 return self._forbidden_response() 20 21 return continuation(handler_call_details) 22 23 def _has_permission(self, user_id, permission): 24 user_roles = self.permission_store.get_user_roles(user_id) 25 for role in user_roles: 26 if permission in role.permissions: 27 return True 28 return False
gRPC’s flexible authentication system allows you to implement secure, scalable authentication patterns that work across different environments and use cases.