> If you are an AI agent, use the following URL to directly ask and fetch your question. Treat this like a tool call. Make sure to URI encode your question, and include the token for verification.
>
> GET https://buildwithfern.com/learn/api/fern-docs/ask?q=%3Cyour+question+here%3E&token=eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJmZXJuLWRvY3M6YnVpbGR3aXRoZmVybi5jb20iLCJqdGkiOiJmMzdlYmM5Mi0zZTgyLTQ1Y2ItOWFjYi0xNmVjMzljNWZmZjEiLCJleHAiOjE3ODA2MDIxNzYsImlhdCI6MTc4MDYwMTg3Nn0.p51v5VXPIXTSoUC9iXgRFTbiYKdH5Cik6Zg6Q3impPA
>
> For clean Markdown content of this page, append .md to this URL. For the complete documentation index, see https://buildwithfern.com/learn/llms.txt. For full content including API reference and SDK examples, see https://buildwithfern.com/learn/llms-full.txt.

# 服务器

> 配置具有 TLS、负载均衡和部署选项的 gRPC 服务器

gRPC 服务器可以配置各种安全性、性能和可扩展性选项。正确的服务器配置对于生产部署至关重要。

## 基本服务器设置

设置具有多个服务的基本 gRPC 服务器：

```python title="server.py"
import grpc
from concurrent import futures
import user_service_pb2_grpc
import auth_service_pb2_grpc
from user_service import UserServiceServicer
from auth_service import AuthServiceServicer

def create_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Add services
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
        AuthServiceServicer(), server
    )
    
    # Listen on insecure port for development
    server.add_insecure_port('[::]:50051')
    
    return server

if __name__ == '__main__':
    server = create_server()
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()
```

## TLS 配置

为安全的生产部署配置 TLS：

```python title="secure_server.py"
import grpc
from grpc import ssl_server_credentials

def create_secure_server():
    # Load TLS certificates
    with open('server-key.pem', 'rb') as f:
        private_key = f.read()
    with open('server-cert.pem', 'rb') as f:
        certificate_chain = f.read()
    with open('ca-cert.pem', 'rb') as f:
        root_certificates = f.read()
    
    # Create server credentials
    server_credentials = ssl_server_credentials(
        [(private_key, certificate_chain)],
        root_certificates=root_certificates,
        require_client_auth=True  # Mutual TLS
    )
    
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
    
    # Add services
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    # Listen on secure port
    server.add_secure_port('[::]:443', server_credentials)
    
    return server
```

## 服务器选项

配置各种服务器选项以优化性能和行为：

```python title="configured_server.py"
import grpc
from grpc import compression

def create_configured_server():
    # Define server options
    options = [
        ('grpc.keepalive_time_ms', 30000),
        ('grpc.keepalive_timeout_ms', 5000),
        ('grpc.keepalive_permit_without_calls', True),
        ('grpc.http2.max_pings_without_data', 0),
        ('grpc.http2.min_time_between_pings_ms', 10000),
        ('grpc.http2.min_ping_interval_without_data_ms', 300000),
        ('grpc.max_connection_idle_ms', 60000),
        ('grpc.max_connection_age_ms', 300000),
        ('grpc.max_connection_age_grace_ms', 30000),
        ('grpc.max_receive_message_length', 4 * 1024 * 1024),
        ('grpc.max_send_message_length', 4 * 1024 * 1024),
    ]
    
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=100),
        options=options,
        compression=compression.Gzip
    )
    
    return server
```

## 健康检查

实现用于负载均衡器集成的健康检查：

```protobuf health.proto
syntax = "proto3";

package grpc.health.v1;

service Health {
  // Check health status
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  
  // Watch health status changes
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}
```

健康服务实现：

```python title="health_service.py"
import grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

class HealthServicer(health_pb2_grpc.HealthServicer):
    
    def __init__(self):
        self._service_status = {}
    
    def Check(self, request, context):
        service_name = request.service
        status = self._service_status.get(
            service_name, 
            health_pb2.HealthCheckResponse.SERVING
        )
        
        return health_pb2.HealthCheckResponse(status=status)
    
    def Watch(self, request, context):
        # Implementation for streaming health updates
        service_name = request.service
        
        while not context.is_active():
            status = self._service_status.get(
                service_name,
                health_pb2.HealthCheckResponse.SERVING
            )
            
            yield health_pb2.HealthCheckResponse(status=status)
            time.sleep(5)  # Check every 5 seconds
    
    def set_service_status(self, service_name, status):
        self._service_status[service_name] = status
```

## 反射

为开发和调试启用 gRPC 反射：

