Kafka生产者与消费者由浅入深(一)

Kafka是一款高效的MQ,以及流处理服务

Kafka介绍

  • 消息: Kafka的数据单元被称为消息,可以把消息看成数据库里面的一条或一行数据.消息有字节数组成,所以对于Kafka来说,消息里面的数据没有特别格式或含义.
  • 序列化: Kafka一般来说使用Apache Avro来序列化数据,这种序列化格式,将模式与消息体分开,当模式发生变化时,不需要重新生成代码,它还支持强类型和模式进化,以及向其前兼容也向后兼容.
  • 主题和分区: Kafka的消息是通过主题进行分类的,一个主题就是一个队列(好比数据库的表一样),主题可以被分为若干个分区,一个分区就是一个提交日志文件.消息以追加的方式写入分区,先进先出.
  • 生产者: 消息的写入者,一般一个消息会被发布到一个特定的主题上.生产者在默认的情况下会把消息均衡地分布到主题的所有分区上.在某些情况下可以通过消息健(key)和分区器来实现将消息直接写到某个制定的分区.这样可以保证包含同一个key的消息会被写到同一个分区上.可以通过自定义分区器,根据不同的业务规则将消息映射到分区.
  • 消费者: 读取消息者.订阅一个或多个主题,并按照消息生成的顺序读取它们,消费者通过检查消息的偏移量来区分已经读取过的消息.
  • 消费者组: 消费者是消费群租的一部分,有一个或者多个消费者读取同一个主题,群组保证每个分区只能被一个消费者读取.(一个群组有多个消费者,消息被消费按照群组来区分,同一条消息只能被群组里面的一个消费者消费;同样的,一条消息可以被不同群组多次消费互不影响,A消息被群组1消费过了,群组2还可以再次消费A,因为它们是不同群组)
  • broker: 一个独立的Kafka服务器被称为broker,broker接受生产者的消息,为消息设置偏移量,并提交消息保存到磁盘.同样为消费者提供服务,读取分区的请求作出相应,读取磁盘信息并返回给消费者.
  • 集群: 多个broker组成集群.每个集群都有一个broker同时充当集群控制器的角色(自动从集群的活跃成员中选举出来).控制器负责管理工作,包括将分区分配给broker和监控broker.
  • 消息保留: Kafka 默认的消息保留策略是:要么保留一段时间(比如7天)要么保留消息达到一定大小的字节数(比如1GB),当消息数量达到这些上限时,旧消息就会过期被删除.
  • 基于磁盘的消息保留.

基本安装

由于官网给的安装信息已经非常直接明了了.所以这里不准备写了.

官网安装教程

本地macbook 安装

1.只需要下面即可
> brew install kafka

2.安装成功后启动zk
> zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

3.进入/usr/local/etc/kafka/,copy出server.properties两个文件,这样就可以启动三台Kafka了
server-1.properties:
    broker.id=1  
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1

server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

> kafka-server-start /usr/local/etc/kafka/server.properties
> kafka-server-start /usr/local/etc/kafka/server1.properties
> kafka-server-start /usr/local/etc/kafka/server2.properties

Kafka几个默认的重要参数

参数名 作用 默认值
num.partitions 指定新创建的主题将包含多少个分区 1
log.retention.hours 指定Kafka根据时间来决定数据可以保留多久 168小时
log.retention.bytes 指定消息字节数大小来判断消息是否过期,默认值为1个分区1G,如果有八个分区则能存储8G的数据 byte
log.segment.bytes 上面的是指定日志片段上,而不是单个消息上.而该参数是针对单个消息的日志片段,当达到某个值时,将创建一个新的日志片段. 1GB
log.segment.ms 和上面一个一样, 一个根据大小,一个根据时间 毫秒
message.max.bytes 设置单个消息提最大值,超过设定值则发送消息失败 1000000(1MB)

