官网地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

上面是官网的介绍,什么意思呢?

就是原来的简单工作模式

P 生产的数据 都给C ,C是个废物,扛不住那么大的压力。怕C崩溃了,弄C1、C2,他俩轮询争抢,就避免了单个C 过大的压力

什么意思呢,请看实际效果

就是2个服务平分,怕一个服务吃不了那么多压力,分担一下。

如何实现呢?

Maven代码 见 简单模式

https://www.zanglikun.com/613.html

先分析逻辑,再写代码

整体除了 变了个队列名称为 work_queues,其他:

生产者对比简单模式变化,啥也没变,只是循环发送消息

消费者 注释了一些没用的输出,啥也没变。

见代码吧。

生产者


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WordQueuesprovider {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();

        //5 创建队列 Queue  如果没有叫hello_word的队列,会自动创建
        /*
        参数:
            1: queue 队列名称
            2: durable 是否持久化  持久化到erlang自带的数据库中 重启 数据依旧存在
            3: exclusive 是否独占 只允许一个消费者监听这个队列  2 当connection时,是否删除队列 一般为flase
            4: autodelete 是否自动删除 当没有消费者,会自动删除
            5: arguement 参数:如何删除队列的参数
         */
        channel.queueDeclare("work_queques",true,false,false,null);

        //6 发送消息到
        /*
        参数:
            1:exchange 交换机名称。简单模式,会使用默认的
            2:routingKey 路由名称
            3:props 配置信息
            4:body 真实发送的数据
         */

        String Body = "Hello Rabbit MQ";

        for (int i = 0; i < 100; i++) {
            //简单模式 没有交换机,所以 路由 与 队列名称一样
            channel.basicPublish("","work_queques",null,("第"+i+"条"+Body).getBytes());
        }

        //7 释放资源
        channel.close();
        connection.close();

    }
}

消费者(同样的代码 创建2个消费者,目的是 启动,让他俩轮询争)


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WordQueuesconsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //记得刷新Maven  简单模式   没有交换机,但会用到默认的交换机


        //1 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2 设置参数
        factory.setHost("118.31.127.248");  //不设置 就为 127.0.0.0.1
        factory.setPort(5672);              //不设置 就为 5672
        factory.setVirtualHost("/govbuy");  //不设置 就为默认虚拟机 /
        factory.setUsername("zanglikun");   //不设置 就是默认 guest
        factory.setPassword("zanglikun");   //不设置 就是默认 guest

        //3 创建连接 Connection
        Connection connection = factory.newConnection();

        //4 获取channl
        Channel channel = connection.createChannel();


        //5 此方法不需要参数 是添加方法块{} 然后Alt + Inster --> Override Methords 生成的
        Consumer consumer = new DefaultConsumer(channel){
            // 这是一个回调方法 ,当收到消息后,会自动执行该方法。
            /**
             *
             * @param consumerTag 标识
             * @param envelope  获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body  真实的数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //System.out.println("接收时间是:"+System.currentTimeMillis());
                //System.out.println("consumerTag:"+consumerTag);
                //System.out.println("Exchange:"+envelope.getExchange());
                //System.out.println("RoutingKey:"+envelope.getRoutingKey());
                //System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                //System.out.println();
            }
        };


        //6 获取消息
        /*
            参数
                1:queue 队列名称
                2:Auto ack 是否自动确认
                3:callback 回调对象

         */
        channel.basicConsume("work_queques",true,consumer);


        // 消费者 不需要关闭连接,因为需要监听MQ。

    }
}

启动效果 就是 先启动2个 消费者,让他俩等着,在启动生产者,生产数据。

当然了 2个消费者 同时启动,是轮询,如果先启动一个消费者,就启动一个生产者后,在启动另一个消费者,是什么情况呢?

猜想:一开始是消费者1 不停的消费,然后消费者2进入,就开始轮询了。

试试

把生产者的循环 写死

代码图

开始测试

果然是的,是不是巧合呢,再试试10回,确认下。哈哈,我不测试了。有兴趣的老哥,你自己试试。我姑且默认他是这样的。

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