Puppet Class: confluent::kafka::broker

Defined in:
modules/confluent/manifests/kafka/broker.pp

Overview

Class confluent::kafka::broker

Sets up a Kafka Broker and ensures that it is running.

Parameters:

[enabled] Boolean. If true, systemd::service will be passed ensure => present, otherwise ensure => absent. Default: true.

[brokers] Hash of Kafka Broker configs keyed by fqdn of each kafka broker node. This Hash should be of the form:

{ 'hostA' => { 'id' => 1, 'rack' => 'A' }, 'hostB' => { 'id' => 2, 'rack' => 'B' }, ... }
Default: { $facts['networking']['fqdn'] => { 'id' => 1 } }

'rack' is optional, but will be used for broker.rack awareness.

[listeners] Array of URIs Kafka will listen on. Default: ['PLAINTEXT://:9092']

[security_inter_broker_protocol] Security protocol used to communicate between brokers. Default: undef

[ssl_keystore_location] The location of the key store file. Default: undef

[ssl_keystore_password] The store password for the key store file. Default: undef

[ssl_key_password] The password of the private key in the key store file. Default: undef

[ssl_truststore_location] The location of the trust store file. Default: undef

[ssl_truststore_password] The password for the trust store file. Default: undef

[*ssl_client_auth] Configures kafka broker to request client authentication. Must be one of 'none', 'requested', or 'required'. Default: undef

[ssl_enabled_protocols*] Comma separated string of enabled ssl protocols that will be accepted from clients e.g. TLSv1.2,TLSv1.1,TLSv1. Default: undef

[ssl_cipher_suites] Comma separated string of cipher suites that will be accepted from clients. Default: undef

[log_dirs] Array of directories in which the broker will store its received message data. Default: ['/var/spool/kafka']

[zookeeper_connect] Zookeeper URI list and chroot on which Kafka will store its metadata. To use multiple hosts and a chroot, do something like zk1:2181,zk2:2181,zk3:2181/kafka/chroot Default: localhost:2181

[zookeeper_connection_timeout_ms] The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used. Default: 6000

[zookeeper_session_timeout_ms] Zookeeper session timeout. Default: 6000

[auto_create_topics_enable] If autocreation of topics is allowed. Default: true

[auto_leader_rebalance_enable] If leaders should be auto rebalanced. Default: true

[num_partitions] The default number of partitions per topic. Default: 1

[default_replication_factor] The default replication factor for automatically created topics. Default: 1

[delete_topic_enable] Enables topic deletion. Default: true

[min_insync_replicas] When producing with acks=all, this specifiies the number of replicas that should be in a partition's ISR. If fewer than this are present, the produce request will fail. Default: 1

[replica_lag_time_max_ms] If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR. Default: undef (10000)

[num_recovery_threads_per_data_dir] The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. Default: undef (1)

[replica_socket_timeout_ms] The socket timeout for network requests to the leader for replicating data. Default: undef (30000)

[replica_socket_receive_buffer_bytes] The socket receive buffer for network requests to the leader for replicating data. Default: undef (65536)

[num_replica_fetchers] Number of threads used to replicate messages from leaders. Default: 1

[replica_fetch_max_bytes] The number of bytes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. Default: undef (1048576)

[max_incremental_fetch_session_cache_slots] The maximum number of incremental fetch sessions that we will maintain. Default: undef (1000).

[num_network_threads] The number of threads handling network requests. Default: undef (3)

[num_io_threads] The number of threads doing disk I/O. Default: size($log_dirs)

[socket_send_buffer_bytes] The byte size of the send buffer (SO_SNDBUF) used by the socket server. Default: 1048576

[socket_receive_buffer_bytes] The byte size of receive buffer (SO_RCVBUF) #used by the socket server. Default: 1048576

[socket_request_max_bytes] The maximum size of a request that the socket server will accept. Default: undef (104857600)

[log_message_timestamp_type] Default message timestamp type for a topic if it is not set on that topic. CreateTime means the timestamp will only be set if the producer provides it. LogAppendTime means that the broker will set it to the time is receives the message. You can override this setting per topic. Default: LogAppendTime

[log_flush_interval_messages] The number of messages accumulated on a log partition before messages are flushed to disk. Default: 10000

[log_flush_interval_ms] The maximum amount of time a message can sit in a log before we force a flush: Default 1000 (1 second)

[log_retention_hours] The number of hours to keep a log file before deleting it (in hours). Default 168 (1 week)

