RabbitMQ

摘要:基于 AMQP 协议实现的消息中间件。默认端口是15672


目录

[TOC]

RabbitMQ

RabbitMQ 是最受欢迎的开源消息中间件之一,在全球范围内被广泛应用。

  • 轻量级且易于部署的,能支持多种消息协议(模式)。可以部署在分布式系统中,以满足大规模、高可用的要求。
  • RabbitMQ 使用的还是消息队列这种消息模型,不过引入了一个 exchange 的概念。
  • RabbitMQ 是基于 AMQP 协议实现的消息中间件。

AMQP

AMQPAdvanced Message Queuing Protocol 高级消息队列协议):用于在应用程序之间传递消息的网络协议,通常用于消息队列系统。定义了消息的格式、传递方法和消息队列的行为,允许不同应用程序之间进行异步通信

  • 在RabbitMQ中,AMQP是主要的通信协议,用于生产者将消息发送到队列,消费者从队列中接收消息,以及在消息代理(如RabbitMQ)中进行消息路由和处理。
  • 提供统一消息服务应用层标准(二进制应用层协议),面向消息的中间件设计,兼容 JMS。有跨平台、跨语言特性。

端口

  • AMQP 默认端口是5672
端口   作用
4369   epmd(代表 Erlang端口映射守护进程),erlang发现口
5671   AMQP 默认端口。非加密的,默认情况下消息在网络上传输时不会被加密。
5672   AMQPS(AMQP over SSL/TLS)协议默认端口。提供加密的通信。加密连接需要配置证书等安全设置。
15671   管理监听端口
15672   管理界面ui使用的端口
25672   ( Erlang distribution) server间内部通信口

Spring AMQP 依赖和配置

Spring AMQP 实现:

  • 首先需要在pom.xml中添加 Spring AMQP 的相关依赖;
1
2
3
4
5
<!--Spring AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 然后修改application.yml,添加RabbitMQ的相关配置;
1
2
3
4
5
6
7
8
9
spring:
  rabbitmq:
    host: localhost	 	# rabbitmq的连接地址
    port: 5672  		# rabbitmq的连接端口号
    virtual-host: /mall  #rabbitmq的虚拟host
    username: mall
    password: mall
    publisher-confirms: true #消息发送到交换器确认,如果对异步消息需要回调必须设置为true
    publisher-returns: true  #消息发送到队列确认

消息(队列)模型

RabbitMQ 内部结构

以5种消息模式中的路由模式为例。

标志 中文名 英文名 描述
P 生产者 Producer 消息的发送者,可以将消息发送到交换机
X 交换机 Exchange 接收生产者发送的消息,并根据路由键发送给指定队列
Q 队列 Queue 存储从交换机发来的消息
C 消费者 Consumer 消息的接收者,从队列中获取消息并进行消费
type 交换机类型 type 不同类型的交换机转发消息方式不同
3. fanout 发布/订阅模式 fanout 广播消息给所有绑定交换机的队列
4. direct 路由模式 direct 根据路由键发送消息
5. topic 通配符模式 topic 根据路由键的匹配规则发送消息

如何实现路由?

  1. 消费者(指定 BindingKey)将交换器和队列绑定。
  2. 生产者先指定一个RoutingKey 路由键,然后将消息发送到交换机。
  3. 交换机(根据 exchange type 按照策略)将消息发送到(相同 BindingKey)对应的队列中去。

Broker

Broker(消息中间件的服务节点):可简单地看作一个 RabbitMQ 服务实例(服务器)。

  • 包括 ExchangeQueue

Queue

Queue(消息队列):用来保存消息直到发送给消费者。是消息的容器。

  1. RabbitMQ 中消息只能存储在 队列 中,这和 Kafka 这种消息中间件相反。
    • Kafka 将消息存储在 topic 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识
  2. 一个消息可投入一个或多个队列。
  3. 多个消费者可订阅同一个队列,队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理。而不是每个消费者都收到所有的消息并处理,避免消息被重复消费。
  4. RabbitMQ 不支持队列层面的广播消费

