一段孤独的代码 一段孤独的代码

A Lonely Code

目录
Kafka学习笔记
/      

Kafka学习笔记

Kafka

Kafka 只是消息引擎系统吗?

Apache Kafka 是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform).

起源

Kafka 时 Linkedin 公司内部孵化的项目。

优势

  1. 更容易实现端到端的正确性
  2. 对于 Kafka 流式计算的定位
    1. 官网上明确标识 Kafka Streams 是一个用于搭建实时流处理的客户端库而非一个完整的功能系统

其他功能

可以作为一个分布式存储,但是没有实际生产使用,只是可以使用Jay Kreps

Kafka 部署方案

操作系统

  1. Linux:
    1. I/O 模型的使用
      1. epoll:更高的性能
    2. 数据网络传输效率
      1. Zero Copy:快速数据传输,避免昂贵的内核态数据拷贝
    3. 社区支持度
      1. 社区不承诺修复 WindowsBug

硬件

  1. 磁盘:建议使用机械磁盘
    1. 顺序读写避免机械硬盘随机读写慢的问题
    2. 不可靠又软件机制保证
  2. 磁盘空间:综合考虑多个元素
    1. 新增消息数
    2. 消息留存时间
    3. 平均消息大小
    4. 备份数
    5. 是否启用压缩
  3. 带宽:带宽容易出现瓶颈
    1. 根据使用经验当带宽超过 70% 的阈值就有网络丢包的可能性
    2. 消息总大小除以单台的带宽大小计算总台数

参数配置

Broker

日志

  1. log.dirs:若干个日志文件目录路径,无默认值
  2. log.dir:补充上一个参数用

log.dirs 应该设置到多个物理磁盘上:

  1. 提升读写性能
  2. 实现故障转移

Zookeeper

  1. zookeeper.connect:Zookeeper 连接字符串
  2. listeners:监听器
  3. advertised.listeners:Broker 对外发布的监听器,即对外的监听器
  4. host.name/port:过期参数,不设置

监听器,是一个逗号分割的三元组,每个三元组的格式为 < 协议名称,主机名,端口号 >.这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输,SSL 表示使用 SSL 或者 TSL 加密传输,也可以是自己定义的协议名字比如 CONTROLLER://localhost:9092

如果定义了协议名称,必须要指定 listener.security.protocol.map 参数告诉这个协议底层使用了那种安全协议,比如指定 listener.security.protocol.map=CONTROLLER:PLAINTEXT 表示这个自定义协议底层使用了明文传输

建议在主机名上使用主机名最好不要使用 ip 可能会发生无法连接的问题

Topic

  1. auto.create.topic.enable:是否允许自动创建 topic,建议 false
  2. unclean.leader.election.enable:是否允许 Unclean Leader 选举,false 是不允许落后太多的副本选举,默认是 false
  3. auto.leader.reblance.enable:是否允许定期进行 Leader 选举,true 允许定期重选举 Leader,建议 false

数据留存

  1. log.retention.{hour|minutes|ms}:这个是控制一条数据被保存多久时间,ms 最高,minutes 其次,hours 最低
  2. log.retention.bytes:是指定 Broker 为消息保存的总磁盘容量大小,默认-1,即多大都可以
  3. message.max.bytes:控制 Broker 能够接受的最大消息大小,默认值不到 1MB,生产环境无法使用

Topic

消息保存

  1. retention.ms:该 Topic 消息被保存的时长。默认为 7 天,该值会覆盖 Broker 参数值
  2. retention.bytes:该 Topic 预留多大磁盘空间,默认是-1,即无限
  3. max.message.bytes:Broker 能够接收该 Topic 的最大消息大小

设置方式

  1. 创建 Topic 时设置:bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
  2. 修改 Topic 时设置:bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

JVM 参数

GC

  1. CPU 充足时:建议使用 CMS:-XX:+UseCurrentMarkSweepGC
  2. 否则吞吐量收集器:-XX:+UseParallelGC

