RocketMQ使用手册
什么是RocketMQ?
Rocket是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式的特点。它是一个使用java开发的分布式消息系统,由阿里巴巴团体开发,2016年贡献给Apache。
RocketMQ基本概念
主题(Topic)
Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。需保证集群内全局唯一。
消息队列(MessageQueue)
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
消息(Message)
消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
订阅关系(Subscription)
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
RocketMQ架构图

如图为RockerMQ官方的架构图,包含4个角色:
- Broker(消息队列服务器):主要负责消息的存储、投递和查询以及服务高可用保证。
- Producer(生产者):生产消息
- Consumer(消费者):消费消息
- NameServer(命名服务):作为注册中心,主要提供两个功能:Broker管理和路由信息管理。Broker向NameServer注册信息,生产者和消费者从NameServer获取Broker信息。
从图中可以看到RocketMQ架构具有以下特点:
- Broker采用了分布式和主从结构的部署。保证服务高可用、高性能,保证消息的可靠性。
- NameServer采用分布式架构部署,并且去中心化,每个Broker和所有NameServer保持长链接,并定期向NameServer发送心跳。
- Producer向Broker发送信息前,需向NameServer获取Brocker路由信息,通过轮询的方法向每个队列发送消息,以达到负载均衡。
- Consumer从NameServer获取Broker的路由信息后,向Broker发送pull请求来获取消息数据。
RocketMQ中的消息模型
在RocketMQ中是按照主题模型实现的,具体架构如上图,包含3个角色:
- Producer Group(生产者组):代表一类生产者,将多个生产者合在一起,他们一般生产相同主题的消息。
- Consumer Group(消费者组):代表一类消费者,将多个消费者合在一起,他们一般消费相同主题的消息
- Topic(主题):代表一个类型的消息。
从图中可以看到一下几点:
- 一个主题中可以有多个队列。
- 生产者组生产消息时可以指定主题中的某个队列发送消息。
- 一个队列只会被消费者组中的一个消费者消费。
- 每个消费者组在每个队列上维护一个消费位置(offset)。
为什么一个主题中需要维护多个队列?
提供多个队列给消费者组消费,可以提供并发能力。
消息(Message)
消息属性
- Topic主题:消息所属的业务主题
- Tags标签:消息的标签,用于消费者获取消息时过滤消息。
- keys关键词:rocketmq会对keys建立一个索引表,通过key可以直接检索到消息。
- DelayTimeLevel延时时间:设置消息发送后的延时可见时间,发送延迟消息时使用。
- MessageGroup消息组:发送顺序消息时,定义消息组。相同消息组的消息会被发送到同一队列中。消息组建议采用订单id、用户id作为消息组。
消息类型
消息类型的分类如下:
- 普通消息(Normal):消息本身无特殊语义,消息间无关联关系
- 顺序消息(FIFO):RocketMQ中通过消息分组MessageGroup标记一组特定消息的先后顺序。
- 定时/延时消息(Delay):指定延时时间后,消息才对消费者可见。定时消息传入一个时间戳。延时消息是将当前时间加上延时时间转换成时间戳传入。传入小于当前时间的定时消息时,消息会被立刻被投递。
- 事务消息(Transaction):RocketMQ支持分布式事务消息,支持应用数据库更新和消息调用事务的一致性
主题支持的消息类型在主题创建时定义,每个主题只支持一个消息类型。
分布式事务消息
RocketMQ使用事务消息加事务反查机制来解决事务问题。
发送half消息,在事务提交之前,对于消费者来说,这个消息是不可见的。在第二次提交后,事务消息才能被消费者看见。
事务反查机制是指,RocketMQ Server在收到half消息后的一段时间内,因网络异常或其他原因获取不到生产者的二次提交或回滚请求的话,会向生产者发送事务反查请求。生产者会查询事务状态并重新向MQ Server发送提交或回滚事务请求。
特性
- 事务异常检查间隔:默认值:60秒。
- 半事务消息第一次回查时间:默认值:取值等于事务异常检查间隔* 最大限制:不超过1小时。
- 半事务消息最大超时时长:默认值:4小时。不支持自定义修改。
消息流量控制
Apache RocketMQ 的消息流控触发条件如下:
- 存储压力大:消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
- 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
当系统触发消息流控时,生产者客户端会收到系统返回的限流错误和异常:
- reply-code:530
- reply-text:TOO_MANY_REQUESTS
生产者(Producer)
消息发送重试
客户端在发送消息到服务端时,若出现异常,客户端SDK支持消息发送重试以达到最终成功发送消息的效果。
需要注意的是,对于事务消息,只进行透明重试,网络超时或异常等场景不会进行重试。
消息发送重试次数 默认值:3次。
消息重试分为:同步重试和异步重试。
- 同步重试会一直阻塞客户端,直到成功或达到重试次数后抛出异常。
- 异步重试不会阻塞客户端,调用结果通过成功事件或是失败事件返回。
除服务端返回流控错误场景,其他条件触发重试后,均会立即进行重试。在流控错误场景下,生产者客户端按照重试策略进行延迟重试。
消费者(Consumer)
消费者分类
RocketMQ的客户端SDK提供了3个类型的消费者
- PushConsumer:一种高度封装的消费者类型
- SimpleConsumer:支持业务自定义流程的消费者类型
- PullConsumer
PushConsumer
一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。
特性
- PushConsumer消费并发度:默认值为20个线程。
- PushConsumer重试间隔时长:默认值:
- 非顺序性投递:间隔时间阶梯变化,具体取值,请参见PushConsumer消费重试策略。
- 顺序性投递:3000毫秒。
适用场景
PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:
- 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
- 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。
SimpleConsumer
一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
特性
- SimpleConsumer最大不可见时间为必填值,最小10秒,最大12小时。
适用场景
SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:
- 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
- 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
- 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。
PullConsumer
消息过滤功能
Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
消费模式
Consumer可以以两种模式启用:**广播模式(Broadcast)**和集群模式(Cluster)。
- 广播模式下,消息发送给同一个消费组中的所有消费者。
- 集群模式下,消息只会发送给一个消费者,该为默认模式。
顺序消费
RocketMQ支持两种顺序消费模式:普通顺序和严格顺序。
- 普通顺序:是指消费者通过同一队列收到的消息是有序的,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在重启情况下不会保证消息顺序性。
- 严格顺序:消费者收到的所有消息均是有顺序的。严格顺序即使在重启的情况下也能保证消息的顺序性。
消费者负载均衡
根据消费者类型的不同,消费者负载均衡策略分为以下两种模式:
- 消息粒度负载均衡
- 队列粒度负载均衡
消息粒度负载均衡
消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。
PushConsumer和SimpleConsumer默认使用消息粒度负载均衡策略,且仅能使用此策略。
队列粒度负载均衡
队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每一个消费者消费一个队列的消息。
PullConsumer默认使用队列粒度负债均衡策略。
消费进度管理
RocketMQ通过消费位点(Offset)管理消费进度。如果是启动了新的消费者分组,消费位点为队列的最大消费位点。如果是启动原有的消费者分组,会使用服务端保存的消费进度获得消费位点。
消息堆积
当生产者生产消息太快或消费者消费消息太慢,都会出现消息堆积的情况,最快速的解决办法就是增加消费者实例,同时增加主题的队列数量。
回溯消费
RocketMQ是使用消费位点去获取消息的,消息被消费后并没有被丢弃掉。假如consumer出现了异常,也可以通过调整消费位点去实现回溯消费。
消费重试
消费重试次数默认为16次。
PushConsumer消费重试策略

