kafka管理命令

启动服务

1.kafka依赖于zookeeper,所以先启动zookeeper 进入到kafka目录

bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka

bin/kafka-server-start.sh config/server.properties

分区命令

创建topic

创建
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic my_message_topic

replication-factor:复制系数,就是创建多少个副本.(副本个数不能超出broker数量)

partitions:分区数,将一个主题分成n个分区存储到多个broker中.
可以使用--if-no-exists来指明如果topic已经存在则不重复创建.

--zookeeper localhost:2181 ,默认创建到了zookeeper跟目录来.可以使用--zookeeper localhost:2181/mykafka 这种方式来指定文件夹,当然一旦指定来文件夹,在修改,删除,新增分区都需要指定zk的文件夹.

添加分区

创建好topic之后,需要增加分区来进行扩容或提高单个分区的吞吐量,可以修改主题的分区数.

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 10  

添加--if-exists修改主题不存在时,则忽略错误.

调整基于key的主题来说,是很困难的,因为如果改变了分区的数量,key到分区之间的映射关系也会发生变化.所以基于key的主题来说,一开始就应该设置好分区数量,以免后期调整.

分区数量只能增多,不能减少.如果一定要减少,只能删除topic然后重新创建.

删除主题

如果一个主题不在使用,只要它还存在集群里面,就会占用一定数量的磁盘空间和文件句柄,删除它就可以节约资源.

为了能够删除主题,broker的delete.topic.enable参数必须设置为true,如果设置为false,则删除topic的请求就会被忽略.

删除topic
kafka-topics --delete --zookeeper 127.0.0.1:2181 --topic my-par-test

删除主题是一个不可逆的过程.执行删除动作最好小心哦.

topic信息查看

  1. 查看所有topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

  1. 查看主题详细信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_message_topic

在该命令后面加入--under-replicated-partitions可以列出所有不同步副本的分区.

或者加入--unavailable-partitions参数可以列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是不可用的.

发送消息

生产者,发送消息到kafka,使用topic test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_message_topic

接收消息

老版本
消费者 消费kafka消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_message_topic --from-beginning
vs
新版本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_message_topic --from-beginning

消费者群组

Kafka在老版本中保存消费者群组的信息放入zookeeper中,而新版本则放入到了broker中.

老版本查看消费者群组
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
vs
新版本
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server kafka.com:9092 --list

查看消费群组详细信息

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group testGroup
  • group:群组名字
  • topic:在正被读取的主题名字
  • partition:正被读取的分区id
  • current-offset:消费者群组最近提交的偏移量.
  • log-end-offset:当前高水位偏移量,也就是最近一个被读取消息的偏移量.
  • lag:current-offset与log-end-offset之间的差距
  • awner: 消费者群组正在读取该分区的消费者.这个是消费者id

列出被覆盖的配置

kafka-config.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name my-topic

这里只能显示被覆盖的配置,如果没有配置被覆盖则不显示信息

broker配置了一次,消费者自己又配置了一次.这样就会导致配置覆盖.

2019-12-05 13:1184