《kafka API使用方法》

《kafka API使用方法》


title: 《kafka API使用方法》
date: 2022-6-7 21:20:41
description: Spark (HA) Spark (Yarn)

阅读全文 1、一个正常的生产逻辑需要具备以下几个步骤 >>> (1)、配置生产者客户端参数及创建相应的生产者实例; (2)、构建待发送的消息; (3)、发送消息; (4)、关闭生产者实例。

2、生产者API采用默认分区方式将消息散列的发送到各个分区中。

3、什么时候会发生重复写入:producer的重试机制中,检测到一条数据的发送失败,而实际上已经发送成功,只是因为服务端响应超时。

4、什么时候会发生数据写入的丢失:ack参数的配置。

5、ack模式:取值0,1,all

0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是销量最高。
1代表producer往集群发送数据只要leader成功写入消息就可以发送下一条,只确保leader接收成功。
-1或all代表producer往集群发送数据需要所有的ISR Follow都完成从Leader的同步才会发送下一条,确保leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

6、Properties props = new Properties(); ->配置生产者客户端参数

7、props.put(“bootstrap.servers”, “node1:9092,node2:9092,node3:9092”);->设置 kafka 集群的地址

8、props.put(“retries”, 3); ->失败重试次数

9、props.put(“batch.size”, 10); ->数据发送的批次大小

10、props.put(“linger.ms”, 10000); ->消息在缓冲区保留的时间,超过设置的值就会被提交到服务端。

11、props.put(“max.request.size”,10); ->数据发送请求的最大缓存数

12、props.put(“buffer.memory”, 10240); ->整个 Producer 用到总内存的大小,如果缓冲区满了会提交数据到服务端//buffer.memory 要大于 batch.size,否则会报申请内存不足的错误,降低阻塞的可能性。

13、props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); ->key-value序列化器

14、props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); ->字符串最好

15、在 Kafka 生产者客户端 KatkaProducer 中有3个参数是必填的 -> bootstrap.servers、key.serializer、value.serializer

16、生产者api参数发送方式(发后即忘):

发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。在大多数情况下,这种发送方式没有问题; 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性最差。
ack->作用在broker
Future send = producer.send(rcd);->也是异步

17、生产者api参数发送方式(同步发送):

 producer.send(rcd).get(); ->一旦调用get方法,就会阻塞
 Future future = Callable.run()->有返回值,future.get()
 Runnable.run()->无返回值

18、生产者api参数发送方式(异步发送):

回调函数会在 producer 收到ack时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果Exception不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

19、幂等性:生产者将一条数据多次重复写入的情况下,broker端依然只有一条。

20、在IJ中新建Maven项目,配置pom.xml,重新加载Maven项目
17
18

21、新建ProducerDemo类、ProducerCallbackDemo类
19
20
21
22

22、生产者原理
23

(1)、一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
(2)、在主线程中由 kafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
(3)、Sender 线程负责从 RecordAccumulator 获取消息并将其发送到 Kafka 中;
(4)、RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
(5)、RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即 32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。
(6)、主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中, RecordAccumulator 内部为每个分区都维护了一个双端队列,即 Deque。消息写入缓存时,追加到双端队列的尾部。
(7)、Sender 读取消息时,从双端队列的头部读取。
(8)、注意:ProducerBatch 是指一个消息批次;与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
(9)、当topic中的分区数增多的情况下,recordaccumulator中的分区数就会增多。当topic的数量增多的情况下,recordaccumulator中的分区数也会增多。
(10)、ProducerBatch 大小和 batch.size 参数也有着密切的关系。
(11)、当一条消息(ProducerRecord) 流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以写入,如果不可以则需要创建一个新的 Producer Batch。
(12)、在新建ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。
(13)、Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List< ProducerBatch>的形式,其中 Node 表示 Kafka 集群 broker 节点。
(14)、对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区。
(15)、而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。
(16)、在转换成<Node, List>的形式之后,Sender 会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个 Node了,这里的 Request 是 Kafka 各种协议请求。
(17)、请求在从sender线程发往 Kafka 之前还会保存到InFlightRequests中,InFlightRequests 保存对象的具体形式为 Map<Nodeld, Deque>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。
(18)、与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接) 最多缓存的请求数。
(19)、这个配置参数为max.in.flight.request.per.Connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。
(20)、通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继其发送请求会增大请求超时的可能。

23、重要的生产者参数