[log_retention_bytes] The maximum size of the log before deleting it. Default: undef

[log_segment_bytes] The maximum size of a log segment file. When this size is reached a new log segment will be created: Default undef (512MB)

[log_retention_check_interval_ms] The frequency in milliseconds that the log cleaner checks whether any log eligible for deletion. Default: undef (300000)

[log_cleanup_policy] Designates the retention policy to use on old log segments. 'delete' will discard old segments when their retention time or size limit has been reached. 'compact' will enable log compaction. Default: delete

[offsets_retention_minutes] Log retention window in minutes for offsets topic. Default: 10080 (1 week)

[log_max_backup_index] Number of (256 MB) log files to keep in /var/log/kafka. Default: 4

[inter_broker_protocol_version] Specify which version of the inter-broker protocol will be used. This is typically bumped after all brokers were upgraded to a new version. Default: undef

[* group_initial_rebalance_delay*] The time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.

[log_message_format_version] Specify the message format version the broker will use to append messages to the logs. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don't understand. Default: undef

[nofiles_ulimit] The broker process' number of open files ulimit. Default: 8192

[java_opts] Extra Java options. Default: undef

[classpath] Extra classpath entries. Default: undef

[jmx_port] Port on which to expose JMX metrics. Default: 9999

[heap_opts] Heap options to pass to JVM on startup. Default: undef

[jvm_performance_opts] Value to use for KAFKA_JVM_PERFORMANCE_OPTS in /etc/default/kafka. This controls GC settings. Default: undef.

[server_properties_template] Default: 'confluent/kafka/server.properties.erb'

[super_user_client_properties_template] Default: 'confluent/kafka/super-user-client.properties.erb'

[default_template] Default: 'confluent/kafka/kafka.default.erb'

[log4j_properties_template] Default: 'confluent/kafka/log4j.properties.erb'

[message_max_bytes] The maximum message size allowed. Default: 1048576

[allow_everyone_if_no_acl_found] If this value is on true, only the topics on which are ACLs are set are secured. Default: true

[super_users] List of super user CNs. If configuring SSL, this should at least include the cluster's SSL principal so the cluster can operate.

[authorizer_class_name] Sets up the ACL authorization provider specified as parameter. It also set up a more verbose log4j logging related to ACL authorization events. Default: undef

[authorizer_log_level] Default: INFO

