什么是RabbitMQ的TTL队列?

TTL 与Redis的TTL一样是 time to live

如果消息存放在队列中超时,该消息就会被删除

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期
  • 如果两者都进行了设置,以时间短的为准。

RabbitMQ队列设置TTL

1、代码创建TTL队列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration // 声明为 配置类
public class RabbitMQConfig {
    /**
     * 设置三部分
     * 交换机 Exchange
     * 队列 Queue
     * 绑定关系 Binding
     */
    // 声明 交换机名称
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    // 声明 队列名称
    public static final String QUEUE_NAME = "boot_queue";

    //1 配置交换机
    @Bean("bootExchange") // 设置BeanName 为 bootExchanghe
    public Exchange bootExchange() {
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    //2 Queue 队列 通过参数创建ttl为60秒的队列
    @Bean("bootQueue") // 设置BeanName 为 bootQueue
    public Queue bootQueue() {
        Map<String, Object> props = new HashMap<>();
        // 设置队列中的消息过期时间为60秒
        props.put("x-message-ttl", 60000);
        return new Queue(QUEUE_NAME, true, false, false, props);
    }

    //3 队列 与交换机的绑定
    /**
     * 我们首先要知道我们绑定的路由key
     *
     * @param queue    要知道那个队列
     * @param exchange 要知道那个交换机
     * @return
     * @Qualifier注解是 自动装配
     */
    @Bean("bootBind") //设置BeanName 为 bootBind
    public Binding bootBindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("boot.#")
                .noargs();
    }
}

2、手动创建TTL队列

3、查看是否有TTL队列

4、消息设置TTL

    @RequestMapping("/testSendWithTTL")
    public void ttl(){
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 在此进行TTL设置 单位毫秒。队列有过期时间、消息也有过期时间,同时存在,以最少的ttl为准
                message.getMessageProperties().setExpiration("70000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        // 发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","这是整合:testSend方法发送的消息",messagePostProcessor);
    }

测试方法

不要任何消费者,我们主动发消息,看一下控制台此队列有几条消息,然后等待TTL过期,再次检查还剩余几条消息

注意事项:

如果消息队列中,最前面的消息没有到过期时间,即便后面的队列进入了过期时间,必须等待前面的消息被消费或者过期后,才能被过期。这样可以实现整体消息的高性能。开发中我们不直接选用TTL队列。

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