Kafka分区(五)

主题和分区是Kafka的两个核心概念,生产者消费者都是在主题和分区层面做的操作.主题做为消息的归类,可以再细分为一个或多个分区,分区可以看作对消息的二次归类.

从Kafka 的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。下面将从创建分区开始

Broker分区的管理

在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器,只不过它除了具有一般broker的功能之外,还负责分区首领的选举,整个集群中所有分区和副本的状态管理。(控制器broker和普通broker有相同的功能,只不过多了一些管理功能)

优先副本的选举

分区使用多副本机制来保证可靠性,但只有leader副本对外提供读写服务,而follower副本则只负责在内部同步消息.如果一个任意leader副本不可用,那么就意味着整个分区变得不可用.此时就需要Kafka从follower副本中挑选一个新的leader副本来继续对外提供服务.

如果一个主题的一个分区的leader不可用,该主题的所有分区都变成不可用了.(在没有副本或者副本也是坏的情况下,如果有副本,副本会自动选举为leader.这里说的是分区的leader一直不可用,就是跟随副本也没有起来的情况才会有leader不可用)

例子:总的有三台broker,创建了10个分区,3个副本(不能超过broker数量)



(分区图一)

看图分析:可以看到10个分区,均匀的分布到在3台服务器上

Topic: my-topic-10-3 Partition: 0(分区) Leader:0(0分区的Leader副本在第0broker上) Replicas: 0,1,2(副本集合0,1,2排序的) Isr: 0,1,2

Topic: my-topic-10-3 Partition: 1(分区) Leader:1(1分区的leader副本在第1个broker上) Replicas: 1,2,0(优先副本,排在第一的就是leader) Isr: 1,2,0

Topic: my-topic-10-3 Partition: 3(分区) Leader:0(3分区的leader副本在第0个broker上) Replicas: 0,2,1 Isr: 0,2,1

从上图可以看出,分区0,3,6,9的leader副本都在0broker上.

在0broker服务器上面我看到了如下图所示(在1broker,2broker服务器上看到的也是,因为每个分区都有三个副本,而正好只有三台服务器,10*3=30,分布到3台服务器,就是每台10个)



(分区图二)

看0broker上面的看到了10个分区,其他broker也是10个分区,也都是从0-9这样的分区.从(分区图一)查看topic详情可以看出,在0broker上面,0,3,6,9是leader副本,它们负责读写,在0broker上面的1,2,4,5,7,8副本都是分布在1,2broker上面的leader副本的跟随副本.就像上面分析的一样,0分区的leader副本在0broker上,另外2个副本在1broker,2broker上,所以在1broker上看到的my-topic-10-3-0分区,其实是0broker上面my-topic-10-3-0的一个跟随副本.同样的,在0broker上的1分区,其实是一个跟随副本,而它的leader副本在1broker上面.

针对同一个分区而言,同一个broker节点不可能出现它的多个副本,即Kafka集群的一个broker中最多只能有它的一个副本.

在某个broker宕机后,就会导致该broker上的leader不可用,随即开始leader选举.假如0broker宕机,那么0broker上的0,1,3,6,9分区Leader副本将不可用,这几个分区将会在1,2broker上的跟随副本中选举Leader,如果新选举的Leader都在1broker上面,则会导致1broker,2broker负载一个高,一个低,1broker将承担很大的读写压力.

Kafka为了很好的做到负载均衡,引入了优先副本的概念,所谓优先副本就是指AR集合列表(Replicase列表),在集合列表中排在第一的就是Leader副本,可以看到(分区图一)0分区的优先级为0,1,2而3的优先级为0,2,1,所以0,3分区的Leader分区都在0broker上面.当0broker挂掉的时候,0分区会选择1broker上的0分区做为Leader副本,而3分区的会选择2broker上的3分区做为Leader副本.

优先副本选举并不代表,Kafka在再均衡中就一定能达到均衡的状态(在生产环境中还需考虑broker硬件的问题),本小节主要是讨论优先副本的用途.

优先副本的均衡依赖于创建分区是是否能均衡的分配Leader副本和follower副本.

分区重分配

  1. 当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了, 在节点恢复前,相应的数据也就处于丢失状态:如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他follower副本中。总而言之,这个节点上的分区副本都已经处于功能失效的状态,Kafka并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节点上,如果放任不管,则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性.
  2. 当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。
  3. 当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。

    上图所示,当broker2宕机之后,broker2上面的Leader自动选举之后转移到了0,1broker上了,之后我恢复了broker2,虽然broker2可用了,但是broker2上面没有了Leader分区,这样0,1broker节点就的读写负载就会变高,而broker2则一直处于空闲状态.
    为了解决上面的问题,Kafka提供了kafka-reassign-partitions.sh来手动重新分配,当然Kafka也会在一定条件下触发.
    在我自己电脑上测试的时候,broker2恢复后,确实上面没有leader分区,等我过了几天看的时候如下图:

    leader分区被均匀的分配到了0,1,2broker