消息队列的重要属性:

  • 持久性:broker重启前都有效。
  • 自动删除:在所有消费者停止使用之后自动删除。
  • 惰性:没有主动声明队列,调用会导致异常。
  • 排他性:一旦启用,声明它的消费者才能使用。

Exchange

Exchange(交换器):用来接收生产者发送的消息、(根据配置的策略)将这些消息路由到对应的(一个或多个)队列。如果路由不到,会返回给生产者、或直接丢弃 。

  • 可将交换器理解成一个由绑定构成的路由表,绑定是多对多的关系。

4 种消息模式

这4种消息模式是构建基于RabbitMQ的消息应用的基础。

这些消息模式基于 Java API 的实现,也可以通过 Spring AMQP 来实现。

Exchange 有 4 种 Exchange Types,对应着不同的路由转发策略

  1. fanout (扇出)发布/订阅模式:该交换器收到的信息会被发送到所有与该交换器绑定的队列中。不需做任何判断,所以速度最快。常用来广播消息。
  2. direct 直连路由模式(默认):把消息路由到那些 BindingkeyRoutingKey 完全匹配的 Queue 中。
    1. 常用在处理有优先级的任务,根据任务的优先级把消息发送到对应队列,这样可指派更多的资源去处理高优先级的队列。
    2. 例如,如果发送消息的路由键(RoutingKey)为 log 时,两个消息队列都会收到消息,如果路由键为 debug ,exchange 只会把消息发送到消息队列1中。
  3. topic 主题通配符模式:在 direct 基础之上(做了扩展),引入了模糊匹配机制。使用通配符。
  4. headers:不推荐,不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。性能很差,也不实用,基本上不用。

简单模式

简单模式:是最简单的消息模式。包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

模式示意图:

Spring AMQP 实现:

  • 添加简单模式相关Java配置,创建一个名为simple.hello的队列、一个生产者和一个消费者;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class SimpleRabbitConfig {

	@Bean
	public Queue hello() {
		return new Queue("simple.hello");
	}

	@Bean
	public SimpleSender simpleSender(){
		return new SimpleSender();
	}

	@Bean
	public SimpleReceiver simpleReceiver(){
		return new SimpleReceiver();
	}

}
  • 生产者通过send方法向队列simple.hello中发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SimpleSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

	@Autowired
	private RabbitTemplate template;

	private static final String queueName = "simple.hello";

	public void send() {
		String message = "Hello World!";
		this.template.convertAndSend(queueName, message);
		LOGGER.info(" [x] Sent '{}'", message);
	}

}
  • 消费者从队列simple.hello中获取消息;
