Kafka在Java客户端的配置参数
文章目录生产者配置(ProducerConfig)bootstrap.servers(Kafka地址列表)key.serializer(Key序列化)value.serializer(Value序列化)connections.max.idle.ms(空闲连接关闭时间)partitioner.class(分区器)interceptor.classes(拦截器)acks(分区多少副本收到消息)client.idmax.in.flight.requests.per.connection(最大连接数)retries(重试次数)retry.backoff.ms(重试间隔时间)delivery.timeout.ms(传输超时时间)linger.ms(发送延迟)request.timeout.ms(请求超时时间)batch.size批大小send.buffer.bytesreceive.buffer.bytesmax.request.size请求最大字节数reconnect.backoff.ms(重连间隔时间)reconnect.backoff.max.ms(重新连接最大间隔时间)max.block.msbuffer.memorycompression.type(压缩方式)metadata.max.age.ms(元数据更新最长时间)metadata.max.idle.ms(元数据过期时间)metrics.sample.window.msmetrics.num.samplesmetrics.recording.levelmetric.reportersenable.idempotencetransaction.timeout.mstransactional.idsecurity.providersclient.dns.lookup(客户端DNS查找)生产者配置(ProducerConfig)bootstrap.servers(Kafka地址列表)用于建立与Kafka集群的初始连接host/port列表客户端会使用Kafka司所有的服务器无论配置的列表有多少。此列表是用于发现集群服务器的初始主机客户端可以通过初始主机查找到整个集群服务器在配置的时候会配置多个因有可能配置的某个服务器不可用的情况。配置格式host1:port1,host2:port2,….无默认值且必须配置key.serializer(Key序列化)Key值序列化类需要实现接口org.apache.kafka.common.serialization.Serializer。value.serializer(Value序列化)Value值序列化类需要实现接口org.apache.kafka.common.serialization.Serializer。connections.max.idle.ms(空闲连接关闭时间)配置指定的毫秒数后关闭空闲连接。Close idle connections after the number of milliseconds specified by this config.默认值300000partitioner.class(分区器)用来计算消息分送到哪个主题分区需要实现org.apache.kafka.clients.producer.Partitioner接口。publicinterfacePartitionerextendsConfigurable,Closeable{//获取当前分区intpartition(Stringvar1,Objectvar2,byte[]var3,Objectvar4,byte[]var5,Clustervar6);voidclose();//生产下一个分区defaultvoidonNewBatch(Stringtopic,Clustercluster,intprevPartition){}}默认值DefaultPartitionerinterceptor.classes(拦截器)拦截器在消息发送之前和发送之后分别进行拦截可以实现批量修改、统计、添加信息。需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。publicinterfaceProducerInterceptorK,VextendsConfigurable{// 消息发送前ProducerRecordK,VonSend(ProducerRecordK,Vvar1);// 消息应答前voidonAcknowledgement(RecordMetadatavar1,Exceptionvar2);voidclose();}acks(分区多少副本收到消息)指定Leader需要有多少个副本持久化之后才能反馈成功。acks0如果设置为0生产者不需要等待任何服务端响应记录添加到缓存区被视为已发送无法保证消息是否发送成功并且retries配置会失效因客户端不知道任何异常. 每条记录的 offset 始终为-1.acks1只要Leader写入成功立刻返回不能确定有多少副本写入成功。如果Leader写入成功之后在followers复制之前失败那么记录将丢失。acksall要求Leader同步等待所有副本复制成功之后反馈在至少有一个存活的副本就能确保消息记录不会被丢失。 效果等同于acks-1设置.默认值1 取值范围{all, -1, 0, 1}client.id生成者客户ID目的用来跟踪请求资源在服务器端请求日志包含应客户ID和请求IP与端口号。默认值“producer-”num;max.in.flight.requests.per.connection(最大连接数)同步请求最大连接数即请求未收到响应的请求是阻塞的。如果设置值大于1如果发送失败且开启了重试则会导致消息重排的风险。默认值5最小值为1。retries(重试次数)当服务器发生短暂性异常即可自行恢复的.可以通过配置大于0的值自动重新发送失败的消息。如果max.in.flight.requests.per.connection设置的不是1。 可能会改变同一个分区的消息记录顺序如果第一批次消息写入失败则会产生重试但是第二批次消息写入成功第二批次的消息会排序在第一批次消息之前。如果配置了delivery.timeout.ms那么重试的总时长不能超过delivery.timeout.ms的配置时间超过的部分请求都将置为无效或失败。不推荐使用此配置而应该用delivery.timeout.ms配置来控制重试行为。retry.backoff.ms(重试间隔时间)重试发送请求之间的等待时间避免紧密重复的发送请求。默认值100delivery.timeout.ms(传输超时时间)消息发送成功或失败的总时间上限(包括重试发送的时间)。配置值应大于request.timeout.ms和linger.ms.之和。默认值120000linger.ms(发送延迟)生产者会将未发送的消息分组进行批次发送消息消息合并发送已减少请求数量。生产者不会立即发送消息而是将消息进行分组分批等到配置的延迟时间在发送或是消息超过匹配大小就会立即发送。功能类似于TCP中的Nagle算法。默认值0request.timeout.ms(请求超时时间)客户端等待请求响应的最长时间如果在响应收到之前超时需要时客户端会重新发送请求或在重试次数耗尽之后请求失败。这个值应该大于replica.lag.time.max.ms(broker 配置) 已减少由于不必要的重试而导致消息重复的。默认值30000batch.size批大小将多条发送同一分区的数据生成者会尝试将多条记录合并发送。这样有助于提高客户端和服务器的性能。批次大小不能设置太小这样增加发送次数减少吞吐量但是批次不能设置过大否则会让费内存且请求时间会过长而导致超时等情况因此设置一个合理的缓存区大小。默认值16384(byte)send.buffer.bytes发送数据缓冲区(SO_RCVBUF)大小如果设置为-1采用操作系统默认值。默认值131072最小值 -1。receive.buffer.bytes接收数据缓冲区(SO_RCVBUF)大小如果设置为-1采用操作系统默认值。默认值32768最小值 -1。max.request.size请求最大字节数请求最大字节数这个设置会限制在单个请求中发送批记录的数量以避免发送数据超大的请求。这个未压缩批次数据最大上限大小。默认值1048576bytereconnect.backoff.ms(重连间隔时间)重新连接间隔时间在第上次失败之后等待多少时间之后在发起连接以免重复连接。默认值50最小值0.reconnect.backoff.max.ms(重新连接最大间隔时间)重新连接失败的最大等待时间在每次连接失败之后重连间隔时间将呈指数增长直至重新连接最大时间这个值。默认值1000max.block.msThe configuration controls how longKafkaProducer.send()andKafkaProducer.partitionsFor()will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.buffer.memoryThe total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block formax.block.msafter which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.默认值33554432compression.type(压缩方式)生产者发送数据的压缩方式默认值为 none (即不压缩). 取值范围none,gzip,snappy,lz4, orzstd.默认值nonemetadata.max.age.ms(元数据更新最长时间)元数据在通过一段时间之后会强制刷新元数据即使没有看到任何分区Leader任何变换主动发现任何新的brokers代理或partitions分区默认值300000metadata.max.idle.ms(元数据过期时间)控制空闲生成者主题缓存元数据时间如果自上次生成主题以后经过的是会就爱你超过了元数据空闲持续时间则该主题的元数据将被遗忘下次访问将强制执行元数据获取请求。默认值300000metrics.sample.window.msmetrics.num.samplesmetrics.recording.levelmetric.reportersenable.idempotenceWhen set to ‘true’, the producer will ensure that exactly one copy of each message is written in the stream. If ‘false’, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requiresmax.in.flight.requests.per.connectionto be less than or equal to 5,retriesto be greater than 0 andacksmust be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, aConfigExceptionwill be thrown.默认值falsetransaction.timeout.msThe maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with aInvalidTransactionTimeouterror.默认值60000transactional.idThe TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note thatenable.idempotencemust be enabled if a TransactionalId is configured. The default isnull, which means transactions cannot be used. Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker settingtransaction.state.log.replication.factor.security.providersA list of configurable creator classes each returning a provider implementing security algorithms. These classes should implement theorg.apache.kafka.common.security.auth.SecurityProviderCreatorinterface.client.dns.lookup(客户端DNS查找)控制客户端如何使用DNS查找。如果值为use_all_dns_ips查找一个host对应多个IP地址在连接失败之前会尝试所有连接。适用于独立或公有服务器。如果值为resolve_canonical_bootstrap_servers_only每个条目将被解析并展开为规范名称列表。取值范围DEFAULT(“default”),USE_ALL_DNS_IPS(“use_all_dns_ips”),RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY(“resolve_canonical_bootstrap_servers_only”);默认值DEFAULT