官网:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

官网说:在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。

这句话什么意思?

就是生产者 直接将消息 发送至X 交换机,交换机路由分发给不同的队列

常用交换机类型:

Fanout:广播,将消息交给所有绑定到交换机上的队列

Direct:定向,将消息发送到指定的 RoutingKey 的 队列

Topic:通配符,将消息交给符合routing pattern 的队列

Headers:参数匹配 (不讲解)

交换机:只负责发送消息,并不存储消息,因此,如果没有队列与交换机绑定。或者没有符合的路由规则的队列,那么消息就会消失。

一旦交换机写死,那么就只能按照次交换机的类型来发送

要学会一个英语单词

Declare : 声明

下面代码最终实现效果如:一个生产者,生产的2条数据,2个消费者都可以接收

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubprovider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接工程对象 记得是RabbitMQ包下的
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/govbuy");

        //2 用工厂对象创建连接
        Connection connection = connectionFactory.newConnection();

        //3 创建队列
        Channel channel = connection.createChannel();

        //4 创建交换机
        /**     参数说明
         *         String exchange,  交换机名称
         *         BuiltinExchangeType type,  交换机的类型 枚举类型
         *              DIRECT("direct") 定向,
         *              FANOUT("fanout") 扇形广播,发送消息到每个与其绑定的队列,
         *              TOPIC("topic") 通配符,
         *              HEADERS("headers") 参数匹配;
         *         boolean durable, 是否持久化
         *         boolean autoDelete,  是否自动删除
         *         boolean internal,  内部使用 一般为false
         *         Map<String, Object> arguments  参数列表 设为null
         */
        String exchangename = "test_fanout";
        channel.exchangeDeclare(exchangename, BuiltinExchangeType.FANOUT,true,false,false,null);

        //5 创建队列
        //创建队列名称
        String queque1 = "test_fanout_queue1";
        String queque2 = "test_fanout_queue2";

        channel.queueDeclare(queque1,true,false,false,null);
        channel.queueDeclare(queque2,true,false,false,null);

        //6 绑定队列、交换机
        /**
         *  相关参数
         *  String queue,  队列名称
         *  String exchange,  交换机名称
         *  String routingKey,  路由key,绑定规则 如果交换机类型为fanout 那么路由key为"" ,为什么呢?因为广播,需要发送到与之绑定的所有队列
         *  Map<String, Object> arguments
         *
         */
        channel.queueBind(queque1,exchangename,"");
        channel.queueBind(queque2,exchangename,"");  // 绑定完成后,没有发送消息钱,就可以登录 15672 在交换机看到这2个队列了

        //声明一下数据
        String body = "张三,调用了findAll方法";
        //7 发送消息
        channel.basicPublish(exchangename,"",null,body.getBytes());

        //8 释放资源
        channel.close();
        connection.close();

    }
}
生产者 执行后,会看到交换机绑定的2个队列,有了2条数据了。

消费者代码

消费者1 和 2 的区别时 绑定的队列名称不一样,看清楚,其他地方都一样)

消费者1 代码


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接工厂对象 并设置相应参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 创建连接
        Connection connection = connectionFactory.newConnection();

        //3 创建队列
        Channel channel = connection.createChannel();

        //4 设置队列相关信息
        //创建队列名称
        String queque1 = "test_fanout_queue1";
        String queque2 = "test_fanout_queue2";

        //5 创建 Counsumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                System.out.println("拿到数据,执行相关业务逻辑");
            }
        };
        channel.basicConsume(queque1,true,consumer);

    }
}

消费者2 代码


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubconsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接工厂对象 并设置相应参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 创建连接
        Connection connection = connectionFactory.newConnection();

        //3 创建队列
        Channel channel = connection.createChannel();

        //4 设置队列相关信息
        //创建队列名称
        String queque1 = "test_fanout_queue1";
        String queque2 = "test_fanout_queue2";

        //5 创建 Counsumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                System.out.println("拿到数据,执行相关业务逻辑");
            }
        };
        channel.basicConsume(queque2,true,consumer);

    }
}

最终效果

先开启2个消费者,然后 开启生产者,发现2个消费者都接收到数据了。

启动生产者 消费者 1 2 显示相应的结果

发表评论

您的电子邮箱地址不会被公开。