1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

    @RabbitHandler
    public void receive(String in) {
        LOGGER.info(" [x] Received '{}'", in);
    }

}
  • 在controller中添加测试接口,调用该接口开始发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private SimpleSender simpleSender;

    @ApiOperation("简单模式")
    @RequestMapping(value = "/simple", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult simpleTest() {
        for(int i=0;i<10;i++){
            simpleSender.send();
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • 运行后结果如下,可以发现生产者往队列中发送消息,消费者从队列中获取消息并消费。

工作队列模式

工作模式:指向多个互相竞争的消费者发送消息的模式。包含一个生产者、多个消费者和一个队列。

  • 两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

Spring AMQP 实现:

  • 添加工作模式相关Java配置,创建一个名为work.hello的队列、一个生产者和两个消费者;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class WorkRabbitConfig {

    // 消息队列
    @Bean
    public Queue workQueue() {
        return new Queue("work.hello");
    }

    // 例化两个 WorkReceiever 作为消费者
    @Bean
    public WorkReceiver workReceiver1() {
        return new WorkReceiver(1);
    }
    @Bean
    public WorkReceiver workReceiver2() {
        return new WorkReceiver(2);
    }

    // 生产者
    @Bean
    public WorkSender workSender() {
        return new WorkSender();
    }
}
  • 生产者通过send方法向队列work.hello中发送消息,消息中包含一定数量的.号;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WorkSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName = "work.hello";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3+1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('.');
        }
        builder.append(index+1);
        String message = builder.toString();
        
        template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }
}
  • 两个消费者从队列work.hello中获取消息,名称分别为instance 1instance 2,消息中包含.号越多,耗时越长;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 消息队列
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String in) {
        
        StopWatch watch = new StopWatch();
        
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
        
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }
}
  • 在controller中添加测试接口,调用该接口开始发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private WorkSender workSender;

    @ApiOperation("工作模式")
    @RequestMapping(value = "/work", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult workTest() {
        for(int i=0;i<10;i++){
            workSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • 运行后结果如下,可以发现生产者往队列中发送包含不同数量.号的消息,instance 1instance 2消费者互相竞争,分别消费了一部分消息。

fanout 发布/订阅模式

发布/订阅模式:指同时向多个消费者发送消息的模式(类似广播的形式)。包含一个生产者、多个消费者、多个队列和一个交换机

  • 两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费相同消息

Spring AMQP实现:

  • 添加发布/订阅模式相关Java配置,创建一个名为exchange.fanout的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class FanoutRabbitConfig {

    // 交换器
    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("exchange.fanout");
    }

    // 匿名队列
    @Bean
    public Queue fanoutQueue1() {
        return new AnonymousQueue();
    }
    @Bean
    public Queue fanoutQueue2() {
        return new AnonymousQueue();
    }

    // 绑定、订阅
    @Bean
    public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanout);
    }
    @Bean
    public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanout);
    }

    // 内部有两个消费者
    @Bean
    public FanoutReceiver fanoutReceiver() {
        return new FanoutReceiver();
    }

    @Bean
    public FanoutSender fanoutSender() {
        return new FanoutSender();
    }

}
  • 生产者通过send方法向交换机exchange.fanout中发送消息,消息中包含一定数量的.号;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FanoutSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
    @Autowired
    private RabbitTemplate template;

    private static final String exchangeName = "exchange.fanout";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3 + 1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('.');
        }
        builder.append(index + 1);
        String message = builder.toString();
        
        template.convertAndSend(exchangeName, "", message);
        LOGGER.info(" [x] Sent '{}'", message);
    }
}
  • 消费者从绑定的匿名队列中获取消息,消息中包含.号越多,耗时越长。由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class FanoutReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

    // 消费者监听队列
    @RabbitListener(queues = "#{fanoutQueue1.name}")
    public void receive1(String in) {
        receive(in, 1);
    }
    @RabbitListener(queues = "#{fanoutQueue2.name}")
    public void receive2(String in) {
        receive(in, 2);
    }

    private void receive(String in, int receiver) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }
}
  • 在controller中添加测试接口,调用该接口开始发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private FanoutSender fanoutSender;

    @ApiOperation("发布/订阅模式")
    @RequestMapping(value = "/fanout", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult fanoutTest() {
        for(int i=0;i<10;i++){
            fanoutSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • 运行后结果如下,可以发现生产者往队列中发送包含不同数量.号的消息,instance 1instance 2同时获取并消费了相同消息

MQTT 协议实现即时通讯

有时候项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时需要通知前端支付成功。

RabbitMQ可以很方便的实现即时通讯功能,如果没有特殊的业务需求,甚至可以不写后端代码。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的轻量级通讯协议,该协议构建于TCP/IP协议上。

  • 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称 Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:
    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

direct 直连路由模式

路由模式:根据路由键选择性给多个消费者发送消息。

  • 多个消费者、多个队列,每个队列通过routing key全文匹配。
  • 消费者绑定唯一队列,队列绑定到交换机上时需要指定路由key,仅消费指定路由key的消息。
  • 生产者发送消息到交换机并且指定路由键,交换机通过路由键转发到对应多个队列,队列绑定的消费者接收并消费消息。

应用场景:

  • 在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,
  • 只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。

路由模式(Routing)

Spring AMQP实现:

  • 添加路由模式相关Java配置,创建一个名为exchange.direct的交换机、一个生产者、两个消费者和两个匿名队列,
  • 队列通过路由键都绑定到交换机,队列1的路由键为orangeblack队列2的路由键为greenblack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
public class DirectRabbitConfig {

    @Bean
    public DirectExchange direct() {
        return new DirectExchange("exchange.direct");
    }

    @Bean
    public Queue directQueue1() {
        return new AnonymousQueue();
    }
    @Bean
    public Queue directQueue2() {
        return new AnonymousQueue();
    }

    // 绑定键1
    @Bean
    public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("orange");
    }
    @Bean
    public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("black");
    }

    // 绑定键2
    @Bean
    public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("green");
    }
    @Bean
    public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("black");
    }

    @Bean
    public DirectReceiver receiver() {
        return new DirectReceiver();
    }

    @Bean
    public DirectSender directSender() {
        return new DirectSender();
    }
}
  • 生产者通过send方法向交换机exchange.direct中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DirectSender {

	@Autowired
	private RabbitTemplate template;
	private static final String exchangeName = "exchange.direct";
	private final String[] keys = {"orange", "black", "green"};

	private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

	public void send(int index) {
		StringBuilder builder = new StringBuilder("Hello to ");
		int limitIndex = index % 3;
		String key = keys[limitIndex];
		builder.append(key).append(' ');
		builder.append(index+1);
		String message = builder.toString();
        
		template.convertAndSend(exchangeName, key, message);
		LOGGER.info(" [x] Sent '{}'", message);
	}
}
  • 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DirectReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitListener(queues = "#{directQueue1.name}")
    public void receive1(String in){
        receive(in, 1);
    }
    @RabbitListener(queues = "#{directQueue2.name}")
    public void receive2(String in){
        receive(in, 2);
    }

    private void receive(String in, int receiver){
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in){
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }
}
  • 在controller中添加测试接口,调用该接口开始发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private DirectSender directSender;

    @ApiOperation("路由模式")
    @RequestMapping(value = "/direct", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult directTest() {
        for(int i=0;i<10;i++){
            directSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • 运行后结果如下,可以发现生产者往队列中发送包含不同路由键的消息,instance 1获取到了orangeblack消息,instance 2获取到了greenblack消息。

topic 主题通配符模式

通配符模式:根据路由键匹配规则选择性给多个消费者发送消息。包含一个生产者、两个消费者、两个队列和一个交换机。

  • 两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,

  • 生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。

应用场景:

iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

特殊匹配符号:

  • *:只能匹配一个单词;
  • #:可以匹配零个或多个单词。

Spring AMQP实现:

  • 添加通配符模式相关Java配置,创建一个名为exchange.topic的交换机、一个生产者、两个消费者和两个匿名队列,匹配*.orange.**.*.rabbit发送到队列1,匹配lazy.#发送到队列2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class TopicRabbitConfig {

    @Bean
    public TopicExchange topic() {
        return new TopicExchange("exchange.topic");
    }

    @Bean
    public Queue topicQueue1() {
        return new AnonymousQueue();
    }
    @Bean
    public Queue topicQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
    }
    @Bean
    public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
    }

    @Bean
    public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
        return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
    }
    @Bean
    public TopicReceiver topicReceiver() {
        return new TopicReceiver();
    }

    @Bean
    public TopicSender topicSender() {
        return new TopicSender();
    }

}
  • 生产者通过send方法向交换机exchange.topic中发送消息,消息中包含不同的路由键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TopicSender {

	@Autowired
	private RabbitTemplate template;
	private static final String exchangeName = "exchange.topic";
	private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);

	private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

	public void send(int index) {
		StringBuilder builder = new StringBuilder("Hello to ");
		int limitIndex = index%keys.length;
		String key = keys[limitIndex];
		builder.append(key).append(' ');
		builder.append(index+1);
		String message = builder.toString();
        
		template.convertAndSend(exchangeName, key, message);
		LOGGER.info(" [x] Sent '{}'",message);
		System.out.println(" [x] Sent '" + message + "'");
	}

}
  • 消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TopicReceiver {

	private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);

	@RabbitListener(queues = "#{topicQueue1.name}")
	public void receive1(String in){
		receive(in, 1);
	}
	@RabbitListener(queues = "#{topicQueue2.name}")
	public void receive2(String in){
		receive(in, 2);
	}

	public void receive(String in, int receiver){
		StopWatch watch = new StopWatch();
		watch.start();
		LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        
		doWork(in);
		watch.stop();
		LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
	}

	private void doWork(String in){
		for (char ch : in.toCharArray()) {
			if (ch == '.') {
				ThreadUtil.sleep(1000);
			}
		}
	}
}
  • 在controller中添加测试接口,调用该接口开始发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private TopicSender topicSender;

    @ApiOperation("通配符模式")
    @RequestMapping(value = "/topic", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult topicTest() {
        for(int i=0;i<10;i++){
            topicSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
  • 运行后结果如下,可以发现生产者往队列中发送包含不同路由键的消息,instance 1instance 2分别获取到了匹配的消息。

header 首部

忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则。

根据应用程序消息的特定属性进行匹配。

RPC

远程过程调用(RPC)

远程过程调用:在远程计算机上运行功能并等待结果。

应用场景:

需要等待接口返回数据,如订单支付。

Publisher Confirms

发布者确认:与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布

在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景:

对于消息可靠性要求较高,比如钱包扣款。

延迟消息

学习本文需要对RabbitMQ有所了解,还不了解的朋友可以看下:《花了3天总结的RabbitMQ实用技巧,有点东西!》

RabbitMQ实现延迟消息的方式有两种,实现原理:

  • 死信队列:如果消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性可以实现延迟消息。具体参考《mall整合RabbitMQ实现延迟消息》

  • 延迟插件:通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。

由于死信队列方式需要创建两个交换机(死信队列交换机 + 处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。

死信队列

整合RabbitMQ实现延迟消息,以发送延迟消息取消超时订单为例。

业务场景说明

用于解决用户下单以后,订单超时如何取消订单的问题。

  1. 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  2. 生成订单,获取订单的id;
  3. 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  4. 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  5. 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

添加消息队列的枚举配置类

用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.macro.mall.tiny.dto;

/**
 * 消息队列枚举配置
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

添加RabbitMQ的配置类

用于配置交换机、队列、队列与交换机的绑定关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.macro.mall.tiny.config;

/**
 * 消息队列配置
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 订单消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)	//
                .build();
    }

    /**
     * 订单延迟队列队列所绑定的交换机
     */
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单实际消费队列
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /**
     * 订单延迟队列(死信队列)
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())  //到期后转发的交换机
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())  //到期后转发的路由键
                .build();
    }

    /**
     * 将订单队列绑定到交换机
     */
    @Bean
    Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /**
     * 将订单延迟队列绑定到交换机
     */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }

}
管理页面

在RabbitMQ管理页面可以看到以下交换机和队列

交换机及队列说明
  • mall.order.direct取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。
  • mall.order.direct.ttl订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel取消订单消息消费队列)。

添加延迟消息的发送者

用于向订单延迟消息队列(mall.order.cancel.ttl)里发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.macro.mall.tiny.component;

/**
 * 取消订单消息的发出者
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}

添加取消订单消息的接收者

用于从取消订单的消息队列(mall.order.cancel)里接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.macro.mall.tiny.component;

/**
 * 取消订单消息的处理者
 */
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}

