目录消费者组管理kafka-consumer-groups.sh1.查看消费者列表--list2.查看消费者组详情--describe3.删除消费者组--delete4.重置消费组的偏移量...
目录
消费者组管理 kafka-consumer-groups.sh1. 查看消费者列表--list
2. 查看消费者组详情--describe
3. 删除消费者组--delete
4. 重置消费组的偏移量 --reset-offsets
5. 删除偏移量delete-offsets
消费者组管理 kafka-consumer-groups.sh
1. 查看消费者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

先调用MetadataRequest拿到所有在线Broker列表 再给每个Broker发送ListGroupsRequest请求获取 消费者组数据
2. 查看消费者组详情--describe
DescribeGroupsRequest
查看消费组详javascript情--group 或 --all-groups
查看指定消费组详情--group sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
查看所有消费组详情--all-groups sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups 查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息

查询消费者成员信息--members
所有消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查询消费者状态信息--state
所有消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootswww.cppcns.comtrap-server xxxx:9090
指定消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 删除消费者组--delete
DeleteGroupsRequest
删除消费组--delete
删除指定消费组--group sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
删除所有消费组--all-groups sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常
Error: Deletion of some consumer groups failed: * Group 'test2_consumer_group' could not be deleted due to: Java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. 重置消费组的偏移量javascript --reset-offsets
能够执行成功的一个前提是 消费组这会是不可用状态;
下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果; 等你想真正执行的时候请换成参数--excute ;
下面示例 重置模式都是 --to-earliest 重置到最早的;
请根据需要参考下面 相关重置Offset的模式 换成其他模式;
重置指定消费组的偏移量 --group
重置指定消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定消费组的指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
重置所有消费组的偏移量 --all-group
重置所有消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有消费组中指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
--reset-offsets 后面需要接重置的模式
相关重置Offset的模式
--to-offset 例子
--to-offset 3465

--from-file着重讲解一下
上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;
先配置cvs文档 格式为: Topic:分区号: 重置目标偏移量
test2,0,100 test2,1,200 test2,2,300
执行命令
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. 删除偏移量delete-offsets
能够执行成功的一个前提是 消费组这会是不可用状态;
偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2
相关可选参数
以上就是kafka运维consumer-groups.sh消费者组管理的详细内容,更多关于kafka运维consumer groups sh的资料请关注我们其它相关文章!










