上一篇文章中,我们梳理了 当客户端进行服务注册以后,服务端接收到注册服务的请求进行处理 的源码部分,那服务端是如何知晓注册上来的客户端是否还正常呢?接下来本文将揭秘这一切
心跳机制是分布式系统中用于检测节点存活状态的重要机制,通过定期发送心跳包来确认服务实例的健康状况,整体流程如下

客户端 通过定时任务定期向 服务端 发送心跳请求,其入口则是在 NamingGrpcClientProxy 构建对象时,初始化了 RpcClient 客户端,同时执行 RpcClient #start 方法
javapublic final void start() throws NacosException {
// ...
// 异步任务
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
// 阻塞队列,每5秒出队
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// 检查活跃时间是否超过5秒
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
// 健康检查
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}",
rpcClientConfig.name(), currentConnection.getConnectionId());
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}",
rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// ...
// 注册客户端检测请求
registerServerRequestHandler(request -> {
// 客户端检测请求
if (request instanceof ClientDetectionRequest) {
// 返回客户端检测响应
return new ClientDetectionResponse();
}
return null;
});
}
我们来看下健康检查的 healthCheck 方法,主要通过 rpc 客户端发送 HealthCheckRequest 健康检查请求到服务端
javaprivate boolean healthCheck() {
// 健康检查请求
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
if (this.currentConnection == null) {
return false;
}
int reTryTimes = rpcClientConfig.healthCheckRetryTimes();
while (reTryTimes >= 0) {
reTryTimes--;
try {
Response response = this.currentConnection
.request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut());
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
}
return false;
}
与 Nacos 1.4 版本的不同
Nacos 1.4 版本中,是在客户端发起心跳,在客户端注册后,如果是临时实例则开启心跳任务,源码示例如下
java@PostMapping("/instance")
public String register(HttpServletRequest request) throws Exception {
// 解析请求参数
Instance instance = parseInstance(request);
// 核心:ServiceManager处理注册
serviceManager.registerInstance(serviceName, instance);
// 启动心跳监听(BeatReactor)
beatReactor.addBeatInfo(serviceName, instance);
return "ok";
}
在 Nacos 2.0+ 版本开始,客户端构建 RpcClient,并通过异步任务结合心跳进行统计客户端 健康检查。
服务端的健康检查入口则是在 BaseRpcServer #start 中,可通过 HealthCheckRequest 健康检查请求从源码中查找
java@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
// 开启服务
startServer();
Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
// 添加关机挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
// 停止服务
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
核心为 startServer() 方法,主要调用其内部实现 addServices()
java@Override
public void startServer() throws Exception {
// ...
// 添加服务
addServices(handlerRegistry, serverInterceptor);
// ...
}
添加服务的源码如下:
javaprivate void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// 注册一元调用(普通请求-响应模式)的服务方法
// 定义一个一元调用的方法描述符,用于处理单次请求和单次响应
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
// 设置方法类型为一元调用(请求-响应)
.setType(MethodDescriptor.MethodType.UNARY)
// 设置完整的方法名,格式为"服务名/方法名"
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
// 设置请求参数的序列化器,用于将Payload对象序列化为字节流
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
// 设置响应结果的序列化器,用于将字节流反序列化为Payload对象
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 创建一元调用的处理器,当接收到请求时,调用grpcCommonRequestAcceptor处理请求
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
// 创建异步一元调用处理器,传入处理逻辑:收到请求后调用grpcCommonRequestAcceptor的request方法
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
// 构建一元调用的服务定义,将方法描述符和处理器关联起来
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
// 添加方法定义和对应的处理器
.addMethod(unaryPayloadMethod, payloadHandler).build();
// 将构建好的服务注册到处理器注册表中,并应用服务器拦截器
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// 注册双向流式调用的服务方法
// 创建双向流式调用的处理器,用于处理客户端和服务端之间的双向数据流
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
// 创建异步双向流式调用处理器,传入处理逻辑:调用grpcBiStreamRequestAcceptor的requestBiStream方法
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
// 定义一个双向流式调用的方法描述符
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
// 设置方法类型为双向流式调用
.setType(MethodDescriptor.MethodType.BIDI_STREAMING)
// 设置完整的方法名,格式为"服务名/方法名"
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
// 设置请求参数的序列化器
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
// 设置响应结果的序列化器
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 构建双向流式调用的服务定义,将方法描述符和处理器关联起来
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(REQUEST_BI_STREAM_SERVICE_NAME)
// 添加方法定义和对应的处理器
.addMethod(biStreamMethod, biStreamHandler).build();
// 将构建好的服务注册到处理器注册表中,并应用服务器拦截器
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
核心源码分析
ServerCalls.asyncUnaryCall() 创建了一个异步一元调用处理器grpcCommonRequestAcceptor.request() 方法进行具体处理接收器处理的请求,都会交给 GrpcRequestAcceptor #request() 处理,核心是刷新服务的活跃时间
java @Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
// ...
// 刷新活跃时间
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// ...
}
再来看下服务端健康检查的入口,没有做任何处理,所以核心逻辑还在上处
java@Component
public class HealthCheckRequestHandler extends RequestHandler<HealthCheckRequest, HealthCheckResponse> {
@Override
@TpsControl(pointName = "HealthCheck")
public HealthCheckResponse handle(HealthCheckRequest request, RequestMeta meta) {
return new HealthCheckResponse();
}
}
有了上处在服务端中,刷新客户端的上次活跃时间,那此时间在何处使用呢?
接下来我们来看下在 ConnectionManager 中更新的 lastActiveTime 属性,查看其 getLastActiveTime() 何处用到。
通过引用找到了 ConnectionManager 里面的 start 方法中,这个方法上有 @PostConstruct,也就是 ConnectionManager Bean 实例对象在被创建完成时,构造方法执行完毕就执行这个方法

