kafka核心技术与实战
引论
点击获取《kafka核心技术与实战》 点击获取《Kafka多维度系统精讲,从入门到实战开发》Kafka 属于分布式的消息引擎系统,它的主要功能是提供一套完备的消息发布与订阅解决方案。在 Kafka 中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。同时,它也是一个分布式流处理平台(Distributed Streaming Platform)
术语
生产者(Producer)、消费者(Consumer)和服务进程Broker
-
生产者:向主题发布消息的客户端应用程序称为生产者(Producer),生产者程序通常持续不断地向一个或多个主题发送消息
-
消费者:订阅这些主题消息的客户端应用程序就被称为消费者(Consumer)
-
服务进程:Kafka 的服务器端由被称为 Broker 的
服务进程
构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化
。虽然多个
Broker
进程能够运行在同一台机器上,但更常见的做法是将不同的Broker
分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有Broker
进程都挂掉了,其他机器上的Broker
也依然能够对外提供服务。这其实就是 Kafka 提供高可用
的手段之一。
副本Replication
实现高可用的另一个手段就是备份机制(Replication)
。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。Kafka 定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。前者对外提供服务,这里的对外指的是与客户端程序进行交互;而后者只是被动地追随领导者副本而已,不能与外界进行交互。
MySQL的副本是可以对外提供读服务的。但是在 Kafka 中追随者副本不会对外提供服务。
副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。
分区(Partitioning)和消息位移(Offset)
有了副本机制可以保证数据的持久化或消息不丢失,但倘若领导者副本积累了太多的数据以至于单台 Broker 机器都无法容纳,需要
把数据分割成多份保存在不同的 Broker 上。
Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。如你所见,Kafka 的分区编号是从 0 开始的,如果 Topic 有 100 个分区,那么它们的分区号就是从 0 到 99。这些位置号也叫消息位移
可以参考 MySQL 中的 sharding-jdbc 配置,或者redis的高可用-横向扩容
kafka的三层消息结构
- 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
- 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
- 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
- 最后,客户端程序只能与分区的领导者副本进行交互。
Kafka Broker 是如何持久化数据的
总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。
- 因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。
- 在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。
- Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
消费者位移、消费者组和重平衡
- 消费者组(Consumer Group):多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。消费者组中,一个分区只会被一个消费者实例消费。
- 消费者位移(Consumer Offset):表征消费者消费进度,每个消费者都有自己的消费者位移。
- 重平衡(Rebalance):消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
消费者组里面的所有消费者实例不仅“瓜分”订阅主题的数据,而且更酷的是它们还能彼此协助。假设组内某个实例挂掉了,Kafka 能够自动检测到,然后把这个 Failed 实例之前负责的分区转移给其他活着的消费者。这个过程就是 Kafka 中大名鼎鼎的“重平衡”(Rebalance)。嗯,其实既是大名鼎鼎,也是臭名昭著,因为由重平衡引发的消费者问题比比皆是。事实上,目前很多重平衡的 Bug 社区都无力解决。
基本使用
操作系统
kafka是属于jvm系的大数据框架,所以可以跨平台部署。但是以linux
系统性能为佳。
-
I/O 模型的使用:
selector
在linux中是epoll,在windows中的实现机制是select。 -
数据网络传输效率:Kafka 生产和消费的消息都是通过网络传输的,而消息保存在磁盘。故 Kafka 需要在磁盘和网络间进行大量数据传输。Linux 平台实现了这样的零拷贝机制(当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速地数据传输)。
主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动 I/O 和异步 I/O。每种 I/O 模型都有各自典型的使用场景,比如 Java 中 Socket 对象的阻塞模式和非阻塞模式就对应于前两种模型;而 Linux 中的系统调用 select 函数就属于 I/O 多路复用模型;大名鼎鼎的 epoll 系统调用则介于第三种和第四种模型之间;至于第五种模型,其实很少有 Linux 系统支持,反而是 Windows 系统提供了一个叫 IOCP 线程模型属于这一种。😑redis为啥快?它也是epoll。
磁盘
磁盘类型对于kafka的影响不大,所以机械硬盘物美价廉,最适合kafka。因为kafka的使用方式多是顺序读写的,避免了机械硬盘随机读写数据的短板。
关于磁盘选择另一个经常讨论的话题就是到底是否应该使用磁盘阵列(RAID)。使用 RAID 的两个主要优势在于:
- 提供冗余的磁盘存储空间
- 提供负载均衡
不过就 Kafka 而言,一方面 Kafka 自己实现了冗余机制来提供高可靠性;另一方面通过分区的概念,Kafka 也能在软件层面自行实现负载均衡。如此说来 RAID 的优势就没有那么明显了
磁盘容量
规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
带宽
根据实际使用经验,超过 70% 的带宽阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值。这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即47%左右。可以结合你自己机器的使用情况酌情减少此值。
总结一下
重要的参数配置
Broker端参数
针对存储信息的重要参数有以下这么几个
log.dirs
:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。log.dir
:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
这两个参数只要配置
log.dirs
就可以了。在线上生产环境中一定要为log.dirs
配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。
下面说说与 ZooKeeper 相关的设置。ZooKeeper是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。
zookeeper.connect
: Kafka 与 ZooKeeper 相关的最重要的参数当属zookeeper.connect
。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181
。2181 是 ZooKeeper 的默认端口。多集群可以在后面加 chroot。例如zk1:2181,zk2:2181,zk3:2181/kafka1
和zk1:2181,zk2:2181,zk3:2181/kafka2
第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参数:
listeners
:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。advertised.listeners
:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。host.name/port
:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数:
auto.create.topics.enable
:是否允许自动创建 Topic。建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。unclean.leader.election.enable
:是否允许 Unclean Leader 选举。如果设置成 false,则选举保存数据最多的副本为leader。反之,允许从保存数据少的副本中选举leader,这样会丢失更多的数据。auto.leader.rebalance.enable
:是否允许定期进行 Leader 选举。设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!换一次leader成本很高,所以建议设置为false
最后一组参数是数据留存方面的,即:
-
log.retention.{hour|minutes|ms}
:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。虽然 ms 设置有最高的优先级,但是通常情况下我们还是设置 hour 级别的多一些,比如
log.retention.hour=168
表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。 -
log.retention.bytes
:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以。 -
message.max.bytes
:控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。
Topic级别的参数配置
如果同时设置了 Topic 级别参数和全局 Broker 参数,Topic级别参数会覆盖Broker的参数。
保存消息相关的参数:
retention.ms
:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。retention.bytes
:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
JVM参数
设置的方法也很简单,你只需要设置下面这两个环境变量即可:
KAFKA_HEAP_OPTS
:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。
实践及原理剖析
生产者消息分区机制原理
分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。
如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class
。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner
接口。这个接口也很简单,只定义了两个方法:partition()
和close()
,通常你只需要实现最重要的 partition 方法。
1 | int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); |
这里的topic
、key
、keyBytes
、value
和valueBytes
都属于消息数据,cluster
则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class
参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。
比较常见的分区策略也就那么几种:
轮询策略
也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。
这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class
参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
随机策略
也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
1 | List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); |
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
Key-ordering 策略
Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。
代码实现如下
1 | List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); |
生产者压缩算法
何时压缩?
压缩可能发生在 生产者 和 Broker 端。
生产者程序中配置 compression.type
参数即表示启用指定类型的压缩算法,如 GZIP
等。
Broker 端也有这个参数。如果 生产者 和 Broker 端两个参数不一致,那么就会有个消息解压和重新压缩的过程。
其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
- Broker端指定了和 Producer 端不同的压缩算法。一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。
- Broker端发生了消息格式转换。消息格式转换主要是为了兼容老版本的消费者程序。Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
何时解压?
Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
各种压缩算法的对比
下面这张表是 Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。
- 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;
- 在压缩比方面,zstd > LZ4 > GZIP > Snappy;
- 具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。
无消息丢失配置
这块面试很经常会问!
Kafka 到底在什么情况下才能保证消息不丢失呢?
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
-
已提交的消息:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
-
有限度的持久化保证:假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。
“消息丢失”案例
案例 1:生产者程序丢失数据
Producer 程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。我来描述一个场景:你写了一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存,于是大骂:“Kafka 真烂,消息发送居然都能丢失,而且还不告诉我?!”如果你有过这样的经历,那么请先消消气,我们来分析下可能的原因。
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。
这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。
如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
不过,就算不是 Kafka 的“锅”,我们也要解决这个问题吧。实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer 端而非 Broker 端。
你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。
案例 2:消费者程序丢失数据
Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。
比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。
这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。
正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。
同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。
当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。
案例3:多线程场景位移更新但是处理失败
除了上面所说的场景,其实还存在一种比较隐蔽的消息丢失场景。
我们依然以看书为例。假设你花钱从网上租借了一本共有 10 章内容的电子书,该电子书的有效阅读时间是 1 天,过期后该电子书就无法打开,但如果在 1 天之内你完成阅读就退还租金。
为了加快阅读速度,你把书中的 10 个章节分别委托给你的 10 个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这 10 个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这 10 个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。
对于 Kafka 而言,这就好比 Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。
这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。在这里我要提醒你一下,单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
最佳实践
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
不常用但是高级的功能——客户端的拦截器
客户端的拦截器设计思想可以参考spring的拦截器。主要有两类:
-
org.apache.kafka.clients.producer.ProducerInterceptor
的子类(继承)。ProducerInterceptor主要是两个方法:-
onSend
:发送之前执行 -
onAcknowladgement
:该方法会在消息成功提交或发送失败之后被调用,并且比callback先执行。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
-
-
org.apache.kafka.clients.consumer.ConsumerInterceptor
的实现类(接口实现)onConsume
:该方法在消息返回给 Consumer 程序之前调用onCommit
:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
写好代码后,将 全限定名 添加到kafka的配置中。Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
Producer的TCP连接
-
何时创建?
在
KafkaProducer
实例化的同时,创建了KafkaProducer.Sender
实例 sender,然后 sender 与 Broker 进行TCP连接 -
何时关闭?
两种情况下会关闭:
- 用户显式调用了close方法
- 超过配置的最大超时时间
connections.max.idle.ms
(默认值为9分钟)没有数据流过
后续内容可以下载专栏自己看,主要是消费者的生产实践经验。