For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
预约演示登录免费开始
  • 概览
    • 什么是 API 定义?
    • 项目结构
      • 概览
      • 覆盖(Overrides)
      • 身份验证
      • 服务器
      • 同步您的规范
      • gRPC generators.yml 参考
Checking status...
SOC2Soc 2 Type II
© 2026 Fern • Birch Solutions, Inc., a Postman company

Documentation

SDKsDocsAsk FernCLI Reference

API Definitions

OpenAPIAsyncAPIOpenRPCgRPC

Resources

BlogSupportPricing

Company

Brand KitPrivacy PolicyTerms of Service
LogoLogo
预约演示登录免费开始
在本页
  • 基本服务器设置
  • TLS 配置
  • 服务器选项
  • 健康检查
  • 反射
  • 负载均衡
  • Kubernetes 部署
  • 监控和可观测性
  • 环境特定配置
gRPC

服务器

为生产部署设置和配置 gRPC 服务器

||以 Markdown 格式查看|
此页面是否有帮助?
在仪表板中编辑
上一个

身份验证

下一个

同步你的 gRPC 规范

gRPC 服务器可以配置各种安全性、性能和可扩展性选项。正确的服务器配置对于生产部署至关重要。

基本服务器设置

设置具有多个服务的基本 gRPC 服务器:

server.py
1import grpc
2from concurrent import futures
3import user_service_pb2_grpc
4import auth_service_pb2_grpc
5from user_service import UserServiceServicer
6from auth_service import AuthServiceServicer
7
8def create_server():
9 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
10
11 # Add services
12 user_service_pb2_grpc.add_UserServiceServicer_to_server(
13 UserServiceServicer(), server
14 )
15 auth_service_pb2_grpc.add_AuthServiceServicer_to_server(
16 AuthServiceServicer(), server
17 )
18
19 # Listen on insecure port for development
20 server.add_insecure_port('[::]:50051')
21
22 return server
23
24if __name__ == '__main__':
25 server = create_server()
26 server.start()
27 print("gRPC server started on port 50051")
28 server.wait_for_termination()

TLS 配置

为安全的生产部署配置 TLS:

secure_server.py
1import grpc
2from grpc import ssl_server_credentials
3
4def create_secure_server():
5 # Load TLS certificates
6 with open('server-key.pem', 'rb') as f:
7 private_key = f.read()
8 with open('server-cert.pem', 'rb') as f:
9 certificate_chain = f.read()
10 with open('ca-cert.pem', 'rb') as f:
11 root_certificates = f.read()
12
13 # Create server credentials
14 server_credentials = ssl_server_credentials(
15 [(private_key, certificate_chain)],
16 root_certificates=root_certificates,
17 require_client_auth=True # Mutual TLS
18 )
19
20 server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
21
22 # Add services
23 user_service_pb2_grpc.add_UserServiceServicer_to_server(
24 UserServiceServicer(), server
25 )
26
27 # Listen on secure port
28 server.add_secure_port('[::]:443', server_credentials)
29
30 return server

服务器选项

配置各种服务器选项以优化性能和行为:

configured_server.py
1import grpc
2from grpc import compression
3
4def create_configured_server():
5 # Define server options
6 options = [
7 ('grpc.keepalive_time_ms', 30000),
8 ('grpc.keepalive_timeout_ms', 5000),
9 ('grpc.keepalive_permit_without_calls', True),
10 ('grpc.http2.max_pings_without_data', 0),
11 ('grpc.http2.min_time_between_pings_ms', 10000),
12 ('grpc.http2.min_ping_interval_without_data_ms', 300000),
13 ('grpc.max_connection_idle_ms', 60000),
14 ('grpc.max_connection_age_ms', 300000),
15 ('grpc.max_connection_age_grace_ms', 30000),
16 ('grpc.max_receive_message_length', 4 * 1024 * 1024),
17 ('grpc.max_send_message_length', 4 * 1024 * 1024),
18 ]
19
20 server = grpc.server(
21 futures.ThreadPoolExecutor(max_workers=100),
22 options=options,
23 compression=compression.Gzip
24 )
25
26 return server

