官方说的很好理解,我说下我的看法。
- *(星号)可以代替一个单词。
- #(井号)可以替代零个或多个单词。
通配符模式 实际是对路由的拓展。
使用 * 和 # 来代替指定的路由key,大大提高了 路由的灵活性
下面要实现
在交换机类型为 Topic 中 有2个队列
队列1 的路由key是 *.orange
队列2 的路由key是 black.#
测试 往 路由key 为a.orange、black.a.b 分别发送一条消息
生产者代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Topicsprovider {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建 连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
//2 连接 连接
Connection connection = connectionFactory.newConnection();
//3 通过连接 创建信道
Channel channel = connection.createChannel();
// 设置 队列名称
String quotoName1 = "test_topic_queue1";
String quotoName2 = "test_topic_queue2";
// 让信道与队列进行声明(绑定)
channel.queueDeclare(quotoName1,true,false,false,null);
channel.queueDeclare(quotoName2,true,false,false,null);
//4 设置交换机名称
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//5 队列1的绑定 有 orange black green
channel.queueBind(quotoName1,exchangeName,"*.orange");
channel.queueBind(quotoName2,exchangeName,"black.#");
String orange = "Orange:消息 XXX";
String black = "Black: 消息 XXX";
//6 发送数据 一共发送3条数据
channel.basicPublish(exchangeName,"a.orange",null,orange.getBytes());
channel.basicPublish(exchangeName,"black.a.b",null,black.getBytes());
//7 释放资源
channel.close();
connection.close();
}
}
消费者代码
消费者1
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Topicsconsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置相关参数
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 建立连接
Connection connection = connectionFactory.newConnection();
//3 创建信道
Channel channel = connection.createChannel();
// 声明队列名称
String queue1 = "test_topic_queue1";
//6 创建接收回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//接收消息
channel.basicConsume(queue1,true,consumer);
}
}
消费者2
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Topicsconsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置相关参数
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 建立连接
Connection connection = connectionFactory.newConnection();
//3 创建信道
Channel channel = connection.createChannel();
// 声明队列名称
String queue1 = "test_topic_queue2";
//6 创建接收回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//接收消息
channel.basicConsume(queue1,true,consumer);
}
}
特殊说明: 上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤
评论(0)