1. 前言在 Raft KV 系统中每个节点Node都是对等的。一个典型的请求流向是Client-Leader Node-Raft 日志同步-大多数节点确认-应用到状态机 (KV Store)-返回 Client。2. 设计步骤Raft 核心组件包括一致性结点模块RPC 通信日志模块。2.1 日志写日志 → 复制日志 → commit → apply 【leader应用顺序】 细分一下的话就如下 Client ↓ Leader ↓ append log (本地) ↓ 发送 AppendEntries ↓ Followers append log ↓ 多数成功 ↓ Leader commit ↓ Leader apply ↓ 返回客户端成功 ↓ Leader 下次心跳带 commitIndex ↓ Followers apply首先看一下客户端发送一个请求涉及到的大致东西有哪些从后往前看我们需要设计的就是如何写入日志文件以及日志文件的格式该如何设计呢此处我们就弄简单一点儿就好了totalLength inttermlongindexlongcommandLengthintcommandbyte[]整条log entry 长度Raft term日志 index命令长度真正命令这个肯定是变长的totalLength 8 (term) 8 (index) 4 (commandLength) commandLengthpublic class LogEntry { private final long term; private final long index; private final String command; .... }日志存储与管理/** * Description: 日志存储与管理 * Author: txf * Date: 2026/2/9 */ public class LogManager { // 日志文件路径 private static final String LOG_FILE_PATH easy_kv_log.dat; // 内存映射的分段大小128MB可根据内存调整,这里先调整为两兆 private static final int MAPPED_SIZE 2 * 1024 * 1024; // 文件打开模式rw 读写 private static final String FILE_MODE rw; private final File logFile; private RandomAccessFile raf; private FileChannel fileChannel; // 当前映射的内存缓冲区 private MappedByteBuffer currentMappedBuffer; // 当前映射段的起始偏移文件偏移 private long currentMappedOffset 0; // 当前写入的位置相对于文件的总偏移 private long writePosition 0; public LogManager() { this.logFile new File(LOG_FILE_PATH); initFileChannel(); initMappedBuffer(); // 初始化时定位到文件末尾继续追加写 try { this.writePosition fileChannel.size(); } catch (IOException e) { throw new RuntimeException(获取文件大小失败, e); } } /** * 追加写入单条日志核心高性能写入 * param term Raft任期 * param index 日志索引 * param command KV操作命令如PUT key value */ public void appendLogEntry(long term, long index, String command) { // 1. 准备命令字节数组 byte[] commandBytes command.getBytes(StandardCharsets.UTF_8); int commandLength commandBytes.length; // 2. 计算总长度 int totalLength 4 8 8 4 commandLength; // 3. 准备直接缓冲区堆外内存避免拷贝 ByteBuffer directBuffer ByteBuffer.allocateDirect(totalLength); // 按格式写入缓冲区 -- 这里是写入二进制的文件内容我们人类就读不懂了 directBuffer.putInt(totalLength); directBuffer.putLong(term); directBuffer.putLong(index); directBuffer.putInt(commandLength); directBuffer.put(commandBytes); // 这里是写入字符串的 // directBuffer.put((term index command).getBytes(StandardCharsets.UTF_8)); // 翻转缓冲区从写模式转为读模式 directBuffer.flip(); // 4. 写入到内存映射缓冲区核心零拷贝 writeToMappedBuffer(directBuffer); // 5. 更新全局写入位置 writePosition totalLength; } /** * 将缓冲区数据写入内存映射区自动扩容映射段 */ private void writeToMappedBuffer(ByteBuffer buffer) { while (buffer.hasRemaining()) { // 检查当前映射缓冲区是否有足够剩余空间 if (currentMappedBuffer.remaining() buffer.remaining()) { // 先写入当前映射区的剩余空间 int remaining currentMappedBuffer.remaining(); byte[] temp new byte[remaining]; buffer.get(temp); currentMappedBuffer.put(temp); // 强制刷盘将映射内存的数据同步到磁盘可选批量刷盘可提升性能 currentMappedBuffer.force(); // 扩容映射段 initMappedBuffer(); } else { // 直接写入映射缓冲区 currentMappedBuffer.put(buffer); } } } /** * 读取指定索引的日志条目高性能读取 */ public LogEntry readLogEntry(long index) { try { // 使用FileChannel 直接缓冲区读取 ByteBuffer directBuffer ByteBuffer.allocateDirect(1024 * 1024); // 1MB直接缓冲区 long fileOffset 0; long fileSize fileChannel.size(); while (fileOffset fileSize) { // 重置缓冲区 directBuffer.clear(); // 从文件指定偏移读取数据到缓冲区 int readBytes fileChannel.read(directBuffer, fileOffset); if (readBytes -1) break; directBuffer.flip(); // 解析缓冲区中的日志条目 while (directBuffer.hasRemaining()) { // 检查剩余字节是否足够读取固定头部488424字节 if (directBuffer.remaining() 24) break; // 读取固定字段 int totalLength directBuffer.getInt(); long term directBuffer.getLong(); long currentIndex directBuffer.getLong(); int commandLength directBuffer.getInt(); // 检查剩余字节是否足够读取command if (directBuffer.remaining() commandLength) { // 回退缓冲区position下次继续解析 directBuffer.position(directBuffer.position() - 24); break; } // 读取command byte[] commandBytes new byte[commandLength]; directBuffer.get(commandBytes); String command new String(commandBytes, StandardCharsets.UTF_8); // 校验总长度 int actualLength 4 8 8 4 commandLength; if (totalLength ! actualLength) { throw new RuntimeException(日志文件损坏总长度不匹配); } // 找到目标索引则返回 if (currentIndex index) { return new LogEntry(term, index, command); } // 更新文件偏移 fileOffset totalLength; } } } catch (IOException e) { throw new RuntimeException(读取日志条目失败 [index index ], e); } return null; } /** * 加载所有日志条目节点启动时恢复 */ public ListLogEntry loadAllLogEntries() { ListLogEntry logEntries new ArrayList(); try { ByteBuffer directBuffer ByteBuffer.allocateDirect(1024 * 1024); long fileOffset 0; long fileSize fileChannel.size(); while (fileOffset fileSize) { directBuffer.clear(); int readBytes fileChannel.read(directBuffer, fileOffset); if (readBytes -1) break; directBuffer.flip(); while (directBuffer.hasRemaining()) { if (directBuffer.remaining() 24) break; int totalLength directBuffer.getInt(); long term directBuffer.getLong(); long index directBuffer.getLong(); int commandLength directBuffer.getInt(); if (directBuffer.remaining() commandLength) { directBuffer.position(directBuffer.position() - 24); break; } byte[] commandBytes new byte[commandLength]; directBuffer.get(commandBytes); String command new String(commandBytes, StandardCharsets.UTF_8); int actualLength 4 8 8 4 commandLength; if (totalLength ! actualLength) { throw new RuntimeException(日志文件损坏总长度不匹配); } logEntries.add(new LogEntry(term, index, command)); fileOffset totalLength; } } } catch (IOException e) { throw new RuntimeException(加载所有日志条目失败, e); } return logEntries; } /** * 强制刷盘将映射内存的数据同步到磁盘 */ public void forceFlush() { if (currentMappedBuffer ! null) { currentMappedBuffer.force(); // 同步映射内存到磁盘 } try { fileChannel.force(true); // 强制刷盘包含元数据 } catch (IOException e) { throw new RuntimeException(刷盘失败, e); } } /** * 关闭资源必须调用否则会导致文件句柄泄漏 */ public void close() { try { forceFlush(); if (fileChannel ! null) { fileChannel.close(); } if (raf ! null) { raf.close(); } } catch (IOException e) { throw new RuntimeException(关闭资源失败, e); } } /** * 初始化/扩容内存映射缓冲区 */ private void initMappedBuffer() { try { // 计算需要映射的起始位置和大小 long fileSize fileChannel.size(); // 如果当前映射段已写满或首次初始化创建新的映射 if (currentMappedBuffer null || writePosition currentMappedOffset MAPPED_SIZE) { currentMappedOffset (writePosition / MAPPED_SIZE) * MAPPED_SIZE; // 映射文件的指定区间到内存FileChannel.MapMode.READ_WRITE读写模式 currentMappedBuffer fileChannel.map( FileChannel.MapMode.READ_WRITE, currentMappedOffset, MAPPED_SIZE ); // 将缓冲区的position定位到当前写入位置相对于映射段的偏移 currentMappedBuffer.position((int) (writePosition - currentMappedOffset)); } } catch (IOException e) { throw new RuntimeException(初始化内存映射缓冲区失败, e); } } /** * 初始化FileChannel核心高性能通道 */ private void initFileChannel() { try { // 不存在则创建文件 if (!logFile.exists()) { boolean newFile logFile.createNewFile(); if (!newFile) { throw new RuntimeException(创建文件失败); } } this.raf new RandomAccessFile(logFile, FILE_MODE); this.fileChannel raf.getChannel(); } catch (IOException e) { throw new RuntimeException(初始化FileChannel失败, e); } } }2.2 服务端设计2.2.1 消息设计日志设计好了之后接下来看服务端如何设计。我们使用的是netty框架要求有netty基础。然后序列化协议采用的是protobuf读者可以参考这篇文章https://mp.weixin.qq.com/s/kg_-AMHRn_DzFbfBnkK4VQ 这篇文章大致讲解了一下该序列化协议并且也是采用netty整合的。根据proto文件生成类的命令如下也可以用idea的插件自动生成。protoc --proto_pathxxxx目录 --java_outxxx目录 具体的proto文件消息模板如下syntax proto3; option java_outer_classname KvRaftProto; // 生成的外层类名 option java_multiple_files false; // 生成多个独立的Java类而非内部类 // 1. 通用消息封装体核心 // Netty传输时只传这个消息通过type识别具体消息类型 message RaftKvMessage { // 消息类型枚举覆盖所有交互场景 enum MessageType { UNKNOWN 0; // 未知类型兜底 // 客户端 ↔ 节点KV操作 KV_REQUEST 1; // 客户端发起KV请求PUT/GET/DELETE KV_RESPONSE 2; // 节点响应客户端KV请求 // 节点 ↔ 节点Raft共识 VOTE_REQUEST 3; // 选举请求Candidate→Follower VOTE_RESPONSE 4; // 选举响应Follower→Candidate APPEND_ENTRIES_REQUEST 5; // 日志追加/心跳Leader→Follower APPEND_ENTRIES_RESPONSE 6; // 日志追加响应Follower→Leader } MessageType type 1; // 消息类型必传 string node_id 2; // 发送方节点ID用于识别节点 // 具体消息体根据type选择其中一个 KvRequest kv_request 3; KvResponse kv_response 4; VoteRequest vote_request 5; VoteResponse vote_response 6; AppendEntriesRequest append_entries_request 7; AppendEntriesResponse append_entries_response 8; } // 2. 客户端KV操作相关 // 客户端发起的KV请求PUT/GET/DELETE message KvRequest { enum OpType { PUT 0; // 写入/更新 GET 1; // 读取 DELETE 2; // 删除 } OpType op_type 1; // 操作类型必传 string key 2; // KV的key必传 string value 3; // KV的value仅PUT时传 // 可选请求ID用于幂等性防止重复请求 string request_id 4; } // 节点响应客户端的KV结果 message KvResponse { bool success 1; // 操作是否成功 string message 2; // 错误信息/提示失败时必传 string value 3; // 返回的value仅GET成功时传 string request_id 4; // 对应请求的ID幂等性 } // 3. Raft选举相关 // Candidate向Follower发起的投票请求 message VoteRequest { int64 term 1; // Candidate的当前任期必传 string candidate_id 2; // Candidate的节点ID必传 int64 last_log_index 3; // Candidate最后一条日志的索引用于日志一致性检查 int64 last_log_term 4; // Candidate最后一条日志的任期用于日志一致性检查 } // Follower响应Candidate的投票结果 message VoteResponse { int64 term 1; // Follower的当前任期必传用于更新Candidate的任期 bool vote_granted 2; // 是否投赞成票必传 } // 4. Raft日志追加/心跳相关 // 日志条目与你设计的日志格式对齐序列化后可直接写入日志文件 message LogEntry { int64 term 1; // Raft任期对应你日志格式的term int64 index 2; // 日志索引对应你日志格式的index string command 3; // KV操作命令如PUT key1 value1对应你日志格式的command } // Leader向Follower发送的日志追加/心跳请求 message AppendEntriesRequest { int64 term 1; // Leader的当前任期必传 string leader_id 2; // Leader的节点ID必传 int64 prev_log_index 3; // 前一条日志的索引用于日志一致性检查 int64 prev_log_term 4; // 前一条日志的任期用于日志一致性检查 repeated LogEntry entries 5; // 待追加的日志条目心跳时为空 int64 leader_commit 6; // Leader已提交的日志索引Follower据此更新自己的提交索引 } // Follower响应Leader的日志追加结果 message AppendEntriesResponse { int64 term 1; // Follower的当前任期必传用于更新Leader的任期 bool success 2; // 日志追加是否成功必传 int64 match_index 3; // Follower已匹配的日志索引Leader据此更新nextIndex }上面是消息的大致格式。接下来看服务端的节点设计我们从netty服务启动开始往下看在kv-core的app包里面public static void main(String[] args) { int port getPort(args); RaftNettyServer raftNettyServer new RaftNettyServer(port); try { raftNettyServer.start(); } catch (InterruptedException e) { throw new RuntimeException(e); } } private static int getPort(String[] args) { for (String arg : args) { if (arg.startsWith(node.port)) { return Integer.parseInt(arg.substring(10)); } } return 7777; }从java程序启动的命令行读取结点的端口参数我们可以用一台电脑开启多个应用在idea中这样配置就可以了如下图所示从上图中可以看到配置了三个节点然后本项目的jdk是采用的21这个版本。配置好了之后在idea的services里面可以把这些配置一起加进去然后就可以同时启动多个节点了。2.2.2 连接设计RaftNettyServer raftNettyServer new RaftNettyServer(port); ... raftNettyServer.start();从上一节的启动类看出来主要就是new了一个server对象然后调用start方法。我们顺着这两个看就可以了。首先是构造方法public class RaftNettyServer { .... private final int port; private final RaftNode node; public RaftNettyServer(int port) { this.port port; // 这里创建了一个raft结点对象 this.node new RaftNode(port); } } // 这个是RaftNode类 public RaftNode(int port) { this.port port; // 从配置文件中找到自己 this.nodesConfig new NodesConfig(); this.nodeId nodesConfig.findSelf(port); // 需要把自身结点 this.rpcPeers nodesConfig.getNodeList().stream() // node的格式是ip:port .map(node - new RpcPeer(node, node.split(:)[0], Integer.parseInt(node.split(:)[1]), this)) .toList(); this.logManager new LogManager(); this.storage new MemoryStorage(); // 把两个时间先初始化咯 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); this.lastHeartbeatTime System.currentTimeMillis(); log.info(初始化选举超时{}, electionTimeout); // 定时器 scheduler Executors.newScheduledThreadPool(2); scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS); }Raft类是最核心的一个类。上面的构造方法其实很简单。读者自行理解。需要说明一下的就是nodeId的格式【ip:端口】127.0.0.1:8888 // 就是这种字符串的格式还有要说明的就是rpcPeers这个List的构建可以看出来先是从配置文件读取到了集群节点列表然后遍历这个列表创建了对象这个具体是什么意思呢首先看一下Netty的客户端发送请求到服务端服务端处理后在返回给客户端客户端根据响应结果进行逻辑处理。这样一个示意图再看一下下面的连接示意图NettyClient不仅仅是给客户用的集群结点内部互相通信也要用到这个rpcPeers就是集群节点内部通信使用的。如上图所示每个节点都有一个NettyServer启动并监听着端口同时Leader结点需要给所有的Follower结点发送心跳请求此时这个Leader相当于其他两个Follower结点就相当于Client了所以在结点一里面有client的部分在项目归到了rpc的包下也就是上面那个rpcPeers的由来了。在集群启动的时候都是Follower结点只有等到超时了才会开始选主在此之前每个节点都有成为Candidate的可能也就是说每个节点都有向其他结点发送投票请求的可能VoteRequest那么每个节点里面都要有一套Netty Client及处理流程。就如下图所示这样就有一个问题了三个节点我就要六个tcp连接了四个节点就要12个连接十个节点呢就要90个连接这也太多了吧那也确实是的。有一个思路是搞一个中间层叫做routeCenter所有结点连向它然后消息都经过这个路由中心来转发这样连接数就会少很多了。还有一个思路反正NettyClient 一个NettyServer构建出一个Channel理论上我只需要三个tcp连接就可以了啊。如下图所示但是这样的话节点间通信需要转发了代码逻辑就太复杂了况且还有一个问题那就是如何确定这个tcp的连接顺序呢这个可以按照nodeId的字典顺序来嘛。反正就是麻烦就完事儿了。。。综上所述还是最开始的方案最简单直接了反正这就是一个简易的案例不要考虑太多了。如果有一些其他合适的思路欢迎读者给出。节点之间的连接设计就是上面的样子了。接下来看RaftNode的设计。2.2.3 RaftNode设计那就从构造器开始看吧public RaftNode(int port) { this.port port; // 从配置文件中找到自己 this.nodesConfig new NodesConfig(); this.nodeId nodesConfig.findSelf(port); // 需要把自身结点 this.rpcPeers nodesConfig.getNodeList().stream() // node的格式是ip:port .map(node - new RpcPeer(node, node.split(:)[0], Integer.parseInt(node.split(:)[1]), this)) .toList(); this.logManager new LogManager(); this.storage new MemoryStorage(); // 把两个时间先初始化咯 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); this.lastHeartbeatTime System.currentTimeMillis(); log.info(初始化选举超时{}, electionTimeout); // 定时器 scheduler Executors.newScheduledThreadPool(2); scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS); }可以看到都是做一些初始化的工作然后下面是开启了一个定时任务tickprivate void tick() { log.info(检查是否超时{} 状态: {}, nodeId, state); try { if (state ! NodeState.LEADER isTimeout()) { becomeCandidate(); } else if (state NodeState.LEADER) { // sendHeartbeats(); } } catch ( Exception e ) { log.error({} 节点tick定时任务异常, nodeId, e); } } private void becomeCandidate() { log.info({} 选举超时转为 Candidate开始任期: {}, nodeId, currentTerm.get() 1); state NodeState.CANDIDATE; currentTerm.getAndIncrement(); // 任期1 votedFor nodeId; // 给自己投一票 resetElectionTimeout(); // 集群发送投票请求 requestVotes(); } private boolean isTimeout() { return System.currentTimeMillis() - lastHeartbeatTime electionTimeout; } private void resetElectionTimeout() { // 8000ms ~ 12000ms 随机超时避免平票 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); log.info(重置 {} 节点选举时间随机超时{} ms, nodeId, electionTimeout); this.lastHeartbeatTime System.currentTimeMillis(); }主要就是看becomeCandidate这个方法最后的向集群发送投票请求。private void requestVotes() { // 1. 初始化票数自己的一票 AtomicInteger grantedVotes new AtomicInteger(1); long count rpcPeers.stream().filter(peer - !peer.isSelf()).count(); // 不包含自己的结点数 int majority (int) ((count 1) / 2 1); // 总节点数(包含自己)的半数以上 // 2.构造投票消息 KvRaftProto.VoteRequest voteRequest KvRaftProto.VoteRequest.newBuilder() .setTerm(currentTerm.get()) .setCandidateId(nodeId) .setLastLogIndex(logManager.getLastLogIndex()) .setLastLogTerm(logManager.getLastLogTerm()) .build(); // 3. 发送投票请求 // 构建一个对象表示当前投票请求的状态 // String voteId UUID.randomUUID().toString().replaceAll(-, ); // 这里为什么可以用term因为 Raft 规定一个节点在一个 term 内只能投一张票。 // 所以只要 term 匹配这个响应就一定是针对你当前发起的这一轮选举的。 GlobalVoteManager.setVoteState(currentTerm.get(), new VoteState(nodeId, currentTerm.get(), majority)); int countSend 0; for (RpcPeer peer : rpcPeers) { if (!peer.isSelf()) { // 不是自身结点就发送投票请求 // send方法就很简单了请读者自行查看 boolean send peer.send(KvRaftProto.RaftKvMessage.newBuilder() .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST) .setVoteRequest(voteRequest) .build()); if ( send ) countSend; } } log.info(发送投票请求{}已发送给了 {} 个结点.., voteRequest, countSend); }这样投票请求就发送出去了此时结点是作为客户端发送给其他节点的接下来的逻辑就是其他节点接收到voteRequest请求然后做逻辑处理所以就要在server包下面去查看具体逻辑。// 在kv-core的server包下面的KvBusinessHandler.java // 1.如果是投票请求 if ( raftKvMessage.getType() KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST) { log.info(receive vote request.........); KvRaftProto.VoteRequest voteRequest raftKvMessage.getVoteRequest(); // 可以看到交给了node去处理 KvRaftProto.VoteResponse voteResponse node.tackleVoteRequest(voteRequest); ctx.writeAndFlush(KvRaftProto.RaftKvMessage.newBuilder() .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE) .setVoteResponse(voteResponse) .build()); } // 又回到了RaftNode类了 public KvRaftProto.VoteResponse tackleVoteRequest(KvRaftProto.VoteRequest voteRequest) { // 比较任期 if (voteRequest.getTerm() currentTerm.get()) { log.info({} 投票请求任期太小拒绝投票, nodeId); return buildVoteResponse(false, currentTerm.get()); } if ( votedFor ! null !voteRequest.getCandidateId().equals(votedFor) ) { log.info({}已投给其他人拒绝该投票请求, nodeId); return buildVoteResponse(false, currentTerm.get()); } // 再比较日志情况 if ( voteRequest.getLastLogIndex() logManager.getLastLogIndex() voteRequest.getLastLogTerm() logManager.getLastLogTerm() ) { log.info({} 投票请求ok赞成投票, nodeId); currentTerm.set(voteRequest.getTerm()); // 更新自己的任期 votedFor voteRequest.getCandidateId(); // 投票给该节点 return buildVoteResponse(true, voteRequest.getTerm()); } log.info({} 投票请求日志太旧拒绝该投票请求, nodeId); return buildVoteResponse(false, currentTerm.get()); }其他节点收到了拉票请求会返回response给candidate结点candidate结点是作为Client发送的拉票请求收到的响应肯定是在客户端的处理器handler接下来的逻辑就要在rpc包下面的RpcClientHandler去查看了Override protected void channelRead0(ChannelHandlerContext ctx, KvRaftProto.RaftKvMessage msg) { // 2.VOTE_RESPONSE 投票请求回来的响应【投票请求是结点作为客户端发出的应该在客户端的handler处理响应】 if (msg.getType() KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE) { log.info(receive vote response.........); KvRaftProto.VoteResponse voteResponse msg.getVoteResponse(); raftNode.tackleVoteResponse(voteResponse); // 又回到了RaftNode } } // RaftNode.java // 投票结果处理,【投票请求是结点作为客户端发出的要在客户端的handler处理响应】 public synchronized void tackleVoteResponse(KvRaftProto.VoteResponse voteResponse) { long term voteResponse.getTerm(); // 2. 发现更高任期立即降级并更新 // 1. 任期检查对方比我大我立即认输 if (term currentTerm.get()) { stepDown(term); return; } // 2. 状态检查如果我已经不是 Candidate 了比如已经超时重选或收到心跳忽略 if (state ! NodeState.CANDIDATE) return; // 3. 任期匹配检查确保这是对“当前这一轮”选举的回复 // 如果收到的响应任期比当前小说明是之前过期的选举回复直接丢弃 if (term currentTerm.get()) { return; } // 4. 从全局管理器获取当前选举的投票状态 VoteState voteState GlobalVoteManager.getVoteState(term); if (voteState null) { log.error(未找到任期 {} 的投票记录状态, term); return; } // 5. 如果对方投了赞成票 if (voteResponse.getVoteGranted()) { // 增加票数这里 AtomicInteger 在 VoteState 内部保证了线程安全 // 但由于本方法加了 synchronized其实双重保险 int currentVotes voteState.addVote(); int majority voteState.getMajority(); log.info(赞成票当前票数: {}/{}, currentVotes, nodesConfig.getNodeList().size()); // 6. 检查是否达到多数派 if (currentVotes majority) { log.info(节点 {} 获得过半选票 ({})准备晋升为 Leader, nodeId, currentVotes); becomeLeader(); } } else { log.info(拒绝了我的投票请求); } } private synchronized void becomeLeader() { if (state ! NodeState.CANDIDATE) return; if (state NodeState.LEADER) return; this.state NodeState.LEADER; log.info(Node {} 赢得选举即将成为 Leader, Term: {}, nodeId, currentTerm.get()); // 1. 清理上一任期的残留状态 this.votedFor null; // 2. 立即发送第一波心跳宣示主权 (防止其他节点又超时) sendHeartbeats(); // 3. 启动定时心跳任务 (比如每 2 秒一次) if (heartbeatTask ! null) heartbeatTask.cancel(true); heartbeatTask scheduler.scheduleAtFixedRate(this::sendHeartbeats, 0, 1000, TimeUnit.MILLISECONDS); log.info( 节点 {} 正式成为 Term {} 的 Leader , nodeId, currentTerm.get()); }3.启动测试选主把配置好的三个节点启动一下看看结果