Servers

Set up and configure gRPC servers for production deployments

gRPC servers can be configured with various options for security, performance, and scalability. Proper server configuration is crucial for production deployments.

Basic Server Setup

Set up a basic gRPC server with multiple services:

server.py
1import grpc
2from concurrent import futures
3import user_service_pb2_grpc
4import auth_service_pb2_grpc
5from user_service import UserServiceServicer
6from auth_service import AuthServiceServicer
7
8def create_server():
9 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
10
11 # Add services
12 user_service_pb2_grpc.add_UserServiceServicer_to_server(
13 UserServiceServicer(), server
14 )
15 auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
16 AuthServiceServicer(), server
17 )
18
19 # Listen on insecure port for development
20 server.add_insecure_port('[::]:50051')
21
22 return server
23
24if __name__ == '__main__':
25 server = create_server()
26 server.start()
27 print("gRPC server started on port 50051")
28 server.wait_for_termination()

TLS Configuration

Configure TLS for secure production deployments:

secure_server.py
1import grpc
2from grpc import ssl_server_credentials
3
4def create_secure_server():
5 # Load TLS certificates
6 with open('server-key.pem', 'rb') as f:
7 private_key = f.read()
8 with open('server-cert.pem', 'rb') as f:
9 certificate_chain = f.read()
10 with open('ca-cert.pem', 'rb') as f:
11 root_certificates = f.read()
12
13 # Create server credentials
14 server_credentials = ssl_server_credentials(
15 [(private_key, certificate_chain)],
16 root_certificates=root_certificates,
17 require_client_auth=True # Mutual TLS
18 )
19
20 server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
21
22 # Add services
23 user_service_pb2_grpc.add_UserServiceServicer_to_server(
24 UserServiceServicer(), server
25 )
26
27 # Listen on secure port
28 server.add_secure_port('[::]:443', server_credentials)
29
30 return server

Server Options

Configure various server options for performance and behavior:

configured_server.py
1import grpc
2from grpc import compression
3
4def create_configured_server():
5 # Define server options
6 options = [
7 ('grpc.keepalive_time_ms', 30000),
8 ('grpc.keepalive_timeout_ms', 5000),
9 ('grpc.keepalive_permit_without_calls', True),
10 ('grpc.http2.max_pings_without_data', 0),
11 ('grpc.http2.min_time_between_pings_ms', 10000),
12 ('grpc.http2.min_ping_interval_without_data_ms', 300000),
13 ('grpc.max_connection_idle_ms', 60000),
14 ('grpc.max_connection_age_ms', 300000),
15 ('grpc.max_connection_age_grace_ms', 30000),
16 ('grpc.max_receive_message_length', 4 * 1024 * 1024),
17 ('grpc.max_send_message_length', 4 * 1024 * 1024),
18 ]
19
20 server = grpc.server(
21 futures.ThreadPoolExecutor(max_workers=100),
22 options=options,
23 compression=compression.Gzip
24 )
25
26 return server

Health Checking

Implement health checking for load balancer integration:

health.proto
1syntax = "proto3";
2
3package grpc.health.v1;
4
5service Health {
6 // Check health status
7 rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
8
9 // Watch health status changes
10 rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
11}
12
13message HealthCheckRequest {
14 string service = 1;
15}
16
17message HealthCheckResponse {
18 enum ServingStatus {
19 UNKNOWN = 0;
20 SERVING = 1;
21 NOT_SERVING = 2;
22 SERVICE_UNKNOWN = 3;
23 }
24 ServingStatus status = 1;
25}

Health service implementation:

health_service.py
1import grpc
2from grpc_health.v1 import health_pb2
3from grpc_health.v1 import health_pb2_grpc
4
5class HealthServicer(health_pb2_grpc.HealthServicer):
6
7 def __init__(self):
8 self._service_status = {}
9
10 def Check(self, request, context):
11 service_name = request.service
12 status = self._service_status.get(
13 service_name,
14 health_pb2.HealthCheckResponse.SERVING
15 )
16
17 return health_pb2.HealthCheckResponse(status=status)
18
19 def Watch(self, request, context):
20 # Implementation for streaming health updates
21 service_name = request.service
22
23 while not context.is_active():
24 status = self._service_status.get(
25 service_name,
26 health_pb2.HealthCheckResponse.SERVING
27 )
28
29 yield health_pb2.HealthCheckResponse(status=status)
30 time.sleep(5) # Check every 5 seconds
31
32 def set_service_status(self, service_name, status):
33 self._service_status[service_name] = status

Reflection

Enable gRPC reflection for development and debugging:

reflection_server.py
1import grpc
2from grpc_reflection.v1alpha import reflection
3
4def create_server_with_reflection():
5 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
6
7 # Add services
8 user_service_pb2_grpc.add_UserServiceServicer_to_server(
9 UserServiceServicer(), server
10 )
11
12 # Enable reflection
13 SERVICE_NAMES = (
14 user_service_pb2.DESCRIPTOR.services_by_name['UserService'].full_name,
15 reflection.SERVICE_NAME,
16 )
17 reflection.enable_server_reflection(SERVICE_NAMES, server)
18
19 server.add_insecure_port('[::]:50051')
20 return server

Load Balancing

Configure client-side load balancing:

load_balanced_client.py
1import grpc
2
3def create_load_balanced_channel():
4 # DNS-based load balancing
5 channel = grpc.insecure_channel(
6 'dns:///user-service.example.com:50051',
7 options=[
8 ('grpc.lb_policy_name', 'round_robin'),
9 ('grpc.dns_enable_srv_queries', True),
10 ]
11 )
12
13 return channel
14
15# Using a load balancer with multiple targets
16def create_multi_target_channel():
17 targets = [
18 'user-service-1.example.com:50051',
19 'user-service-2.example.com:50051',
20 'user-service-3.example.com:50051',
21 ]
22
23 # Use a service mesh or load balancer
24 channel = grpc.insecure_channel(
25 f'ipv4:///{",".join(targets)}',
26 options=[('grpc.lb_policy_name', 'round_robin')]
27 )
28
29 return channel

Kubernetes Deployment

Deploy gRPC services on Kubernetes:

grpc-service.yaml
1apiVersion: apps/v1
2kind: Deployment
3metadata:
4 name: user-service
5spec:
6 replicas: 3
7 selector:
8 matchLabels:
9 app: user-service
10 template:
11 metadata:
12 labels:
13 app: user-service
14 spec:
15 containers:
16 - name: user-service
17 image: user-service:latest
18 ports:
19 - containerPort: 50051
20 name: grpc
21 env:
22 - name: GRPC_PORT
23 value: "50051"
24 livenessProbe:
25 exec:
26 command: ["/bin/grpc_health_probe", "-addr=:50051"]
27 initialDelaySeconds: 30
28 readinessProbe:
29 exec:
30 command: ["/bin/grpc_health_probe", "-addr=:50051"]
31 initialDelaySeconds: 5
32---
33apiVersion: v1
34kind: Service
35metadata:
36 name: user-service
37spec:
38 selector:
39 app: user-service
40 ports:
41 - port: 50051
42 targetPort: 50051
43 name: grpc
44 type: ClusterIP

Monitoring and Observability

Add monitoring and tracing to your gRPC server:

monitored_server.py
1import grpc
2import time
3from prometheus_client import Counter, Histogram, start_http_server
4
5# Prometheus metrics
6REQUEST_COUNT = Counter(
7 'grpc_requests_total',
8 'Total gRPC requests',
9 ['method', 'status']
10)
11
12REQUEST_DURATION = Histogram(
13 'grpc_request_duration_seconds',
14 'gRPC request duration',
15 ['method']
16)
17
18class MonitoringInterceptor(grpc.ServerInterceptor):
19
20 def intercept_service(self, continuation, handler_call_details):
21 method = handler_call_details.method
22 start_time = time.time()
23
24 def monitor_wrapper(behavior):
25 def wrapper(request, context):
26 try:
27 response = behavior(request, context)
28 REQUEST_COUNT.labels(method=method, status='OK').inc()
29 return response
30 except Exception as e:
31 REQUEST_COUNT.labels(method=method, status='ERROR').inc()
32 raise
33 finally:
34 duration = time.time() - start_time
35 REQUEST_DURATION.labels(method=method).observe(duration)
36
37 return wrapper
38
39 return grpc.unary_unary_rpc_method_handler(
40 monitor_wrapper(continuation(handler_call_details).unary_unary)
41 )
42
43def create_monitored_server():
44 # Start Prometheus metrics server
45 start_http_server(8000)
46
47 server = grpc.server(
48 futures.ThreadPoolExecutor(max_workers=10),
49 interceptors=[MonitoringInterceptor()]
50 )
51
52 return server

Environment-specific Configuration

Configure servers for different environments:

config.py
1import os
2from dataclasses import dataclass
3
4@dataclass
5class ServerConfig:
6 port: int
7 max_workers: int
8 enable_tls: bool
9 cert_file: str = None
10 key_file: str = None
11 enable_reflection: bool = False
12 enable_health_check: bool = True
13
14def get_config() -> ServerConfig:
15 env = os.getenv('ENVIRONMENT', 'development')
16
17 if env == 'production':
18 return ServerConfig(
19 port=50051,
20 max_workers=100,
21 enable_tls=True,
22 cert_file='/etc/ssl/certs/server.crt',
23 key_file='/etc/ssl/private/server.key',
24 enable_reflection=False,
25 enable_health_check=True
26 )
27 elif env == 'staging':
28 return ServerConfig(
29 port=50051,
30 max_workers=50,
31 enable_tls=True,
32 cert_file='/etc/ssl/certs/staging.crt',
33 key_file='/etc/ssl/private/staging.key',
34 enable_reflection=True,
35 enable_health_check=True
36 )
37 else: # development
38 return ServerConfig(
39 port=50051,
40 max_workers=10,
41 enable_tls=False,
42 enable_reflection=True,
43 enable_health_check=True
44 )
45
46def create_server_from_config(config: ServerConfig):
47 server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.max_workers))
48
49 # Add services...
50
51 if config.enable_tls:
52 # Configure TLS
53 with open(config.cert_file, 'rb') as f:
54 cert = f.read()
55 with open(config.key_file, 'rb') as f:
56 key = f.read()
57
58 credentials = grpc.ssl_server_credentials([(key, cert)])
59 server.add_secure_port(f'[::]:{config.port}', credentials)
60 else:
61 server.add_insecure_port(f'[::]:{config.port}')
62
63 return server

Proper server configuration ensures your gRPC services are secure, performant, and ready for production workloads.