Puppet Function: kafka_config

Defined in:
modules/role/lib/puppet/parser/functions/kafka_config.rb
Function type:
Ruby 3.x API

Overview

kafka_config()Any

Returns:

  • (Any)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'modules/role/lib/puppet/parser/functions/kafka_config.rb', line 50

newfunction(:kafka_config, :type => :rvalue, :arity => -2) do |args|
  fqdn = lookupvar('::fqdn').to_s
  clusters = call_function(:hiera, ['kafka_clusters', {}])
  cluster_name = clusters.key?(args[0]) ? args[0] : function_kafka_cluster_name(args)

  cluster = clusters[cluster_name] || {
    'brokers' => {
      fqdn => { 'id' => '1' }
    }
  }
  brokers = cluster['brokers']

  # Get this Kafka cluster's zookeeper cluster name from the cluster config.
  zk_cluster_name = cluster['zookeeper_cluster_name']

  # Lookup all zookeeper clusters config
  zk_clusters = call_function(:hiera, ['zookeeper_clusters'])

  # These are the zookeeper hosts for this kafka cluster.
  zk_hosts = zk_clusters[zk_cluster_name]['hosts'].keys.sort

  default_port = 9092
  default_ssl_port = 9093
  jmx_port = '9999'

  config = {
    'name'      => cluster_name,
    'brokers'   => {
      'hash'       => brokers,
      # array of broker hostnames without port.  TODO: change this to use host:port?
      'array'      => brokers.keys.sort,
      # string list of comma-separated host:port broker
      'string'     => brokers.map { |host, conf| "#{host}:#{conf['port'] || default_port}" }.sort.join(','),

      # array host:ssl_port brokers
      'ssl_array'  => brokers.map { |host, conf| "#{host}:#{conf['ssl_port'] || default_ssl_port}" }.sort,
      # string list of comma-separated host:ssl_port brokers
      'ssl_string' => brokers.map { |host, conf| "#{host}:#{conf['ssl_port'] || default_ssl_port}" }.sort.join(','),

      # list of comma-separated host_9999 broker pairs used as graphite wildcards
      'graphite'   => "{#{brokers.keys.map { |b| "#{b.tr '.', '_'}_#{jmx_port}" }.sort.join(',')}}",
      'size'       => brokers.keys.size
    },
    'jmx_port'  => jmx_port,
    'zookeeper' => {
      'name'   => zk_cluster_name,
      'hosts'  => zk_hosts,
      'chroot' => "/kafka/#{cluster_name}",
      'url'    => "#{zk_hosts.join(',')}/kafka/#{cluster_name}"
    }
  }

  if cluster.key?('api_version')
    config['api_version'] = cluster['api_version']
  else
    config['api_version'] = nil
  end

  config
end