看下官方说明:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

这是一个什么模式呢?

Direct交换机中接收到的数据,会以 路由 的形式进行绑定,发送。不再给全部消费者发送。这样设计的系统,会更加的合理。

生产者发送数据的时候需要绑定 路由key 了,以后 谁绑定了路由key ,就发送给谁。

这里 我不写图片中的Demo

小结:本来是放在最下面,但是放在上面方便查看,更易于理解

Routing模式要求队列在绑定交换机时 要制定routing key,消息会发给符合的队列中,记得交换机必须时direct 才可以。

代码实现的效果

一个Direct 交换机中 2个队列 分别绑定了不同的 路由key

队列1 绑定的路由key 有black、green、orange

队列2 绑定的路由key 有black

生产者 分别再 black、green、orange 的路由上 发送了一条消息,一共3条

队列1 全部接收到消息

队列2 只能接收到black的消息

生产者代码


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Routingprovider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建 连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);

        //2 连接 连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接 创建信道
        Channel channel = connection.createChannel();
        // 设置 队列名称
        String quotoName1 = "test_direct_queue1";
        String quotoName2 = "test_direct_queue2";
        // 让信道与队列进行声明(绑定)
        channel.queueDeclare(quotoName1,true,false,false,null);
        channel.queueDeclare(quotoName2,true,false,false,null);
        //4 设置交换机名称
        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        //5 队列1的绑定 有 orange black green
        channel.queueBind(quotoName1,exchangeName,"orange");
        channel.queueBind(quotoName1,exchangeName,"black");
        channel.queueBind(quotoName1,exchangeName,"green");
        channel.queueBind(quotoName2,exchangeName,"black");
        String orange = "Orange:消息 XXX";
        String black = "Black: 消息 XXX";
        String green = "Green: 消息 XXX";



        //6 发送数据 一共发送3条数据
        channel.basicPublish(exchangeName,"orange",null,orange.getBytes());
        channel.basicPublish(exchangeName,"black",null,black.getBytes());
        channel.basicPublish(exchangeName,"green",null,green.getBytes());

        //7 释放资源
        channel.close();
        connection.close();

    }
}
生产者启动时,只有队列1有,因为设置了路由key 为 orange 的才有消息

自己手写代码的时候 ,记得去看下交换机绑定的路由对不对,如果多绑定、少绑定,一定时你代码写错了,记得解除所有的routing key

消费者代码

消费者1:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Routingconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue1 = "test_direct_queue1";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue1,true,consumer);

    }
}

消费者2


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Routingconsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue2 = "test_direct_queue2";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue2,true,consumer);

    }
}

启动2个消费者,在启动生产者

此时 2个消费者 都能接收到消息了。

路由模式完成

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