RabbitMQ

流行的消息中间件

使用

docker安装

1
docker run -d --name rabbitmq3.8 -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management

简介

组成

rabbitMQ基本结构

rabbitMQ组成说明

消息发布流程

rabbitMQ消息发布流程

工作模式

Work queues工作队列

rabbitMQWork-queues模式

一个生产者将消息发给队列,多个消费者共同监听一个队列的消息

消息不能被重复消费

rabbitMQ采用轮询的方式将消息平均发送给消费者

Publish/Subscribe发布订阅

rabbitMQPublish-Subscribe模式

一个生产者把消息发送给交换机,与交换机绑定的有多个队列,每个消费者监听自己的队列

生产者将消息发给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

如果消息发给没有绑定队列的交换机上消息将丢失

Routing路由

rabbitMQRouting模式

交换机类型设置为:direct

一个交换机绑定多个队列,每个队列设置routingkey,并且一个队列可以设置多个routingKey

每个消费者监听自己的队列

生产者将消息发给交换机,发送消息时需要指定routingkey的值,交换机来判断改routingkey的值和哪个队列的routingkey相等,如果相等则将消息转发给该队列

Topics通配符

rabbitMQTopics模式

交换机类型设置为:topic

一个交换机绑定多个队列,交换机根据routingkey的值来匹配队列,匹配时采用通配符方式,匹配成功的将消息转发到指定的队列

#号统配符:匹配一个或多个词,多个词中间用.分割

1
2
3
4
匹配:info.#
info.1
info.1.2
info.1.2.3

\*号统配符:匹配一个词

1
2
3
4
匹配:info.*
info.1
info.2
info.3

Header转发器

header取消了routingkey,使用header中的key/value匹配队列

RPC远程调用

rabbitMQRPC模式

使用MQ实现RPC的异步调用,基于Direct交换机实现

客户端既是生产者也是消费者,像RPC请求队列发送RPC调用消息,同时监听RPC响应队列

服务端监听RPC请求队列的消息,收到消息后执行服务端方法,得到方法返回的结果,服务端将RPC的结果发送到RPC响应队列

SpringBoot使用

pom

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest # 使用虚拟主机的用户名
password: guest # 使用虚拟主机的密码
virtual-host: / # 指定虚拟主机

发送消息类型转json

1
2
3
4
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

定义配置类

DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)

name->交换机名称,durable->是否持久化,autoDelete->是否自动删除,arguments->指定一些参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class DirectRabbitConfig {

@Bean
DirectExchange directExchange(){
// 注册一个 Direct 类型的交换机 默认持久化、非自动删除
return new DirectExchange("directExchange");
}

@Bean
Queue infoQueue(){
// 注册队列
return new Queue("infoMsgQueue");
}

@Bean
Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
// 将队列以 info-msg 为绑定键绑定到交换机
return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
}

// Topic绑定
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}

@Bean
Queue errorQueue(){
return new Queue("errorMsgQueue");
}

@Bean
Binding infoToExchangeBinging2(Queue errorQueue, TopicExchange topicExchange) {
// 将队列以 error-# 统配绑定键绑定到交换机
return BindingBuilder.bind(errorQueue).to(topicExchange).with("error-#");
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class DirectSender {

@Resource
private AmqpTemplate rabbitTemplate;

/**
* 交换机,队列,消息内容,消息唯一id
*/
public void sendInfo() {
String content = "发送到directExchange交换机,指定routing key为info-msg";
// 将消息以info-msg绑定键key发送到directExchange交换机
this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content, new CorrelationData(UUID.randomUUID().toString()));
}

public void sendError() {
String content = "I am Error msg!";
this.rabbitTemplate.convertAndSend("topicExchange", "error.msg", content);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class DirectReceiver1 {

// 监听
@RabbitListener(queues = "infoMsgQueue")
public void process(Message message,String data) {
System.out.println("########### 消费者消费:" + data);
}
}

// 消费者2
@Component
// 监听队列errorMsgQueue
@RabbitListener(queues = "errorMsgQueue")
public class DirectReceiver3 {

@RabbitHandler
public void process(Message message,String data) {
System.out.println("消费error消息:" + data);
}

// 可以接收同一队列不同的消息内容
@RabbitHandler
public void process(Message message,UserInfo data) {
System.out.println("消费error消息:" + data);
}
}

开启发送接收确认模式

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
rabbitmq:
host: 192.168.0.18
port: 5672
username: guest
password: guest
virtual-host: /
# 开启发送端消息抵达队里的确认
publisher-returns: true
# 只要抵达队列,以异步方式优先回调returnConfirm
template:
mandatory: true
# 发送者confirm确认机制
# correlated是消息发送成功后回调confirmCallback方法
# none是不回调(默认)
# simple有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
publisher-confirm-type: correlated
listener:
simple:
# 设置消费端手动 ack
# auto自动回复(默认)
# manual 手动处理
acknowledge-mode: manual
retry:
# 是否支持重试
enabled: true

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Autowired
private RabbitTemplate rabbitTemplate;

// 项目加载完成后执行
@PostConstruct
public void initRabbitTemplate() {
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 消息发送回调
* @param correlationData 当前消息唯一id
* @param ack 消息是否成功收到 只要消息抵达服务器,就是true
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 逻辑
}
});

