Puppet Class: confluent::kafka::broker

Defined in:
modules/confluent/manifests/kafka/broker.pp

Overview

Class confluent::kafka::broker

Sets up a Kafka Broker and ensures that it is running.

Parameters:

enabled

Boolean. If true, systemd::service will be passed ensure => present, otherwise ensure => absent. Default: true.

brokers

Hash of Kafka Broker configs keyed by fqdn of each kafka broker node. This Hash should be of the form: { 'hostA' => { 'id' => 1, 'rack' => 'A' }, 'hostB' => { 'id' => 2, 'rack' => 'B' }, … } Default: { $::fqdn => { 'id' => 1 } }

'rack' is optional, but will be used for broker.rack awareness.
listeners

Array of URIs Kafka will listen on. Default: ['PLAINTEXT://:9092']

security_inter_broker_protocol

Security protocol used to communicate between brokers. Default: undef

ssl_keystore_location

The location of the key store file. Default: undef

ssl_keystore_password

The store password for the key store file. Default: undef

ssl_key_password

The password of the private key in the key store file. Default: undef

ssl_truststore_location

The location of the trust store file. Default: undef

ssl_truststore_password

The password for the trust store file. Default: undef

*ssl_client_auth

Configures kafka broker to request client authentication. Must be one of 'none', 'requested', or 'required'. Default: undef

ssl_enabled_protocols*

Comma separated string of enabled ssl protocols that will be accepted from clients

e.g. TLSv1.2,TLSv1.1,TLSv1.  Default: undef
ssl_cipher_suites

Comma separated string of cipher suites that will be accepted from clients. Default: undef

log_dirs

Array of directories in which the broker will store its received message data. Default: ['/var/spool/kafka']

zookeeper_connect

Zookeeper URI list and chroot on which Kafka will store its metadata. To use multiple hosts and a chroot, do something like

zk1:2181,zk2:2181,zk3:2181/kafka/chroot

Default: localhost:2181

zookeeper_connection_timeout_ms

The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used. Default: 6000

zookeeper_session_timeout_ms

Zookeeper session timeout. Default: 6000

auto_create_topics_enable

If autocreation of topics is allowed. Default: true

auto_leader_rebalance_enable

If leaders should be auto rebalanced. Default: true

num_partitions

The default number of partitions per topic. Default: 1

default_replication_factor

The default replication factor for automatically created topics. Default: 1

delete_topic_enable

Enables topic deletion. Default: true

min_insync_replicas

When producing with acks=all, this specifiies the number of replicas that should be in a partition's ISR. If fewer than this are present, the produce request will fail. Default: 1

replica_lag_time_max_ms

If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR. Default: undef (10000)

num_recovery_threads_per_data_dir

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. Default: undef (1)

replica_socket_timeout_ms

The socket timeout for network requests to the leader for replicating data. Default: undef (30000)

replica_socket_receive_buffer_bytes

The socket receive buffer for network requests to the leader for replicating data. Default: undef (65536)

num_replica_fetchers

Number of threads used to replicate messages from leaders. Default: 1

replica_fetch_max_bytes

The number of bytes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. Default: undef (1048576)

max_incremental_fetch_session_cache_slots

The maximum number of incremental fetch sessions that we will maintain. Default: undef (1000).

num_network_threads

The number of threads handling network requests. Default: undef (3)

num_io_threads

The number of threads doing disk I/O. Default: size($log_dirs)

socket_send_buffer_bytes

The byte size of the send buffer (SO_SNDBUF) used by the socket server. Default: 1048576

socket_receive_buffer_bytes

The byte size of receive buffer (SO_RCVBUF) #used by the socket server. Default: 1048576

socket_request_max_bytes

The maximum size of a request that the socket server will accept. Default: undef (104857600)

log_message_timestamp_type

Default message timestamp type for a topic if it is not set on that topic. CreateTime means the timestamp will only be set if the producer provides it. LogAppendTime means that the broker will set it to the time is receives the message. You can override this setting per topic. Default: LogAppendTime

log_flush_interval_messages

The number of messages accumulated on a log partition before messages are flushed to disk. Default: 10000

log_flush_interval_ms

