如何限制消费者消费
消费时指定channel.basicQos(0,1,false);
@RabbitListener(queues = "boot_queue")
public void getMessage(Message message, Channel channel) throws IOException {
long qps = 10;
// prefetchSize(消息大小,0是不限制)、prefechSize(消息数量)、global(true 表示此通道的消费者都适用此策略,false代表仅当前消费者)
channel.basicQos(0,qps,false);
System.out.println(message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
如何保证消息投递
消息投递到过程:生产者 -> 交换机 -> 队列
因为上述消息传递过程是2个环节,rabbitmq对上述2个环节提供了2个消息投递确认对机制
- confirm 确认模式:消息发送到交换器Exchange后触发回调。每次投递都触发
- return 退回模式 :当消息未投递到queue时的反馈。投递成功回调不触发
如果开启上述2种模式报错,可能是 java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate 解决方案如下:
// 自己添加多例Bean
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setMessageConverter(new SerializerMessageConverter());
return template;
}
// 请在Controlle或Service类上添加注解 @Scope(prototype)
@Scope(prototype)
confirm 确认模式
消息发送时,每次都会有一个confirm回调函数,我们在confirm回调函数处理即可
# 开启发送端confirm,高AMQP版本就默认开启,且不允许添加此配置
spring.rabbitmq.publisher-confirms=true
@RequestMapping("/testSendConfirm")
public void confirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack){
log.info("发送成功");
}else {
log.error("发送失败!需要执行重新发送消息");
}
}
});
// 发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","这是整合:testSend方法发送的消息");
}
return退回模式,当消息未投递到queue时的反馈。
# 发送者开启return模式
spring.rabbitmq.publisher-returns=true
@RequestMapping("/testSendReturn")
public void returnTest(){
// 开启交换机处理失败消息 交换机无法将消息进行路由时,会将该消息返回给生产者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message, int errorCode, String errorInfo, String exchange, String routeKey) {
log.info("Return 正在执行");
}
});
// 发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","这是整合:testSend方法发送的消息");
}
测试confirm 与 return模式,
confirm是消息到交换机的无论成功与失败都触发,如果需要测试发送失败,直接指定假的交换机即可
return是交换机到队列,我们只要指定假的路由,这样交换机到消息就无法发送到队列,就会触发rerurn模式。如果消息发送成功,return回调则不会触发。
如何保证消息消费
手动开启ACK模式
配置文件添加
# 消费者开启手动确认。默认是自动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class RabbitMQListen01 {
@RabbitListener(queues = "boot_queue")
public void getMessage(Message message, Channel channel) throws IOException {
int qps = 1;
try {
channel.basicQos(0, qps, false);
log.info("拿到消息的内容:{}", message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// 确认ACK执行后,必须当getMessage方法结束后,才能读取下一条
} catch (Exception e) {
// 如果有异常,主动拒绝此消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("Error..");
}
}
}
每当消费者消费完成是,确定一下即可!
消息幂等性
这个问题还是建议兄弟从发送端的confirm、return机制入手,然后消费端的ACK确认处理,如果消费者遇到问题,可以考虑到死信队列,然后针对死信队列进行监听。
面试管也可能会从重复发送、重复消费处理。重复发送,一般来说无所谓。因为我们真正操作我们的业务是消费者。我们发送消息的时候,指定消息id即可。相同消息,我们记录是否消费过,间接解决消息重复消费问题。
乐观锁:通过DB + version字段实现
比如我不小心发送了2条消息,内容一样,扣除用户Money。我不可能扣2次。
update user_wallect set money = money -1,version = version + 1 where user = 'zhangsan' and version = 1
第二次消费的时候就不会有version = 1的了,也就实现无重复消费,
Redis也可以实现,Redis的exist判断消息的id,就间接实现无重复消费
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