如何限制消费者消费

消费时指定channel.basicQos(0,1,false);

    @RabbitListener(queues = "boot_queue")
    public void getMessage(Message message, Channel channel) throws IOException {
        long qps = 10;
        // prefetchSize(消息大小,0是不限制)、prefechSize(消息数量)、global(true 表示此通道的消费者都适用此策略,false代表仅当前消费者)
        channel.basicQos(0,qps,false);
        System.out.println(message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

如何保证消息投递

消息投递到过程:生产者 -> 交换机 -> 队列

因为上述消息传递过程是2个环节,rabbitmq对上述2个环节提供了2个消息投递确认对机制

  1. confirm 确认模式:消息发送到交换器Exchange后触发回调。每次投递都触发
  2. return 退回模式 :当消息未投递到queue时的反馈。投递成功回调不触发

如果开启上述2种模式报错,可能是 java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate 解决方案如下:

    // 自己添加多例Bean
    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setMessageConverter(new SerializerMessageConverter());
        return template;
    }


// 请在Controlle或Service类上添加注解 @Scope(prototype)
@Scope(prototype)

confirm 确认模式

消息发送时,每次都会有一个confirm回调函数,我们在confirm回调函数处理即可

# 开启发送端confirm,高AMQP版本就默认开启,且不允许添加此配置
spring.rabbitmq.publisher-confirms=true
    @RequestMapping("/testSendConfirm")
    public void confirm(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                if (ack){
                    log.info("发送成功");
                }else {
                    log.error("发送失败!需要执行重新发送消息");
                }
            }
        });
        // 发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","这是整合:testSend方法发送的消息");
    }

return退回模式,当消息未投递到queue时的反馈。

# 发送者开启return模式
spring.rabbitmq.publisher-returns=true
    @RequestMapping("/testSendReturn")
    public void returnTest(){
        // 开启交换机处理失败消息 交换机无法将消息进行路由时,会将该消息返回给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
            @Override
            public void returnedMessage(Message message, int errorCode, String errorInfo, String exchange, String routeKey) {
                log.info("Return 正在执行");
            }
        });
        // 发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hello","这是整合:testSend方法发送的消息");
    }

测试confirm 与 return模式,

confirm是消息到交换机的无论成功与失败都触发,如果需要测试发送失败,直接指定假的交换机即可

return是交换机到队列,我们只要指定假的路由,这样交换机到消息就无法发送到队列,就会触发rerurn模式。如果消息发送成功,return回调则不会触发。

如何保证消息消费

手动开启ACK模式

配置文件添加

# 消费者开启手动确认。默认是自动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class RabbitMQListen01 {

    @RabbitListener(queues = "boot_queue")
    public void getMessage(Message message, Channel channel) throws IOException {
        int qps = 1;
        try {
            channel.basicQos(0, qps, false);
            log.info("拿到消息的内容:{}", message); // 这里 不只是输出 单个 发送的信息,而是 全部输出 消息里面的内容数据
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            // 确认ACK执行后,必须当getMessage方法结束后,才能读取下一条
        } catch (Exception e) {
            // 如果有异常,主动拒绝此消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("Error..");
        }
    }
}

每当消费者消费完成是,确定一下即可!

消息幂等性

这个问题还是建议兄弟从发送端的confirm、return机制入手,然后消费端的ACK确认处理,如果消费者遇到问题,可以考虑到死信队列,然后针对死信队列进行监听。

面试管也可能会从重复发送、重复消费处理。重复发送,一般来说无所谓。因为我们真正操作我们的业务是消费者。我们发送消息的时候,指定消息id即可。相同消息,我们记录是否消费过,间接解决消息重复消费问题。

乐观锁:通过DB + version字段实现

比如我不小心发送了2条消息,内容一样,扣除用户Money。我不可能扣2次。

update user_wallect set money = money -1,version = version + 1 where user = 'zhangsan' and version = 1

第二次消费的时候就不会有version = 1的了,也就实现无重复消费,

Redis也可以实现,Redis的exist判断消息的id,就间接实现无重复消费

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