添加OmsPortalOrderService接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.macro.mall.tiny.service;

/**
 * 前台订单管理Service
 */
public interface OmsPortalOrderService {

    /**
     * 根据提交信息生成订单
     */
    @Transactional
    CommonResult generateOrder(OrderParam orderParam);

    /**
     * 取消单个超时订单
     */
    @Transactional
    void cancelOrder(Long orderId);
}

添加OmsPortalOrderService的实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.macro.mall.tiny.service.impl;

/**
 * 前台订单管理Service
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 执行一系类下单操作,具体参考mall项目
        LOGGER.info("process generateOrder");
        //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下单成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 执行一系类取消订单操作,具体参考mall项目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //获取订单超时时间,假设为60分钟
        long delayTimes = 30 * 1000;
        //发送延迟消息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }
}

添加 OmsPortalOrderController 定义接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.macro.mall.tiny.controller;

/**
 * 订单管理Controller
 */
@Controller
@Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
    @Autowired
    private OmsPortalOrderService portalOrderService;

    @ApiOperation("根据购物车信息生成订单")
    @RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
    @ResponseBody
    public Object generateOrder(@RequestBody OrderParam orderParam) {
        return portalOrderService.generateOrder(orderParam);
    }
}

进行接口测试

调用下单接口

  • 注意:已经将延迟消息时间设置为30秒