(1)、max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即 1MB 一般情况下,这个默认值就可以满足大多数的应用场景了。
这个参数还涉及一些其它参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的异常;比如将broker 端的message.max.bytes 参数配置为 10,而max.request.size 参数配置为20,那么当发送一条大小为15B的消息时,生产者客户端就会报出异常。
(2)、compression.type
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为”gzip”,”snappy” 和 “lz4”。(服务端也有压缩参数,先解压,再压缩);对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;没有必要,不需要压缩。
(3)、retries 和 retry.backoff.ms
retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用来说,顺序性非常重要,比如 MySQL binlog 的传输,如果出现错误就会造成非常严重的后果;MySQL binlog–>mysql插入数据–>操作结果体会在表中–>mysql为了提高可靠性会把操作记录在日志中–>为了以后的主从同步(mysql集群,主表,子表)–>读写分离–>binlog(mysql自己设计的格式,二进制形式)。
如果将 acks 参数配置为非零值,并且max.flight.requests.per.connection 参数配置为大于1的值,那可能会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一次的消息写入成功,那么这两个批次的消息就出现了错序。
一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection 配置为 1 ,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。–>吞吐量降低
(4)、batch.size
每个Batch 要存放 batch.size 大小的数据后,才可以发送出去。比如说 batch.size 默认值是 16KB,那么里面凑够 16KB 的数据才会发送。
理论上来说,提升 batch.size 的大小,可以允许更多的数据缓冲在里面,那么一次 Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是 batch.size 也不能过大,要是数据老是缓冲在 Batch 里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。
(5)、linger.ms(和batchsize有联系)
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入ProducerBatch 时间,默认值为0。生产者客户端会在ProducerBatch 填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
(6)、enable.idempotence
是否开启幂等性功能,详见后续原理加强;幂等性,就是一个操作重复做,每次的结果都一样,x1=1,x1=1,x*1=1;在 kafka 中,就是生产者生产的一条消息,如果多次重复发送,在服务器中的结果还是只有一条。kafka很难实现幂等性,如果重复发,kafka肯定有多条消息–>需要有机制判断曾经是否发送过–>各种手段判断–>事务管理的概念–>加入幂等性,吞吐量会急剧下降。
(7)、partitioner.classes
用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner –>用hashcode分
自定义 partitioner 需要实现 org.apache.kafka.clients.producer.Partitioner 接口 –>可以通过partitioner接口自己实现分区器

四、消费者API
1、一个正常的消费逻辑需要具备以下几个步骤:

(1)、配置消费者客户端参数
(2)、创建相应的消费者实例
(3)、订阅主题
(4)、拉取消息并消费
(5)、提交消费位移 offset
(6)、关闭消费者实例

2、subscribe重载方法:

(1)、前面两种通过集合的方式订阅一到多个topic

public void subscribe(Collection topics,ConsumerRebalanceListener listener)
public void subscribe(Collection topics)
(2)、后两种主要是采用正则的方式订阅一到多个topic
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
(3)、正则方式订阅主题(只要是tpc_数字的形式,三位数字以内)
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。利用正则表达式订阅主题,可实现动态订阅;

3、assign订阅主题

消费者不仅可以通过 KafkaConsumer.subscribe()方法订阅主题,还可直接订阅某些主题的指定分区; 在KafkaConsumer 中提供了assign()方法来实现这些功能,此方法的具体定义如下:public void assign(Collection<TopicPartition> partitions) ;
这个方法只接受参数 partitions,用来指定需要订阅的分区集合。
示例如下: 
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1",0),new TopicPartition(“tpc_2”,1))); 

4、subscribe与assign的区别

(1)、通过 subscribe()方法订阅主题具有消费者自动再均衡功能;
在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
(2)、assign() 方法订阅分区时,是不具备消费者自动均衡的功能的; 
其实这一点从 assign()方法参数可以看出端倪,两种类型subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

5、取消订阅

(1)、可以使用KafkaConsumer中的unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过subscribe( Collection)方式实现的订阅; 
(2)、也可以取消通过subscribe(Pattem)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。
(3)、如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与 unsubscribe()方法相同,如下示例中三行代码的效果相同:
consumer.unsubscribe(); 
consumer.subscribe(new ArrayList<String>()) ; 
consumer.assign(new ArrayList<TopicPartition>());

6、消息的消费模式

Kafka中的消费是基于拉取模式的。消息的消费一般有两种模式:推送模式和拉取模式。
推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll()方法,poll()方法返回的是所订阅的主题(分区)上的一组消息。对于poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空如果订阅的所有分区中都没有可供消费的消息,那么 poll()方法返回为空的消息集; poll ()方法具体定义如下: 
public ConsumerRecords<K, V> poll(final Duration timeout) 
超时时间参数 timeout,用来控制poll()方法的阻塞时间, 在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据,则为了提高吞吐率,可以把timeout设置为Long.MAX_VALUE;
消费者消费到的每条消息的类型为 ConsumerRecord
topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号
offset 表示消息在所属分区的偏移量
timestamp 表示时间戳,与此对应的timestampType表示时间戳的类型
timestampType 有两种类型CreateTime和LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳
headers 表示消息的头部内容
key value 分别表示消息的键和消息的值,一般业务应用要读取的就是value
serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小,如果 key 为空, 则 serializedKeySize 值为-1,同样,如果value 为空,则serializedValueSize的值也会为-1
checksum 是 CRC32 的校验值

