kafka深入理解及存储(二)

深入Kafka

前面介绍了Kafka的安装以及使用,现在来聊点关于Kafka的存储.

  • Kafka如何进行复制
  • Kafka如何处理生产者和消费者请求;
  • Kafka的存储细节,文件及格式;

集群成员

Kafka使用zookeeper来维护集群成员的信息.每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成.

broker在启动的时候,通过创建临时借点把自己的id注册到zookeeper的/brokers/ids路径下,broker加入或者退出集群,都会收到通知.

如果broker的id相同,则只有一个能注册成功,另外一个注册失败,将启动报错.

broker关闭时,对应的节点会消失,但是id会继续留在其他数据结构中,当重启或者另外一个相同的id加入集群后,将拥有与旧broker相同的分区和主题.

控制器

在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器,只不过它除了具有一般broker的功能之外,还负责分区首领的选举,整个集群中所有分区和副本的状态管理。(控制器broker和普通broker有相同的功能,只不过多了一些管理功能)

broker的创建:集群里第一个启动的broker通过在Zookeeper里创建一个临时节点/controller让自己成为控制器。 其他broker在启动时也会尝试创建这个节点,不过它们会收到一个 “节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。 其他broker在控制器节点上创建Zookeeper watch对象,这样它们就可以收到这个节点的变更通知。 这种方式可以确保集群里一次有且只有一个控制器存在

如果控制器被关闭或者与Zookeeper断开连接,Zookeeper上的临时节点就会消失。集群里的其他broker通过watch对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper里成功创建控制器节点的broker就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建watch对象。毎个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的,数值更大的controller epoch。其他broker在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch的消息,就会忽略它们。

脑裂问题:是指两个节点同时认为自己是当前的控制器,Kafka通过递增controller epoch来避免脑裂问题.当发现两个都认为是自己是控制器的时候,比较epoch的值,最大的将被选择为控制器.因为epoch是自增的,所以不存在两个相等的值.

创建topic时需要分区等操作,需要数据一致性,这个时候就只能有一个broker来操作,不能谁都来操作,会导致数据冲突.所以需要在一群broker里面需要选择一个Leader broker来做这个操作.

优雅关闭Kafka

  1. 使用Kafka-server-stop.sh关闭。
  2. 千万别使用kill -9 pid关闭。
  3. 可以使用kill -s TERM $pid 关闭或者 kill -15 pid的方式关闭 优雅关闭的优点:一个是可以让消息完全同步到磁盘上,在服务下次重新上线时不需要进行日志的恢复操作。
    二是ControllerShutdown在关闭服务前,会对其上的leader副本进行迁移,这样就可以减少分区的不可用时间。

复制

复制功能是Kafka架构的核心功能,可分区,可复制,分布式的.是Kafka安全性,扩展性的重要组成部分.

Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区又有多个副本.这些副本被保存在broker上,每个broker可以保存成百上千的属于不同主题的分区的副本.

副本有两种类型:

首领副本:每个分区都有一个首领副本,为了保持一致性,所有生产者请求和消费者请求都会经过这个副本.

跟随副本:首领以为的副本都是跟随副本.跟随者副本不处理来自客户端的请求.它们唯一的任务就是从首领那里复制消息,当首领崩溃后,其中一个跟随者会成被提升为新首领.(主从同步)

跟随副本的有效性:首领副本在收到一条消息时,跟随副本会请求获取该消息并存储.某些时候跟随副本会出现问题,导致消息和首领副本消息不一致,这个跟随副本就是有问题的.设置跟随副本多久不活跃或者达到同步副本之前的时间:replica.lag.time.max.ms

控制器与复制图总结



从图中可以看出,控制器就是一个broker.所有broker都有读写权限,只是被选为控制器的broker多干了一点活,那就是创建topic时所做的那些事.

broker就是一台服务器或者说起了一个服务.

kafka请求处理

在我的前一章中《Kafka生产者与消费者》中讲解的很清楚了.

物理存储

Kafka 的基本存储单元是分区。分区无法在多个broker间进行再细分,也无法在同一个broker的多个磁盘上进行再细分。 所以,分区的大小受到单个挂载点可用空间的限制.在配置Kafka的时候,管理员指定了一个用于存储分区的目录清单-也就是log.dirs参数的值(不要把它与存放错误日志的目录混淆了,日志目录是配置在log4j.properties 文件里的)。该参数一般会包含每个挂载点的目录.接下来我们会介绍Kafka是如何使用这些目录来存储数据的。

首先,我们要知道数据是如何被分配到集群的broker上以及broker的目录里的。

然后,我们还要知道broker是如何管理这些文件的,特别是如何进行数据保留的。

随后,我们会深入探讨文件和索引格式。

最后,我们会讨论日志压缩及其工作原理。日志压缩是Kafka的一个高级特性,因为有了这个特性,Kafka 可以用来长时间地保存数据。

文件目录布局