The maximum amount of time a message can sit in a log before we force a flush: Default 1000 (1 second)

log_retention_hours

The number of hours to keep a log file before deleting it (in hours). Default 168 (1 week)

log_retention_bytes

The maximum size of the log before deleting it. Default: undef

log_segment_bytes

The maximum size of a log segment file. When this size is reached a new log segment will be created: Default undef (512MB)

log_retention_check_interval_ms

The frequency in milliseconds that the log cleaner checks whether any log eligible for deletion. Default: undef (300000)

log_cleanup_policy

Designates the retention policy to use on old log segments. 'delete' will discard old segments when their retention time or size limit has been reached. 'compact' will enable log compaction. Default: delete

offsets_retention_minutes

Log retention window in minutes for offsets topic. Default: 10080 (1 week)

log_max_backup_index

Number of (256 MB) log files to keep in /var/log/kafka. Default: 4

inter_broker_protocol_version

Specify which version of the inter-broker protocol will be used. This is typically bumped after all brokers were upgraded to a new version. Default: undef

* group_initial_rebalance_delay*

The time, in milliseconds, that the `GroupCoordinator` will delay the initial consumer rebalance.

log_message_format_version

Specify the message format version the broker will use to append messages to the logs. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don't understand. Default: undef

nofiles_ulimit

The broker process' number of open files ulimit. Default: 8192

java_opts

Extra Java options. Default: undef

classpath

Extra classpath entries. Default: undef

jmx_port

Port on which to expose JMX metrics. Default: 9999

heap_opts

Heap options to pass to JVM on startup. Default: undef

jvm_performance_opts

Value to use for KAFKA_JVM_PERFORMANCE_OPTS in /etc/default/kafka. This controls GC settings. Default: undef.

server_properties_template

Default: 'confluent/kafka/server.properties.erb'

default_template

Default: 'confluent/kafka/kafka.default.erb'

log4j_properties_template

Default: 'confluent/kafka/log4j.properties.erb'

message_max_bytes

The maximum message size allowed. Default: 1048576

allow_everyone_if_no_acl_found

If this value is on true, only the topics on which are ACLs are set are secured. Default: true

super_users

List of super user CNs. If configuring SSL, this should at least include the cluster's SSL principal so the cluster can operate.

authorizer_class_name

Sets up the ACL authorization provider specified as parameter. It also set up a more verbose log4j logging related to ACL authorization events. Default: undef

authorizer_log_level

Default: INFO

