基于Spring Boot的RabbitMQ延时队列技术实现

news/2025/2/22 19:23:46

文章目录

    • 基于Spring Boot的RabbitMQ延时队列技术实现
      • 延时队列应用场景
      • 基本概念
      • 实现延时队列
        • 添加依赖
        • 基础配置
        • 配置类设计
        • 消息生产者
        • 消息消费者
      • 两种TTL设置方式
    • 订单超时关闭实例
      • 订单服务
      • 消息处理
    • 延迟消息插件
      • 安装插件
      • 配置延迟交换机

基于Spring Boot的RabbitMQ延时队列技术实现

延时队列应用场景

  • 订单系统:30分钟未支付订单自动取消
1. 用户下单 → 发送延时消息(30分钟TTL)
2. 消息进入普通队列等待
3. 30分钟后消息过期 → 转入死信队列
4. 消费者检查订单状态:
   - 未支付 → 执行关闭操作
   - 已支付 → 忽略
  • 定时通知:预约提醒服务
场景:会议开始前15分钟提醒
1. 创建会议时发送延时消息
2. 消息存活直到会议开始前15分钟
3. 触发通知服务发送提醒
  • 异步重试:失败任务延时重试机制
消息处理失败时:
1. 首次失败 → 延时5秒重试
2. 二次失败 → 延时30秒重试
3. 三次失败 → 进入死信队列人工处理
  • 物流跟踪:预计送达时间状态更新

基本概念

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消息被拒绝且不重新入队:消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息过期:消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 队列达到最大长度:要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)。

在这里插入图片描述


RabbitMQ 本身没有直接的延时队列功能,通常是通过死信队列和**TTL(Time-To-Live)**来实现的。

[生产者] → [普通队列(设置TTL)] → (消息过期)→ [死信队列] → [消费者]

实现延时队列

添加依赖
<!-- amqp 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Mybatis-Plus包 -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.1</version>
</dependency>
<!-- MySQL驱动包 -->
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <scope>runtime</scope>
</dependency>
<!-- lombok包 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
基础配置
server:
  port: 8080
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=false
    username: root
    password: root
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
mybatis-plus:
  type-aliases-package: com.hz.pojo #类型别名所在的包
  #控制台打印sql语句
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    map-underscore-to-camel-case: false # 驼峰映射

死信队列三要素

  1. DLX (Dead-Letter-Exchange):死信转发交换机
  2. DLK (Dead-Letter-Routing-Key):死信路由键
  3. TTL (Time-To-Live):消息存活时间
配置类设计
@Configuration
public class RabbitMQConfig {

    // 业务交换机
    public static final String BUSINESS_EXCHANGE = "business.exchange";
    // 业务队列
    public static final String BUSINESS_QUEUE = "business.queue";
    // 死信交换机
    public static final String DLX_EXCHANGE = "dlx.exchange";
    // 死信队列
    public static final String DLX_QUEUE = "dlx.queue";
    
    // 业务队列路由键
    private static final String BUSINESS_ROUTING_KEY = "business.key";
    // 死信路由键
    private static final String DLX_ROUTING_KEY = "dlx.key";

    // 声明业务交换机(直连型)
    @Bean
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    // 声明死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    // 声明业务队列(绑定死信属性)
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机
        args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键
        args.put("x-message-ttl", 10000); // 队列统一TTL(单位:毫秒)
        return new Queue(BUSINESS_QUEUE, true, false, false, args);
    }

    // 声明死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE);
    }

    // 绑定业务队列到交换机
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
               .to(businessExchange())
               .with(BUSINESS_ROUTING_KEY);
    }

    // 绑定死信队列到交换机
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
               .to(dlxExchange())
               .with(DLX_ROUTING_KEY);
    }
}
消息生产者
@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延时消息
     * @param message 消息内容
     * @param ttl 单位:秒
     */
    public void sendDelayMessage(String message, int ttl) {
        // 消息属性设置
        MessagePostProcessor processor = message -> {
            message.getMessageProperties()
                   .setExpiration(String.valueOf(ttl * 1000)); // 消息级别TTL
            return message;
        };

        rabbitTemplate.convertAndSend(
            RabbitMQConfig.BUSINESS_EXCHANGE,
            RabbitMQConfig.BUSINESS_ROUTING_KEY,
            message,
            processor
        );
    }
}
消息消费者
@Component
public class MessageConsumer {
    @Autowired
    private BillService billService;
    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
    public void processDelayMessage(String billCode) {
        System.out.println("收到延时消息:" + billCode);
        billService.closeBill(billCode);
        System.out.println("超时未支付,订单已关闭--------------");
    }
}

两种TTL设置方式

队列级别TTL

args.put("x-message-ttl", 10000);

队列中所有消息统一过期时间;消息实际存活时间 = 队列TTL;性能更优(RabbitMQ统一处理)

消息级别TTL

message.getMessageProperties().setExpiration("5000");

每个消息可以设置不同TTL;实际存活时间取最小值(队列TTL vs 消息TTL);需要逐个处理消息,性能开销较大

订单超时关闭实例

在这里插入图片描述

订单服务