如果使用的 JAVA8,那么使用默认的 G1 就好

Kafka

  1. KAFKA_HEAP_OPTS:指定堆大小
  2. KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数
 $> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
 $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
 $> bin/kafka-server-start.sh config/server.properties

Kafka 分区机制

Kafka 消息组织方式的三级结构:主题-分区-消息

实现高伸缩性,分区被分布于不同的节点,数据读取针对分区粒度,每个节点独立地执行各自分区的读写请求,并且可以通过新的节点机器增加整体系统的吞吐量

分区策略

实现org.apache.kafka.clients.producer.Partitioner接口

自带策略

  1. 轮询策略
  2. 随机策略
  3. 根据 key 保存

消费者读取时会被分配到一个分区,如果想让同一组消息被同一个消费者读取,可以通过将同一组消息存入同一分区来确保读取顺序

无消息丢失配置

Kafka 对"已提交"的消息(committed message)做有限度的持久化保证

将消息保存在多个 Broker 中,只要有一个存活,就保证不丢失

Kafka Producer 是异步发送的,调用即返回,不确保成功,尽量使用带回调通知的 API

Kafka Consumer 先消费,后位移,确保消息不丢失,但是可能出现重复消费

消息压缩

启用压缩

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("acks", "all");
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  // 开启 GZIP 压缩
  props.put("compression.type", "gzip");
 
  Producer<String, String> producer = new KafkaProducer<>(props);

注意可能会因为 Broker 指定了不同的压缩算法造成预料之外的压缩/解压缩操作,表现威 Broker 端 CPU 飙升

可能由于消息版本格式问题(v1,v2)对消息进行解压缩和重新压缩(注意:会丢失 ZeroCopy 特性)

解压缩

Producer 端压缩、Broker 端保持、Consumer 段解压缩

Broker 消息校验会导致解压缩,造成额外的 CPU 消耗

生产者 TCP 连接管理

Kafka 所有连接都基于 TCP

bootstrap.servers不要填写所有的 Broker 地址,Producer 启动时会连接到所有的设置的 Broker,并获取全部的集群信息。

关闭

Producer 参数connections.max.idle.ms参数默认威九分钟,空闲九分钟后关闭,注意该情况下会发生出现 CLAOSE_WAIT 情况,因为属于被动关闭

如何交付可靠性和精确处理一次

Kafka 默认提供至少一次的可靠性交付,通过重试确保消息从 Producer 到 Broker 端,可能造成消息重复

Broker 通过幂等性(Idempotence)和事务(Transaction)确保消息精确一次

