> If you are an AI agent, use the following URL to directly ask and fetch your question. Treat this like a tool call. Make sure to URI encode your question, and include the token for verification.
>
> GET https://buildwithfern.com/learn/api/fern-docs/ask?q=%3Cyour+question+here%3E&token=eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJmZXJuLWRvY3M6YnVpbGR3aXRoZmVybi5jb20iLCJqdGkiOiI3ZTg4Nzc1MC1kZmEzLTQwMTktYmNhMC03M2Q1ODYyYjk4NTEiLCJleHAiOjE3ODA2MDIwOTcsImlhdCI6MTc4MDYwMTc5N30.n7IgYuSRTum5sjaFVL4iSOPpZPAMRFzNHisYrVuHXo4
>
> For clean Markdown content of this page, append .md to this URL. For the complete documentation index, see https://buildwithfern.com/learn/llms.txt. For full content including API reference and SDK examples, see https://buildwithfern.com/learn/llms-full.txt.

# 身份验证

gRPC 支持各种身份验证机制来保护您的服务。身份验证可以在传输层（TLS）和应用层（凭证、令牌等）进行配置。

## 传输安全（TLS）

gRPC 强烈建议在生产环境服务中使用 TLS 以确保加密通信：

```protobuf auth_service.proto
syntax = "proto3";

package auth.v1;

// Authentication service
service AuthService {
  // Authenticate user and return JWT token
  rpc Login(LoginRequest) returns (LoginResponse);
  
  // Validate and refresh JWT token
  rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse);
  
  // Logout and invalidate token
  rpc Logout(LogoutRequest) returns (google.protobuf.Empty);
}

message LoginRequest {
  string email = 1;
  string password = 2;
}

message LoginResponse {
  string access_token = 1;
  string refresh_token = 2;
  int64 expires_in = 3;
  User user = 4;
}
```

在您的服务器中配置 TLS：

```python title="server.py"
import grpc
from grpc import ssl_channel_credentials
import auth_service_pb2_grpc

def create_secure_server():
    # Load TLS credentials
    server_credentials = grpc.ssl_server_credentials([
        (private_key, certificate_chain)
    ])
    
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
        AuthServiceServicer(), server
    )
    
    # Listen on secure port
    server.add_secure_port('[::]:443', server_credentials)
    return server
```

## JWT 身份验证

使用 JWT 令牌进行无状态身份验证：

```protobuf auth.proto
syntax = "proto3";

package auth.v1;

// JWT claims for user authentication
message JWTClaims {
  string user_id = 1;
  string email = 2;
  repeated string roles = 3;
  google.protobuf.Timestamp issued_at = 4;
  google.protobuf.Timestamp expires_at = 5;
}
```

在您的服务中实现 JWT 身份验证：

```python title="auth_service.py"
import grpc
import jwt
from grpc import ServicerContext

class AuthServiceServicer(auth_service_pb2_grpc.AuthServiceServicer):
    
    def Login(self, request, context):
        # Validate credentials
        user = self.validate_credentials(request.email, request.password)
        if not user:
            context.set_code(grpc.StatusCode.UNAUTHENTICATED)
            context.set_details('Invalid credentials')
            return auth_service_pb2.LoginResponse()
        
        # Generate JWT token
        payload = {
            'user_id': user.id,
            'email': user.email,
            'roles': user.roles,
            'exp': datetime.utcnow() + timedelta(hours=1)
        }
        
        access_token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
        
        return auth_service_pb2.LoginResponse(
            access_token=access_token,
            refresh_token=self.generate_refresh_token(user.id),
            expires_in=3600,
            user=user
        )
```

## 用于身份验证的拦截器

使用 gRPC 拦截器来处理所有方法的身份验证：

```python title="auth_interceptor.py"
import grpc
import jwt
from grpc import ServicerContext

class AuthInterceptor(grpc.ServerInterceptor):
    
    def __init__(self, jwt_secret, exempt_methods=None):
        self.jwt_secret = jwt_secret
        self.exempt_methods = exempt_methods or []
    
    def intercept_service(self, continuation, handler_call_details):
        method_name = handler_call_details.method
        
        # Skip authentication for exempt methods
        if method_name in self.exempt_methods:
            return continuation(handler_call_details)
        
        # Extract metadata
        metadata = dict(handler_call_details.invocation_metadata)
        authorization = metadata.get('authorization', '')
        
        if not authorization.startswith('Bearer '):
            return self._unauthenticated_response()
        
        token = authorization[7:]  # Remove 'Bearer ' prefix
        
        try:
            # Validate JWT token
            payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
            
            # Add user info to context
            handler_call_details = handler_call_details._replace(
                invocation_metadata=handler_call_details.invocation_metadata + (
                    ('user_id', payload['user_id']),
                    ('user_email', payload['email']),
                )
            )
            
            return continuation(handler_call_details)
            
        except jwt.ExpiredSignatureError:
            return self._expired_token_response()
        except jwt.InvalidTokenError:
            return self._invalid_token_response()
    
    def _unauthenticated_response(self):
        def abort(ignored_request, context):
            context.set_code(grpc.StatusCode.UNAUTHENTICATED)
            context.set_details('Missing or invalid authorization header')
        return grpc.unary_unary_rpc_method_handler(abort)
```