// 设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 消息发送给的交换机
* @param routingKey 消息发送的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 逻辑
}
});
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RabbitListener(queues = RabbitMqConfig.ORDER_TIMEOUT_QUEUE, concurrency = "4-10")
public void consumeTimeOutQueue(@Payload String orderId, Channel channel, Message message) throws IOException {
// channel内按顺序自增的,消息唯一id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("消息ID=" + orderId);
if (orderId.equals("1")) {
int i = 1 / 0;
}
// 签收消息,第一个参数是当前消息id,第二个参数是 是否是批量签收模式
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息 第二个操作代表是否批量操作,第三个参数代表 是否重新进入队列
// channel.basicNack(deliveryTag,false,false);
// 拒绝消息,第二个参数代表 是否重新进入队列,和basicNack一样
channel.basicReject(deliveryTag, false);
} else {
System.out.println("消息即将再次返回队列处理...");
channel.basicAck(deliveryTag, false);
// 重新发送消息到队尾,TODO 多次失败后应该手动处理消息,不应该重复发送
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes("《报错后发送的新消息》"));
}
}
}

延迟队列

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 订单队列
*/
public static final String ORDER_QUEUE = "woniu.order.queue";

/**
* 超时订单队列
*/
public static final String ORDER_TIMEOUT_QUEUE = "woniu.order.timeout.queue";

/**
* 订单exchange
*/
public static final String ORDER_EXCHANGE = "woniu.order.exchage";

/**
* 订单队列
*/
@Bean
public Queue orderQueue() {
// 设置超时转发策略 超时后消息会通过x-dead-letter-exchange 转发到x-dead-letter-routing-key绑定的队列中
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange", ORDER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", ORDER_TIMEOUT_QUEUE);
return new Queue(ORDER_QUEUE, true, false, false, arguments);
}

/**
* 超时订单队列
*
* @return
*/
@Bean
public Queue orderTimeoutQueue() {
return new Queue(ORDER_TIMEOUT_QUEUE, true, false, false);
}

/**
* 订单队列绑定exchange
*
* @return
*/
@Bean
public Binding orderQueueBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_QUEUE);
}


/**
* 超时订单队列绑定exchange
*
* @return
*/
@Bean
public Binding orderTimeoutQueueBinding() {
return BindingBuilder.bind(orderTimeoutQueue()).to(orderExchange()).with(ORDER_TIMEOUT_QUEUE);
}

/**
* 订单exchange
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE, true, false, null);
}

使用

1
2
3
4
5
6
7
8
9
10
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrder(String orderId) {
rabbitTemplate.convertAndSend(RabbitMqConfig.ORDER_EXCHANGE, RabbitMqConfig.ORDER_QUEUE, orderId, message -> {
// 设置超时时间为 2s
message.getMessageProperties().setExpiration("2000");
return message;
},new CorrelationData(UUID.randomUUID().toString()));
}

相关文章

数据库连接池

Junit和Spring

Tomcat

Servlet

Request,Response和ServletContext

Cookie和Session

JSP和EL和Jstl

Filter和Listener

Mybatis

SpringCache