Defined Type: logstash::input::kafka

Defined in:
modules/logstash/manifests/input/kafka.pp

Overview

Define: logstash::input::kafka

Configure logstash to collect input from a Kafka topic.

If $security_protocol == SSL, this will install the Kafka truststore.jks file at /etc/logstash/kafka_$cluster_name.truststore.jks from the Puppet private secrets module. This assumes that the Kafka truststore is available via the function secret(“certificates/kafka_$kafka_cluster_name_full_broker/truststore.jks”). This should be the correct path to the cergen created truststore for the specified Kafka cluster.

Parameters:

kafka_cluster_name

Kafka cluster name. Either non datacenter prefixed cluster name, or the full cluster name key in the kafka_clusters hiera variable.

topic

Kafka topic. Default: $title.

topics_pattern

Kafka topic pattern. Default: None. Supersedes $topic if set.

group_id

Kafka consumer group id. Default: None (use logstash implemented default of “logstash”)

security_protocol

Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL must be set to SSL for ssl_truststore* configs to be set see www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-security_protocol

ssl_truststore_location

jks truststore location value. Default: none. Requires $security_protocol = 'SSL'

ssl_truststore_password

jks truststore password value. Default: none. Requires $security_protocol = 'SSL'

manage_truststore

Enables puppet to manage the deployed truststore file. Default: true

priority

Configuration loading priority. Default: '10'.

tags

Array of tags to be added to the logs. Default: [$title].

consumer_threads

number of logstash consumer threads.

type

Log type to be passed to Logstash. Default: none.

codec

Codec to decode input. Default 'plain'.

plugin_id

Name associated with Logstash metrics

ensure

Whether the config should exist. Default: present.

Sample usage:

logstash::input::kafka { 'some_topic':
    kafka_cluster_name => 'logging-eqiad'
}

Parameters:

  • kafka_cluster_name (String)
  • topic (String) (defaults to: $title)
  • topics_pattern (Optional[String]) (defaults to: undef)
  • group_id (Optional[String]) (defaults to: undef)
  • security_protocol (Optional[Enum['PLAINTEXT','SSL','SASL_PLAINTEXT','SASL_SSL']]) (defaults to: undef)
  • ssl_truststore_location (Optional[String]) (defaults to: undef)
  • ssl_truststore_password (Optional[String]) (defaults to: undef)
  • ssl_endpoint_identification_algorithm (Optional[String]) (defaults to: undef)
  • manage_truststore (Boolean) (defaults to: true)
  • priority (Any) (defaults to: 10)
  • tags (Any) (defaults to: [$title])
  • consumer_threads (Integer) (defaults to: 1)
  • type (Optional[String]) (defaults to: undef)
  • codec (String) (defaults to: 'plain')
  • plugin_id (Any) (defaults to: "input/kafka/${title}")
  • ensure (Any) (defaults to: present)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'modules/logstash/manifests/input/kafka.pp', line 71

define logstash::input::kafka(
    String $kafka_cluster_name,
    String $topic                                                                    = $title,
    Optional[String] $topics_pattern                                                 = undef,
    Optional[String] $group_id                                                       = undef,
    Optional[Enum['PLAINTEXT','SSL','SASL_PLAINTEXT','SASL_SSL']] $security_protocol = undef,
    Optional[String] $ssl_truststore_location                                        = undef,
    Optional[String] $ssl_truststore_password                                        = undef,
    Optional[String] $ssl_endpoint_identification_algorithm                          = undef,
    Boolean $manage_truststore                                                       = true,

    $priority                                                                        = 10,
    $tags                                                                            = [$title],
    Integer $consumer_threads                                                        = 1,
    Optional[String] $type                                                           = undef,
    String $codec                                                                    = 'plain',
    $plugin_id                                                                       = "input/kafka/${title}",
    $ensure                                                                          = present,
) {
    $logstash_conf_title = "input-kafka-${title}"

    $kafka_config = kafka_config($kafka_cluster_name)
    $kafka_cluster_name_full = $kafka_config['name']

    if ($security_protocol == 'SSL') {
        if !$ssl_truststore_password {
            fail('Must provide $ssl_truststore_password if using logstash::input::kafka with $security_protocol=SSL')
        }

        $bootstrap_servers = $kafka_config['brokers']['ssl_string']
        $_ssl_truststore_location = $ssl_truststore_location ? {
            undef   => "/etc/logstash/kafka_${kafka_cluster_name_full}.truststore.jks",
            default => $ssl_truststore_location,
        }

        if $manage_truststore {
            if !defined(File[$_ssl_truststore_location]){
                file { $_ssl_truststore_location:
                    content => secret("certificates/kafka_${kafka_cluster_name_full}_broker/truststore.jks"),
                    owner   => 'logstash',
                    group   => 'logstash',
                    mode    => '0440',
                }
            }
            # If using SSL, the Kafka input logstash conf
            # should depend on File $ssl_truststore_location.
            Logstash::Conf[$logstash_conf_title] { require => File[$_ssl_truststore_location] }
        }
    }
    else {
        $bootstrap_servers = $kafka_config['brokers']['string']
    }

    logstash::conf { $logstash_conf_title:
        ensure   => $ensure,
        content  => template('logstash/input/kafka.erb'),
        priority => $priority,
    }
}