服务器
服务器
为生产部署设置和配置 gRPC 服务器
服务器
为生产部署设置和配置 gRPC 服务器
gRPC 服务器可以配置各种安全性、性能和可扩展性选项。正确的服务器配置对于生产部署至关重要。
设置具有多个服务的基本 gRPC 服务器:
1 import grpc 2 from concurrent import futures 3 import user_service_pb2_grpc 4 import auth_service_pb2_grpc 5 from user_service import UserServiceServicer 6 from auth_service import AuthServiceServicer 7 8 def create_server(): 9 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 10 11 # Add services 12 user_service_pb2_grpc.add_UserServiceServicer_to_server( 13 UserServiceServicer(), server 14 ) 15 auth_service_pb2_grpc.add_AuthServiceServicer_to_server( 16 AuthServiceServicer(), server 17 ) 18 19 # Listen on insecure port for development 20 server.add_insecure_port('[::]:50051') 21 22 return server 23 24 if __name__ == '__main__': 25 server = create_server() 26 server.start() 27 print("gRPC server started on port 50051") 28 server.wait_for_termination()
为安全的生产部署配置 TLS:
1 import grpc 2 from grpc import ssl_server_credentials 3 4 def create_secure_server(): 5 # Load TLS certificates 6 with open('server-key.pem', 'rb') as f: 7 private_key = f.read() 8 with open('server-cert.pem', 'rb') as f: 9 certificate_chain = f.read() 10 with open('ca-cert.pem', 'rb') as f: 11 root_certificates = f.read() 12 13 # Create server credentials 14 server_credentials = ssl_server_credentials( 15 [(private_key, certificate_chain)], 16 root_certificates=root_certificates, 17 require_client_auth=True # Mutual TLS 18 ) 19 20 server = grpc.server(futures.ThreadPoolExecutor(max_workers=50)) 21 22 # Add services 23 user_service_pb2_grpc.add_UserServiceServicer_to_server( 24 UserServiceServicer(), server 25 ) 26 27 # Listen on secure port 28 server.add_secure_port('[::]:443', server_credentials) 29 30 return server
配置各种服务器选项以优化性能和行为:
1 import grpc 2 from grpc import compression 3 4 def create_configured_server(): 5 # Define server options 6 options = [ 7 ('grpc.keepalive_time_ms', 30000), 8 ('grpc.keepalive_timeout_ms', 5000), 9 ('grpc.keepalive_permit_without_calls', True), 10 ('grpc.http2.max_pings_without_data', 0), 11 ('grpc.http2.min_time_between_pings_ms', 10000), 12 ('grpc.http2.min_ping_interval_without_data_ms', 300000), 13 ('grpc.max_connection_idle_ms', 60000), 14 ('grpc.max_connection_age_ms', 300000), 15 ('grpc.max_connection_age_grace_ms', 30000), 16 ('grpc.max_receive_message_length', 4 * 1024 * 1024), 17 ('grpc.max_send_message_length', 4 * 1024 * 1024), 18 ] 19 20 server = grpc.server( 21 futures.ThreadPoolExecutor(max_workers=100), 22 options=options, 23 compression=compression.Gzip 24 ) 25 26 return server
实现用于负载均衡器集成的健康检查:
1 syntax = "proto3"; 2 3 package grpc.health.v1; 4 5 service Health { 6 // Check health status 7 rpc Check(HealthCheckRequest) returns (HealthCheckResponse); 8 9 // Watch health status changes 10 rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); 11 } 12 13 message HealthCheckRequest { 14 string service = 1; 15 } 16 17 message HealthCheckResponse { 18 enum ServingStatus { 19 UNKNOWN = 0; 20 SERVING = 1; 21 NOT_SERVING = 2; 22 SERVICE_UNKNOWN = 3; 23 } 24 ServingStatus status = 1; 25 }
健康服务实现:
1 import grpc 2 from grpc_health.v1 import health_pb2 3 from grpc_health.v1 import health_pb2_grpc 4 5 class HealthServicer(health_pb2_grpc.HealthServicer): 6 7 def __init__(self): 8 self._service_status = {} 9 10 def Check(self, request, context): 11 service_name = request.service 12 status = self._service_status.get( 13 service_name, 14 health_pb2.HealthCheckResponse.SERVING 15 ) 16 17 return health_pb2.HealthCheckResponse(status=status) 18 19 def Watch(self, request, context): 20 # Implementation for streaming health updates 21 service_name = request.service 22 23 while not context.is_active(): 24 status = self._service_status.get( 25 service_name, 26 health_pb2.HealthCheckResponse.SERVING 27 ) 28 29 yield health_pb2.HealthCheckResponse(status=status) 30 time.sleep(5) # Check every 5 seconds 31 32 def set_service_status(self, service_name, status): 33 self._service_status[service_name] = status
为开发和调试启用 gRPC 反射:
1 import grpc 2 from grpc_reflection.v1alpha import reflection 3 4 def create_server_with_reflection(): 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 6 7 # Add services 8 user_service_pb2_grpc.add_UserServiceServicer_to_server( 9 UserServiceServicer(), server 10 ) 11 12 # Enable reflection 13 SERVICE_NAMES = ( 14 user_service_pb2.DESCRIPTOR.services_by_name['UserService'].full_name, 15 reflection.SERVICE_NAME, 16 ) 17 reflection.enable_server_reflection(SERVICE_NAMES, server) 18 19 server.add_insecure_port('[::]:50051') 20 return server
配置客户端负载均衡:
1 import grpc 2 3 def create_load_balanced_channel(): 4 # DNS-based load balancing 5 channel = grpc.insecure_channel( 6 'dns:///user-service.example.com:50051', 7 options=[ 8 ('grpc.lb_policy_name', 'round_robin'), 9 ('grpc.dns_enable_srv_queries', True), 10 ] 11 ) 12 13 return channel 14 15 # Using a load balancer with multiple targets 16 def create_multi_target_channel(): 17 targets = [ 18 'user-service-1.example.com:50051', 19 'user-service-2.example.com:50051', 20 'user-service-3.example.com:50051', 21 ] 22 23 # Use a service mesh or load balancer 24 channel = grpc.insecure_channel( 25 f'ipv4:///{",".join(targets)}', 26 options=[('grpc.lb_policy_name', 'round_robin')] 27 ) 28 29 return channel
在 Kubernetes 上部署 gRPC 服务:
1 apiVersion: apps/v1 2 kind: Deployment 3 metadata: 4 name: user-service 5 spec: 6 replicas: 3 7 selector: 8 matchLabels: 9 app: user-service 10 template: 11 metadata: 12 labels: 13 app: user-service 14 spec: 15 containers: 16 - name: user-service 17 image: user-service:latest 18 ports: 19 - containerPort: 50051 20 name: grpc 21 env: 22 - name: GRPC_PORT 23 value: "50051" 24 livenessProbe: 25 exec: 26 command: ["/bin/grpc_health_probe", "-addr=:50051"] 27 initialDelaySeconds: 30 28 readinessProbe: 29 exec: 30 command: ["/bin/grpc_health_probe", "-addr=:50051"] 31 initialDelaySeconds: 5 32 --- 33 apiVersion: v1 34 kind: Service 35 metadata: 36 name: user-service 37 spec: 38 selector: 39 app: user-service 40 ports: 41 - port: 50051 42 targetPort: 50051 43 name: grpc 44 type: ClusterIP
为你的 gRPC 服务器添加监控和跟踪:
1 import grpc 2 import time 3 from prometheus_client import Counter, Histogram, start_http_server 4 5 # Prometheus metrics 6 REQUEST_COUNT = Counter( 7 'grpc_requests_total', 8 'Total gRPC requests', 9 ['method', 'status'] 10 ) 11 12 REQUEST_DURATION = Histogram( 13 'grpc_request_duration_seconds', 14 'gRPC request duration', 15 ['method'] 16 ) 17 18 class MonitoringInterceptor(grpc.ServerInterceptor): 19 20 def intercept_service(self, continuation, handler_call_details): 21 method = handler_call_details.method 22 start_time = time.time() 23 24 def monitor_wrapper(behavior): 25 def wrapper(request, context): 26 try: 27 response = behavior(request, context) 28 REQUEST_COUNT.labels(method=method, status='OK').inc() 29 return response 30 except Exception as e: 31 REQUEST_COUNT.labels(method=method, status='ERROR').inc() 32 raise 33 finally: 34 duration = time.time() - start_time 35 REQUEST_DURATION.labels(method=method).observe(duration) 36 37 return wrapper 38 39 return grpc.unary_unary_rpc_method_handler( 40 monitor_wrapper(continuation(handler_call_details).unary_unary) 41 ) 42 43 def create_monitored_server(): 44 # Start Prometheus metrics server 45 start_http_server(8000) 46 47 server = grpc.server( 48 futures.ThreadPoolExecutor(max_workers=10), 49 interceptors=[MonitoringInterceptor()] 50 ) 51 52 return server
为不同环境配置服务器:
1 import os 2 from dataclasses import dataclass 3 4 @dataclass 5 class ServerConfig: 6 port: int 7 max_workers: int 8 enable_tls: bool 9 cert_file: str = None 10 key_file: str = None 11 enable_reflection: bool = False 12 enable_health_check: bool = True 13 14 def get_config() -> ServerConfig: 15 env = os.getenv('ENVIRONMENT', 'development') 16 17 if env == 'production': 18 return ServerConfig( 19 port=50051, 20 max_workers=100, 21 enable_tls=True, 22 cert_file='/etc/ssl/certs/server.crt', 23 key_file='/etc/ssl/private/server.key', 24 enable_reflection=False, 25 enable_health_check=True 26 ) 27 elif env == 'staging': 28 return ServerConfig( 29 port=50051, 30 max_workers=50, 31 enable_tls=True, 32 cert_file='/etc/ssl/certs/staging.crt', 33 key_file='/etc/ssl/private/staging.key', 34 enable_reflection=True, 35 enable_health_check=True 36 ) 37 else: # development 38 return ServerConfig( 39 port=50051, 40 max_workers=10, 41 enable_tls=False, 42 enable_reflection=True, 43 enable_health_check=True 44 ) 45 46 def create_server_from_config(config: ServerConfig): 47 server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.max_workers)) 48 49 # Add services... 50 51 if config.enable_tls: 52 # Configure TLS 53 with open(config.cert_file, 'rb') as f: 54 cert = f.read() 55 with open(config.key_file, 'rb') as f: 56 key = f.read() 57 58 credentials = grpc.ssl_server_credentials([(key, cert)]) 59 server.add_secure_port(f'[::]:{config.port}', credentials) 60 else: 61 server.add_insecure_port(f'[::]:{config.port}') 62 63 return server
正确的服务器配置确保你的 gRPC 服务安全、高性能并为生产工作负载做好准备。