我们要知道延迟队列 作用是在交换机上与队列无关!

RabbitMQ延迟队列实现方式

  1. 设置的TTL队列 + 死信交换机。消息先进入TTL消息队列,不要去消费TTL消息队列,一旦TTL消息到期,进入死信队列,业务读取的是死信队列数据 间接实现延迟功能!参考:https://www.zanglikun.com/13220.html#ttl&dlx
  2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能

本文介绍rabbitmq-delayed-message-exchange插件

需要安装插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

你可以尝试查找一下你RabbitMQ安装在哪里的

whereis rabbitmq
一般会提示在 /usr/lib/rabbitmq /etc/rabbitmq
实际是在/usr/lib/rabbitmq/lib/RabbitMQ的XX版本/plugins

将下载好的文件 放入到

/usr/lib/rabbitmq/lib/RabbitMQ的XX版本/plugins

在RabbitMQ运行的时候开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在RabbitMQ运行的时候关闭插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

查看RabbitMQ使用的插件 有没有刚才添加的 rabbitmq_delayed_message_exchange

rabbitmq-plugins list

[   ] 插件名 插件版本号
只有 中括号有e或E 才算开启了如:
[ e ] 插件名 插件版本号

使用

配置交换机与队列,并在交换机开启Delay模式

使用前:请删除相关交换机、队列。然后请求发消息的方法会自动创建交换机与队列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration // 声明为 配置类
public class RabbitMQConfig {
    /**
     * 设置三部分
     * 交换机 Exchange
     * 队列 Queue
     * 绑定关系 Binding
     */
    // 声明 交换机名称
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    // 声明 队列名称
    public static final String QUEUE_NAME = "boot_queue";

    // 1 配置交换机
    @Bean("bootExchange") // 设置BeanName 为 bootExchanghe
    public Exchange bootExchange() {
        TopicExchange exchange = new TopicExchange(EXCHANGE_NAME, true, false);
        exchange.setDelayed(true);
        return exchange;
    }

    // 2 Queue 队列
    @Bean("bootQueue") // 设置BeanName 为 bootQueue
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .build();
    }

    // 3 队列 与交换机的绑定
    /**
     * 我们首先要知道我们绑定的路由key
     *
     * @param queue    要知道那个队列
     * @param exchange 要知道那个交换机
     * @return
     * @Qualifier注解是 自动装配
     */
    @Bean("bootBind") //设置BeanName 为 bootBind
    public Binding bootBindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("boot.#")
                .noargs();
    }
}

测试发送消息,指定延迟时间即可

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/testSendWithDelay")
    public void Delay(){
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setContentEncoding("UTF-8");
                // 延迟6秒发送
                message.getMessageProperties().setDelay(6000);
                return message;
            }
        };
        // 发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","测试Delay消息",messagePostProcessor);
    }

正常消费消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author : zanglikun
 * @date : 2021/2/2 14:28
 * @Version: 1.0
 * @Desc : 费劲,没啥好说的
 */
@Slf4j
@Component
public class RabbitMQListen01 {

    /**
     * 消费端限流思路,开启qps限制,同时开启ack手动确认机制,然乎消费成功,主动ack消息
     *
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "boot_queue")
    public void getMessage(Message message, Channel channel) throws IOException {
        int qps = 1;
        try {
            /**
             * 设置消费者限流,需要去配置文件开启手动ACK确认功能
             * prefetchSize 是消息大小,默认是0,我看别人会说RabbitMQ没有实现限定消息大小,
             * prefetchCount 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,
             * global 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
             */
            channel.basicQos(0, qps, false);
            log.info("拿到消息的内容:{}", message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
            // 消息处理完成,主动ACK确认
            Thread.sleep(1500);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            // 确认ACK执行后,必须当getMessage方法结束后,才能读取下一条
        } catch (Exception e) {
            // 如果有异常,主动拒绝此消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("Error..");
        }
    }
}
特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