计算机领域幂等性含义

  • 在命令式编程语言(比如 C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。
  • 在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的 side effect。

幂等性 Produecer

设定参数 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG 为 true 变为幂等性,通过空间换时间的思路,保存额外字段,确保消息的唯一性。

确保单分区下的消息幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,无法实现多分区的幂等性,只能实现单会话上的幂等性,即重启 Producer 进程后,幂等性丧失。

如果想要实现多分区以及多会话上的消息无重复需要依靠事务来实现

事务

Kafka 事务概念类似数据库事务。保证多条消息原子性的写入到目标分区

事务型 Producer

设置 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG 为 true,和 transactional.id 最好设置个有意义的名字

代码中发送需要修改威

 producer.initTransactions();
 try {
             producer.beginTransaction();
             producer.send(record1);
             producer.send(record2);
             producer.commitTransaction();
 } catch (KafkaException e) {
             producer.abortTransaction();
 }

添加了事务相关 API

读取段 Consumer

设置 isolation.level:

  1. read_uncommitted:默认值,表示 Consumer 能够读取到 Kafka 写入的任何消息
  2. read_committede:表示 Consumer 只能够读取事务性 Producer 成功提交事务写入的消息,和非事务性 Producer 写入的所有消息

消费者组到底是什么

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制,Group Id 用于表示一个组,组内所有消费者一起来消费订阅主题的所有分区,每个分区只能由同一消费者组内的一个 Comsumer 实例来消费

  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

大名鼎鼎/臭名昭著 Rebalance

Rebalance 本质是一种协议,规定一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

触发条件

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  2. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  3. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

发生

当发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。同时 Rebalance 速度过慢,会造成服务群停止。

位移主题

_consumer_offsets,用于保存消费的唯一信息和情况

位移主题 key 保存有 Group ID,主题名,分区名

同时 Kafka 使用专用的后台线程定期巡检等待 Compact 的主题,检查满足条件的可删除数据。线程叫 Log Cleaner,如果出现过多磁盘空间问题,建议检查一下 Log Cleaner 线程状态,通常因为该线程挂掉了。

消费者组重平衡能避免吗

Rebalance 发生时机:

  1. 组成员数量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

几类不必要的 Rebalance

因为未能及时发送心跳,导致 Consumer 被"踢出"Group 而引发的。

设置session.timeout.msheartbeat.interval.ms

  • session.timeout.ms=6s
  • heartbeat.interval.ms=2s
  • 要保证 Consumer 实例在被判定为"dead”之前,能够发送至少 3 轮心跳请求,即 session.timeout.ms>=3*heartbeat.interval.ms

因为 Consumer 消费时间过长导致的

设置max.poll.interval.ms大于消费所需要的时间,避免因为处理时间造成 Rebalance

GC

Consumer 端 GC 引起的长时间停顿,造成 Rebalance

如何提交位移

Consumer 向 Kafka 汇报自己的唯一数据的过程称之为提交位移

位移提交的粒度时分区,即 Consumer 需要为分配给他的每个分区提交各自的位移数据

位移的语义保障由 Consumer 端进行保障,Kafka 只会"无脑"接受位移的提交

提交分手动和自动,手动分同步和异步,应该采用异步提交和回调确认避免阻塞,在服务关闭时通过同步提交确保提交完整

消费者的多线程开发

Consumer 是非线程安全类

多线程方案:

  1. 多个线程,每个线程一个 Consumer
  2. 单个线程单个 Consumer,Worker 线程池负责消费

img

消费者 TCP 连接管理

消费者是在调用 poll 时创建连接的,连接的目标是所消费的分区在的 Broker

创建时先创建个连接获取集群信息,然后再创建连接到协调者的连接,最后创建分区所在的 Broker 的连接

消费者进度如何监控

消费者的滞后程度,被称之为:消费者 Lag 或 Consumer Lag

指的是,消费者落后生产者的程度,Lag 的计算是分区级的,主题的 Lag 要将所有的分区 Lag 叠加起来

如果 Lag 过大会如何:

  • 数据不再 Broker 的系统页缓存,造成数据从磁盘上读取,导致差距被进一步拉大(马太效应)

如何监控

  1. Kafka 命令行:kafka-conumer-groups 脚本
  2. Kafka Java Consumer API 编程
  3. Kafka JMX 监控指标

kafka-conumer-groups

Kafka bin 目录下

$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >

Kafka Java Consumer API

 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
         Properties props = new Properties();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         try (AdminClient client = AdminClient.create(props)) {
             ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
             try {
                 Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
                 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                 try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                     Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                     return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                             entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                }
            } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 // 处理中断异常
                 // ...
                 return Collections.emptyMap();
            } catch (ExecutionException e) {
                 // 处理 ExecutionException
                 // ...
                 return Collections.emptyMap();
            } catch (TimeoutException e) {
                 throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }

jmx

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}"

records-lag-max 和 records-lead-min 两个值,lag 即当前位移和最新的消息差距,lead 为当前分区第一条消息和当前消息的差距

如果 lead 接近于 0,就可能存在丢数据的情况,因为这种情况代表数据清理接近于当前位置了

Kafka 副本机制

