依旧如此 看下 官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

官方说的很好理解,我说下我的看法。

  • *(星号)可以代替一个单词。
  • #(井号)可以替代零个或多个单词。

通配符模式 实际是对路由的拓展。

使用 * 和 # 来代替指定的路由key,大大提高了 路由的灵活性

下面要实现

在交换机类型为 Topic 中 有2个队列

队列1 的路由key是 *.orange

队列2 的路由key是 black.#

测试 往 路由key 为a.orange、black.a.b 分别发送一条消息

生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsprovider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建 连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);

        //2 连接 连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接 创建信道
        Channel channel = connection.createChannel();
        // 设置 队列名称
        String quotoName1 = "test_topic_queue1";
        String quotoName2 = "test_topic_queue2";
        // 让信道与队列进行声明(绑定)
        channel.queueDeclare(quotoName1,true,false,false,null);
        channel.queueDeclare(quotoName2,true,false,false,null);
        //4 设置交换机名称
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

        //5 队列1的绑定 有 orange black green
        channel.queueBind(quotoName1,exchangeName,"*.orange");
        channel.queueBind(quotoName2,exchangeName,"black.#");
        String orange = "Orange:消息 XXX";
        String black = "Black: 消息 XXX";

        //6 发送数据 一共发送3条数据
        channel.basicPublish(exchangeName,"a.orange",null,orange.getBytes());
        channel.basicPublish(exchangeName,"black.a.b",null,black.getBytes());

        //7 释放资源
        channel.close();
        connection.close();

    }
}

消费者代码

消费者1


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue1 = "test_topic_queue1";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue1,true,consumer);

    }
}

消费者2


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Topicsconsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置相关参数
        connectionFactory.setHost("118.31.127.248");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zanglikun");
        connectionFactory.setPassword("zanglikun");
        connectionFactory.setVirtualHost("/govbuy");

        //2 建立连接
        Connection connection = connectionFactory.newConnection();

        //3 创建信道
        Channel channel = connection.createChannel();

        // 声明队列名称
        String queue1 = "test_topic_queue2";


        //6 创建接收回调
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };

        //接收消息
        channel.basicConsume(queue1,true,consumer);

    }
}