通过上述源码中可以发现,其通过 ScheduledThreadPoolExecutor #scheduleWithFixedDelay 启动了一个固定延迟时间的任务,每隔3秒钟执行一次,其 start 方法内部主要做4步逻辑
ConnectResetRequest 请求ClientDetectionRequest 检测客户端活性java@PostConstruct
public void start() {
// 启动不健康连接驱逐任务,每3秒执行一次,首次延迟1秒执行
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
try {
// 获取当前总连接数
int totalCount = connections.size();
Loggers.REMOTE_DIGEST.info("Connection check task start");
// 更新监控指标中的长连接数量
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
// 获取所有连接的键值对集合,用于后续遍历检查
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
// 统计当前SDK客户端连接数
int currentSdkClientCount = currentSdkClientCount();
// 判断是否处于负载调整状态(loadClient >= 0表示正在调整)
boolean isLoaderClient = loadClient >= 0;
// 确定当前最大允许连接数:如果是负载调整状态则使用loadClient,否则使用规则中配置的限制数
int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
// 计算需要驱逐的连接数:如果限制数小于0表示无限制,否则计算超出部分
int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);
// 记录连接检查的统计信息
Loggers.REMOTE_DIGEST
.info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}",
totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),
currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount);
// 创建存储需要驱逐的客户端连接ID的列表
List<String> expelClient = new LinkedList<>();
// 创建按IP统计需要驱逐连接数的映射表
Map<String, AtomicInteger> expelForIp = new HashMap<>(16);
// 第一步:根据IP限制规则计算每个IP需要驱逐的连接数
for (Map.Entry<String, Connection> entry : entries) {
// 遍历每个连接
Connection client = entry.getValue();
String appName = client.getMetaInfo().getAppName();
String clientIp = client.getMetaInfo().getClientIp();
// 只处理来自SDK的连接,并且该IP尚未计算过驱逐数
if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
// 获取针对该IP的连接数限制
int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
// 如果IP没有单独配置限制,则尝试获取应用级别的限制
if (countLimitOfIp < 0) {
int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
}
// 如果仍未获取到限制配置,则使用默认IP连接数限制
if (countLimitOfIp < 0) {
countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
}
// 如果有限制配置并且该IP当前连接数超过了限制,则计算需要驱逐的数量
if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
// 将需要驱逐的连接数记录到expelForIp映射中
expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
}
}
}
}
// 记录超限IP的数量
Loggers.REMOTE_DIGEST
.info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size());
// 如果有超限IP,则记录详细信息
if (expelForIp.size() > 0) {
Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp);
}
// 创建存储过期连接ID的集合
Set<String> outDatedConnections = new HashSet<>();
// 获取当前时间戳,用于判断连接是否过期
long now = System.currentTimeMillis();
// 第二步:根据IP限制规则确定要驱逐的具体连接
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
// 如果该IP需要驱逐连接,且还有未处理的驱逐配额
if (integer != null && integer.intValue() > 0) {
// 减少驱逐配额,并将该连接加入驱逐列表
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
}
// 如果不是因IP限制需要驱逐,但连接已过期(超过KEEP_ALIVE_TIME时间未活动)
else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
// 将过期连接加入过期连接集合
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// 第三步:如果总的驱逐数量仍有剩余,继续选择其他连接进行驱逐
if (expelCount > 0) {
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
// 选择那些不在IP限制驱逐范围内的SDK连接进行驱逐
if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
.isSdkSource() && expelCount > 0) {
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
// 从过期连接列表中移除,避免重复处理
outDatedConnections.remove(client.getMetaInfo().getConnectionId());
}
}
}
// 解析重定向地址,提取服务器IP和端口
String serverIp = null;
String serverPort = null;
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
String[] split = redirectAddress.split(Constants.COLON);
serverIp = split[0];
serverPort = split[1];
}
// 对所有需要驱逐的客户端发送连接重置请求
for (String expelledClientId : expelClient) {
try {
Connection connection = getConnection(expelledClientId);
if (connection != null) {
// 创建连接重置请求,包含推荐的新服务器地址
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp(serverIp);
connectResetRequest.setServerPort(serverPort);
// 异步发送重置请求
connection.asyncRequest(connectResetRequest, null);
Loggers.REMOTE_DIGEST
.info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}",
expelledClientId, connectResetRequest.getServerIp(),
connectResetRequest.getServerPort());
}
} catch (ConnectionAlreadyClosedException e) {
// 如果连接已经关闭,则直接注销
unregister(expelledClientId);
} catch (Exception e) {
// 记录驱逐连接时发生的错误
Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e);
}
}
// 第四步:对过期连接进行主动检测,确认是否仍然存活
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
// 存储检测成功的连接ID集合
Set<String> successConnections = new HashSet<>();
// 使用CountDownLatch等待所有检测请求完成
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
// 对每个过期连接发送检测请求
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
// 创建客户端检测请求
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
// 异步发送检测请求
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
// 不指定特定执行器
return null;
}
@Override
public long getTimeout() {
// 设置1秒超时
return 1000L;
}
@Override
public void onResponse(Response response) {
// 检测响应处理
latch.countDown();
if (response != null && response.isSuccess()) {
// 如果检测成功,更新连接的活跃时间
connection.freshActiveTime();
// 将连接ID加入成功连接集合
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
// 发生异常时也减少计数器
latch.countDown();
}
});
Loggers.REMOTE_DIGEST
.info("[{}]send connection active request ", outDateConnectionId);
} else {
// 连接不存在时减少计数器
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
// 连接已关闭时减少计数器
latch.countDown();
} catch (Exception e) {
// 记录检测过程中发生的错误
Loggers.REMOTE_DIGEST
.error("[{}]Error occurs when check client active detection ,error={}",
outDateConnectionId, e);
latch.countDown();
}
}
// 等待最多3秒直到所有检测完成
latch.await(3000L, TimeUnit.MILLISECONDS);
Loggers.REMOTE_DIGEST
.info("Out dated connection check successCount={}", successConnections.size());
// 注销所有检测失败的过期连接
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
Loggers.REMOTE_DIGEST
.info("[{}]Unregister Out dated connection....", outDateConnectionId);
// 注销连接
unregister(outDateConnectionId);
}
}
}
// 如果正在进行负载调整,重置相关参数
if (isLoaderClient) {
loadClient = -1;
redirectAddress = null;
}
Loggers.REMOTE_DIGEST.info("Connection check task end");
} catch (Throwable e) {
// 记录连接检查过程中发生的任何错误
Loggers.REMOTE.error("Error occurs during connection check... ", e);
}
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
客户端最核心的就是第4点,客户端发送的心跳请求,由于是异步方式发送,此处通过 CountDownLatch 来控制异步请求的响应处理的等待,流程如下:
ClientDetectionRequest 心跳检测请求客户端处理逻辑很简单,就是响应返回成功

当连接心跳没有进入到成功列表时,则进行注销操作
javapublic synchronized void unregister(String connectionId) {
// 从连接列表中删除
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
// 删除连接的客户端IP
connectionForClientIp.remove(clientIp);
}
}
// 关闭连接
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
// 通知监听器
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
此处会查找客户端关闭的监听器,关闭后进行发布事件 ClientDisconnectEvent
javapublic void notifyClientDisConnected(final Connection connection) {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
// 查找客户端关闭监听器
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}
}
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
// 删除客户端
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
// 释放客户端
client.release();
// 发布事件 ClientDisconnectEvent
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client)));
return true;
}
Nacos 中针对注册的服务实例最后活跃时间进行判断,如果超过最大活跃时间 20s,则会进入到待驱逐列表中,服务端会主动通过心跳检测服务来确认服务是否可用的。
核心作用:
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!