我们在日常开发中,需要思考 消息丢失、消息重复 等场景,我们应如何进行处理

Kafka中的消息会不会丢失或重复消费?

从两个方面分析入手:生产者发送消息消息者消费消息

# 消息发送

Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。

# ACK机制

Kafka通过配置request.required.acks属性来确认消息的生产:

  • 0: 表示不进行消息接收是否成功的确认;
  • 1: 表示当Leader接收成功时确认;
  • -1: 表示Leader和Follower都接收成功时确认;

综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:

  1. acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
  2. acks=1,同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;

# 消息消费

Kafka消息消费有两个consumer接口,Low-level API和High-level API:

  • Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
  • High-level API:封装了对parition和offset的管理,使用简单;

如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;

解决办法:

  • 针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
  • 针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

# 如何保证精准一次性

精准一次性 = 至少一次性 + 幂等性

# 至少一次性

# 幂等性

另一种说法是:如何保证消息不被重复消费?

通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据处理结果是一致的

幂等性 通常是通过业务来保证的

列举几个业务场景:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

  • 比如更新数据库操作时,使用乐观锁机制,类似update x set xx where version <=

  • 如果你所面临的场景更为复杂,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看