RabbitMQ使用手册
什么是RabbitMQ
RabbitMQ是采用Erlang语言实现AMQP的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。
RabbitMQ架构图

RabbitMQ的核心概念
Broker(消息中间件的服务节点)
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
Producer(生产者)和Consumer(消费者)
- 生产者:消息发送者
- 消费者:消费接收者
消息的组成
消息由两部分组成:
- 消息头(label):消息头由可选属性组成,包括routing-key(路由键)、priority(优先级)、delivery-mode(是否需要持久性存储)等。
- 消息体(payload)
Connection(连接)
producer/consumer和brocker之间通过TCP连接通信
Channel(信道)
Channel是Connection内部建立的逻辑连接,用于减少TCP连接的开销。
Exchange(交换机)
在RabbitMQ中,消息先发送到Exchange交换机中,再根据转发策略,发送到消息队列上。交换器只负责转发消息,不具备存储消息的能力。
Queue(消息队列)
消息队列用来保存消息直到发送给消费者,一个消息可投入到一个或多个队列中,多个消费者也可以订阅一个队列。
RabbitMQ队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。因此RabbitMQ不支持队列层面的消息广播。
Rounting Key(路由键)
生产者将消息发送到交换机时会携带一个key,来指定路由规则
Binding Key(绑定键)
在绑定Exchange和Queue时,会指定一个BindngKey。生产者发送消息到Exchange时会携带一个Rounting Key,根据Exchange Tpye将Rounting Key与Binding Key进行匹配。若匹配成功,则将消息分发到队列或交换器。
Vhost(虚拟主机)
每一个RabbitMQ服务器可以开设多个Vhost,每一个Vhost相当于是一个小型的RabbitMQ服务器,拥有独立的权限、Exchange、队列和绑定。
Exchange Type(交换器类型)
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP规范里还提到两种 Exchange Type,分别为 system 与 自定义)
fanout类型
fanout类型Exchange会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作。
用法
fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
direct类型
direct 类型Exchange会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
用法
direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
topic
topic类型的Exchange在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
- Routing Key、BindingKey是由"."号分隔的字符串,如"com.rabbitmq.clinet"
- BindingKey中存在两个字符"*"和"#",“*”表示一个单词,"#"表示0个或多个单词
headers
head类型的Exchange使用发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对。该类型性能较差,一般不使用。
六种基本消息模式
基本消息模型
生产者声明队列,将队列名称为rounting key的消息发送到默认交换器。消费者监听该队列。
生产者
public class Producer {
private final static String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获得rabbitmq连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取信道
Channel channel = connection.createChannel();
/**
* 声明一个队列
* queue:队列名称
* durable:是否持久化
* exclusive:是否独占。独占表示队列只允许在该连接中访问,如果connection关闭,则队列消失,一般用于创建临时连接。
* autoDelete:队列不使用时自动删除
* arguments:拓展参数,比如失效时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "basic queue hello world";
//发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("send " + message + " to default exchange");
channel.close();
connection.close();
}
}消费者
public class Consumer {
private final static String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取rabbitmq连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取信道
Channel channel = connection.createChannel();
//继承默认消费者,创建一个自定义消息处理方法的消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("consumer 接受到消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}work消息模型
生产者声明队列,将队列名称为rounting key的消息发送到默认交换器。多个消费者监听队列,消费消息后手动确认。
生产者
public class Producer {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String message = "work_queue message" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
channel.close();
connection.close();
}
}消费者
public class Consumer {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1 消费消息:" + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 消费消息:" + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
}
}发布/订阅消息模型(交换机类型:fanout)
生产者声明fanout类型交换器,将消息发送到该交换器中。多个消息者,每个消费者声明自己的消息队列,将消息队列绑定到交换器上,并监听消息队列。
生产者
public class Producer {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
for (int i = 0; i < 50; i++) {
String message = "fanout_exchange message " + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
}
channel.close();
connection.close();
}
}消费者
public class Consumer {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("fanout_queue_1", false, false, false, null);
channel.queueBind("fanout_queue_1", EXCHANGE_NAME, "");
channel.basicConsume("fanout_queue_1", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 消费消息:" + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
channel.queueDeclare("fanout_queue_2", false, false, false, null);
channel.queueBind("fanout_queue_2", EXCHANGE_NAME, "");
channel.basicConsume("fanout_queue_2", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 消费消息:" + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
}
}Routing路由模型(交换机类型:direct)
生产者声明direct类型交换机,声明消息队列并配置绑定,发送消息时带上routing key用于与绑定键匹配。多个消费者,监听消息队列,处理消息。
生产者
public class Producer {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String[] QUEUE_NAMES = {"direct_1_queue", "direct_2_queue"};
private static final String[] ROUTING_KEYS = {"ONE", "TWO", "THREE", "FOUR"};
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (String queue_name : QUEUE_NAMES
) {
channel.queueDeclare(queue_name, false, false, false, null);
}
channel.queueBind(QUEUE_NAMES[0], EXCHANGE_NAME, ROUTING_KEYS[0]);
channel.queueBind(QUEUE_NAMES[0], EXCHANGE_NAME, ROUTING_KEYS[1]);
channel.queueBind(QUEUE_NAMES[1], EXCHANGE_NAME, ROUTING_KEYS[2]);
channel.queueBind(QUEUE_NAMES[1], EXCHANGE_NAME, ROUTING_KEYS[3]);
Random random = new Random();
for (int i = 0; i < ROUTING_KEYS.length; i++) {
int r = random.nextInt(4);
String message = ROUTING_KEYS[r] + "的message";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEYS[r], null, message.getBytes());
}
channel.close();
connection.close();
}
}消费者
public class Consumer {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String[] QUEUE_NAMES = {"direct_1_queue", "direct_2_queue"};
private static final String[] ROUNTING_KEYS = {"ONE", "TWO", "THREE", "FOUR"};
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 2; i++) {
final int j = i;
channel.basicConsume(QUEUE_NAMES[i], false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者" + j + "消费队列" + QUEUE_NAMES[j] + "的消息: " + message);
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
}Topics通配符模型(叫环节类型:topic)
生产者声明direct类型交换机,声明消息队列并配置绑定,发送消息时带上routing key用于与通配符绑定键匹配。多个消费者,监听消息队列,处理消息。
生产者
public class Producer {
private final static String EXCHANGE_NAME = "topic_exchange";
private final static String QUEUE_NAME = "topic_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.rabbit.*");
channel.basicPublish(EXCHANGE_NAME, "a.rabbit.b", null, "a.rabbit.b message".getBytes());
channel.basicPublish(EXCHANGE_NAME, "a.rabbit.b.c", null, "a.rabbit.b.c message".getBytes());
channel.close();
connection.close();
}
}消费者
public class Consumer {
private final static String EXCHANGE_NAME = "topic_exchange";
private final static String QUEUE_NAME = "topic_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者消费消息: " + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
}
}RPC

- 客户端启动时创建一个匿名独享的回调队列并监听。
- 服务端创建一个请求队列并监听。
- 在RPC请求中,客户端向服务端监听的请求队列发送消息,消息中设置了两个属性,一是设置回调队列reply_to属性,二是设置唯一的correlation_id属性。
- 服务端监听请求队列。当有请求出现时,服务端获取消息并开始执行工作。工作执行完毕后,服务端将带有执行结果的消息发送到reply_to属性指定的队列,消息上也带上correlation_id属性。
- 客户端等待回调队列的消息,当有消息时,他会检查correlation_id属性,并进行匹配,将结果返回到应用。
服务端
public class Server {
private final static String SERVER_QUEUE = "server_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(SERVER_QUEUE, false, false, false, null);
channel.basicConsume(SERVER_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("处理客户端请求:" + new String(body));
String correlationId = properties.getCorrelationId();
String replyTo = properties.getReplyTo();
AMQP.BasicProperties proper = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
getChannel().basicPublish("", replyTo, proper, "请求已处理".getBytes());
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
}
}客户端
public class Client {
private final static String SERVER_QUEUE = "server_queue";
private final static String REPLY_TO_QUEUE = "reply_to_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String correlationId = "client";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(REPLY_TO_QUEUE).build();
channel.queueDeclare(REPLY_TO_QUEUE, false, true, true, null);
channel.basicConsume(REPLY_TO_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (correlationId.equals(properties.getCorrelationId())) {
System.out.println("接受服务端返回的处理结果:" + new String(body));
}
getChannel().basicAck(envelope.getDeliveryTag(), false);
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
});
String message = "hello world";
channel.basicPublish("", SERVER_QUEUE, properties, message.getBytes());
}
}消息确认
消息确认分为两种模式:
- 自动确认:当消费者获取消息后,自动确认。这种模式下若消费者消费时出现异常,消息会丢失掉。
- 手动确认:当消息者获取消息后,消息从ready状态转为unacked状态。手工确认后,状态转为已消费。若消费者在没有进行确认前链接被关闭,则状态回滚到ready。
//手工确认
getChannel().basicAck(envelope.getDeliveryTag(), false);消息重回队列
//不确认消息,实现消息重回队列
getChannel().basicNack(envelope.getDeliveryTag(), false, true);PrefetchCount
PrefetchCount用于设置消费者同时能处理多少条消息,仅在手动ack下才生效。
channel.basicQos(1);生存时间TTL
TTL全称Time to Live,中文为生存时间。在RabbitMQ中,可以指定TTL指定消息或队列的生存时间。如果队列或消息的生存时间超过TTL,则它们会立即失效。
指定一个消息的TTL
使用expiration()设置消息的失效时间,单位为毫秒。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();
channel.basicPublish("", SERVER_QUEUE, properties, message.getBytes());声明一个队列,并设置发送到该队列的消息的TTL
使用queueArguements.put("x-message-ttl", 1000);设置队列消息的失效时间,单位为毫秒。
Map<String, Object> queueArguements = new HashMap<String, Object>();
queueArguements.put("x-message-ttl", 1000);
channel.queueDeclare("ttl_queueu", false, false, true, queueArguements);声明一个队列,并设置队列的TTL
使用queueArguements.put("x-expires", 1000);设置队列的失效时间,单位为毫秒。
Map<String, Object> queueArguements = new HashMap<String, Object>();
queueArguements.put("x-expires", 1000);
channel.queueDeclare("ttl_queueu", false, false, false, queueArguements);死信
什么是死信
消息被拒绝访问时就会变称死信,具体包括以下场景:
- Rabbitmq服务器访问nack信号时
- 消息的TTL过期时
- 消息队列达到最大长度
- 消息不能入队
什么是死信队列
死信队列就是存储死信的消息队列,在死信队列中,有且只有死信,不会存在其余类型的消息。
死信队列的使用
使用x-dead-letter-exchange给队列设置死信转发机制。
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("dead_letter_exchange", BuiltinExchangeType.DIRECT);
//定义死信交换器、死信队列、绑定
channel.queueDeclare("dead_letter_queue", false, false, false, null);
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
Map<String, Object> queueArguements = new HashMap<String, Object>();
//定义死信被转法到哪个交换机
queueArguements.put("x-dead-letter-exchange", "dead_letter_exchange");
//定义死信转发到交换机时携带routing key
queueArguements.put("x-dead-letter-routing-key", "dead_letter_routing_key");
queueArguements.put("x-message-ttl", 1000);
channel.queueDeclare("ttl_queue", false, false, false, queueArguements);
channel.basicPublish("", "ttl_queue", null, "hello world".getBytes());
//关闭连接
channel.close();
connection.close();
}上面的代码,只是设置了普通队列产生死信时的转发机制,我们还需要定义上面指定的死信转换机,死信队列以及绑定。
Java使用RabbitMQ
- 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>- 获取rabbitmq连接和信道,定义队列绑定到默认Exchange,生产者向Exchange发送消息,消费者监听队列并处理消息,最后关闭信道和连接。
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//定义队列
/**
* queue 队列名称
* durable 是否持久化
* exclusive 是否独占。若设置独占,仅当前连接可以访问这个队列。当连接关闭时,关闭队列。
* autodelete 是否自动删除,当队列不在使用时,自动删除。
* arguments 其他队列参数,如队列消失时间
*/
channel.queueDeclare("demo_queue", true, false, false, null);
//生产者向消息队列发送消息
/**
* exchange 交换机名称。为空,则为默认交换机
* routingKey 路由键
* props 其他参数列表,消息头
* body 消息体
*/
channel.basicPublish("", "demo_queue", null, "hello world".getBytes());
System.out.println("发送消息: " + "hello world");
//消费者监听队列
/**
* queue 队列名称
* autoAck 是否自动确认
* callback 传入处理消息的消费者
*/
channel.basicConsume("demo_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受信息: " + new String(body));
//手工确认消息
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
});
//关闭连接
channel.close();
connection.close();
}SpringBoot集成RabbitMQ
- 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>- 配置rabbitmq服务器属性
##rabbitmq配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456定义配置类在配置类中定义队列、交换机、绑定
定义交换机
交换机有以下常用属性:
- type:交换机类型
- name:交换机名称
- durable:是否持久化
- autoDelete:是否自动删除
@Bean("direct_exchange")
public AMQP.Exchange directExchange() {
return ExchangeBuilder.directExchange("direct_exchange").durable(true).build();
}定义队列
队列有以下常用属性:
- name:队列名称
- durable:是否持久化
- exclusive:是否独占
- autoDelete:是否自动删除
- arguments:其他队列参数
@Bean("direct_queue")
public Queue directQueue() {
return new Queue("direct_queue", false, false, false, null);
}定义绑定
使用BindingBuilder建造一个Bingding
@Bean
public Binding bindingDirectToDirect(@Qualifier("direct_queue") Queue queue, @Qualifier("direct_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("direct").noargs();
}消费者监听消息队列
使用@RabbitListener注解方法监听消息队列,具体作用有三个:
- 使用
queues属性传入队列名以监听消息队列。 - 使用
queuesToDeclare传入@Queue定义消息队列,同时监听消息队列。 - 使用
@QueueBinding定义消息队列@Queue,定义交换器@Exchange,定义绑定键key,监听消息队列。
@Component
public class RabbitReceiveHandler {
@RabbitListener(queues = "direct_queue")
public void rec_direct_queue1(String msg) {
System.out.println("rec_direct_queue1" + msg);
}
@RabbitListener(queuesToDeclare = @Queue(name = "direct_queue", durable = "true", exclusive = "false", autoDelete = "false"))
public void rec_direct_queue2(String msg) {
System.out.println("rec_direct_queue2" + msg);
}
@RabbitListener(bindings =
@QueueBinding(value =
@Queue(name = "direct_queue", durable = "true", exclusive = "false", autoDelete = "false"),
exchange = @Exchange(name = "direct_exchange", type = ExchangeTypes.DIRECT),
key = "direct"
))
public void rec_direct_queue3(String msg) {
System.out.println("rec_direct_queue3" + msg);
}
}生产者发送消息
从容器中获得RabbitTemplate对象,使用RabbitTemplate向交换机发送信息
@SpringBootTest
public class RabbitmqTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendText() {
for (int i = 0; i < 999999; i++) {
rabbitTemplate.convertAndSend("direct_exchange", "direct", "hello world");
}
}
}消息监听容器
什么是消息监听容器
可以提供对消息队列、消费、消费者、消息签收模式进行控制,以及消息的全方位监听的一系列工具的统称。
基本使用
在Spring-AMQP中,消息容器共有5种
- MessageListenerContainer接口
- AbstractMessageListenerContainer
- DirectMessageListenerContainer
- DirectReplyToMessageListenerContainer
- SimpleMessageListenerContainer
实际工作中使用最多的一种消息容器是SimpleMessageListenerContainer
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.addQueueNames("direct_queue");
container.setMaxConcurrentConsumers(10);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);使用
@RabbitListener实质上为我们创建了一个SimpleMessageListenerContainer。
消息适配器
消息适配器类MessageListenerAdapter实现了MessageListener接口,可以对消息消费接口与具体的消费方法进行适配。适配器有两种处理模式:
- 在MessageListenerAdapter类初始化时传入一个实现了ChannelAwareMessageListener接口或MessageListener接口的代理类,处理消息时调用代理类的onMessage方法。
- 给MessageListenerAdapter对象传入一个代理类,指定代理类的代理方法,处理消息时使用反射执行代理方法。
//创建消息监听适配器
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(new Object() {
void m(String message) {
System.out.println("消费消息:" + message);
}
});
messageListenerAdapter.setDefaultListenerMethod("m");
container.setMessageListener(messageListenerAdapter);消息转换器
MessageConverter接口提供了将Message转换成对象和将对象转换成Message的功能。
//创建消息转换器
MessageConverter messageConverter = new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return "转换后的消息" + new String(message.getBody());
}
};
messageListenerAdapter.setMessageConverter(messageConverter);RabbitTemplate提供了默认消息转换器SimpleMessageConverter类,支持转换字节数组对象、字符串对象、可序列化对象为字节数组。
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}