For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
预约演示登录免费开始
  • 概览
    • 什么是 API 定义?
    • 项目结构
      • 概览
      • 覆盖(Overrides)
      • 身份验证
      • 服务器
      • 同步您的规范
      • gRPC generators.yml 参考
Checking status...
SOC2Soc 2 Type II
© 2026 Fern • Birch Solutions, Inc., a Postman company

Documentation

SDKsDocsAsk FernCLI Reference

API Definitions

OpenAPIAsyncAPIOpenRPCgRPC

Resources

BlogSupportPricing

Company

Brand KitPrivacy PolicyTerms of Service
LogoLogo
预约演示登录免费开始
在本页
  • 传输安全(TLS)
  • JWT 身份验证
  • 用于身份验证的拦截器
  • API 密钥身份验证
  • OAuth2 集成
  • 客户端身份验证
  • 基于角色的访问控制
gRPC

身份验证

为 gRPC 服务配置身份验证,包括 TLS、JWT 和自定义认证

||以 Markdown 格式查看|
此页面是否有帮助?
在仪表板中编辑
上一个

重写

下一个

服务器

gRPC 支持各种身份验证机制来保护您的服务。身份验证可以在传输层(TLS)和应用层(凭证、令牌等)进行配置。

传输安全(TLS)

gRPC 强烈建议在生产环境服务中使用 TLS 以确保加密通信:

auth_service.proto
1syntax = "proto3";
2
3package auth.v1;
4
5// Authentication service
6service AuthService {
7 // Authenticate user and return JWT token
8 rpc Login(LoginRequest) returns (LoginResponse);
9
10 // Validate and refresh JWT token
11 rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse);
12
13 // Logout and invalidate token
14 rpc Logout(LogoutRequest) returns (google.protobuf.Empty);
15}
16
17message LoginRequest {
18 string email = 1;
19 string password = 2;
20}
21
22message LoginResponse {
23 string access_token = 1;
24 string refresh_token = 2;
25 int64 expires_in = 3;
26 User user = 4;
27}

在您的服务器中配置 TLS:

server.py
1import grpc
2from grpc import ssl_channel_credentials
3import auth_service_pb2_grpc
4
5def create_secure_server():
6 # Load TLS credentials
7 server_credentials = grpc.ssl_server_credentials([
8 (private_key, certificate_chain)
9 ])
10
11 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
12 auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
13 AuthServiceServicer(), server
14 )
15
16 # Listen on secure port
17 server.add_secure_port('[::]:443', server_credentials)
18 return server

JWT 身份验证

使用 JWT 令牌进行无状态身份验证:

auth.proto
1syntax = "proto3";
2
3package auth.v1;
4
5// JWT claims for user authentication
6message JWTClaims {
7 string user_id = 1;
8 string email = 2;
9 repeated string roles = 3;
10 google.protobuf.Timestamp issued_at = 4;
11 google.protobuf.Timestamp expires_at = 5;
12}

在您的服务中实现 JWT 身份验证:

auth_service.py
1import grpc
2import jwt
3from grpc import ServicerContext
4
5class AuthServiceServicer(auth_service_pb2_grpc.AuthServiceServicer):
6
7 def Login(self, request, context):
8 # Validate credentials
9 user = self.validate_credentials(request.email, request.password)
10 if not user:
11 context.set_code(grpc.StatusCode.UNAUTHENTICATED)
12 context.set_details('Invalid credentials')
13 return auth_service_pb2.LoginResponse()
14
15 # Generate JWT token
16 payload = {
17 'user_id': user.id,
18 'email': user.email,
19 'roles': user.roles,
20 'exp': datetime.utcnow() + timedelta(hours=1)
21 }
22
23 access_token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
24
25 return auth_service_pb2.LoginResponse(
26 access_token=access_token,
27 refresh_token=self.generate_refresh_token(user.id),
28 expires_in=3600,
29 user=user
30 )

用于身份验证的拦截器

使用 gRPC 拦截器来处理所有方法的身份验证:

auth_interceptor.py
1import grpc
2import jwt
3from grpc import ServicerContext
4
5class AuthInterceptor(grpc.ServerInterceptor):
6
7 def __init__(self, jwt_secret, exempt_methods=None):
8 self.jwt_secret = jwt_secret
9 self.exempt_methods = exempt_methods or []
10
11 def intercept_service(self, continuation, handler_call_details):
12 method_name = handler_call_details.method
13
14 # Skip authentication for exempt methods
15 if method_name in self.exempt_methods:
16 return continuation(handler_call_details)
17
18 # Extract metadata
19 metadata = dict(handler_call_details.invocation_metadata)
20 authorization = metadata.get('authorization', '')
21
22 if not authorization.startswith('Bearer '):
23 return self._unauthenticated_response()
24
25 token = authorization[7:] # Remove 'Bearer ' prefix
26
27 try:
28 # Validate JWT token
29 payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
30
31 # Add user info to context
32 handler_call_details = handler_call_details._replace(
33 invocation_metadata=handler_call_details.invocation_metadata + (
34 ('user_id', payload['user_id']),
35 ('user_email', payload['email']),
36 )
37 )
38
39 return continuation(handler_call_details)
40
41 except jwt.ExpiredSignatureError:
42 return self._expired_token_response()
43 except jwt.InvalidTokenError:
44 return self._invalid_token_response()
45
46 def _unauthenticated_response(self):
47 def abort(ignored_request, context):
48 context.set_code(grpc.StatusCode.UNAUTHENTICATED)
49 context.set_details('Missing or invalid authorization header')
50 return grpc.unary_unary_rpc_method_handler(abort)

