Authentication

Configure authentication for gRPC services including TLS, JWT, and custom auth

gRPC supports various authentication mechanisms to secure your services. Authentication can be configured at the transport level (TLS) and at the application level (credentials, tokens, etc.).

Transport Security (TLS)

gRPC strongly recommends using TLS for production services to ensure encrypted communication:

auth_service.proto
1syntax = "proto3";
2
3package auth.v1;
4
5// Authentication service
6service AuthService {
7 // Authenticate user and return JWT token
8 rpc Login(LoginRequest) returns (LoginResponse);
9
10 // Validate and refresh JWT token
11 rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse);
12
13 // Logout and invalidate token
14 rpc Logout(LogoutRequest) returns (google.protobuf.Empty);
15}
16
17message LoginRequest {
18 string email = 1;
19 string password = 2;
20}
21
22message LoginResponse {
23 string access_token = 1;
24 string refresh_token = 2;
25 int64 expires_in = 3;
26 User user = 4;
27}

Configure TLS in your server:

server.py
1import grpc
2from grpc import ssl_channel_credentials
3import auth_service_pb2_grpc
4
5def create_secure_server():
6 # Load TLS credentials
7 server_credentials = grpc.ssl_server_credentials([
8 (private_key, certificate_chain)
9 ])
10
11 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
12 auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
13 AuthServiceServicer(), server
14 )
15
16 # Listen on secure port
17 server.add_secure_port('[::]:443', server_credentials)
18 return server

JWT Authentication

Use JWT tokens for stateless authentication:

auth.proto
1syntax = "proto3";
2
3package auth.v1;
4
5// JWT claims for user authentication
6message JWTClaims {
7 string user_id = 1;
8 string email = 2;
9 repeated string roles = 3;
10 google.protobuf.Timestamp issued_at = 4;
11 google.protobuf.Timestamp expires_at = 5;
12}

Implement JWT authentication in your service:

auth_service.py
1import grpc
2import jwt
3from grpc import ServicerContext
4
5class AuthServiceServicer(auth_service_pb2_grpc.AuthServiceServicer):
6
7 def Login(self, request, context):
8 # Validate credentials
9 user = self.validate_credentials(request.email, request.password)
10 if not user:
11 context.set_code(grpc.StatusCode.UNAUTHENTICATED)
12 context.set_details('Invalid credentials')
13 return auth_service_pb2.LoginResponse()
14
15 # Generate JWT token
16 payload = {
17 'user_id': user.id,
18 'email': user.email,
19 'roles': user.roles,
20 'exp': datetime.utcnow() + timedelta(hours=1)
21 }
22
23 access_token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
24
25 return auth_service_pb2.LoginResponse(
26 access_token=access_token,
27 refresh_token=self.generate_refresh_token(user.id),
28 expires_in=3600,
29 user=user
30 )

Interceptors for Authentication

Use gRPC interceptors to handle authentication across all methods:

auth_interceptor.py
1import grpc
2import jwt
3from grpc import ServicerContext
4
5class AuthInterceptor(grpc.ServerInterceptor):
6
7 def __init__(self, jwt_secret, exempt_methods=None):
8 self.jwt_secret = jwt_secret
9 self.exempt_methods = exempt_methods or []
10
11 def intercept_service(self, continuation, handler_call_details):
12 method_name = handler_call_details.method
13
14 # Skip authentication for exempt methods
15 if method_name in self.exempt_methods:
16 return continuation(handler_call_details)
17
18 # Extract metadata
19 metadata = dict(handler_call_details.invocation_metadata)
20 authorization = metadata.get('authorization', '')
21
22 if not authorization.startswith('Bearer '):
23 return self._unauthenticated_response()
24
25 token = authorization[7:] # Remove 'Bearer ' prefix
26
27 try:
28 # Validate JWT token
29 payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
30
31 # Add user info to context
32 handler_call_details = handler_call_details._replace(
33 invocation_metadata=handler_call_details.invocation_metadata + (
34 ('user_id', payload['user_id']),
35 ('user_email', payload['email']),
36 )
37 )
38
39 return continuation(handler_call_details)
40
41 except jwt.ExpiredSignatureError:
42 return self._expired_token_response()
43 except jwt.InvalidTokenError:
44 return self._invalid_token_response()
45
46 def _unauthenticated_response(self):
47 def abort(ignored_request, context):
48 context.set_code(grpc.StatusCode.UNAUTHENTICATED)
49 context.set_details('Missing or invalid authorization header')
50 return grpc.unary_unary_rpc_method_handler(abort)

API Key Authentication

Implement API key-based authentication:

api_key.proto
1syntax = "proto3";
2
3package auth.v1;
4
5message ApiKeyRequest {
6 string api_key = 1;
7 string service_name = 2;
8}
9
10message ApiKeyResponse {
11 bool valid = 1;
12 string client_id = 2;
13 repeated string permissions = 3;
14 google.protobuf.Timestamp expires_at = 4;
15}