注意: 如果主题的消息量不大,如何调整log.segment.bytes这个参数尤为重要,例如,一个主题每天只接受100MB的消息,而log.segment.bytes使用默认值,那么需要10天才能填满一个日志片段.因为在日志片段被关闭之前消息是不会过期的,所以如果log.retention.hours设置为168小时(7天),那么日志片段则需要17天才会过期,因为关闭日志片段需要10天的时间,而关闭后还需要保留7天时间(要等到日志片段里的最后一个消息过期才能被删除).

生产者

客户端发送一条消息到Kafka服务器流程图



我们从创建一个ProducerRecord对象开始,ProducerRecord对象需要包含目标主题和要发送的内容。 我们还可以指定键或分区。在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。接下来,数据被传给分区器。如果之前在ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord 对象的键来选择一个分区。 选好分区以后, 生产者就知道该往哪个主题和分区发送这条记录了。 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。 有一个独立的线程负责把这些记录批次发送到相应的broker上.

下面的例子将使用SpringBoot集成kafka2.x

创建生产者

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

key,value都需要指定序列化,在网络传输过程中需要将数据转换成二进制数组才行.

也可以自定义序列化只需要继承org.apache.kafka.common.serialization.Serializer接口即可

bootstrap-servers:需要指定一个,另外几个在链接Kafka之后会自动获取到集群中的所有机器,但是为了保证可用性,当某台服务挂了之后可用,多填写几个地址总会是好事.

发送消息

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message) {
        log.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);//1.无返回值,无等待.丢失消息后生产者自己无法感知
        kafkaTemplate.send(topic, message).addCallback(o -> log.info("o=={}", o), throwable -> log.error("发送错误", throwable));//2.异步返回结果.
        kafkaTemplate.send(topic, message).get(); //3.同步返回结果.阻塞

    }

生产者配置

spring:
  kafka:
    producer:
      acks: 1
      buffer-memory: 10240 #生产者内存缓冲区大小(应用程序发送的速度大于Kafka接受速度时的一个缓冲大小)
      compression-type: snappy # gzip,lz4 消息压缩算法,默认不压缩
      retries: 3 #重试错误
      client-id: mykafka #客户端标示,可以做日志追踪

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写人是成功的。这个参数对消息丢失的可能性有重要影响.

该参数有如下选项。

  • acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。 也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。 不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  • acks=1, 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。 如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失, 生产者会重发消息。 不过,如果一个没有收到消息的节点成为新首领, 消息还是会丢失。 这个时候的吞吐量取决于使用的是
  • acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应.比acks=1延时更高,但也更安全.

分区器

ProducerRecord对象包含了目标主题键和值。 Kafka的消息是一个个键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不过大多数应用程序会用到键.

键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说, 如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取.

如果键值为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列(使用Kafka自己的散列算法,即使升级Java版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(会是所有分区中的任意一个包含了不可用分区)。 这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。 这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。 但这种情况很少发生.

自定义分区策略

继承Partitioner即可自定义分区策略

public class CustomerPartitionr implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
        Integer partitions= cluster.partitionCountForTopic(topic);
        if (keyBytes == null || (!(key instanceof String))) {
            throw new InvalidRecordException("key 不能为空");
        }
        //特殊
//        if (String.valueOf(key).equals("..")) {
//
//        }
        //使用kafka自己的散列,然后求余放入到某个地方
        return (Math.abs(Utils.murmur2(keyBytes)) % (partitions - 1));
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitionr.class.getName());

拦截器

拦截器可以拦截一些不符合要求的信息,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计之类的.

拦截器可以继承ProducerInterceptor接口来实现

生产者发送原理


整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator 中获取消息并将其发送到Kafka中.RecordAccumulator主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0)方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为60000,即60秒.

RecordAccumulator其实就是生产者发送消息的速度大于发送到服务器处理的速度,这个时候消息累加器就起到缓冲作用了