7、指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer中的seek()方法正好提供了这个功能,让我们可以追前消费或回溯消费。seek()方法的具体定义如下: 
public void seek(TopicPartiton partition,long offset)

8、再均衡监听器

一个消费组中,一旦有消费者的增减发生,会触发消费者组的 rebalance 再均衡;如果 A 消费者消费掉的一批消息还没来得及提交 offset,而它所负责的分区在 rebalance 中转移给了B消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;

9、自动位移提交

Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true。
在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。
重复消费
   >>> 
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

丢失消息
   >>> 
按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:拉取线程不断地拉取消息并存入本地缓存,比如在 BlockingQueue中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第 m 次位移提交的时候,也就是x+6 之前的位移己经确认提交了,处理线程却还正在处理 x+3 的消息;此时如果处理线程发生了异常,待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

10、手动位移提交

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费; 手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。 开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 fals,示例如下
props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false); 
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。
同步提交的方式:对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync()的另一个有参方法,具体定义如下:
public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)
异步提交方式:commitSync()方法相反,异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操。异步提交以便消费者的性能得到一定的增强。

11、其他重要参数

fetch.min.bytes=1B	 -> 一次拉取的最小字节数
fetch.max.bytes=50M -> 一次拉取的最大数据量
fetch.max.wait.ms=500ms ->	 拉取时的最大等待时长
max.partition.fetch.bytes = 1MB -> 每个分区一次拉取的最大数据量
max.poll.records=500-> 一次拉取的最大条数
connections.max.idle.ms=540000ms -> 网络连接的最大闲置时长
request.timeout.ms=30000ms  -> 一次请求等待响应的最大超时时间consumer 等待请求响应的最长时间
metadata.max.age.ms=300000 -> 元数据在限定时间内没有进行更新,则会被强制更新
reconnect.backoff.ms=50ms 	-> 尝试重新连接指定主机之前的退避时间
retry.backoff.ms=100ms -> 尝试重新拉取数据的重试间隔

12、新建ConsumerDemo、ConsumerDemo1、ConsumerTask、ConsumerDemo2、ConsumerSeekOffset类
24
25
26
27
28

五、Topic管理API
1、一般情况下,我们都习惯使用 kafka-topic.sh 本来管理主题,如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。这种调用 API 方式实现管理主要利用 KafkaAdminClient 工具类。

KafkaAdminClient 不仅可以用来管理 broker、配置和 ACL (Access Control List),还可用来管理主题)它提供了以下方法:

创建主题:CreateTopicResult createTopics(Collection new Topics)
删除主题:DeleteTopicsResult deleteTopics(Collectiontopics)
列出所有可用的主题:ListTopicsResult listTopics()
查看主题的信息:DescribeTopicsResult describeTopics(Collection topicNames)

查询配置信息:

DescribeConfigsResult describeConfigs(Collectionresources)
修改配置信息:AlterConfigsResult alterConfigs(Map<ConfigResource,Config> configs)
增加分区:CreatePartitionsResult createPartitions(Map<String,NewPartitions> new Partitions)
构造一个 KafkaAdminClient AdminClient adminClient =
KafkaAdminClient.create(props);

2、列出主题

ListTopicsResult listTopicsResult = adminClient.listTopics();
Set topics = listTopicsResult.names().get();
System.out.println(topics);

3、查看主题信息

DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Arrays.asList(“tpc_4”, “tpc_3”));
Map<String, TopicDescription> res = describeTopicsResult.all().get();
Set ksets = res.keySet();
for (String k : ksets) {
System.out.println(res.get(k));
}

4、创建主题

// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,”node1:9092,node2:9092,node3:9092”);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);
// 由服务端 controller 自行分配分区及副本所在 broker
NewTopic tpc_3 = new NewTopic(“tpc_3”, 2, (short) 1);
// 手动指定分区及副本的 broker 分配
HashMap<Integer, List> replicaAssignments = new HashMap<>();
// 分区 0,分配到 broker0,broker1
replicaAssignments.put(0,Arrays.asList(0,1));
// 分区1,分配到 broker0,broker2
replicaAssignments.put(0,Arrays.asList(0,1));
NewTopic tpc_4 = new NewTopic(“tpc_4”, replicaAssignments);
CreateTopicsResult result =
adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));
// 从 future 中等待服务端返回
try {
result.all().get();
} catch (Exception e) {
e.printStackTrace();
}
adminClient.close();

5、删除主题

DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(Arrays.asList(“tpc_1”, “tpc_1”));
Map<String, KafkaFuture> values = deleteTopicsResult.values();
System.out.println(values);

6、其他管理

除了进行 topic 管理之外,KafkaAdminClient 也可以进行诸如动态参数管理,分区管理等各类管理操作;

7、新建KafkaAdminDemo、CallableDemo类
29
30
**

**