我们要知道延迟队列 作用是在交换机上与队列无关!
RabbitMQ延迟队列实现方式
- 设置的TTL队列 + 死信交换机。消息先进入TTL消息队列,不要去消费TTL消息队列,一旦TTL消息到期,进入死信队列,业务读取的是死信队列数据 间接实现延迟功能!参考:https://www.zanglikun.com/13220.html#ttl&dlx
- 使用rabbitmq-delayed-message-exchange插件实现延迟功能
本文介绍rabbitmq-delayed-message-exchange插件
需要安装插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
你可以尝试查找一下你RabbitMQ安装在哪里的
whereis rabbitmq
一般会提示在 /usr/lib/rabbitmq /etc/rabbitmq
实际是在/usr/lib/rabbitmq/lib/RabbitMQ的XX版本/plugins
将下载好的文件 放入到
/usr/lib/rabbitmq/lib/RabbitMQ的XX版本/plugins
在RabbitMQ运行的时候开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在RabbitMQ运行的时候关闭插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
查看RabbitMQ使用的插件 有没有刚才添加的 rabbitmq_delayed_message_exchange
rabbitmq-plugins list
[ ] 插件名 插件版本号
只有 中括号有e或E 才算开启了如:
[ e ] 插件名 插件版本号
使用
配置交换机与队列,并在交换机开启Delay模式
使用前:请删除相关交换机、队列。然后请求发消息的方法会自动创建交换机与队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration // 声明为 配置类
public class RabbitMQConfig {
/**
* 设置三部分
* 交换机 Exchange
* 队列 Queue
* 绑定关系 Binding
*/
// 声明 交换机名称
public static final String EXCHANGE_NAME = "boot_topic_exchange";
// 声明 队列名称
public static final String QUEUE_NAME = "boot_queue";
// 1 配置交换机
@Bean("bootExchange") // 设置BeanName 为 bootExchanghe
public Exchange bootExchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE_NAME, true, false);
exchange.setDelayed(true);
return exchange;
}
// 2 Queue 队列
@Bean("bootQueue") // 设置BeanName 为 bootQueue
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.build();
}
// 3 队列 与交换机的绑定
/**
* 我们首先要知道我们绑定的路由key
*
* @param queue 要知道那个队列
* @param exchange 要知道那个交换机
* @return
* @Qualifier注解是 自动装配
*/
@Bean("bootBind") //设置BeanName 为 bootBind
public Binding bootBindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("boot.#")
.noargs();
}
}
测试发送消息,指定延迟时间即可
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/testSendWithDelay")
public void Delay(){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setContentEncoding("UTF-8");
// 延迟6秒发送
message.getMessageProperties().setDelay(6000);
return message;
}
};
// 发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","测试Delay消息",messagePostProcessor);
}
正常消费消息
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author : zanglikun
* @date : 2021/2/2 14:28
* @Version: 1.0
* @Desc : 费劲,没啥好说的
*/
@Slf4j
@Component
public class RabbitMQListen01 {
/**
* 消费端限流思路,开启qps限制,同时开启ack手动确认机制,然乎消费成功,主动ack消息
*
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "boot_queue")
public void getMessage(Message message, Channel channel) throws IOException {
int qps = 1;
try {
/**
* 设置消费者限流,需要去配置文件开启手动ACK确认功能
* prefetchSize 是消息大小,默认是0,我看别人会说RabbitMQ没有实现限定消息大小,
* prefetchCount 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,
* global 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
*/
channel.basicQos(0, qps, false);
log.info("拿到消息的内容:{}", message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
// 消息处理完成,主动ACK确认
Thread.sleep(1500);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// 确认ACK执行后,必须当getMessage方法结束后,才能读取下一条
} catch (Exception e) {
// 如果有异常,主动拒绝此消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("Error..");
}
}
}
特殊说明: 上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