API 密钥身份验证

实现基于 API 密钥的身份验证:

api_key.proto
1syntax = "proto3";
2
3package auth.v1;
4
5message ApiKeyRequest {
6 string api_key = 1;
7 string service_name = 2;
8}
9
10message ApiKeyResponse {
11 bool valid = 1;
12 string client_id = 2;
13 repeated string permissions = 3;
14 google.protobuf.Timestamp expires_at = 4;
15}

服务器实现:

api_key_auth.py
1class ApiKeyAuthInterceptor(grpc.ServerInterceptor):
2
3 def __init__(self, api_key_store):
4 self.api_key_store = api_key_store
5
6 def intercept_service(self, continuation, handler_call_details):
7 metadata = dict(handler_call_details.invocation_metadata)
8 api_key = metadata.get('x-api-key', '')
9
10 if not api_key:
11 return self._unauthorized_response('API key required')
12
13 # Validate API key
14 key_info = self.api_key_store.get(api_key)
15 if not key_info or key_info.is_expired():
16 return self._unauthorized_response('Invalid or expired API key')
17
18 # Add client info to context
19 handler_call_details = handler_call_details._replace(
20 invocation_metadata=handler_call_details.invocation_metadata + (
21 ('client_id', key_info.client_id),
22 ('permissions', ','.join(key_info.permissions)),
23 )
24 )
25
26 return continuation(handler_call_details)

OAuth2 集成

与 OAuth2 提供商集成:

oauth2_auth.py
1import requests
2from google.oauth2 import id_token
3from google.auth.transport import requests as google_requests
4
5class OAuth2Interceptor(grpc.ServerInterceptor):
6
7 def __init__(self, google_client_id):
8 self.google_client_id = google_client_id
9
10 def intercept_service(self, continuation, handler_call_details):
11 metadata = dict(handler_call_details.invocation_metadata)
12 auth_header = metadata.get('authorization', '')
13
14 if not auth_header.startswith('Bearer '):
15 return self._unauthorized_response()
16
17 token = auth_header[7:]
18
19 try:
20 # Verify Google ID token
21 idinfo = id_token.verify_oauth2_token(
22 token, google_requests.Request(), self.google_client_id
23 )
24
25 # Add user info to context
26 handler_call_details = handler_call_details._replace(
27 invocation_metadata=handler_call_details.invocation_metadata + (
28 ('user_id', idinfo['sub']),
29 ('user_email', idinfo['email']),
30 ('user_name', idinfo.get('name', '')),
31 )
32 )
33
34 return continuation(handler_call_details)
35
36 except ValueError:
37 return self._invalid_token_response()

客户端身份验证

在客户端配置身份验证:

client.py
1import grpc
2
3# TLS with JWT
4def create_authenticated_channel(server_address, jwt_token):
5 credentials = grpc.ssl_channel_credentials()
6 channel = grpc.secure_channel(server_address, credentials)
7
8 # Add JWT token to all requests
9 def jwt_interceptor(continuation, client_call_details):
10 metadata = list(client_call_details.metadata or [])
11 metadata.append(('authorization', f'Bearer {jwt_token}'))
12
13 client_call_details = client_call_details._replace(metadata=metadata)
14 return continuation(client_call_details)
15
16 intercepted_channel = grpc.intercept_channel(channel, jwt_interceptor)
17 return intercepted_channel
18
19# API Key authentication
20def create_api_key_channel(server_address, api_key):
21 credentials = grpc.ssl_channel_credentials()
22 channel = grpc.secure_channel(server_address, credentials)
23
24 def api_key_interceptor(continuation, client_call_details):
25 metadata = list(client_call_details.metadata or [])
26 metadata.append(('x-api-key', api_key))
27
28 client_call_details = client_call_details._replace(metadata=metadata)
29 return continuation(client_call_details)
30
31 intercepted_channel = grpc.intercept_channel(channel, api_key_interceptor)
32 return intercepted_channel

基于角色的访问控制

实现用于细粒度权限控制的 RBAC:

rbac.proto
1syntax = "proto3";
2
3package auth.v1;
4
5message Permission {
6 string resource = 1;
7 string action = 2;
8}
9
10message Role {
11 string name = 1;
12 repeated Permission permissions = 2;
13}
14
15message UserRoles {
16 string user_id = 1;
17 repeated string role_names = 2;
18}

RBAC 拦截器实现:

rbac_interceptor.py
1class RBACInterceptor(grpc.ServerInterceptor):
2
3 def __init__(self, permission_store):
4 self.permission_store = permission_store
5
6 def intercept_service(self, continuation, handler_call_details):
7 # Get user info from context (added by auth interceptor)
8 metadata = dict(handler_call_details.invocation_metadata)
9 user_id = metadata.get('user_id')
10
11 if not user_id:
12 return self._unauthorized_response()
13
14 # Check permissions for the method
15 method_name = handler_call_details.method
16 required_permission = self._get_required_permission(method_name)
17
18 if required_permission and not self._has_permission(user_id, required_permission):
19 return self._forbidden_response()
20
21 return continuation(handler_call_details)
22
23 def _has_permission(self, user_id, permission):
24 user_roles = self.permission_store.get_user_roles(user_id)
25 for role in user_roles:
26 if permission in role.permissions:
27 return True
28 return False

gRPC 的灵活身份验证系统允许您实现安全、可扩展的身份验证模式,这些模式可以在不同的环境和用例中工作。