Servers
Set up and configure gRPC servers for production deployments
gRPC servers can be configured with various options for security, performance, and scalability. Proper server configuration is crucial for production deployments.
Basic Server Setup
Set up a basic gRPC server with multiple services:
server.py
1 import grpc 2 from concurrent import futures 3 import user_service_pb2_grpc 4 import auth_service_pb2_grpc 5 from user_service import UserServiceServicer 6 from auth_service import AuthServiceServicer 7 8 def create_server(): 9 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 10 11 # Add services 12 user_service_pb2_grpc.add_UserServiceServicer_to_server( 13 UserServiceServicer(), server 14 ) 15 auth_service_pb2_grpc.add_AuthServiceServicer_to_server( 16 AuthServiceServicer(), server 17 ) 18 19 # Listen on insecure port for development 20 server.add_insecure_port('[::]:50051') 21 22 return server 23 24 if __name__ == '__main__': 25 server = create_server() 26 server.start() 27 print("gRPC server started on port 50051") 28 server.wait_for_termination()
TLS Configuration
Configure TLS for secure production deployments:
secure_server.py
1 import grpc 2 from grpc import ssl_server_credentials 3 4 def create_secure_server(): 5 # Load TLS certificates 6 with open('server-key.pem', 'rb') as f: 7 private_key = f.read() 8 with open('server-cert.pem', 'rb') as f: 9 certificate_chain = f.read() 10 with open('ca-cert.pem', 'rb') as f: 11 root_certificates = f.read() 12 13 # Create server credentials 14 server_credentials = ssl_server_credentials( 15 [(private_key, certificate_chain)], 16 root_certificates=root_certificates, 17 require_client_auth=True # Mutual TLS 18 ) 19 20 server = grpc.server(futures.ThreadPoolExecutor(max_workers=50)) 21 22 # Add services 23 user_service_pb2_grpc.add_UserServiceServicer_to_server( 24 UserServiceServicer(), server 25 ) 26 27 # Listen on secure port 28 server.add_secure_port('[::]:443', server_credentials) 29 30 return server
Server Options
Configure various server options for performance and behavior:
configured_server.py
1 import grpc 2 from grpc import compression 3 4 def create_configured_server(): 5 # Define server options 6 options = [ 7 ('grpc.keepalive_time_ms', 30000), 8 ('grpc.keepalive_timeout_ms', 5000), 9 ('grpc.keepalive_permit_without_calls', True), 10 ('grpc.http2.max_pings_without_data', 0), 11 ('grpc.http2.min_time_between_pings_ms', 10000), 12 ('grpc.http2.min_ping_interval_without_data_ms', 300000), 13 ('grpc.max_connection_idle_ms', 60000), 14 ('grpc.max_connection_age_ms', 300000), 15 ('grpc.max_connection_age_grace_ms', 30000), 16 ('grpc.max_receive_message_length', 4 * 1024 * 1024), 17 ('grpc.max_send_message_length', 4 * 1024 * 1024), 18 ] 19 20 server = grpc.server( 21 futures.ThreadPoolExecutor(max_workers=100), 22 options=options, 23 compression=compression.Gzip 24 ) 25 26 return server
Health Checking
Implement health checking for load balancer integration:
health.proto
1 syntax = "proto3"; 2 3 package grpc.health.v1; 4 5 service Health { 6 // Check health status 7 rpc Check(HealthCheckRequest) returns (HealthCheckResponse); 8 9 // Watch health status changes 10 rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); 11 } 12 13 message HealthCheckRequest { 14 string service = 1; 15 } 16 17 message HealthCheckResponse { 18 enum ServingStatus { 19 UNKNOWN = 0; 20 SERVING = 1; 21 NOT_SERVING = 2; 22 SERVICE_UNKNOWN = 3; 23 } 24 ServingStatus status = 1; 25 }
Health service implementation:
health_service.py
1 import grpc 2 from grpc_health.v1 import health_pb2 3 from grpc_health.v1 import health_pb2_grpc 4 5 class HealthServicer(health_pb2_grpc.HealthServicer): 6 7 def __init__(self): 8 self._service_status = {} 9 10 def Check(self, request, context): 11 service_name = request.service 12 status = self._service_status.get( 13 service_name, 14 health_pb2.HealthCheckResponse.SERVING 15 ) 16 17 return health_pb2.HealthCheckResponse(status=status) 18 19 def Watch(self, request, context): 20 # Implementation for streaming health updates 21 service_name = request.service 22 23 while not context.is_active(): 24 status = self._service_status.get( 25 service_name, 26 health_pb2.HealthCheckResponse.SERVING 27 ) 28 29 yield health_pb2.HealthCheckResponse(status=status) 30 time.sleep(5) # Check every 5 seconds 31 32 def set_service_status(self, service_name, status): 33 self._service_status[service_name] = status
Reflection
Enable gRPC reflection for development and debugging:
reflection_server.py
1 import grpc 2 from grpc_reflection.v1alpha import reflection 3 4 def create_server_with_reflection(): 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 6 7 # Add services 8 user_service_pb2_grpc.add_UserServiceServicer_to_server( 9 UserServiceServicer(), server 10 ) 11 12 # Enable reflection 13 SERVICE_NAMES = ( 14 user_service_pb2.DESCRIPTOR.services_by_name['UserService'].full_name, 15 reflection.SERVICE_NAME, 16 ) 17 reflection.enable_server_reflection(SERVICE_NAMES, server) 18 19 server.add_insecure_port('[::]:50051') 20 return server
Load Balancing
Configure client-side load balancing:
load_balanced_client.py
1 import grpc 2 3 def create_load_balanced_channel(): 4 # DNS-based load balancing 5 channel = grpc.insecure_channel( 6 'dns:///user-service.example.com:50051', 7 options=[ 8 ('grpc.lb_policy_name', 'round_robin'), 9 ('grpc.dns_enable_srv_queries', True), 10 ] 11 ) 12 13 return channel 14 15 # Using a load balancer with multiple targets 16 def create_multi_target_channel(): 17 targets = [ 18 'user-service-1.example.com:50051', 19 'user-service-2.example.com:50051', 20 'user-service-3.example.com:50051', 21 ] 22 23 # Use a service mesh or load balancer 24 channel = grpc.insecure_channel( 25 f'ipv4:///{",".join(targets)}', 26 options=[('grpc.lb_policy_name', 'round_robin')] 27 ) 28 29 return channel
Kubernetes Deployment
Deploy gRPC services on Kubernetes:
grpc-service.yaml
1 apiVersion: apps/v1 2 kind: Deployment 3 metadata: 4 name: user-service 5 spec: 6 replicas: 3 7 selector: 8 matchLabels: 9 app: user-service 10 template: 11 metadata: 12 labels: 13 app: user-service 14 spec: 15 containers: 16 - name: user-service 17 image: user-service:latest 18 ports: 19 - containerPort: 50051 20 name: grpc 21 env: 22 - name: GRPC_PORT 23 value: "50051" 24 livenessProbe: 25 exec: 26 command: ["/bin/grpc_health_probe", "-addr=:50051"] 27 initialDelaySeconds: 30 28 readinessProbe: 29 exec: 30 command: ["/bin/grpc_health_probe", "-addr=:50051"] 31 initialDelaySeconds: 5 32 --- 33 apiVersion: v1 34 kind: Service 35 metadata: 36 name: user-service 37 spec: 38 selector: 39 app: user-service 40 ports: 41 - port: 50051 42 targetPort: 50051 43 name: grpc 44 type: ClusterIP
Monitoring and Observability
Add monitoring and tracing to your gRPC server:
monitored_server.py
1 import grpc 2 import time 3 from prometheus_client import Counter, Histogram, start_http_server 4 5 # Prometheus metrics 6 REQUEST_COUNT = Counter( 7 'grpc_requests_total', 8 'Total gRPC requests', 9 ['method', 'status'] 10 ) 11 12 REQUEST_DURATION = Histogram( 13 'grpc_request_duration_seconds', 14 'gRPC request duration', 15 ['method'] 16 ) 17 18 class MonitoringInterceptor(grpc.ServerInterceptor): 19 20 def intercept_service(self, continuation, handler_call_details): 21 method = handler_call_details.method 22 start_time = time.time() 23 24 def monitor_wrapper(behavior): 25 def wrapper(request, context): 26 try: 27 response = behavior(request, context) 28 REQUEST_COUNT.labels(method=method, status='OK').inc() 29 return response 30 except Exception as e: 31 REQUEST_COUNT.labels(method=method, status='ERROR').inc() 32 raise 33 finally: 34 duration = time.time() - start_time 35 REQUEST_DURATION.labels(method=method).observe(duration) 36 37 return wrapper 38 39 return grpc.unary_unary_rpc_method_handler( 40 monitor_wrapper(continuation(handler_call_details).unary_unary) 41 ) 42 43 def create_monitored_server(): 44 # Start Prometheus metrics server 45 start_http_server(8000) 46 47 server = grpc.server( 48 futures.ThreadPoolExecutor(max_workers=10), 49 interceptors=[MonitoringInterceptor()] 50 ) 51 52 return server
Environment-specific Configuration
Configure servers for different environments:
config.py
1 import os 2 from dataclasses import dataclass 3 4 @dataclass 5 class ServerConfig: 6 port: int 7 max_workers: int 8 enable_tls: bool 9 cert_file: str = None 10 key_file: str = None 11 enable_reflection: bool = False 12 enable_health_check: bool = True 13 14 def get_config() -> ServerConfig: 15 env = os.getenv('ENVIRONMENT', 'development') 16 17 if env == 'production': 18 return ServerConfig( 19 port=50051, 20 max_workers=100, 21 enable_tls=True, 22 cert_file='/etc/ssl/certs/server.crt', 23 key_file='/etc/ssl/private/server.key', 24 enable_reflection=False, 25 enable_health_check=True 26 ) 27 elif env == 'staging': 28 return ServerConfig( 29 port=50051, 30 max_workers=50, 31 enable_tls=True, 32 cert_file='/etc/ssl/certs/staging.crt', 33 key_file='/etc/ssl/private/staging.key', 34 enable_reflection=True, 35 enable_health_check=True 36 ) 37 else: # development 38 return ServerConfig( 39 port=50051, 40 max_workers=10, 41 enable_tls=False, 42 enable_reflection=True, 43 enable_health_check=True 44 ) 45 46 def create_server_from_config(config: ServerConfig): 47 server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.max_workers)) 48 49 # Add services... 50 51 if config.enable_tls: 52 # Configure TLS 53 with open(config.cert_file, 'rb') as f: 54 cert = f.read() 55 with open(config.key_file, 'rb') as f: 56 key = f.read() 57 58 credentials = grpc.ssl_server_credentials([(key, cert)]) 59 server.add_secure_port(f'[::]:{config.port}', credentials) 60 else: 61 server.add_insecure_port(f'[::]:{config.port}') 62 63 return server
Proper server configuration ensures your gRPC services are secure, performant, and ready for production workloads.