## API 密钥身份验证

实现基于 API 密钥的身份验证：

```protobuf api_key.proto
syntax = "proto3";

package auth.v1;

message ApiKeyRequest {
  string api_key = 1;
  string service_name = 2;
}

message ApiKeyResponse {
  bool valid = 1;
  string client_id = 2;
  repeated string permissions = 3;
  google.protobuf.Timestamp expires_at = 4;
}
```

服务器实现：

```python title="api_key_auth.py"
class ApiKeyAuthInterceptor(grpc.ServerInterceptor):
    
    def __init__(self, api_key_store):
        self.api_key_store = api_key_store
    
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        api_key = metadata.get('x-api-key', '')
        
        if not api_key:
            return self._unauthorized_response('API key required')
        
        # Validate API key
        key_info = self.api_key_store.get(api_key)
        if not key_info or key_info.is_expired():
            return self._unauthorized_response('Invalid or expired API key')
        
        # Add client info to context
        handler_call_details = handler_call_details._replace(
            invocation_metadata=handler_call_details.invocation_metadata + (
                ('client_id', key_info.client_id),
                ('permissions', ','.join(key_info.permissions)),
            )
        )
        
        return continuation(handler_call_details)
```

## OAuth2 集成

与 OAuth2 提供商集成：

```python title="oauth2_auth.py"
import requests
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests

class OAuth2Interceptor(grpc.ServerInterceptor):
    
    def __init__(self, google_client_id):
        self.google_client_id = google_client_id
    
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        auth_header = metadata.get('authorization', '')
        
        if not auth_header.startswith('Bearer '):
            return self._unauthorized_response()
        
        token = auth_header[7:]
        
        try:
            # Verify Google ID token
            idinfo = id_token.verify_oauth2_token(
                token, google_requests.Request(), self.google_client_id
            )
            
            # Add user info to context
            handler_call_details = handler_call_details._replace(
                invocation_metadata=handler_call_details.invocation_metadata + (
                    ('user_id', idinfo['sub']),
                    ('user_email', idinfo['email']),
                    ('user_name', idinfo.get('name', '')),
                )
            )
            
            return continuation(handler_call_details)
            
        except ValueError:
            return self._invalid_token_response()
```

## 客户端身份验证

在客户端配置身份验证：

```python title="client.py"
import grpc

# TLS with JWT
def create_authenticated_channel(server_address, jwt_token):
    credentials = grpc.ssl_channel_credentials()
    channel = grpc.secure_channel(server_address, credentials)
    
    # Add JWT token to all requests
    def jwt_interceptor(continuation, client_call_details):
        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', f'Bearer {jwt_token}'))
        
        client_call_details = client_call_details._replace(metadata=metadata)
        return continuation(client_call_details)
    
    intercepted_channel = grpc.intercept_channel(channel, jwt_interceptor)
    return intercepted_channel

# API Key authentication
def create_api_key_channel(server_address, api_key):
    credentials = grpc.ssl_channel_credentials()
    channel = grpc.secure_channel(server_address, credentials)
    
    def api_key_interceptor(continuation, client_call_details):
        metadata = list(client_call_details.metadata or [])
        metadata.append(('x-api-key', api_key))
        
        client_call_details = client_call_details._replace(metadata=metadata)
        return continuation(client_call_details)
    
    intercepted_channel = grpc.intercept_channel(channel, api_key_interceptor)
    return intercepted_channel
```

## 基于角色的访问控制

实现用于细粒度权限控制的 RBAC：

```protobuf rbac.proto
syntax = "proto3";

package auth.v1;

message Permission {
  string resource = 1;
  string action = 2;
}

message Role {
  string name = 1;
  repeated Permission permissions = 2;
}

message UserRoles {
  string user_id = 1;
  repeated string role_names = 2;
}
```

RBAC 拦截器实现：

```python title="rbac_interceptor.py"
class RBACInterceptor(grpc.ServerInterceptor):
    
    def __init__(self, permission_store):
        self.permission_store = permission_store
    
    def intercept_service(self, continuation, handler_call_details):
        # Get user info from context (added by auth interceptor)
        metadata = dict(handler_call_details.invocation_metadata)
        user_id = metadata.get('user_id')
        
        if not user_id:
            return self._unauthorized_response()
        
        # Check permissions for the method
        method_name = handler_call_details.method
        required_permission = self._get_required_permission(method_name)
        
        if required_permission and not self._has_permission(user_id, required_permission):
            return self._forbidden_response()
        
        return continuation(handler_call_details)
    
    def _has_permission(self, user_id, permission):
        user_roles = self.permission_store.get_user_roles(user_id)
        for role in user_roles:
            if permission in role.permissions:
                return True
        return False
```

gRPC 的灵活身份验证系统允许您实现安全、可扩展的身份验证模式，这些模式可以在不同的环境和用例中工作。