Parameters:

  • enabled (Any) (defaults to: true)
  • brokers (Any) (defaults to: { "${::fqdn}" => { 'id' => 1, 'port' => 9092, }, })
  • listeners (Any) (defaults to: ['PLAINTEXT://:9092'])
  • security_inter_broker_protocol (Any) (defaults to: undef)
  • ssl_keystore_location (Any) (defaults to: undef)
  • ssl_keystore_password (Any) (defaults to: undef)
  • ssl_key_password (Any) (defaults to: undef)
  • ssl_truststore_location (Any) (defaults to: undef)
  • ssl_truststore_password (Any) (defaults to: undef)
  • ssl_client_auth (Any) (defaults to: undef)
  • ssl_enabled_protocols (Any) (defaults to: undef)
  • ssl_cipher_suites (Any) (defaults to: undef)
  • log_dirs (Any) (defaults to: ['/var/spool/kafka'])
  • zookeeper_connect (Any) (defaults to: 'localhost:2181')
  • zookeeper_connection_timeout_ms (Any) (defaults to: undef)
  • zookeeper_session_timeout_ms (Any) (defaults to: undef)
  • auto_create_topics_enable (Any) (defaults to: true)
  • auto_leader_rebalance_enable (Any) (defaults to: true)
  • num_partitions (Any) (defaults to: 1)
  • default_replication_factor (Any) (defaults to: 1)
  • delete_topic_enable (Any) (defaults to: true)
  • offsets_topic_replication_factor (Any) (defaults to: undef)
  • min_insync_replicas (Any) (defaults to: 1)
  • replica_lag_time_max_ms (Any) (defaults to: undef)
  • num_recovery_threads_per_data_dir (Any) (defaults to: undef)
  • replica_socket_timeout_ms (Any) (defaults to: undef)
  • replica_socket_receive_buffer_bytes (Any) (defaults to: undef)
  • num_replica_fetchers (Any) (defaults to: 1)
  • replica_fetch_max_bytes (Any) (defaults to: undef)
  • max_incremental_fetch_session_cache_slots (Any) (defaults to: undef)
  • num_network_threads (Any) (defaults to: undef)
  • num_io_threads (Any) (defaults to: size($log_dirs))
  • socket_send_buffer_bytes (Any) (defaults to: 1048576)
  • socket_receive_buffer_bytes (Any) (defaults to: 1048576)
  • socket_request_max_bytes (Any) (defaults to: undef)
  • log_message_timestamp_type (Any) (defaults to: 'CreateTime')
  • log_flush_interval_messages (Any) (defaults to: undef)
  • log_flush_interval_ms (Any) (defaults to: undef)
  • log_retention_hours (Any) (defaults to: 168)
  • log_retention_bytes (Any) (defaults to: undef)
  • log_segment_bytes (Any) (defaults to: undef)
  • log_retention_check_interval_ms (Any) (defaults to: undef)
  • log_cleanup_policy (Any) (defaults to: undef)
  • offsets_retention_minutes (Any) (defaults to: 10080)
  • inter_broker_protocol_version (Any) (defaults to: undef)
  • group_initial_rebalance_delay (Any) (defaults to: undef)
  • log_message_format_version (Any) (defaults to: undef)
  • nofiles_ulimit (Any) (defaults to: 8192)
  • java_opts (Any) (defaults to: undef)
  • classpath (Any) (defaults to: undef)
  • jmx_port (Any) (defaults to: 9999)
  • heap_opts (Any) (defaults to: undef)
  • log_max_backup_index (Any) (defaults to: 4)
  • jvm_performance_opts (Any) (defaults to: undef)
  • server_properties_template (Any) (defaults to: 'confluent/kafka/server.properties.erb')
  • default_template (Any) (defaults to: 'confluent/kafka/kafka.default.erb')
  • log4j_properties_template (Any) (defaults to: 'confluent/kafka/log4j.properties.erb')
  • message_max_bytes (Any) (defaults to: 1048576)
  • allow_everyone_if_no_acl_found (Any) (defaults to: true)
  • super_users (Any) (defaults to: undef)
  • authorizer_class_name (Any) (defaults to: undef)
  • authorizer_log_level (Any) (defaults to: 'INFO')


247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'modules/confluent/manifests/kafka/broker.pp', line 247