Parameters:

  • enabled (Any) (defaults to: true)
  • brokers (Any) (defaults to: { $facts['networking']['fqdn'] => { 'id' => 1, 'port' => 9092, }, })
  • listeners (Any) (defaults to: ['PLAINTEXT://:9092'])
  • security_inter_broker_protocol (Any) (defaults to: undef)
  • ssl_keystore_location (Any) (defaults to: undef)
  • ssl_keystore_password (Any) (defaults to: undef)
  • ssl_key_password (Any) (defaults to: undef)
  • ssl_truststore_location (Any) (defaults to: undef)
  • ssl_truststore_password (Any) (defaults to: undef)
  • ssl_client_auth (Any) (defaults to: undef)
  • ssl_enabled_protocols (Any) (defaults to: undef)
  • ssl_cipher_suites (Any) (defaults to: undef)
  • ssl_port (Any) (defaults to: 9093)
  • log_dirs (Any) (defaults to: ['/var/spool/kafka'])
  • zookeeper_connect (Any) (defaults to: 'localhost:2181')
  • zookeeper_connection_timeout_ms (Any) (defaults to: undef)
  • zookeeper_session_timeout_ms (Any) (defaults to: undef)
  • auto_create_topics_enable (Any) (defaults to: true)
  • auto_leader_rebalance_enable (Any) (defaults to: true)
  • num_partitions (Any) (defaults to: 1)
  • default_replication_factor (Any) (defaults to: 1)
  • delete_topic_enable (Any) (defaults to: true)
  • offsets_topic_replication_factor (Any) (defaults to: undef)
  • min_insync_replicas (Any) (defaults to: 1)
  • replica_lag_time_max_ms (Any) (defaults to: undef)
  • num_recovery_threads_per_data_dir (Any) (defaults to: undef)
  • replica_socket_timeout_ms (Any) (defaults to: undef)
  • replica_socket_receive_buffer_bytes (Any) (defaults to: undef)
  • num_replica_fetchers (Any) (defaults to: 1)
  • replica_fetch_max_bytes (Any) (defaults to: undef)
  • max_incremental_fetch_session_cache_slots (Any) (defaults to: undef)
  • num_network_threads (Any) (defaults to: undef)
  • num_io_threads (Any) (defaults to: size($log_dirs))
  • socket_send_buffer_bytes (Any) (defaults to: 1048576)
  • socket_receive_buffer_bytes (Any) (defaults to: 1048576)
  • socket_request_max_bytes (Any) (defaults to: undef)
  • log_message_timestamp_type (Any) (defaults to: 'CreateTime')
  • log_flush_interval_messages (Any) (defaults to: undef)
  • log_flush_interval_ms (Any) (defaults to: undef)
  • log_retention_hours (Any) (defaults to: 168)
  • log_retention_bytes (Any) (defaults to: undef)
  • log_segment_bytes (Any) (defaults to: undef)
  • log_retention_check_interval_ms (Any) (defaults to: undef)
  • log_cleanup_policy (Any) (defaults to: undef)
  • offsets_retention_minutes (Any) (defaults to: 10080)
  • inter_broker_protocol_version (Any) (defaults to: undef)
  • group_initial_rebalance_delay (Any) (defaults to: undef)
  • log_message_format_version (Any) (defaults to: undef)
  • nofiles_ulimit (Any) (defaults to: 8192)
  • java_opts (Any) (defaults to: undef)
  • classpath (Any) (defaults to: undef)
  • jmx_port (Any) (defaults to: 9999)
  • heap_opts (Any) (defaults to: undef)
  • log_max_backup_index (Any) (defaults to: 4)
  • jvm_performance_opts (Any) (defaults to: undef)
  • server_properties_template (Any) (defaults to: 'confluent/kafka/server.properties.erb')
  • super_user_client_properties_template (Any) (defaults to: 'confluent/kafka/super-user-client.properties.erb')
  • super_user_client_credentials_path (Any) (defaults to: '/etc/kafka/super-user-client.properties')
  • default_template (Any) (defaults to: 'confluent/kafka/kafka.default.erb')
  • log4j_properties_template (Any) (defaults to: 'confluent/kafka/log4j.properties.erb')
  • message_max_bytes (Any) (defaults to: 1048576)
  • allow_everyone_if_no_acl_found (Any) (defaults to: true)
  • super_users (Any) (defaults to: undef)
  • authorizer_class_name (Any) (defaults to: undef)
  • authorizer_log_level (Any) (defaults to: 'INFO')
  • use_modern_jvm_default_opts (Boolean) (defaults to: false)


250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'modules/confluent/manifests/kafka/broker.pp', line 250

