首先 简单模式 看下 官网介绍

官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

RabbitMQ是消息代理:它接受并转发消息。您可以将其视为邮局:将您要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给收件人。以此类推,此模式为:RabbitMQ是一个邮箱,一个邮局和一个邮递员

官方说:如果接收不到消息 可能是:默认情况下,它至少需要200 MB的可用空间。

其他没什么了。

好了 ,我们开始我们缕一缕 我们的逻辑了

首先 简单模式 分为3个角色 一个生产者、一个消费者、一个消息中间件

不就是 生产者生产消息,通过AMQP协议 发送到 MQ ,然后消费者 从MQ 也通过AMQP协议 获取消息。

发送消息

  • 创建连接工厂对象
  • 对工厂对象设置一些参数
  • 用工厂创建连接
  • 通过连接获取队列 (指定一些队列的属性)
  • 通过队列进行发送消息
  • 释放资源

接收消息

  • 创建连接工厂对象
  • 对工厂对象设置一些参数
  • 用工厂对象创建连接
  • 通过连接获取队列(指定一些队列属性)
  • 创建消费者,可以写回调函数 选择是否进行需要回调函数
  • 获取消息
  • 不需要释放资源

中间件所做的内容

RabbitMQ:提供交换机(可理解成数据库/govbuy),并提供好可以连接的账号、密码,并可远程登录,这里是通过可视化界面 做的 。

开始看代码吧

首先 还是Maven的代码

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class provider {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();

        //5 创建队列 Queue  如果没有叫hello_world的队列,会自动创建
        /*
        参数:
            1: queue 队列名称
            2: durable 是否持久化  持久化到erlang自带的数据库中 重启 数据依旧存在
            3: exclusive 是否独占 只允许一个消费者监听这个队列  2 当connection时,是否删除队列 一般为flase
            4: autodelete 是否自动删除 当没有消费者,会自动删除
            5: arguement 参数:如何删除队列的参数
         */
        channel.queueDeclare("hello_world",true,false,false,null);

        //6 发送消息到
        /*
        参数:
            1:exchange 交换机名称。简单模式,会使用默认的
            2:routingKey 路由名称
            3:props 配置信息
            4:body 真实发送的数据
         */

        String Body = "Hello Rabbit MQ";


        //简单模式 没有交换机,所以 路由 与 队列名称一样

        channel.basicPublish("","hello_world",null,Body.getBytes());
        System.out.println("发送时间是:"+System.currentTimeMillis());
        //7 释放资源
        channel.close();
        connection.close();

    }
}

消费者代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();


        //5 此方法不需要参数 是添加方法块{} 然后Alt + Inster --> Override Methords 生成的
        Consumer consumer = new DefaultConsumer(channel){
            // 这是一个回调方法 ,当收到消息后,会自动执行该方法。
            /**
             *
             * @param consumerTag 标识
             * @param envelope  获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body  真实的数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收时间是:"+System.currentTimeMillis());
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println();
            }
        };


        //6 获取消息
        /*
            参数
                1:queue 队列名称
                2:Auto ack 是否自动确认
                3:callback 回调对象

         */
        channel.basicConsume("hello_world",true,consumer);


        // 消费者 不需要关闭连接,因为需要监听MQ。

    }
}

先启动生产者,因为生产者 创建了队列了 hello_world 的队列

不然 消费者,会爆出 在虚拟机中 没有发现 队列 的异常

强调一点:消费者 不需要关闭 连接、释放资源。

发表评论

您的电子邮箱地址不会被公开。