class confluent::kafka::broker(
    $enabled                             = true,
    $brokers                             = {
        "${::fqdn}" => {
            'id'   => 1,
            'port' => 9092,
        },
    },
    $listeners                                 = ['PLAINTEXT://:9092'],

    $security_inter_broker_protocol            = undef,

    $ssl_keystore_location                     = undef,
    $ssl_keystore_password                     = undef,
    $ssl_key_password                          = undef,
    $ssl_truststore_location                   = undef,
    $ssl_truststore_password                   = undef,
    $ssl_client_auth                           = undef,
    $ssl_enabled_protocols                     = undef,
    $ssl_cipher_suites                         = undef,

    $log_dirs                                  = ['/var/spool/kafka'],

    $zookeeper_connect                         = 'localhost:2181',
    $zookeeper_connection_timeout_ms           = undef,
    $zookeeper_session_timeout_ms              = undef,

    $auto_create_topics_enable                 = true,
    $auto_leader_rebalance_enable              = true,

    $num_partitions                            = 1,
    $default_replication_factor                = 1,
    $delete_topic_enable                       = true,
    $offsets_topic_replication_factor          = undef,
    $min_insync_replicas                       = 1,
    $replica_lag_time_max_ms                   = undef,
    $num_recovery_threads_per_data_dir         = undef,
    $replica_socket_timeout_ms                 = undef,
    $replica_socket_receive_buffer_bytes       = undef,
    $num_replica_fetchers                      = 1,
    $replica_fetch_max_bytes                   = undef,
    $max_incremental_fetch_session_cache_slots = undef,

    $num_network_threads                       = undef,
    $num_io_threads                            = size($log_dirs),
    $socket_send_buffer_bytes                  = 1048576,
    $socket_receive_buffer_bytes               = 1048576,
    $socket_request_max_bytes                  = undef,

    $log_message_timestamp_type                = 'CreateTime',
    $log_flush_interval_messages               = undef,
    $log_flush_interval_ms                     = undef,

    $log_retention_hours                       = 168,     # 1 week
    $log_retention_bytes                       = undef,
    $log_segment_bytes                         = undef,

    $log_retention_check_interval_ms           = undef,
    $log_cleanup_policy                        = undef,

    $offsets_retention_minutes                 = 10080,   # 1 week

    $inter_broker_protocol_version             = undef,
    $group_initial_rebalance_delay             = undef,
    $log_message_format_version                = undef,
    $nofiles_ulimit                            = 8192,
    $java_opts                                 = undef,
    $classpath                                 = undef,
    $jmx_port                                  = 9999,
    $heap_opts                                 = undef,
    $log_max_backup_index                      = 4,
    $jvm_performance_opts                      = undef,

    $server_properties_template                = 'confluent/kafka/server.properties.erb',
    $default_template                          = 'confluent/kafka/kafka.default.erb',
    $log4j_properties_template                 = 'confluent/kafka/log4j.properties.erb',

    $message_max_bytes                         = 1048576,

    $allow_everyone_if_no_acl_found            = true,
    $super_users                               = undef,
    $authorizer_class_name                     = undef,
    $authorizer_log_level                      = 'INFO',
) {
    # confluent::kafka::common installs the kafka package
    # and a handy wrapper script.
    require ::confluent::kafka::common

    # Get this broker's id out of the $kafka::brokers
    # configuration hash.
    $id = $brokers[$::fqdn]['id']

    # If this broker's rack is set, use it for broker.rack
    $rack = $brokers[$::fqdn]['rack']

    # The default Kafka port for KAFKA_BOOTSTRAP_SERVERS will be the port
    # first port specified in the $listeners array.  This is used
    # in the kafka shell wrapper to automatically provide broker list.
    $default_port = inline_template("<%= Array(@listeners)[0].split(':')[-1] %>")

    # Local variable for rendering in templates.
    $java_home = $::confluent::kafka::common::java_home

    # This is the message data directory,
    # not to be confused with /var/log/kafka
    # which contains daemon process logs.
    file { $log_dirs:
        ensure => 'directory',
        owner  => 'kafka',
        group  => 'kafka',
        mode   => '0755',
    }

    # If any passwords, set proper readability.
    if $ssl_key_password or $ssl_keystore_password or $ssl_truststore_password {
        $server_properties_mode = '0440'
    }
    else {
        $server_properties_mode = '0444'
    }
    # Render out Kafka Broker config files.
    file { '/etc/kafka/server.properties':
        content => template($server_properties_template),
        group   => 'kafka',
        mode    => $server_properties_mode,
    }

    # log4j configuration for Kafka daemon
    # process logs in /var/log/kafka.
    file { '/etc/kafka/log4j.properties':
        content => template($log4j_properties_template),
    }

    # Environment variables that are passed to kafka-run-class.
    file { '/etc/default/kafka':
        content => template($default_template),
    }

    # Environment variables used by the /usr/local/bin/kafka wrapper script
    # installed by confluent::kafka::common.  This makes it easier
    # to use the kafka wrapper script on Kafka brokers.
    file { '/etc/profile.d/kafka.sh':
        content => template('confluent/kafka/kafka-profile.sh.erb'),
    }

    $service_ensure = $enabled ? {
        false   => 'absent',
        default => 'present',
    }

    # Start the Kafka server.
    # We don't want to subscribe to the config files here.
    # It will be better to manually restart Kafka when
    # the config files changes.
    systemd::service { 'kafka':
        ensure  => $service_ensure,
        content => systemd_template('kafka'),
        require => [
            File[$log_dirs],
            File['/etc/kafka/server.properties'],
            File['/etc/kafka/log4j.properties'],
            File['/etc/default/kafka'],
        ],
    }
}