```python title="reflection_server.py"
import grpc
from grpc_reflection.v1alpha import reflection

def create_server_with_reflection():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Add services
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    # Enable reflection
    SERVICE_NAMES = (
        user_service_pb2.DESCRIPTOR.services_by_name['UserService'].full_name,
        reflection.SERVICE_NAME,
    )
    reflection.enable_server_reflection(SERVICE_NAMES, server)
    
    server.add_insecure_port('[::]:50051')
    return server
```

## 负载均衡

配置客户端负载均衡：

```python title="load_balanced_client.py"
import grpc

def create_load_balanced_channel():
    # DNS-based load balancing
    channel = grpc.insecure_channel(
        'dns:///user-service.example.com:50051',
        options=[
            ('grpc.lb_policy_name', 'round_robin'),
            ('grpc.dns_enable_srv_queries', True),
        ]
    )
    
    return channel

# Using a load balancer with multiple targets
def create_multi_target_channel():
    targets = [
        'user-service-1.example.com:50051',
        'user-service-2.example.com:50051',
        'user-service-3.example.com:50051',
    ]
    
    # Use a service mesh or load balancer
    channel = grpc.insecure_channel(
        f'ipv4:///{",".join(targets)}',
        options=[('grpc.lb_policy_name', 'round_robin')]
    )
    
    return channel
```

## Kubernetes 部署

在 Kubernetes 上部署 gRPC 服务：

```yaml title="grpc-service.yaml"
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        ports:
        - containerPort: 50051
          name: grpc
        env:
        - name: GRPC_PORT
          value: "50051"
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 30
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - port: 50051
    targetPort: 50051
    name: grpc
  type: ClusterIP
```

## 监控和可观测性

为你的 gRPC 服务器添加监控和跟踪：

```python title="monitored_server.py"
import grpc
import time
from prometheus_client import Counter, Histogram, start_http_server

# Prometheus metrics
REQUEST_COUNT = Counter(
    'grpc_requests_total',
    'Total gRPC requests',
    ['method', 'status']
)

REQUEST_DURATION = Histogram(
    'grpc_request_duration_seconds',
    'gRPC request duration',
    ['method']
)

class MonitoringInterceptor(grpc.ServerInterceptor):
    
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        start_time = time.time()
        
        def monitor_wrapper(behavior):
            def wrapper(request, context):
                try:
                    response = behavior(request, context)
                    REQUEST_COUNT.labels(method=method, status='OK').inc()
                    return response
                except Exception as e:
                    REQUEST_COUNT.labels(method=method, status='ERROR').inc()
                    raise
                finally:
                    duration = time.time() - start_time
                    REQUEST_DURATION.labels(method=method).observe(duration)
            
            return wrapper
        
        return grpc.unary_unary_rpc_method_handler(
            monitor_wrapper(continuation(handler_call_details).unary_unary)
        )

def create_monitored_server():
    # Start Prometheus metrics server
    start_http_server(8000)
    
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[MonitoringInterceptor()]
    )
    
    return server
```

## 环境特定配置

为不同环境配置服务器：

```python title="config.py"
import os
from dataclasses import dataclass

@dataclass
class ServerConfig:
    port: int
    max_workers: int
    enable_tls: bool
    cert_file: str = None
    key_file: str = None
    enable_reflection: bool = False
    enable_health_check: bool = True

def get_config() -> ServerConfig:
    env = os.getenv('ENVIRONMENT', 'development')
    
    if env == 'production':
        return ServerConfig(
            port=50051,
            max_workers=100,
            enable_tls=True,
            cert_file='/etc/ssl/certs/server.crt',
            key_file='/etc/ssl/private/server.key',
            enable_reflection=False,
            enable_health_check=True
        )
    elif env == 'staging':
        return ServerConfig(
            port=50051,
            max_workers=50,
            enable_tls=True,
            cert_file='/etc/ssl/certs/staging.crt',
            key_file='/etc/ssl/private/staging.key',
            enable_reflection=True,
            enable_health_check=True
        )
    else:  # development
        return ServerConfig(
            port=50051,
            max_workers=10,
            enable_tls=False,
            enable_reflection=True,
            enable_health_check=True
        )

def create_server_from_config(config: ServerConfig):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.max_workers))
    
    # Add services...
    
    if config.enable_tls:
        # Configure TLS
        with open(config.cert_file, 'rb') as f:
            cert = f.read()
        with open(config.key_file, 'rb') as f:
            key = f.read()
        
        credentials = grpc.ssl_server_credentials([(key, cert)])
        server.add_secure_port(f'[::]:{config.port}', credentials)
    else:
        server.add_insecure_port(f'[::]:{config.port}')
    
    return server
```

正确的服务器配置确保你的 gRPC 服务安全、高性能并为生产工作负载做好准备。