分区重分配的本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终目的.

重新分区

我们写错了分区数,或者增加了broker,都可以修改topic的分区个数的.《这里我个人很少用到,就不做说明了,这里只是说明一下,可以修改分区个数》

客户端分配分区

这里指的是分区与客户端(生产者,消费者都属于客户端,Kafka本身是服务端)的关系策略,并不是初始创建分区策略哦。

Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与主题之间的分区分配策略。默认情况下使用RangeAssignor,除此之外Kafka还提供了两种策略:RoundRobinAssignor和StickyAssignor。可以设置多个策略,逗号分隔即可。


图中有2个客户端,都订阅了topic1,topic两个主题.而两个主题分别有3个分区. 目前来看,2个客户端分配比较均匀,每个客户端都承担了3个分区的任务.但是如果分区增加2个他们还会怎么均匀的分担任务吗?看下面的客户端分配分区策略吧.

RangAssignor分配策略

RangAssignor分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能分配给所有的消费者。RangAssignor 会将消费者组内所有订阅这个主题的消费者按照名字的字典序排序,然后为每个消费者划分固定的分区范围。
n=分区数/消费者数量,m=分区数%消费者数量。那么前m个消费者每个分配n+1个分区,后面的(消费者-m)个消费者每隔分配n个分区。

假设:2个消费者C1,C2,订阅了2个主题A,B,每个主题有4个分区。

所有分区可以标示为:A0,A1,A2,A3,B0,B1,B2,B3;

最终分配结果为:

消费者C1 : A0,A1,B0,B1

消费者C2 : A2,A3,B2,B3

分配很均匀,但是如果每个主题只有3个分区。

消费者C1 : A0,A1,B0,B1

消费者C2 : A2,B2

分配逻辑:先给C1消费者分配A主题,A主题有3个分区,先给C1,分配A0,A1 然后给C2分配的时候只剩下A2了,那就分配A2给C2吧。B分区一样的分配,最终就变成上面的结果了。

可以看到再RangAssignor分配策略下,C1消费者承担了很大压力,他需要负责4个分区的读写,而C2只需要负责2个分区的读写

RoundRobinAssignor分配策略

RoundRobinAssignor策略是按照消费组内所有消费者及消费者订阅的所有主题分区按照字典序排序,然后轮询的方式逐个将分区以此分配给每个消费者。

比如:消费者C0,C1 都订阅了A,B两个主题,A,B两个主题都有3个分区。

C0: A0

C1: A1

C0: A2

C1: B0

以此分配下去;最终分配结果为:

C0: A0,A2,B1
C1: A1,B0,B2; 结果很均匀

但是:消费者C0,C1,C2 主题以分区为:A(A0分区),B(B0,B1),D(D0,D1,D2三个分区);

C0订阅了A;C1订阅了A,B; C2订阅了A,B,D主题。

分配规则如下:

C0: A0;

C1: B0;

C2: B1;

C0:没有需要分配的了。A已经订阅,分区已经分配完成。

C1: 没有需要分配的了,订阅了A,B,分区已经分配完成,订阅结束。

C2:订阅了A,B已经分配完成。接着分配D主题。

C2:D0,D1,D2;

最终结果:

C0:A0;

C1:B0;

C2:B1,D0,D1,D2;

C1完全可以再分担一下B1分区的任务,但是现在全部落在了消费者上。

StickyAssignor分配策略

StickyAssignor分配策略就是

1.分区的分配要尽可能的均匀;

2.分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。

消费者协调器和组协调器

消费者分区再均衡原理

消费者组分成多个子集,每个消费者组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费者组的组建。

触发在均衡的操作:

  • 有新的消费者加入消费组
  • 有消费者宕机下限,消费者并不一定需要真正下线,例如遇到GC时间长,网络延迟导致消费者长时间没有发送心跳到Kafka服务器,Kafka就会认为消费者已经下线。
  • 消费者 主动退出消费组,比如:取消某主题的订阅
  • 消费组所对应的节点发生了变更。
  • 消费组内所订阅的任意主题活着主题的分区数量发生变化。

说白了就是消费者有数量变化或者分区增加都会导致在均衡。

如何选择合适的分区数

Kafka生产者压力测试

kafka-producer-perf-test --topic my-topic-1-3 --num-records 10000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1

结果:
10000 records sent, 2872.737719 records/sec (2.81 MB/sec), 1265.46 ms avg latency, 2967.00 ms max latency, 673 ms 50th, 2944 ms 95th, 2964 ms 99th, 2967 ms 99.9th.

示例中在使用kafka-producer-perf-test.sh脚本时用了多个参数

  • topic:用来指定生产者发送消息的目标主题
  • num-records:用来指定发送消息的总条数
  • record-size 用来设置每条消息的字节数
  • producer-props 参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔,与producer-props 参数对应的还有-一个producer.config参数,它用来指定生产者的配置文件
  • throughput用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时, 当发送的吞吐量大于该值时就会被阻塞一段时间。
  • print-metrics:打印更多详细信息,测试指标

