Publisher Confirms发布确认是用于实现可靠发布的RabbitMQ扩展。
我们将使用发布确认来确保已发布的消息已安全到达代理。我们将介绍几种使用publisher确认的策略,并解释其优缺点

首先检查application.yml文件

spring:
  rabbitmq:
    host: 127.0.0.1
    # 之前博客未加端口,此处新增
    port: 5672
    username: guest
    password: guest
    virtualHost: /
1. 单独发布消息
  • 新增配置文件PublishConfirmConfig.java
@Configuration
public class PublishConfirmConfig {

    @Bean("myRabbitConnectionFactory")
    public ConnectionFactory myRabbitConnectionFactory(RabbitProperties rabbitProperties){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate simpleRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
        CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 新增发送文件PublishConfirmSender.java
@Component
public class PublishConfirmSender {

    private RabbitTemplate simpleRabbitTemplate;

    public PublishConfirmSender(RabbitTemplate simpleRabbitTemplate) {
        this.simpleRabbitTemplate = simpleRabbitTemplate;
    }

    public void oneSender() {
        boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
            simpleRabbitTemplate.convertAndSend("direct", "orange", "orange msg");
            return simpleRabbitTemplate.waitForConfirms(5000);
        });
        if (sendFlag) {
            System.out.println("消息已成功发送");
        }
    }
}

  • 测试发送
@SpringBootTest
public class RabbitTest {
    @Autowired
    private PublishConfirmSender publishConfirmSender;

    @Test
    public void testOneSender() {
        publishConfirmSender.oneSender();
    }
}

1

2. 批量消息发布确认
@Component
public class PublishConfirmSender {

    ............

    public void batchSender() {
        boolean sendFlag = simpleRabbitTemplate.invoke(operations -> {
            for (int i = 0; i < 50; i++) {
                simpleRabbitTemplate.convertAndSend("direct", "orange", "orange " + i + "msg");
                if (i % 10 == 0) {
                    if (simpleRabbitTemplate.waitForConfirms(5000)) {
                        System.out.println(i / 10 + "批次消息已全部成功发送");
                    }
                }
            }
            return simpleRabbitTemplate.waitForConfirms(5000);
        });
        if (sendFlag) {
            System.out.println("消息已全部成功发送");
        }
    }
}

2

3. 发布服务器异步确认
@Component
public class PublishConfirmSender {
	......
    @Bean
    @Primary
    public RabbitTemplate asyncRabbitTemplate(ConnectionFactory myRabbitConnectionFactory) {
        CachingConnectionFactory connectionFactory = (CachingConnectionFactory) myRabbitConnectionFactory;
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}
@Component
public class PublishConfirmSender {
     public void asyncSender() {
        asyncRabbitTemplate.invoke(operations -> {
            for (int i = 0; i < 50; i++) {
                String body = "orange " + i + "msg";
                simpleRabbitTemplate.convertAndSend("direct", "orange", body);
            }
            return null;
        }, (deliveryTag, multiple) -> {
            System.out.format("消息已确认. Sequence number: %d, multiple: %b%n", deliveryTag, multiple);
        }, (deliveryTag, multiple) -> {
            System.err.format("消息未确认. Sequence number: %d, multiple: %b%n",deliveryTag, multiple);
        });
    }
}

3

4. 总结

在某些应用程序中,确保将发布的消息发送给代理是至关重要的。Publisher确认有助于满足此要求的RabbitMQ功能。Publisher确认本质上是异步的,但也可以同步处理它们。没有明确的方法来实现publisher确认,这通常归结为应用程序和整个系统中的约束。典型技术包括

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但很难判断何时出错。
  • 异步处理:最佳性能和资源利用率,在发生错误时进行良好控制,但需要正确处理。

欢迎关注公众号算法小生沈健的技术博客

原文地址:http://www.cnblogs.com/shenjian-online/p/16794929.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性