健康检查

实现用于负载均衡器集成的健康检查:

health.proto
1syntax = "proto3";
2
3package grpc.health.v1;
4
5service Health {
6 // Check health status
7 rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
8
9 // Watch health status changes
10 rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
11}
12
13message HealthCheckRequest {
14 string service = 1;
15}
16
17message HealthCheckResponse {
18 enum ServingStatus {
19 UNKNOWN = 0;
20 SERVING = 1;
21 NOT_SERVING = 2;
22 SERVICE_UNKNOWN = 3;
23 }
24 ServingStatus status = 1;
25}

健康服务实现:

health_service.py
1import grpc
2from grpc_health.v1 import health_pb2
3from grpc_health.v1 import health_pb2_grpc
4
5class HealthServicer(health_pb2_grpc.HealthServicer):
6
7 def __init__(self):
8 self._service_status = {}
9
10 def Check(self, request, context):
11 service_name = request.service
12 status = self._service_status.get(
13 service_name,
14 health_pb2.HealthCheckResponse.SERVING
15 )
16
17 return health_pb2.HealthCheckResponse(status=status)
18
19 def Watch(self, request, context):
20 # Implementation for streaming health updates
21 service_name = request.service
22
23 while not context.is_active():
24 status = self._service_status.get(
25 service_name,
26 health_pb2.HealthCheckResponse.SERVING
27 )
28
29 yield health_pb2.HealthCheckResponse(status=status)
30 time.sleep(5) # Check every 5 seconds
31
32 def set_service_status(self, service_name, status):
33 self._service_status[service_name] = status

反射

为开发和调试启用 gRPC 反射:

reflection_server.py
1import grpc
2from grpc_reflection.v1alpha import reflection
3
4def create_server_with_reflection():
5 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
6
7 # Add services
8 user_service_pb2_grpc.add_UserServiceServicer_to_server(
9 UserServiceServicer(), server
10 )
11
12 # Enable reflection
13 SERVICE_NAMES = (
14 user_service_pb2.DESCRIPTOR.services_by_name['UserService'].full_name,
15 reflection.SERVICE_NAME,
16 )
17 reflection.enable_server_reflection(SERVICE_NAMES, server)
18
19 server.add_insecure_port('[::]:50051')
20 return server

负载均衡

配置客户端负载均衡:

load_balanced_client.py
1import grpc
2
3def create_load_balanced_channel():
4 # DNS-based load balancing
5 channel = grpc.insecure_channel(
6 'dns:///user-service.example.com:50051',
7 options=[
8 ('grpc.lb_policy_name', 'round_robin'),
9 ('grpc.dns_enable_srv_queries', True),
10 ]
11 )
12
13 return channel
14
15# Using a load balancer with multiple targets
16def create_multi_target_channel():
17 targets = [
18 'user-service-1.example.com:50051',
19 'user-service-2.example.com:50051',
20 'user-service-3.example.com:50051',
21 ]
22
23 # Use a service mesh or load balancer
24 channel = grpc.insecure_channel(
25 f'ipv4:///{",".join(targets)}',
26 options=[('grpc.lb_policy_name', 'round_robin')]
27 )
28
29 return channel

Kubernetes 部署

在 Kubernetes 上部署 gRPC 服务:

grpc-service.yaml
1apiVersion: apps/v1
2kind: Deployment
3metadata:
4 name: user-service
5spec:
6 replicas: 3
7 selector:
8 matchLabels:
9 app: user-service
10 template:
11 metadata:
12 labels:
13 app: user-service
14 spec:
15 containers:
16 - name: user-service
17 image: user-service:latest
18 ports:
19 - containerPort: 50051
20 name: grpc
21 env:
22 - name: GRPC_PORT
23 value: "50051"
24 livenessProbe:
25 exec:
26 command: ["/bin/grpc_health_probe", "-addr=:50051"]
27 initialDelaySeconds: 30
28 readinessProbe:
29 exec:
30 command: ["/bin/grpc_health_probe", "-addr=:50051"]
31 initialDelaySeconds: 5
32---
33apiVersion: v1
34kind: Service
35metadata:
36 name: user-service
37spec:
38 selector:
39 app: user-service
40 ports:
41 - port: 50051
42 targetPort: 50051
43 name: grpc
44 type: ClusterIP