RecordAccumulator是一个双端队列, RecordAccumulator的内部为每个分区都维护了一个双端队列.

Kafka处理请求

broker的大部分工作是处理客户端、分区副本、控制器发送给分区首领的请求,Kafka提供了二进制协议(基于TCP),指定了请求消息的格式来处理所有请求的.

broker会在它所监听的每个端口上运行一个Acceptor线程,这个线程会创建一个连接,并把它交给Processor线程去处理.Processor线程的数量可以配置的.

客户端怎么知道发送到哪里呢? 客户端使用了另一种请求类型,也就是元数据请求.这种请求包含了主题列表,主题的所有分区,分区中的副本,副本首领等信息.

生产者请求

配置生产者的时候有一个acks参数一该参数指定工需要多少个broker 确认才可以认为个消息写人是成功的,不同的配置对 “写人成功” 的界定是不一样的,如果acks=1,那么只要首领收到消息就认为写人成功;如果 acks=all,那么需要所有同步副本收到消息才算写人成功; 如果acks=0,那么生产者在把消息发出去之后,完全不需要等待broker的响应,直接认为写入成功.之后,消息被写人本地磁盘.

在Linux 系统上,消息会被写到文件系统缓在里,并不保证它们何时会被刷新到磁盘上。Kafka不会一直等待数据被写到磁盘上一一它依赖复制功能
来保证消息的持久性。

在消息被写人分区的首领之后,broker开始检查acks配置参数一一如果acks被设为0或1,那么broker立即返回响应;如果acks被设为all,那么请求会被保存在一个叫作炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端.

其实说白了就是消息会被存储到系统缓存里面,并不会立即刷新到磁盘(取决于系统).而Kafka不关心是否刷新到磁盘否,只关心acks参数以及副本的同步情况.

在acks参数配置中,如果是all,那么需要将所有消息同步到副本之后,消费者才能看见消息,才能消费该消息.

获取请求

获取的请求跟发送请求相似.客户端发送请求,向broker请求主题分区里具特定偏移量的消息.比如:获取主题test分区0偏移量从50开始的消息给我(Kafka是主动拉取).消费者在拉取数据的时候有可能为空,有可能有数据比如:客户端拉取0-50,现在服务端只有0-10,这个时候会等到有50条消息时才会返回给客户端.但是这个是可以设置时间的.客户端可以等待一段时间,等待消息满了之后在返回.但是如果时间过长则有多少返回多少.

在老版本中Kafka消费者使用zookeeper来跟踪偏移量,在消费者启动的时候,它通过检查zookeeper上的偏移量就知道从哪里开始处理消息了.而新版本2.x中使用的Kafka自己特定的主题来保存偏移量.在Kafka服务中默认创建了offsetCommitRequest主题,专门用来存放偏移量信息

消费者

kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息.

消费者与分区对应关系如下:




  • 一个partition对应一个消费者,多个partition可以对应一个消费者.
  • 如果消费者数量多于partition数量,则多余的消费者不会有消息消费.
  • 消费者是横向伸缩消费能力的.如果需要更多的消费者,可以多设置分区.
    > 对于消费者而言,一个线程对应一个分区是最好的选择,一个消费者对应一个分区.

多个群组的情况:

说明:同一条消息,可以被多个群组消费.他们之间是互不影响.(这里也就解决了,一条消息怎么发送给多个不同的消费者呢?答:让消费者在不同群组里面即可)
如果希望topci的一条消息被多个消费者同时消费,消费者只需要在不同的groupId里面即可.

消费者群组与分区再均衡

我们已经从上一个小节了解到,群组里的消费者共同读取主题的分区。 一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。 当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。 在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。 再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。 在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。 另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失, 它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器) 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。 只要消费者以正常的时间间隔发送心跳, 就被认为是活跃的, 说明它还在读取分区里的消息。 消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。 如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它死亡,就会触发一次在均衡.

分配分区是怎样一个过程

当消费者要加人群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为 “群主”。 群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。

