RabbitMQ如何解决各种情况下丢数据的问题

JAVA学习网 2018-11-19 06:00:04

 

spring.rabbitmq.listener.direct.acknowledge-mode=manual
 如果在配置文件中没有设置以上这个ACK确认,那么消费者每次重启都会收到这个消息。
 可以结合confirm使用,处理生产者和消费者丢数据的问题。
 生产者丢数据:
confirm会发送一个Ack给生产者(包含消息的唯一ID),如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。这个解决生产者丢失消息的问题。

消费者丢数据:

如果在配置文件中设置了ACK确认,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,
然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

 

 
 
如何解决各种丢数据的问题:

1.生产者丢数据

生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事
物(channel.txCommit())。

然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦
消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个N
ack消息给你,你可以进行重试操作。

下面演示一下confirm模式:

//测试确认后回调
@Service
public class HelloSender1 implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send() {
        String context = "你好现在是 " + new Date() +"";
        System.out.println("HelloSender发送内容 : " + context);
        this.rabbitTemplate.setConfirmCallback(this);
        //exchange,queue 都正确,confirm被回调, ack=true
        //this.rabbitTemplate.convertAndSend("exchange","topic.message", context);

        //exchange 错误,queue 正确,confirm被回调, ack=false
        //this.rabbitTemplate.convertAndSend("fasss","topic.message", context);

        //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
        //this.rabbitTemplate.convertAndSend("exchange","", context);

        //exchange 错误,queue 错误,confirm被回调, ack=false
        this.rabbitTemplate.convertAndSend("fasss","fass", context);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
    }

}

2.消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘
之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢
失(整个集群都挂掉)
    /**
     * 第二个参数:queue的持久化是通过durable=true来实现的。
     * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
   1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
   2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
   3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。 * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 *
@param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自动删除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }

 

        MessageProperties properties=new MessageProperties();
        properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
        properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
        Message message=new Message("hello".getBytes(),properties);
        this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

 

3.消费者丢数据

启用手动确认模式可以解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让
消息不断重试。 ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如
果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。 ③不确认模式,acknowledge
="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

 

 

 

阅读(1643) 评论(0)