监控和可观测性

为你的 gRPC 服务器添加监控和跟踪:

monitored_server.py
1import grpc
2import time
3from prometheus_client import Counter, Histogram, start_http_server
4
5# Prometheus metrics
6REQUEST_COUNT = Counter(
7 'grpc_requests_total',
8 'Total gRPC requests',
9 ['method', 'status']
10)
11
12REQUEST_DURATION = Histogram(
13 'grpc_request_duration_seconds',
14 'gRPC request duration',
15 ['method']
16)
17
18class MonitoringInterceptor(grpc.ServerInterceptor):
19
20 def intercept_service(self, continuation, handler_call_details):
21 method = handler_call_details.method
22 start_time = time.time()
23
24 def monitor_wrapper(behavior):
25 def wrapper(request, context):
26 try:
27 response = behavior(request, context)
28 REQUEST_COUNT.labels(method=method, status='OK').inc()
29 return response
30 except Exception as e:
31 REQUEST_COUNT.labels(method=method, status='ERROR').inc()
32 raise
33 finally:
34 duration = time.time() - start_time
35 REQUEST_DURATION.labels(method=method).observe(duration)
36
37 return wrapper
38
39 return grpc.unary_unary_rpc_method_handler(
40 monitor_wrapper(continuation(handler_call_details).unary_unary)
41 )
42
43def create_monitored_server():
44 # Start Prometheus metrics server
45 start_http_server(8000)
46
47 server = grpc.server(
48 futures.ThreadPoolExecutor(max_workers=10),
49 interceptors=[MonitoringInterceptor()]
50 )
51
52 return server

环境特定配置

为不同环境配置服务器:

config.py
1import os
2from dataclasses import dataclass
3
4@dataclass
5class ServerConfig:
6 port: int
7 max_workers: int
8 enable_tls: bool
9 cert_file: str = None
10 key_file: str = None
11 enable_reflection: bool = False
12 enable_health_check: bool = True
13
14def get_config() -> ServerConfig:
15 env = os.getenv('ENVIRONMENT', 'development')
16
17 if env == 'production':
18 return ServerConfig(
19 port=50051,
20 max_workers=100,
21 enable_tls=True,
22 cert_file='/etc/ssl/certs/server.crt',
23 key_file='/etc/ssl/private/server.key',
24 enable_reflection=False,
25 enable_health_check=True
26 )
27 elif env == 'staging':
28 return ServerConfig(
29 port=50051,
30 max_workers=50,
31 enable_tls=True,
32 cert_file='/etc/ssl/certs/staging.crt',
33 key_file='/etc/ssl/private/staging.key',
34 enable_reflection=True,
35 enable_health_check=True
36 )
37 else: # development
38 return ServerConfig(
39 port=50051,
40 max_workers=10,
41 enable_tls=False,
42 enable_reflection=True,
43 enable_health_check=True
44 )
45
46def create_server_from_config(config: ServerConfig):
47 server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.max_workers))
48
49 # Add services...
50
51 if config.enable_tls:
52 # Configure TLS
53 with open(config.cert_file, 'rb') as f:
54 cert = f.read()
55 with open(config.key_file, 'rb') as f:
56 key = f.read()
57
58 credentials = grpc.ssl_server_credentials([(key, cert)])
59 server.add_secure_port(f'[::]:{config.port}', credentials)
60 else:
61 server.add_insecure_port(f'[::]:{config.port}')
62
63 return server

正确的服务器配置确保你的 gRPC 服务安全、高性能并为生产工作负载做好准备。