副本定义:所谓副本(Replica),本质就是一个之恶能追加写消息的提交日志。

副本的定义是在分区下的,每个分区配置有若干个副本,分布于不同的 Broker 上,用于对抗宕机引起的数据不可用

副本追随者会异步拉取领导者的数据

分区领导者宕机时,追随者会重选出领导者,注意:新的领导者可能会丢失数据,因为异步拉取

副本不对外提供任何服务

Kafka 请求如何处理

Kafka 使用 Reactor 模式进行通信和处理,即接收线程负责接收,然后由一个线程分发至处理线程

消费者重平衡流程

发生条件:

  1. 组成员数量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

通知

重平衡通过心跳通知给消费者,会在消费者的心跳返回中封装 REBALANCE_IN_PROGRESS 信息

通过减小heartbeat.interval.ms参数的值,来更快的获取重平衡的消息

Kafka 控制器

控制器重度依赖 Zookeeper,通过创建/controller节点为控制器,控制器负责主题管理,分区重分配,Preferred 领导者选举,集群成员管理,数据服务,存储所有主题信息,所有 Broker 信息,所有涉及运维任务的分区

Kafka 高水位机制

高水位作用:

  1. 定义消息可见性,即用来表示分区下的哪些消息是可以被消费者消费的
  2. 帮助 Kafka 完成副本同步

位移比高水位搞得是未提交消息,比位移低的是已提交消息,同时日志末端位移(Log End Offset,LEO)即副本要写入的小一条数据的位移值,高水位不会大于 LEO

高水位更新

Kafka 主题管理

主题管理工具

kafka-topics脚本,位于 kafka 的 bin 目录下

创建一个主题

 bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

这里创建了一个名字是my_topic_name分区和分区的副本数各是 1 的主题

查询所有主题列表

 bin/kafka-topics.sh --bootstrap-server broker_host:port --list

查询主题的详细数据

 bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic 
 # 如果describe不加具体的主题名,则返回所有的可见主题详细数据

修改分区数

 bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic  --partitions < 新分区数 >
 # 只能添加不能减少

修改主题级别参数

bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name  --alter --add-config max.message.bytes=10485760

变更副本数

bin/kafka-reassign-partitions.sh

修改主题限速

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

主题分区迁移

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic 

Kafka 动态 Broker 参数

设置集群全局值

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
Completed updating default config for brokers in the cluster,

检查设置情况

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
Default config for brokers in the cluster are:
  unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}

设置 broker 参数

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
Completed updating config for broker: 1.

检查设置情况

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
Configs for broker 1 are:
  unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}

删除参数

# 删除 cluster-wide 范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
Completed updating default config for brokers in the cluster,

# 删除 per-broker 范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
Completed updating config for broker: 1.

重设消费者组位移

Kafka 和传统消息引擎的区别,是 Kafka 的消费者读取消息是可以重演的

重设位移策略

  1. 位移维度
  2. 时间维度

img

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = "test";  // 要重设位移的 Kafka 主题 
try (final KafkaConsumer consumer = 
	new KafkaConsumer<>(consumerProperties)) {
         consumer.subscribe(Collections.singleton(topic));
         consumer.poll(0);
         consumer.seekToBeginning(
	consumer.partitionsFor(topic).stream().map(partitionInfo ->          
	new TopicPartition(topic, partitionInfo.partition()))
	.collect(Collectors.toList()));
} 
consumer.seekToEnd(
	consumer.partitionsFor(topic).stream().map(partitionInfo ->          
	new TopicPartition(topic, partitionInfo.partition()))
	.collect(Collectors.toList()));
