RabbitMq工作模式

编写获取信道工具类

/**
 * @author zjh
 */
public class RabbitMqUtils {

    public static Channel getChannel() throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP 连接RabbitMq的队列
        factory.setHost("localhost);
        // 用户名
        factory.setUsername("zjh");
        // 密码
        factory.setPassword("zjh");

        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        return connection.createChannel();
    }
}

定义一个生产者, 发送大量消息

/**
 * @author zjh
 * 生产者 发送大量消息
 */
public class TaskOne {

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 发送大量消息
     */
    public static void main(String[] args) throws Exception {
        // 获取信道
        Channel channel = RabbitMqUtils.getChannel();

        /*
            生成一个队列
            1.队列名称
            2.队列里面的消息是否持久化,默认情况消息存储在内存中
            3.该队列是否只提供一个消费者进行消费 是否进行消息共享,true可以多个消费者消费
            4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true自动删除 false不自动删除
            5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 从控制台当中接受信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /*
                发送一个消费
                1.发送到哪个交换机
                2.路由的Key是哪个,本次队列的名称
                3.其他的参数信息
                4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}

消费者一

/**
 * @author zjh
 * 这是一个工作线程
 */
public class WorkerOne {

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     *
     */
    public static void main(String[] args) throws Exception {
        // 获取信道
        Channel channel = RabbitMqUtils.getChannel();

        // 消息接收的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));

        // 消息接收被取消时,执行以下内容
        CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑");

        /*
            消费者消费信息
            1.消费哪个队列
            2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
            3.消费者未成功消费的回调
            4.消费者取消消费的回调
         */
        System.out.println("WorkerOne等待接收消息...");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

消费者二

/**
 * @author zjh
 * 这是一个工作线程
 */
public class WorkerTwo {

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     *
     */
    public static void main(String[] args) throws Exception {
        // 获取信道
        Channel channel = RabbitMqUtils.getChannel();

        // 消息接收的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));

        // 消息接收被取消时,执行以下内容
        CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑");

        /*
            消费者消费信息
            1.消费哪个队列
            2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
            3.消费者未成功消费的回调
            4.消费者取消消费的回调
         */
        System.out.println("WorkerTwo等待接收消息...");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

默认是以轮询的方式进行消费的 消费者一消费一次,消费者二消费一次以此类推

设置分发方式

/*
    分发模式通过 channel.basicQos设置
    1. 轮询分发:0        默认值
    2. 不公平分发:1      性能好的执行的多,性能不好的执行少
    3. 预取值 也就是指定消费多少条:> 1    指定某个消费者消费多少条数据
 */
channel.basicQos(2);

原文地址:http://www.cnblogs.com/zjh0420/p/16891557.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性