SimpleConsumer消费重试策略

重复消费
使用幂等操作防止重复消费。幂等操作的特点是同一个操作任意多次执行所产生的影响均与一次执行的影响相同。幂等操作一般需要结合业务场景来使用。可以使用redis的key/value或数据库插入法的唯一性防止重复消费。
RocketMQ的内部机制
刷盘机制
RocketMQ的刷盘是指将消息持久化到文件中,有两种刷盘机制:同步刷盘和异步刷盘
- 同步刷盘:是指Broker接收到生产者发送的消息后,同步将消息持久化到本地文件,文件成功保存到本地文件后才回复ACK确认消息到生产者。
- 异步刷盘:是指Broker成功接收到生产者发送的消息后,就回复ACK确认消息给生产者。Broker在后台开启一个线程,异步地将消息保存到本地文件。 刷盘机制主要影响消息的可靠性,同步刷盘可以保证消息不丢失,异步刷盘存在一定几率丢失消息。比如在异步机制下,异步刷盘线程运行时Broker突然死机,就会有可能丢失消息。
复制机制
复制机制是指Broker在主从模式下,主节点与从节点的数据同步机制,分为同步复制和异步复制。
- 同步复制:也叫同步双写。只有消息同步双写到主从节点上时才返回写入成功。
- 异步复制:消息写入主节点之后就直接返回写入成功。开启线程,异步复制消息到从节点。 复制机制主要影响服务的可用性。在Rocket中,生产者只能向主节点发送消息。若主节点挂了,生产者将不能再给主节点发送消息,消费者可以切换到从节点继续消费消息。待主节点重启后,才可以重新接收生产者生产的消息。
存储机制
RocketMQ以文件形式存储和使用队列消息,主要由3个角色完成。
- CommitLog:消息主体及元数据的存储主题。单个文件的大写为1G,消息顺序写入文件中,满了则存放在下一个文件中。
- ConsumeQueue:消息消费队列文件,记录了消息在CommitLog文件的位置,相当于消息的索引,目的是提高消息消费的西能你。
- IndexFile:提供一种通过key或时间区间来查询消息的方法。
RocketMQ采用的是混合型的存储结构,所有主题、队列公用同一个日志数据文件。这样做的原因是为了提高数据的写入效率,不分Topic有更大几率将消息数据成批地写入。
部署方式
RocketMQ5.0部署需要启动NameServer、Broker、Proxy组件。
在RocketMQ5.0部署方式分为两种:
- Local模式:Brocker和Proxy是同进程部署
- Cluster模式:Brocker和Proxy分开部署
各组件使用的端口
NameServer
- 9876:NameServer的主服务端口
Broker
- 10911:Broker的主服务端口,用于处理生产者和消费者的请求。配置文件中通过listenPort属性修改。
- 10909:broker的管理端口,用于监控和管理RocketMQ broker服务的状态,如心跳检查等。配置文件中通过fastListenPort属性修改。
- 10912:broker主从同步的端口,用于主broker监听从broker请求。配置文件中通过haListenPort属性修改。
Proxy
- 8081:主服务端口,gprc端口。修改配置文件的grpcServerPort属性可修改端口号。
关联关系
- 生产者从nameserver获取broker的地址及主题信息,生产者发送消息时指定topic,按照namerserver的路由策略自动向broker上的topic发送消息
- 消费者从nameserver获取broker的地址及主题信息,消息费设置消费者组,向broker拉取指定主题的消息。
Java使用RocketMQ
1. 引入依赖
在pom.xml文件中引入RocketMQ的ClientSDK依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>2. 向rocketmq服务器发送消息
发送普通消息
- 创建一个消息类型为normal的主题
sh-4.2$ ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t normal_test_topic
create topic to 172.21.0.5:10911 success.
TopicConfig [topicName=normal_test_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]- 通过客户端sdk编写生产者代码向mq发送消息。
public class NormalProducer {
public static void main(String[] args) throws ClientException, IOException {
//获取客户端服务提供者
final ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
//设置proxy服务器地址
String endpoint = "127.0.0.1:8081";
//配置客户端
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
//创建生产者
Producer producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.build();
//创建消息
Message message = clientServiceProvider.newMessageBuilder()
.setTopic("test_topic")
.setTag("test_tag")
.setKeys("test_key" + new Date().getTime())
.setBody(("hello world" + new Date().getTime()).getBytes())
.build();
//发送消息
producer.send(message);
producer.close();
}
}发送顺序消息
- 在RocketMQ中定义一个消息类型为顺序消息的主题。
sh-4.2$ ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t fifo_test_topic -a +message.type=FIFO
create topic to 172.21.0.5:10911 success.
TopicConfig [topicName=fifo_test_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=FIFO}]- 向主题发送消息。
public class FifoProducer {
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
String endPoint = "127.0.0.1:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endPoint)
.build();
Producer producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.build();
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < 5; j++) {
Message message = clientServiceProvider.newMessageBuilder()
.setTopic("fifo_test_topic")
.setTag("fifo_test_tag")
.setKeys("fifo_test_key" + j + System.currentTimeMillis())
//顺序消息必须设置消息组
.setMessageGroup("fifo_test_group")
.setBody(("hello world" + j + System.currentTimeMillis()).getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt);
}
}
producer.close();
}
}发送延迟消息
- 在RocketMQ中创建消息类型为延迟消息的主题
sh-4.2$ ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t delay_test_topic -a +message.type=DELAY
create topic to 172.21.0.5:10911 success.
TopicConfig [topicName=delay_test_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=DELAY}]- 向主题发送消息
public class DelayProducer {
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
String endPoint = "127.0.0.1:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endPoint)
.build();
Producer producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.build();
Message message = clientServiceProvider.newMessageBuilder()
.setTopic("delay_test_topic")
.setTag("delay_tag")
.setKeys("delay_key" + System.currentTimeMillis())
//延时消息必须设置延时时间
.setDeliveryTimestamp(System.currentTimeMillis() + 10000)
.setBody("hello world".getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt);
producer.close();
}
}发送事务消息
- 在RocketMQ中创建消息类型为事务消息的主题
sh-4.2$ ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t transaction_test_topic -a +message.type=TRANSACTION
create topic to 172.21.0.5:10911 success.
TopicConfig [topicName=transaction_test_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+message.type=TRANSACTION}]- 向主题发送消息
public class TransactionProducer {
public static boolean doLocalTransaction() {
return new Random(System.currentTimeMillis()).nextBoolean();
}
public static int checkTransaction(long id) {
if (System.currentTimeMillis() - id < 10000)
return 1;
else return 2;
}
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
String endPoint = "127.0.0.1:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endPoint)
.build();
Producer producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTransactionChecker(messageView -> {
if (checkTransaction(Long.valueOf(messageView.getProperties().get("id"))) == 1)
return TransactionResolution.UNKNOWN;
else return TransactionResolution.COMMIT;
})
.build();
//开启半事务消息事务
Transaction transaction = producer.beginTransaction();
Message message = clientServiceProvider.newMessageBuilder()
.setTopic("transaction_test_topic")
.setTag("delay_test_tag")
.setKeys("delay_test_key" + System.currentTimeMillis())
.setBody(("hello world" + System.currentTimeMillis()).getBytes())
.addProperty("id", String.valueOf(System.currentTimeMillis()))
.build();
//发送半事务消息
producer.send(message, transaction);
//执行本地事务
if (doLocalTransaction()) {
//提交事务消息
transaction.commit();
} else {
//回滚事务消息
transaction.rollback();
}
producer.close();
}
}3. 从服务器获取消息消费消息
- 在rocketmq服务中配置好消费者分组信息