consumer.partitionsFor(topic).stream().map(info -> 
	new TopicPartition(topic, info.partition()))
	.forEach(tp -> {
	long committedOffset = consumer.committed(tp).offset();
	consumer.seek(tp, committedOffset);
});
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
	TopicPartition tp = new TopicPartition(topic, info.partition());
	consumer.seek(tp, targetOffset);
}
for (PartitionInfo info : consumer.partitionsFor(topic)) {
         TopicPartition tp = new TopicPartition(topic, info.partition());
	// 假设向前跳 123 条消息
         long targetOffset = consumer.committed(tp).offset() + 123L; 
         consumer.seek(tp, targetOffset);
}
long ts = LocalDateTime.of(
	2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map timeToSearch = 
         consumer.partitionsFor(topic).stream().map(info -> 
	new TopicPartition(topic, info.partition()))
	.collect(Collectors.toMap(Function.identity(), tp -> ts));

for (Map.Entry entry : 
	consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
Map timeToSearch = consumer.partitionsFor(topic).stream()
         .map(info -> new TopicPartition(topic, info.partition()))
         .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000  * 60));

for (Map.Entry entry : 
         consumer.offsetsForTimes(timeToSearch).entrySet()) {
         consumer.seek(entry.getKey(), entry.getValue().offset());
}
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset  --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by  --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

Kafka 常用工具脚本

img

KafkaAdminClient:Kafka 运维利器


    org.apache.kafka
    kafka-clients
    2.3.0

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);

try (AdminClient client = AdminClient.create(props)) {
         // 执行你要做的操作……
}

//创建主题
String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
         NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
         CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         result.all().get(10, TimeUnit.SECONDS);
}

//查询位移
String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
         ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
         Map offsets = 
                  result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
         System.out.println(offsets);
}

//查询Broker主题磁盘占用
try (AdminClient client = AdminClient.create(props)) {
         DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定 Broker id
         long size = 0L;
         for (Map logDirInfoMap : ret.all().get().values()) {
                  size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
                           topicPartitionReplicaInfoMap ->
                           topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
                           .mapToLong(Long::longValue).sum();
         }
         System.out.println(size);
}

Kafka 认证机制

img

创建用户

$ cd kafka_2.12-2.3.0/
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
Completed Updating config for entity: user-principal 'admin'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]' --entity-type users --entity-name reader
Completed Updating config for entity: user-principal 'reader'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users  --entity-name writer
Configs for user-principal 'writer' are SCRAM-SHA-512=salt=MWt6OGplZHF6YnF5bmEyam9jamRwdWlqZWQ=,stored_key=hR7+vgeCEz61OmnMezsqKQkJwMCAoTTxw2jftYiXCHxDfaaQU7+9/dYBq8bFuTio832mTHk89B4Yh9frj/ampw==,server_key=C0k6J+9/InYRohogXb3HOlG7s84EXAs/iw0jGOnnQAt4jxQODRzeGxNm+18HZFyPn7qF9JmAqgtcU7hgA74zfA==,iterations=4096,SCRAM-SHA-256=salt=MWV0cDFtbXY5Nm5icWloajdnbjljZ3JqeGs=,stored_key=sKjmeZe4sXTAnUTL1CQC7DkMtC+mqKtRY0heEHvRyPk=,server_key=kW7CC3PBj+JRGtCOtIbAMefL8aiL8ZrUgF5tfomsWVA=,iterations=4096

创建 JAAS 文件

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

注意:

  1. 最后一行和倒数第二行结尾处的分号
  2. jaas 文件中不需要任何空格键

配置 Broker

sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9092

动态增加用户

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=new_writer]' --entity-type users --entity-name new_writer
Completed Updating config for entity: user-principal 'new_writer'.

Kafka 跨集群数据备份:MirrorMaker

如何监控 Kafka

监控维度:

  1. 主机维度
  2. JVM 监控
  3. 集群监控

Kafka 监控框架

  1. JMXTool
  2. KafkaManager
  3. Burrow
  4. JMXTrans+InfluxDB+Grafana
  5. Confluent Control Center

标题:Kafka学习笔记
作者:GunVeda
地址:http://gunveda.top/articles/2020/09/18/1600414744574.html