2025-10-24
微服务与分布式
0

目录

心跳机制
心跳的启动(健康检查)
服务端健康检查
客户端过期检测
心跳超时处理
总结

上一篇文章中,我们梳理了 当客户端进行服务注册以后,服务端接收到注册服务的请求进行处理 的源码部分,那服务端是如何知晓注册上来的客户端是否还正常呢?接下来本文将揭秘这一切

心跳机制

心跳机制是分布式系统中用于检测节点存活状态的重要机制,通过定期发送心跳包来确认服务实例的健康状况,整体流程如下

image.png

心跳的启动(健康检查)

客户端 通过定时任务定期向 服务端 发送心跳请求,其入口则是在 NamingGrpcClientProxy 构建对象时,初始化了 RpcClient 客户端,同时执行 RpcClient #start 方法

  • 通过异步任务,每间隔5秒进行一次健康检查
  • 处理 ClientDetectionRequest 客户端检测请求,服务端请求而来,客户端响应
java
public final void start() throws NacosException { // ... // 异步任务 clientEventExecutor.submit(() -> { while (true) { try { if (isShutdown()) { break; } // 阻塞队列,每5秒出队 ReconnectContext reconnectContext = reconnectionSignal .poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS); if (reconnectContext == null) { // 检查活跃时间是否超过5秒 if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) { // 健康检查 boolean isHealthy = healthCheck(); if (!isHealthy) { if (currentConnection == null) { continue; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", rpcClientConfig.name(), currentConnection.getConnectionId()); RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { break; } boolean statusFLowSuccess = RpcClient.this.rpcClientStatus .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); if (statusFLowSuccess) { reconnectContext = new ReconnectContext(null, false); } else { continue; } } else { lastActiveTimeStamp = System.currentTimeMillis(); continue; } } else { continue; } } if (reconnectContext.serverInfo != null) { // clear recommend server if server is not in server list. boolean serverExist = false; for (String server : getServerListFactory().getServerList()) { ServerInfo serverInfo = resolveServerInfo(server); if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { serverExist = true; reconnectContext.serverInfo.serverPort = serverInfo.serverPort; break; } } if (!serverExist) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", rpcClientConfig.name(), reconnectContext.serverInfo.getAddress()); reconnectContext.serverInfo = null; } } reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail); } catch (Throwable throwable) { // Do nothing } } }); // ... // 注册客户端检测请求 registerServerRequestHandler(request -> { // 客户端检测请求 if (request instanceof ClientDetectionRequest) { // 返回客户端检测响应 return new ClientDetectionResponse(); } return null; }); }

我们来看下健康检查的 healthCheck 方法,主要通过 rpc 客户端发送 HealthCheckRequest 健康检查请求到服务端

java
private boolean healthCheck() { // 健康检查请求 HealthCheckRequest healthCheckRequest = new HealthCheckRequest(); if (this.currentConnection == null) { return false; } int reTryTimes = rpcClientConfig.healthCheckRetryTimes(); while (reTryTimes >= 0) { reTryTimes--; try { Response response = this.currentConnection .request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut()); // not only check server is ok, also check connection is register. return response != null && response.isSuccess(); } catch (NacosException e) { // ignore } } return false; }

与 Nacos 1.4 版本的不同

Nacos 1.4 版本中,是在客户端发起心跳,在客户端注册后,如果是临时实例则开启心跳任务,源码示例如下

java
@PostMapping("/instance") public String register(HttpServletRequest request) throws Exception { // 解析请求参数 Instance instance = parseInstance(request); // 核心:ServiceManager处理注册 serviceManager.registerInstance(serviceName, instance); // 启动心跳监听(BeatReactor) beatReactor.addBeatInfo(serviceName, instance); return "ok"; }

在 Nacos 2.0+ 版本开始,客户端构建 RpcClient,并通过异步任务结合心跳进行统计客户端 健康检查

服务端健康检查

服务端的健康检查入口则是在 BaseRpcServer #start 中,可通过 HealthCheckRequest 健康检查请求从源码中查找

java
@PostConstruct public void start() throws Exception { String serverName = getClass().getSimpleName(); Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort()); // 开启服务 startServer(); Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort()); // 添加关机挂钩 Runtime.getRuntime().addShutdownHook(new Thread(() -> { Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName); try { // 停止服务 BaseRpcServer.this.stopServer(); Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName); } catch (Exception e) { Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e); } })); }

核心为 startServer() 方法,主要调用其内部实现 addServices()

java
@Override public void startServer() throws Exception { // ... // 添加服务 addServices(handlerRegistry, serverInterceptor); // ... }

添加服务的源码如下:

java
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) { // 注册一元调用(普通请求-响应模式)的服务方法 // 定义一个一元调用的方法描述符,用于处理单次请求和单次响应 final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder() // 设置方法类型为一元调用(请求-响应) .setType(MethodDescriptor.MethodType.UNARY) // 设置完整的方法名,格式为"服务名/方法名" .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)) // 设置请求参数的序列化器,用于将Payload对象序列化为字节流 .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())) // 设置响应结果的序列化器,用于将字节流反序列化为Payload对象 .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); // 创建一元调用的处理器,当接收到请求时,调用grpcCommonRequestAcceptor处理请求 final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls // 创建异步一元调用处理器,传入处理逻辑:收到请求后调用grpcCommonRequestAcceptor的request方法 .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver)); // 构建一元调用的服务定义,将方法描述符和处理器关联起来 final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME) // 添加方法定义和对应的处理器 .addMethod(unaryPayloadMethod, payloadHandler).build(); // 将构建好的服务注册到处理器注册表中,并应用服务器拦截器 handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor)); // 注册双向流式调用的服务方法 // 创建双向流式调用的处理器,用于处理客户端和服务端之间的双向数据流 final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall( // 创建异步双向流式调用处理器,传入处理逻辑:调用grpcBiStreamRequestAcceptor的requestBiStream方法 (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)); // 定义一个双向流式调用的方法描述符 final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder() // 设置方法类型为双向流式调用 .setType(MethodDescriptor.MethodType.BIDI_STREAMING) // 设置完整的方法名,格式为"服务名/方法名" .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)) // 设置请求参数的序列化器 .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())) // 设置响应结果的序列化器 .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build(); // 构建双向流式调用的服务定义,将方法描述符和处理器关联起来 final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition .builder(REQUEST_BI_STREAM_SERVICE_NAME) // 添加方法定义和对应的处理器 .addMethod(biStreamMethod, biStreamHandler).build(); // 将构建好的服务注册到处理器注册表中,并应用服务器拦截器 handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor)); }

核心源码分析

  • 创建gRPC一元调用处理器: 这段代码使用 ServerCalls.asyncUnaryCall() 创建了一个异步一元调用处理器
  • 处理客户端请求: 当客户端发起gRPC请求时,会执行 lambda 表达式中的逻辑
  • 转发请求处理: 将接收到的请求转发给 grpcCommonRequestAcceptor.request() 方法进行具体处理

接收器处理的请求,都会交给 GrpcRequestAcceptor #request() 处理,核心是刷新服务的活跃时间

java
@Override public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) { // ... // 刷新活跃时间 connectionManager.refreshActiveTime(requestMeta.getConnectionId()); // ... }

再来看下服务端健康检查的入口,没有做任何处理,所以核心逻辑还在上处

java
@Component public class HealthCheckRequestHandler extends RequestHandler<HealthCheckRequest, HealthCheckResponse> { @Override @TpsControl(pointName = "HealthCheck") public HealthCheckResponse handle(HealthCheckRequest request, RequestMeta meta) { return new HealthCheckResponse(); } }

客户端过期检测

有了上处在服务端中,刷新客户端的上次活跃时间,那此时间在何处使用呢?

接下来我们来看下在 ConnectionManager 中更新的 lastActiveTime 属性,查看其 getLastActiveTime() 何处用到。

通过引用找到了 ConnectionManager 里面的 start 方法中,这个方法上有 @PostConstruct,也就是 ConnectionManager Bean 实例对象在被创建完成时,构造方法执行完毕就执行这个方法

image.png

通过上述源码中可以发现,其通过 ScheduledThreadPoolExecutor #scheduleWithFixedDelay 启动了一个固定延迟时间的任务,每隔3秒钟执行一次,其 start 方法内部主要做4步逻辑

  1. 连接过载检测与计算:检查每个连接是否超出IP限制
  2. 标记过期连接和超限连接:标记超出IP限制的连接和超时连接
  3. 发送连接重置请求:发送 ConnectResetRequest 请求
  4. 客户端活跃度检测:发送 ClientDetectionRequest 检测客户端活性
java
@PostConstruct public void start() { // 启动不健康连接驱逐任务,每3秒执行一次,首次延迟1秒执行 RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> { try { // 获取当前总连接数 int totalCount = connections.size(); Loggers.REMOTE_DIGEST.info("Connection check task start"); // 更新监控指标中的长连接数量 MetricsMonitor.getLongConnectionMonitor().set(totalCount); // 获取所有连接的键值对集合,用于后续遍历检查 Set<Map.Entry<String, Connection>> entries = connections.entrySet(); // 统计当前SDK客户端连接数 int currentSdkClientCount = currentSdkClientCount(); // 判断是否处于负载调整状态(loadClient >= 0表示正在调整) boolean isLoaderClient = loadClient >= 0; // 确定当前最大允许连接数:如果是负载调整状态则使用loadClient,否则使用规则中配置的限制数 int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit; // 计算需要驱逐的连接数:如果限制数小于0表示无限制,否则计算超出部分 int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0); // 记录连接检查的统计信息 Loggers.REMOTE_DIGEST .info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}", totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount), currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount); // 创建存储需要驱逐的客户端连接ID的列表 List<String> expelClient = new LinkedList<>(); // 创建按IP统计需要驱逐连接数的映射表 Map<String, AtomicInteger> expelForIp = new HashMap<>(16); // 第一步:根据IP限制规则计算每个IP需要驱逐的连接数 for (Map.Entry<String, Connection> entry : entries) { // 遍历每个连接 Connection client = entry.getValue(); String appName = client.getMetaInfo().getAppName(); String clientIp = client.getMetaInfo().getClientIp(); // 只处理来自SDK的连接,并且该IP尚未计算过驱逐数 if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) { // 获取针对该IP的连接数限制 int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp); // 如果IP没有单独配置限制,则尝试获取应用级别的限制 if (countLimitOfIp < 0) { int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName); countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp; } // 如果仍未获取到限制配置,则使用默认IP连接数限制 if (countLimitOfIp < 0) { countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault(); } // 如果有限制配置并且该IP当前连接数超过了限制,则计算需要驱逐的数量 if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) { AtomicInteger currentCountIp = connectionForClientIp.get(clientIp); if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) { // 将需要驱逐的连接数记录到expelForIp映射中 expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp)); } } } } // 记录超限IP的数量 Loggers.REMOTE_DIGEST .info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size()); // 如果有超限IP,则记录详细信息 if (expelForIp.size() > 0) { Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp); } // 创建存储过期连接ID的集合 Set<String> outDatedConnections = new HashSet<>(); // 获取当前时间戳,用于判断连接是否过期 long now = System.currentTimeMillis(); // 第二步:根据IP限制规则确定要驱逐的具体连接 for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String clientIp = client.getMetaInfo().getClientIp(); AtomicInteger integer = expelForIp.get(clientIp); // 如果该IP需要驱逐连接,且还有未处理的驱逐配额 if (integer != null && integer.intValue() > 0) { // 减少驱逐配额,并将该连接加入驱逐列表 integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; } // 如果不是因IP限制需要驱逐,但连接已过期(超过KEEP_ALIVE_TIME时间未活动) else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { // 将过期连接加入过期连接集合 outDatedConnections.add(client.getMetaInfo().getConnectionId()); } } // 第三步:如果总的驱逐数量仍有剩余,继续选择其他连接进行驱逐 if (expelCount > 0) { for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); // 选择那些不在IP限制驱逐范围内的SDK连接进行驱逐 if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo() .isSdkSource() && expelCount > 0) { expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; // 从过期连接列表中移除,避免重复处理 outDatedConnections.remove(client.getMetaInfo().getConnectionId()); } } } // 解析重定向地址,提取服务器IP和端口 String serverIp = null; String serverPort = null; if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) { String[] split = redirectAddress.split(Constants.COLON); serverIp = split[0]; serverPort = split[1]; } // 对所有需要驱逐的客户端发送连接重置请求 for (String expelledClientId : expelClient) { try { Connection connection = getConnection(expelledClientId); if (connection != null) { // 创建连接重置请求,包含推荐的新服务器地址 ConnectResetRequest connectResetRequest = new ConnectResetRequest(); connectResetRequest.setServerIp(serverIp); connectResetRequest.setServerPort(serverPort); // 异步发送重置请求 connection.asyncRequest(connectResetRequest, null); Loggers.REMOTE_DIGEST .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}", expelledClientId, connectResetRequest.getServerIp(), connectResetRequest.getServerPort()); } } catch (ConnectionAlreadyClosedException e) { // 如果连接已经关闭,则直接注销 unregister(expelledClientId); } catch (Exception e) { // 记录驱逐连接时发生的错误 Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e); } } // 第四步:对过期连接进行主动检测,确认是否仍然存活 Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size()); if (CollectionUtils.isNotEmpty(outDatedConnections)) { // 存储检测成功的连接ID集合 Set<String> successConnections = new HashSet<>(); // 使用CountDownLatch等待所有检测请求完成 final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); // 对每个过期连接发送检测请求 for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { // 创建客户端检测请求 ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); // 异步发送检测请求 connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { // 不指定特定执行器 return null; } @Override public long getTimeout() { // 设置1秒超时 return 1000L; } @Override public void onResponse(Response response) { // 检测响应处理 latch.countDown(); if (response != null && response.isSuccess()) { // 如果检测成功,更新连接的活跃时间 connection.freshActiveTime(); // 将连接ID加入成功连接集合 successConnections.add(outDateConnectionId); } } @Override public void onException(Throwable e) { // 发生异常时也减少计数器 latch.countDown(); } }); Loggers.REMOTE_DIGEST .info("[{}]send connection active request ", outDateConnectionId); } else { // 连接不存在时减少计数器 latch.countDown(); } } catch (ConnectionAlreadyClosedException e) { // 连接已关闭时减少计数器 latch.countDown(); } catch (Exception e) { // 记录检测过程中发生的错误 Loggers.REMOTE_DIGEST .error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e); latch.countDown(); } } // 等待最多3秒直到所有检测完成 latch.await(3000L, TimeUnit.MILLISECONDS); Loggers.REMOTE_DIGEST .info("Out dated connection check successCount={}", successConnections.size()); // 注销所有检测失败的过期连接 for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST .info("[{}]Unregister Out dated connection....", outDateConnectionId); // 注销连接 unregister(outDateConnectionId); } } } // 如果正在进行负载调整,重置相关参数 if (isLoaderClient) { loadClient = -1; redirectAddress = null; } Loggers.REMOTE_DIGEST.info("Connection check task end"); } catch (Throwable e) { // 记录连接检查过程中发生的任何错误 Loggers.REMOTE.error("Error occurs during connection check... ", e); } }, 1000L, 3000L, TimeUnit.MILLISECONDS); }

客户端最核心的就是第4点,客户端发送的心跳请求,由于是异步方式发送,此处通过 CountDownLatch 来控制异步请求的响应处理的等待,流程如下:

  • 对标记为过期的连接发送 ClientDetectionRequest 心跳检测请求
  • 使用异步回调方式等待客户端响应
  • 更新成功响应连接的活跃时间戳
  • 移除无响应的过期连接,释放资源

客户端处理逻辑很简单,就是响应返回成功

image.png

心跳超时处理

当连接心跳没有进入到成功列表时,则进行注销操作

java
public synchronized void unregister(String connectionId) { // 从连接列表中删除 Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { // 删除连接的客户端IP connectionForClientIp.remove(clientIp); } } // 关闭连接 remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); // 通知监听器 clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } }

此处会查找客户端关闭的监听器,关闭后进行发布事件 ClientDisconnectEvent

java
public void notifyClientDisConnected(final Connection connection) { for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { try { // 查找客户端关闭监听器 clientConnectionEventListener.clientDisConnected(connection); } catch (Throwable throwable) { Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable); } } } @Override public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); // 删除客户端 ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } // 释放客户端 client.release(); // 发布事件 ClientDisconnectEvent NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client))); return true; }

总结

Nacos 中针对注册的服务实例最后活跃时间进行判断,如果超过最大活跃时间 20s,则会进入到待驱逐列表中,服务端会主动通过心跳检测服务来确认服务是否可用的。

核心作用:

  • 健康检测:定期确认服务实例是否存活
  • 故障发现:及时发现并处理宕机或网络异常的节点
  • 自动剔除:将不健康的服务实例从服务列表中移除
  • 状态同步:保持注册中心与客户端之间的状态一致性

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!