@Service
public class BillService {
    @Autowired
    private MessageProducer messageProducer;
    @Resource
    private BillMapper billMapper;
    public void createBill(Bill bill) {
        // 保存订单到数据库
        bill.setIsPayment(1); // 设置初始状态 1:未支付 2:已支付 3:已关闭
        billMapper.insert(bill);
        // 发送延时消息(10s)
        messageProducer.sendDelayMessage(bill.getBillCode(), 10);
    }
    
    public void closeBill(String billCode) {
        Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));
        if (bill != null && bill.getIsPayment() == 1) {
            bill.setIsPayment(3);
            billMapper.updateById(bill);
        }
    }
}

消息处理

@RestController
@RequestMapping("/bill")
public class BillController {
    @Autowired
    private BillService billService;
    @GetMapping("/send")
    public String send(){
        // 创建测试订单
        Bill bill = new Bill();
        bill.setBillCode("BILL2025_999");
        bill.setProductName("可口可乐");
        // 创建账单并发送延时消息
        billService.createBill(bill);

        return "订单创建成功,10秒后未支付将自动关闭。订单号:" + bill.getBillCode();
    }
}

流程:

  1. 访问 localhost:8080/bill/send 创建测试订单

    在这里插入图片描述

  2. 订单初始状态为待支付(1)

    在这里插入图片描述

  3. 消息经过10秒延迟进入死信队列

    在这里插入图片描述

  4. 消费者处理消息时检查订单状态

  5. 若仍为未支付状态,更新为已关闭(3)

    在这里插入图片描述

延迟消息插件

RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange,它允许你发送延迟消息而无需设置消息的 TTL 和死信队列。这个插件提供了一个新的交换机类型 x-delayed-message,可以用来实现消息的延迟投递。

安装插件

可以从 RabbitMQ 的插件页面下载,或者直接使用以下命令进行安装(假设 RabbitMQ 安装在默认位置):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装完成后,重启 RabbitMQ 服务。

配置延迟交换机

@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}

// 发送消息时设置延迟头
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {
    msg.getMessageProperties().setHeader("x-delay", 5000);
    return msg;
});

http://www.niftyadmin.cn/n/5862679.html

相关文章

《Keras 3 : 使用迁移学习进行关键点检测》:此文为AI自动翻译

《Keras 3 :使用迁移学习进行关键点检测》 作者:Sayak Paul,由 Muhammad Anas Raza 转换为 Keras 3 创建日期:2021/05/02 最后修改时间:2023/07/19 描述:使用数据增强和迁移学习训练关键点检测器。 (i) 此示例使用 Keras 3 在 Colab 中查看 GitHub 源 关键点检测包…

深入浅出:理解闭包在JavaScript中的应用

什么是闭包 闭包&#xff08;Closure&#xff09;是 JavaScript 中的一个重要概念&#xff0c;也是函数式编程中的核心特性之一。简单来说&#xff0c;闭包是指一个函数能够访问并记住其词法作用域&#xff08;Lexical Scope&#xff09;&#xff0c;即使这个函数在其词法作用…

论文解读 | AAAI'25 Cobra:多模态扩展的大型语言模型,以实现高效推理

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 点击 阅读原文 观看作者讲解回放&#xff01; 个人信息 作者&#xff1a;赵晗&#xff0c;浙江大学-西湖大学联合培养博士生 内容简介 近年来&#xff0c;在各个领域应用多模态大语言模型&#xff08;MLLMs&…

火语言RPA--Excel插入空行

【组件功能】&#xff1a;在Excel内指定的位置插入空行 配置预览 配置说明 在第n行之前 支持T或# 填写添加插入第n行之前行号。 插入n行 支持T或# 插入多少行。 Sheet页名称 支持T或# Excel表格工作簿名称。 示例 Excel插入空行 描述 在第3行之后插入3行。 配置 输…

数据库-SQLite

目录 1.SQLite介绍 2.SQLite特性 3.SQLite使用 3.1.环境准备 3.2.创建数据库文件 3.3.操作数据库 4.API接口 4.1.封装数据库句柄结构体 4.2.数据库句柄初始化 4.3.连接数据库 4.4.创建表 插入数据 修改数据 删除数据 4.5.执行查询语句 4.6.初始化存储查询结果句柄 …

JavaScript系列(79)--Web Worker 高级应用

Web Worker 高级应用 &#x1f504; Web Worker 为JavaScript提供了真正的多线程能力&#xff0c;让我们能够在后台线程中执行复杂的计算而不阻塞主线程。今天让我们深入探讨Web Worker的高级应用。 Web Worker 概述 &#x1f31f; &#x1f4a1; 小知识&#xff1a;Web Work…

在PyTorch中使用插值法来优化卷积神经网络(CNN)所需硬件资源

插值法其实就是在已知数据点之间估计未知点的值。通过已知的离散数据点,构造一个连续的曲线函数,预测数据点之间的空缺值是什么并且自动填补上去。 适用场景: 在卷积神经网络(CNN)中的应用场景中,经常遇到计算资源有限,比如显存不够或者处理速度慢,需要用插值来降低计…

设计模式之装饰器设计模式/包装设计模式

装饰器设计模式&#xff08;Decorator Pattern&#xff09; 也叫包装设计模式&#xff0c;属于结构型模式&#xff0c;它是作为现有的类的一个包装&#xff0c;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其结构 给对象增加功能&#xff0c;一般两种方式&#…