Server implementation:

api_key_auth.py
1class ApiKeyAuthInterceptor(grpc.ServerInterceptor):
2
3 def __init__(self, api_key_store):
4 self.api_key_store = api_key_store
5
6 def intercept_service(self, continuation, handler_call_details):
7 metadata = dict(handler_call_details.invocation_metadata)
8 api_key = metadata.get('x-api-key', '')
9
10 if not api_key:
11 return self._unauthorized_response('API key required')
12
13 # Validate API key
14 key_info = self.api_key_store.get(api_key)
15 if not key_info or key_info.is_expired():
16 return self._unauthorized_response('Invalid or expired API key')
17
18 # Add client info to context
19 handler_call_details = handler_call_details._replace(
20 invocation_metadata=handler_call_details.invocation_metadata + (
21 ('client_id', key_info.client_id),
22 ('permissions', ','.join(key_info.permissions)),
23 )
24 )
25
26 return continuation(handler_call_details)

OAuth2 Integration

Integrate with OAuth2 providers:

oauth2_auth.py
1import requests
2from google.oauth2 import id_token
3from google.auth.transport import requests as google_requests
4
5class OAuth2Interceptor(grpc.ServerInterceptor):
6
7 def __init__(self, google_client_id):
8 self.google_client_id = google_client_id
9
10 def intercept_service(self, continuation, handler_call_details):
11 metadata = dict(handler_call_details.invocation_metadata)
12 auth_header = metadata.get('authorization', '')
13
14 if not auth_header.startswith('Bearer '):
15 return self._unauthorized_response()
16
17 token = auth_header[7:]
18
19 try:
20 # Verify Google ID token
21 idinfo = id_token.verify_oauth2_token(
22 token, google_requests.Request(), self.google_client_id
23 )
24
25 # Add user info to context
26 handler_call_details = handler_call_details._replace(
27 invocation_metadata=handler_call_details.invocation_metadata + (
28 ('user_id', idinfo['sub']),
29 ('user_email', idinfo['email']),
30 ('user_name', idinfo.get('name', '')),
31 )
32 )
33
34 return continuation(handler_call_details)
35
36 except ValueError:
37 return self._invalid_token_response()

Client-side Authentication

Configure authentication on the client side:

client.py
1import grpc
2
3# TLS with JWT
4def create_authenticated_channel(server_address, jwt_token):
5 credentials = grpc.ssl_channel_credentials()
6 channel = grpc.secure_channel(server_address, credentials)
7
8 # Add JWT token to all requests
9 def jwt_interceptor(continuation, client_call_details):
10 metadata = list(client_call_details.metadata or [])
11 metadata.append(('authorization', f'Bearer {jwt_token}'))
12
13 client_call_details = client_call_details._replace(metadata=metadata)
14 return continuation(client_call_details)
15
16 intercepted_channel = grpc.intercept_channel(channel, jwt_interceptor)
17 return intercepted_channel
18
19# API Key authentication
20def create_api_key_channel(server_address, api_key):
21 credentials = grpc.ssl_channel_credentials()
22 channel = grpc.secure_channel(server_address, credentials)
23
24 def api_key_interceptor(continuation, client_call_details):
25 metadata = list(client_call_details.metadata or [])
26 metadata.append(('x-api-key', api_key))
27
28 client_call_details = client_call_details._replace(metadata=metadata)
29 return continuation(client_call_details)
30
31 intercepted_channel = grpc.intercept_channel(channel, api_key_interceptor)
32 return intercepted_channel

Role-Based Access Control

Implement RBAC for fine-grained permissions:

rbac.proto
1syntax = "proto3";
2
3package auth.v1;
4
5message Permission {
6 string resource = 1;
7 string action = 2;
8}
9
10message Role {
11 string name = 1;
12 repeated Permission permissions = 2;
13}
14
15message UserRoles {
16 string user_id = 1;
17 repeated string role_names = 2;
18}

RBAC interceptor implementation:

rbac_interceptor.py
1class RBACInterceptor(grpc.ServerInterceptor):
2
3 def __init__(self, permission_store):
4 self.permission_store = permission_store
5
6 def intercept_service(self, continuation, handler_call_details):
7 # Get user info from context (added by auth interceptor)
8 metadata = dict(handler_call_details.invocation_metadata)
9 user_id = metadata.get('user_id')
10
11 if not user_id:
12 return self._unauthorized_response()
13
14 # Check permissions for the method
15 method_name = handler_call_details.method
16 required_permission = self._get_required_permission(method_name)
17
18 if required_permission and not self._has_permission(user_id, required_permission):
19 return self._forbidden_response()
20
21 return continuation(handler_call_details)
22
23 def _has_permission(self, user_id, permission):
24 user_roles = self.permission_store.get_user_roles(user_id)
25 for role in user_roles:
26 if permission in role.permissions:
27 return True
28 return False

gRPC’s flexible authentication system allows you to implement secure, scalable authentication patterns that work across different environments and use cases.