- 通过客户端sdk编写消费者代码从mq中拉取消息。
PushConsumer消费者
public class NormalPushConsumer {
public static void main(String[] args) throws ClientException, InterruptedException, IOException {
//获取客户端服务提供者
final ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
//设置proxy服务器地址
String endPoint = "127.0.0.1:8081";
//配置客户端
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endPoint)
.build();
//设置消息标签
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = clientServiceProvider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup("consumer_test_group")
.setSubscriptionExpressions(Collections.singletonMap("test_topic", filterExpression))
.setMessageListener(messageView -> {
System.out.println("messageView = " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
pushConsumer.close();
}
}SimpleConsumer消费者
public class NormalSimpleConsumer {
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
String endPoint = "127.0.0.1:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endPoint)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = clientServiceProvider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup("consumer_test_group")
.setSubscriptionExpressions(Collections.singletonMap("test_topic", filterExpression))
.setAwaitDuration(Duration.ofSeconds(10))
.build();
List<MessageView> receive = simpleConsumer.receive(10, Duration.ofSeconds(10));
for (MessageView messageView :
receive) {
System.out.println("messageView = " + messageView);
simpleConsumer.ack(messageView);
}
simpleConsumer.close();
}
}Springboot集成Rocketmq
使用rocketmq-spring-boot-starter
1. 引入相关依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>2. 在applicaiton.properties文件中添加rocketmq的配置
#rocketmq配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=rocketmq_producer_group
#rocketmq.pull-consumer.group=
#rocketmq.pull-consumer.topic=rocketmq.name-server:配置nameserver地址,如有多个以分号间隔开rocketmq.producer.group:配置生产者组名称。使用生产者时必须设置rocketmq.pull-consumer.group:配置消费者组名称。使用时如没有配置rocketmq.producer.group,则改项必须设置。rocketmq.pull-consumer.topic:配置消费者消费的主题名称。使用时如没有配置rocketmq.producer.group,则改项必须设置。
3. 在Springboot主类中导入RocketMQ的自动配置类
@SpringBootApplication
//自动配置RocketMQ
@ImportAutoConfiguration(RocketMQAutoConfiguration.class)
public class SpringbootstudyApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootstudyApplication.class, args);
}
}4. 编写生产者代码
需要注意的是mq5.0以上版本的sdk要求消息与主题的类型一致,而在4.0的sdk中没有这个限制。
发送普通消息
@Override
public void sendNormalMessage() {
//同步发送普通消息
rocketMQTemplate.syncSend(
"test_topic:normal",
MessageBuilder.withPayload("normal message" + System.currentTimeMillis())
.setHeader(MessageConst.PROPERTY_KEYS, System.currentTimeMillis())
.build());
}发送顺序消息
@Override
public void sendOrderMessage() {
//同步发送顺序消息
rocketMQTemplate.syncSendOrderly("test_topic:order",
MessageBuilder.withPayload("orderly message" + System.currentTimeMillis())
.setHeader(MessageConst.PROPERTY_KEYS, System.currentTimeMillis())
.build(), "order");
}发送延时消息
@Override
public void sendDelayMessage() {
//同步法送延迟消息
rocketMQTemplate.syncSendDelayTimeSeconds("test_topic:delay",
MessageBuilder.withPayload("delay message" + System.currentTimeMillis())
.setHeader(MessageConst.PROPERTY_KEYS, System.currentTimeMillis())
.build(), 10);
}异步发送普通消息
@Override
public void asyncSendNormalMessage() {
//异步发送普通消息
rocketMQTemplate.asyncSend(
"test_topic:asyncNormal",
MessageBuilder
.withPayload("async normal" + System.currentTimeMillis())
.setHeader(MessageConst.PROPERTY_KEYS, System.currentTimeMillis())
.build(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//发送成功
System.out.println("发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败" + e);
}
}
);
}发送事务消息
生产者提交事务消息
使用rocketMQTemplate.sendMessageInTransaction方法发送事务消息,sendMessageInTransaction方法的第三个参数要求传入一个对象,这个对象会作为参数传入执行本地事务的方法中。
@Override
public void sendTransactionMessage() {
//发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"test_topic:transaction",
MessageBuilder
.withPayload("transaction" + System.currentTimeMillis())
.setHeader(MessageConst.PROPERTY_KEYS, System.currentTimeMillis())
.build(),
new Random(System.currentTimeMillis())
);
}定义事务监听器
实现本地事务监听器接口,实现本地事务执行方法和本地事务执行状态查询方法。
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener
public class RocketMQTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int result = ((Random) arg).nextInt(2);
System.out.println("消息:" + msg + ",执行本地事务的结果为:" + result);
switch (result) {
case 0:
return RocketMQLocalTransactionState.COMMIT;
default:
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
int result = new Random(msg.getHeaders().getId().node()).nextInt(3);
System.out.println("消息:" + msg + ",查询本地事务的结果为:" + result);
switch (result) {
case 0:
return RocketMQLocalTransactionState.COMMIT;
case 1:
return RocketMQLocalTransactionState.ROLLBACK;
default:
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}需要注意的是,在spring-boot2.0后,
sendMessageInTransaction方法取消了事务组参数,@RocketMQTransactionListener也取消了事务组参数,这意味着一个应用只有一个全局的事务监听器。这时sendMessageInTransaction方法的第三个参数就十分重要,因为它传入的对象会传入到方法RocketMQLocalTransactionListener.executeLocalTransaction(Message msg, Object arg)的第二个参数中,通过这个对象,可以执行响应的事务方法,相当于是一个策略模式。
5. 编写消费者代码
消费者实现RocketMQListener接口,并使用@Component和@RocketMQMessageListener注解类。@RocketMQMessageListener注解需要提供消费者组、消费主题参数,可选提供消费标签、消费模式、最大消费线程数等参数。
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "normal_msg_consumer_group", selectorExpression = "normal")
public class RocketMQNormalMsgConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到普通消息:" + message);
}
}