项目源码地址

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-08

使用插件实现延迟消息

插件安装

首先需要下载并安装RabbitMQ的延迟插件。

  • 去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/community-plugins.html

  • 直接搜索rabbitmq_delayed_message_exchange即可找到要下载的插件,下载和RabbitMQ配套的版本,不要弄错;

  • 将插件文件复制到RabbitMQ安装目录的plugins目录下;

  • 进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件;

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 启用插件成功后就可以看到如下信息,之后重新启动RabbitMQ服务即可。

实现延迟消息

接下来需要在SpringBoot中实现延迟消息功能,这次依然沿用商品下单的场景。比如说有个用户下单了,60分钟不支付订单,订单就会被取消,这就是一个典型的延迟消息使用场景。

  • 创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * 消息队列配置
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 订单延迟插件消息队列所绑定的交换机
     */
    @Bean
    CustomExchange  orderPluginDirect() {
        //创建一个自定义交换机,可以发送延迟消息
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
    }

    /**
     * 订单延迟插件队列
     */
    @Bean
    public Queue orderPluginQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
    }

    /**
     * 将订单延迟插件队列绑定到交换机
     */
    @Bean
    public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
        return BindingBuilder
                .bind(orderPluginQueue)
                .to(orderPluginDirect)
                .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                .noargs();
    }

}
  • 创建一个取消订单消息的发出者,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 取消订单消息的发出者
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setHeader("x-delay",delayTimes);
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}
  • 创建一个取消订单消息的接收者,用于处理订单延迟插件队列中的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 取消订单消息的处理者
 */
@Component
@RabbitListener(queues = "mall.order.cancel.plugin")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}
  • 然后在订单业务实现类中添加如下逻辑,当下单成功之前,往消息队列中发送一个取消订单的延迟消息,这样如果订单没有被支付的话,就能取消订单了;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 前台订单管理Service
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 执行一系类下单操作,具体参考mall项目
        LOGGER.info("process generateOrder");
        //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下单成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 执行一系类取消订单操作,具体参考mall项目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //获取订单超时时间,假设为60分钟(测试用的30秒)
        long delayTimes = 30 * 1000;
        //发送延迟消息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • 启动项目后,在Swagger中调用下单接口;

  • 调用完成后查看控制台日志可以发现,从消息发送和消息接收处理正好相差了30s,即设置的延迟时间。

1
2
3
4
2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

Web管理界面

rabbitmq_management

0%