系列导读本篇将深入讲解 RocketMQ 架构设计与生产环境最佳实践。文章目录一、RocketMQ 架构1.1 核心组件1.2 组件说明1.3 与 Kafka 对比二、集群部署2.1 集群架构2.2 Docker 部署2.3 Broker 配置三、生产者最佳实践3.1 配置3.2 发送消息四、消费者最佳实践4.1 消费配置4.2 消费消息五、事务消息5.1 事务消息流程5.2 实现代码总结一、RocketMQ 架构1.1 核心组件┌─────────────────────────────────────────────────────────────┐ │ RocketMQ 架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer ──► NameServer ──► Broker ──► Consumer │ │ │ │ │ │ │ ▼ │ │ │ CommitLog │ │ │ ConsumeQueue │ │ │ │ │ └─── 路由注册 ───► Topic 路由信息 │ │ │ └─────────────────────────────────────────────────────────────┘1.2 组件说明组件说明NameServer路由注册中心轻量级Broker消息服务器存储消息Producer消息生产者Consumer消息消费者1.3 与 Kafka 对比特性RocketMQKafka吞吐量十万级百万级延迟ms级ms级事务消息支持不支持延迟消息支持不支持消息过滤支持不支持二、集群部署2.1 集群架构┌─────────────────────────────────────────────────────────────┐ │ RocketMQ 集群 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └────────────────┼────────────────┘ │ │ │ │ │ ┌────────────────┼────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Broker-Master│ │ Broker-Master│ │ Broker-Master│ │ │ │ (主) │ │ (主) │ │ (主) │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │Broker-Slave │ │Broker-Slave │ │Broker-Slave │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘2.2 Docker 部署version:3services:namesrv:image:apache/rocketmq:5.1.0container_name:rmqnamesrvports:-9876:9876command:sh mqnamesrvbroker:image:apache/rocketmq:5.1.0container_name:rmqbrokerports:-10911:10911-10909:10909environment:NAMESRV_ADDR:namesrv:9876command:sh mqbroker-c /opt/rocketmq/conf/broker.confvolumes:-./broker.conf:/opt/rocketmq/conf/broker.conf-./logs:/opt/rocketmq/logs-./store:/opt/rocketmq/store2.3 Broker 配置# broker.conf brokerClusterName DefaultCluster brokerName broker-a brokerId 0 namesrvAddr 192.168.1.100:9876;192.168.1.101:9876 defaultTopicQueueNums 4 autoCreateTopicEnable true autoCreateSubscriptionGroup true listenPort 10911 deleteWhen 04 fileReservedTime 48 mapedFileSizeCommitLog 1073741824 mapedFileSizeConsumeQueue 300000 diskMaxUsedSpaceRatio 88 storePathRootDir /opt/rocketmq/store maxMessageSize 65536三、生产者最佳实践3.1 配置ConfigurationpublicclassRocketMQConfig{BeanpublicDefaultMQProducerproducer(){DefaultMQProducerproducernewDefaultMQProducer(order-producer-group);producer.setNamesrvAddr(192.168.1.100:9876;192.168.1.101:9876);producer.setRetryTimesWhenSendFailed(3);producer.setSendMsgTimeout(3000);producer.setMaxMessageSize(4*1024*1024);// 4MBreturnproducer;}}3.2 发送消息ServicepublicclassOrderProducer{AutowiredprivateDefaultMQProducerproducer;// 同步发送publicSendResultsendSync(Orderorder)throwsException{MessagemessagenewMessage(order-topic,order-created,JSON.toJSONString(order).getBytes());returnproducer.send(message);}// 异步发送publicvoidsendAsync(Orderorder){MessagemessagenewMessage(order-topic,order-created,JSON.toJSONString(order).getBytes());producer.send(message,newSendCallback(){OverridepublicvoidonSuccess(SendResultresult){log.info(发送成功: {},result);}OverridepublicvoidonException(Throwablee){log.error(发送失败,e);}});}// 延迟消息publicvoidsendDelay(Orderorder,intdelayLevel){MessagemessagenewMessage(order-topic,JSON.toJSONString(order).getBytes());message.setDelayTimeLevel(delayLevel);// 1-18producer.send(message);}}四、消费者最佳实践4.1 消费配置ConfigurationpublicclassRocketMQConsumerConfig{BeanpublicDefaultMQPushConsumerconsumer(){DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(order-consumer-group);consumer.setNamesrvAddr(192.168.1.100:9876;192.168.1.101:9876);consumer.setConsumeThreadMin(10);consumer.setConsumeThreadMax(20);consumer.setConsumeMessageBatchMaxSize(10);returnconsumer;}}4.2 消费消息ComponentpublicclassOrderConsumer{PostConstructpublicvoidstart()throwsMQClientException{DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(order-consumer-group);consumer.setNamesrvAddr(192.168.1.100:9876);consumer.subscribe(order-topic,order-created);consumer.registerMessageListener(newMessageListenerConcurrently(){OverridepublicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){for(MessageExtmsg:msgs){try{OrderorderJSON.parseObject(newString(msg.getBody()),Order.class);processOrder(order);}catch(Exceptione){log.error(消费失败,e);returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}五、事务消息5.1 事务消息流程1. 发送半消息Half Message 2. 执行本地事务 3. 提交或回滚消息 4. 事务回查机制5.2 实现代码ServicepublicclassOrderTransactionProducer{AutowiredprivateTransactionMQProducerproducer;publicvoidsendTransactionMessage(Orderorder){MessagemessagenewMessage(order-topic,JSON.toJSONString(order).getBytes());producer.sendMessageInTransaction(message,order);}}// 事务监听器ComponentpublicclassOrderTransactionListenerimplementsTransactionListener{AutowiredprivateOrderServiceorderService;OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{Orderorder(Order)arg;orderService.createOrder(order);returnLocalTransactionState.COMMIT_MESSAGE;}catch(Exceptione){returnLocalTransactionState.ROLLBACK_MESSAGE;}}OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){// 事务回查OrderorderJSON.parseObject(newString(msg.getBody()),Order.class);if(orderService.exists(order.getId())){returnLocalTransactionState.COMMIT_MESSAGE;}returnLocalTransactionState.ROLLBACK_MESSAGE;}}总结✅RocketMQ 架构NameServer、Broker✅集群部署主从架构、配置优化✅生产者实践同步、异步、延迟消息✅消费者实践并发消费、批量消费✅事务消息半消息、事务回查下篇预告消息队列选型对比指南作者刘~浪地球系列消息队列三更新时间2026-04-13