结果解释:

673 ms 50th :意思是50%的消息使用了673ms;

2944 ms 95th:95%的消息使用了2944ms;

2964 ms 99th:99%的消息使用的时间;

2967 ms 99.9th: 99.9%的消息处理时间.

我在测试本机时用了1万条消息,每条消息1Mb.以上时处理速度时间.2.81 MB/sec, 2872.737719 条/秒.

Kafka消费者压力测试

kafka-consumer-perf-test --topic my-topic-1-3 --messages 10000 --broker-list localhost:9092

结果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-01-16 14:58:30:866, 2020-01-16 14:58:34:873, 9.7656, 2.4371, 10000, 2495.6326, 2558, 1449, 6.7396, 6901.3112

start.time起始运行时间, end.time结束运行时间,data.consumed.in.MB消费的消息总量(MB单位),MB.sec按字节大小计算的消费吞吐量, data.consumed.in.nMsg消费的消息总数,nMsg.sec按消息个数计算的吞吐量,rebalance.time.ms平衡时间,fetch.time.ms拉取消息的持续时间,fetch.MB.sec每秒拉取消息的字节大小,fetch.nMsg.sec每秒拉取的个数.

是否分区数越多吞吐量就越好?

分区时Kafka中最小的并行操作单元,对生产者而言,每个分区的数据写入时完全并行的;对于消费者而言,Kafka只允许单个分区中的消息被一个消费者线程消费,一个消费组的消费并行度完全依赖于消费的分区数.这样看来分区数越多,吞吐量越大,但是正是这样的吗?

消息吞吐量受硬件,消息大小,消息压缩方式,消息发送方式(同步/异步),消息确认acks,副本因子等因素的影响.

但是随着分区数的增长,相应的吞吐量也长,但是分区数达到某个阀值之后,吞吐量就会下降.

分区数的上线

一个分区,需要在服务器上打开一个文件,对于Linux来说ulimit -n;ulimit -Sn软限制;ulimit -Hn硬性限制;文件描述符.

还有Kafka基于jvm容器,这样分区数太多,就会导致jvm,文件描述符,io等增加,最后导致服务器不可用.所以上线看呢服务器,而不是一味最求最大.

replication与partitions

先看一个简单的创建topic(假如这里有三台服务器的一个Kafka集群)

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic test

replication-factor是复制的意思,在创建topic时我们指定replication-factor该参数的值(必须小于或等于kafka集群数量).

partitions:是分区的意思.一个主题分成几个区存储.

replication partitions topic名称 服务器A 服务器B 服务器C 说明
1 1 my-topic-r1-p1 my-topic-r1-p1-0 只有一个分区,一个文件
3 1 my-topic-r3-p1 my-topic-r3-p1-0 my-topic-r3-p1-0 my-topic-r3-p1-0 只有一个分区,但是复制了3份,3份的数据是一样的
1 3 my-topic-r1-p3 my-topic-r1-p3-0 my-topic-r1-p3-1 my-topic-r1-p3-2 三个分区,同一个文件被拆分为3个文件,数据是不一样的
1 7 my-topic-r1-p7 my-topic-r1-p3-0
my-topic-r1-p3-1
my-topic-r1-p3-2
my-topic-r1-p3-3
my-topic-r1-p3-4
my-topic-r1-p3-5
my-topic-r1-p3-6
7个分区,一个文件被拆成7个文件,数据不一样哦
3 3 my-topic-r3-p3 my-topic-r3-p3-0
my-topic-r3-p3-1
my-topic-r3-p3-2
my-topic-r3-p3-0
my-topic-r3-p3-1
my-topic-r3-p2
my-topic-r3-p3-0
my-topic-r3-p3-1
my-topic-r3-p3-2
这个最好理解了,三个分区,每个分区存放一台服务器,但每个分区被复制了三份分别存储在了服务器A,B,C上面

这个表格一个看也许会晕,慢慢解说就懂了.

partitoons的意思是将一个topic分几个区存储.partitoons=3相当于把一个文件(topic)拆分成三份(topic-0,topic-1,topic-2),分别放到n台服务器上面

replication的意思是一个topic复制几份.replication=2相当于把一个文件(topic)复制出一份来(topic-0(原始),topic-0(copy))

--replication-factor 2 --partitions 2 --topic my-topic的意思就是将my-topic分2个区,创建成功后会有my-topic-0,my-topic-1两个文件,因为有3台服务器所以随机将这2个分区文件分配到了服务器A(my-topic-0),服务器B(my-topic-1),服务器C(空). replication-factor=2,意思是将my-topic分区的分区复制1份出来,服务器A,B都有文件了,这个时候根据kafka的一些分配策略,服务器C(my-topic-0,my-topic-1). 从而my-topic有两个分区0,1.他们都可以读写,同时0,1都有一个备份存在服务器C上面.他们之间是实时同步的.

2019-12-05 12:1185