class confluent::kafka::broker(
    $enabled                             = true,
    $brokers                             = {
        $facts['networking']['fqdn'] => {
            'id'   => 1,
            'port' => 9092,
        },
    },
    $listeners                                 = ['PLAINTEXT://:9092'],

    $security_inter_broker_protocol            = undef,

    $ssl_keystore_location                     = undef,
    $ssl_keystore_password                     = undef,
    $ssl_key_password                          = undef,
    $ssl_truststore_location                   = undef,
    $ssl_truststore_password                   = undef,
    $ssl_client_auth                           = undef,
    $ssl_enabled_protocols                     = undef,
    $ssl_cipher_suites                         = undef,
    $ssl_port                                  = 9093,

    $log_dirs                                  = ['/var/spool/kafka'],

    $zookeeper_connect                         = 'localhost:2181',
    $zookeeper_connection_timeout_ms           = undef,
    $zookeeper_session_timeout_ms              = undef,

    $auto_create_topics_enable                 = true,
    $auto_leader_rebalance_enable              = true,

    $num_partitions                            = 1,
    $default_replication_factor                = 1,
    $delete_topic_enable                       = true,
    $offsets_topic_replication_factor          = undef,
    $min_insync_replicas                       = 1,
    $replica_lag_time_max_ms                   = undef,
    $num_recovery_threads_per_data_dir         = undef,
    $replica_socket_timeout_ms                 = undef,
    $replica_socket_receive_buffer_bytes       = undef,
    $num_replica_fetchers                      = 1,
    $replica_fetch_max_bytes                   = undef,
    $max_incremental_fetch_session_cache_slots = undef,

    $num_network_threads                       = undef,
    $num_io_threads                            = size($log_dirs),
    $socket_send_buffer_bytes                  = 1048576,
    $socket_receive_buffer_bytes               = 1048576,
    $socket_request_max_bytes                  = undef,

    $log_message_timestamp_type                = 'CreateTime',
    $log_flush_interval_messages               = undef,
    $log_flush_interval_ms                     = undef,

    $log_retention_hours                       = 168,     # 1 week
    $log_retention_bytes                       = undef,
    $log_segment_bytes                         = undef,

    $log_retention_check_interval_ms           = undef,
    $log_cleanup_policy                        = undef,

    $offsets_retention_minutes                 = 10080,   # 1 week

    $inter_broker_protocol_version             = undef,
    $group_initial_rebalance_delay             = undef,
    $log_message_format_version                = undef,
    $nofiles_ulimit                            = 8192,
    $java_opts                                 = undef,
    $classpath                                 = undef,
    $jmx_port                                  = 9999,
    $heap_opts                                 = undef,
    $log_max_backup_index                      = 4,
    $jvm_performance_opts                      = undef,

    $server_properties_template                = 'confluent/kafka/server.properties.erb',
    $super_user_client_properties_template     = 'confluent/kafka/super-user-client.properties.erb',
    $super_user_client_credentials_path        = '/etc/kafka/super-user-client.properties',
    $default_template                          = 'confluent/kafka/kafka.default.erb',
    $log4j_properties_template                 = 'confluent/kafka/log4j.properties.erb',

    $message_max_bytes                         = 1048576,

    $allow_everyone_if_no_acl_found            = true,
    $super_users                               = undef,
    $authorizer_class_name                     = undef,
    $authorizer_log_level                      = 'INFO',
    Boolean $use_modern_jvm_default_opts       = false,
) {
    # confluent::kafka::common installs the kafka package
    # and a handy wrapper script.
    require ::confluent::kafka::common

    # Get this broker's id out of the $kafka::brokers
    # configuration hash.
    $id = $brokers[$facts['networking']['fqdn']]['id']

    # If this broker's rack is set, use it for broker.rack
    $rack = $brokers[$facts['networking']['fqdn']]['rack']

    # The default Kafka port for KAFKA_BOOTSTRAP_SERVERS will be the port
    # first port specified in the $listeners array.  This is used
    # in the kafka shell wrapper to automatically provide broker list.
    $default_port = inline_template("<%= Array(@listeners)[0].split(':')[-1] %>")

    # Local variable for rendering in templates.
    $java_home = $::confluent::kafka::common::java_home

    # This is the message data directory,
    # not to be confused with /var/log/kafka
    # which contains daemon process logs.
    file { $log_dirs:
        ensure => 'directory',
        owner  => 'kafka',
        group  => 'kafka',
        mode   => '0755',
    }

    # If any passwords, set proper readability.
    if $ssl_key_password or $ssl_keystore_password or $ssl_truststore_password {
        $server_properties_mode = '0440'
    }
    else {
        $server_properties_mode = '0444'
    }
    # Render out Kafka Broker config files.
    file { '/etc/kafka/server.properties':
        content => template($server_properties_template),
        group   => 'kafka',
        mode    => $server_properties_mode,
    }

    if $super_users and $ssl_keystore_location and $super_user_client_credentials_path {
        # Credentials to use when an action to modify the cluster
        # is needed. We want to limit the use of the ANONYMOUS
        # user as much as possible.
        file { $super_user_client_credentials_path:
            content => template($super_user_client_properties_template),
            group   => 'kafka',
            mode    => $server_properties_mode,
        }
    }

    # log4j configuration for Kafka daemon
    # process logs in /var/log/kafka.
    file { '/etc/kafka/log4j.properties':
        content => template($log4j_properties_template),
    }

    # Environment variables that are passed to kafka-run-class.
    file { '/etc/default/kafka':
        content => template($default_template),
    }

    # Environment variables used by the /usr/local/bin/kafka wrapper script
    # installed by confluent::kafka::common.  This makes it easier
    # to use the kafka wrapper script on Kafka brokers.
    file { '/etc/profile.d/kafka.sh':
        content => template('confluent/kafka/kafka-profile.sh.erb'),
    }

    $service_ensure = $enabled ? {
        false   => 'absent',
        default => 'present',
    }

    # Start the Kafka server.
    # We don't want to subscribe to the config files here.
    # It will be better to manually restart Kafka when
    # the config files changes.
    systemd::service { 'kafka':
        ensure  => $service_ensure,
        content => systemd_template('kafka'),
        require => [
            File[$log_dirs],
            File['/etc/kafka/server.properties'],
            File['/etc/kafka/log4j.properties'],
            File['/etc/default/kafka'],
        ],
    }
}