Kafka中的消息是以主题为基本单位归类的,各个主题在逻辑上互相独立.每个主题又可以分为一个或多个分区.一个分区对应一个日志(Log),为防止Log文件过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个大文件被分割成多个小文件.

这里创建了一个主题my-topic-10-3,10个分区,3个副本.下图展示了一个broker下的分区情况

为了便于消息的饿检索,每个LogSegment中的文件(.log文件)都对应着2个索引文件:偏移量索引文件(.index文件)和时间戳索引文件(.timeindex文件).每个LogSegment都有一个基准偏移量baseOffset,用来表示当前LogSegment中的第一条消息的offset.偏移量是一个65位的长整型数.日志文件和两个索引文件都是根据基准偏移量命名的.名称固定位20位数字,没有达到的位则用0填充.比如第一个LogSegment的基准偏移量位0,则对应的日志文件位00000000000000000000.log;如果一个文件名为00000000000000000010.log,说明该LogSegment的基准偏移量位10.

分区分配

在创建主题时,Kafka首先会决定如何在broker间分配分区。假设你有6个broker,打算创建一个包含10个分区的主题,并且复制系数为3。那么Kafka总的就会有30个分区副本,它们可以被分配给6个broker。在进行分区分配时,我们要达到如下的目标。

  • 在broker间平均地分布分区副本。对于我们的例子来说, 就是要保证每个broker可以分到5个副本。
  • 确保每个分区的每个副本分布在不同的broker上。假设分区0的首领副本在broker2上,那么可以把跟随者副本放在broker3和broker4上,但不能放在broker2上,也不能两个都放在broker 3上.
  • 如果为broker指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的 broker上。这样做是为了保证整个机架的不可用不会导致整体的分区不可用。
    为了实现这个目标,我们先随机选择一个broker(假设是4),然后使用轮询的方式给毎个broker分配分区来确定首领分区的位置。于是,首领分区0会在broker4上,首领分区1会在broker5上,首领分区2会在broker0上(只有6个broker),并以此类推。然后我们从分区首领开始,依次分配跟随者副本.
    > 分区的时候Kafka会尽可能的将主分区放在不同的broker上,而每个主分区的副本也同样会放到与主分分区不同的broker上.这样就能达到当某台broker挂掉的时候,数据不丢失.

文件管理

保留数据是Kafka的一个基本特性,Kafka不会直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,Kafka管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。

因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以Kafka把分区分成若干个片段。 默认情况下,每个片段包含1GB或一周的数据,以较小的那个为准。在 broker往分区写人数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。当前正在写入数据的片段叫作活跃片段。 活动片段永远不会被删除,所以如果你要保留数据1天,但片段里包含了5天的数据,那么这些数据会被保留5天,因为在片段被关闭之前这些数据无法被删除。如果你要保留数据一周,而且每天使用一个新片段, 那么你就会看到,每天在使用一个新片段的同时会删除一个最老的片段-一所以大部分时间该分区会有7个片段存在。

注意:broker会为分区里的每个片段打开一个文件句柄,哪怕片段是不活跃的。 这样会导致打开过多的文件句柄,所以操作系统必须根据实际情况做一些调优。

文件格式

我们把Kafka的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的,因为使用了相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压和再压缩。

除了键,值和偏移量外,消息里还包含了消息大小, 校验和,消息格式版本号,压缩算法(Snappy,GZip或LZ4)和时间戳(在0.10.0版本里引人的)。时间戳可以是生产者发送消息的时间,也可以是消息到达broker的时间,这个是可配置的。

日志索引

消费者可以从Kafka的任意为可以偏移量位置开始读取消息.为了帮助broker更快的定位到指定的偏移量,Kafka为每个分区维护了一个索引.索引把偏移量映射到片段文件和偏移量在文件里的位置.

索引也被分成片段,所以在删除消息时,也要删除相应的索引.Kafka不维护索引的校验和,所以索引出现损坏,Kafka会通过重新读取消息并录制偏移量和位置来重新生成索引.

Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项目.每当写入一定量(log.index.interval.bytes=4KB默认值)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小改值,对应地可以增加或缩小索引项的密度.

日志分段文件文件切分包含以下几个条件,满足其一即可.

1)当前日志文件大小超过了broker端参数log.segment.bytes配置的值.默认为1GB.

2)当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大雨log.roll.ms或.hours配置的值.默认为hours=168,即7天.

3)偏移量文件或者时间戳索引文件的大小达到了log.index.size.max.bytes配置的值,默认为10MB.

4)追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE.

如果有必要,管理员可以手动删除索引,这样是绝对安全的,Kafka会自动更新生成这些索引.

日志删除

Kafka的日志管理中会有一个专门的日志删除任务来周期的检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数来配置(log.retention.check.interval.ms)默认值为300000,即5分钟.当前日志分段的保留策略有3种:基于时间,日志大小,日志起始偏移量的保留策略.

2019-12-01 12:11135