Kafka内置了两种分配策略,分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。这个过程会在毎次再均衡时重复发生。

创建消费者

springboot配置

spring:
  kafka:
    consumer:
      group-id: my-kafka3
      auto-offset-reset: earliest #latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false #手动提交,(为了测试,正常情况下自动提交即可)
      #auto-commit-interval: 3s #自动提交频率每3s提交一次最大offset. (如果enable-auto-commit参数设置为自动提交则默认5s消费者会自动提交最大偏移量)
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    listener:
      ack-mode: batch #如果enable-auto-commit设置为false时,手动提交的模式。批量提交还是咋提交的。

app:
  topic:
    foo: my-replicate-topic

重要参数:auto-offset-reset: earliest #latest
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录.这个参数还可以设置为none,如果找不到消费位移则直接报出NooffsetForPartitionException.

(earliest参数,我在做测试的时候,改为手动提交,消费后并没有提交偏移量.每次重启之后都会从头开始拉取数据,说明上面说的是对的,你消费了,没有提交偏移量,下次消费者重启之后还会在从头开始消费)

消费者

@Service
@Slf4j
public class Receiver {

    @KafkaListener(topics = "${app.topic.foo}")
    public void receive(@Payload String message, @Headers MessageHeaders headers) {
        log.info("received==0 message={}", message);
        headers.keySet().forEach(key -> log.info("{}: {}", key, headers.get(key)));
    }

    //设置群组id
    @KafkaListener(topics = "${app.topic.foo}",groupId = "my-group-1")
    public void receive1(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        log.info("received==1 message={},partition={}", message,partition);
    }

    //和上面的属于两个群组,但是监听了同一个topic.这样同一个topic,同一条消息,但是不同的群组都能收到消息.
    @KafkaListener(topics = "${app.topic.foo}",groupId = "my-group-2")
    public void receive2(@Payload String message, @Header(KafkaHeaders.OFFSET) int offset, Acknowledgment ack) {
        log.info("received==2 message={},offset={}", message,offset);
        ack.acknowledge(); //开启了手动提交
    }

    //指定partition=0,从0分区读取数据,其他区不读取。initialOffset=0,从offset=0开始读取数据
    //@KafkaListener(groupId = "my-group-3",
            //topicPartitions = @TopicPartition(topic = "${app.topic.foo}", partitionOffsets = {@PartitionOffset(partition="0", initialOffset ="0")}))
    public void receive3(@Payload String message,@Header(KafkaHeaders.OFFSET) int offset) {
        log.info("received==3 message={},offset={}", message,offset);
    }

    //读取0,1分区数据,默认从offset0开始读取
//    @KafkaListener(groupId = "my-group-4",topicPartitions = @TopicPartition(topic = "${app.topic.foo}", partitions = { "0", "1" }))
    public void receive4(@Payload String message,@Header(KafkaHeaders.OFFSET) int offset) {
        log.info("received==4 message={},offset={}", message,offset);
    }
}

注意:Kafka的消费者是主动拉取,不是Kafka主动推送消息.不过这里Spring已经帮我们封装好了,所以没有看到一个无限循环在一直拉取数据的操作.感觉上像是被推送数据一样.

消费者手动提交

1. 配置手动提交
spring:
  kafka:
    consumer: 
      enable-auto-commit: false #手动提交,(为了测试,正常情况下自动提交即可) 
    listener:
      ack-mode: batch #如果enable-auto-commit设置为false时,手动提交的模式。批量提交还是咋提交的。

2.代码里面自动提交
@KafkaListener(topics = "${app.topic.foo}",groupId = "my-group-2")
    public void receive2(@Payload String message, @Header(KafkaHeaders.OFFSET) int offset, Acknowledgment ack) {
        log.info("received==2 message={},offset={}", message,offset);
        ack.acknowledge(); //开启了手动提交
    }
2019-12-01 12:11153