52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'modules/role/lib/puppet/parser/functions/kafka_config.rb', line 52
newfunction(:kafka_config, :type => :rvalue, :arity => -2) do |args|
fqdn = lookupvar('::fqdn').to_s
clusters = call_function('lookup', ['kafka_clusters', {'default_value' => {}}])
cluster_name = clusters.key?(args[0]) ? args[0] : function_kafka_cluster_name(args)
cluster = clusters[cluster_name] || {
'brokers' => {
fqdn => { 'id' => '1' }
}
}
brokers = cluster['brokers']
zk_cluster_name = cluster['zookeeper_cluster_name']
zk_clusters = call_function('lookup', ['zookeeper_clusters'])
zk_hosts = zk_clusters[zk_cluster_name]['hosts'].keys.sort
default_port = '9092'
default_ssl_port = '9093'
jmx_port = '9999'
brokers.each{ |_, conf| conf['ssl_port'] ||= default_ssl_port }
brokers.each{ |_, conf| conf['port'] ||= default_port }
config = {
'name' => cluster_name,
'brokers' => {
'hash' => brokers,
'array' => brokers.keys.sort,
'string' => brokers.map { |host, conf| "#{host}:#{conf['port']}" }.sort.join(','),
'ssl_array' => brokers.map { |host, conf| "#{host}:#{conf['ssl_port']}" }.sort,
'ssl_string' => brokers.map { |host, conf| "#{host}:#{conf['ssl_port']}" }.sort.join(','),
'graphite' => "{#{brokers.keys.map { |b| "#{b.tr '.', '_'}_#{jmx_port}" }.sort.join(',')}}",
'size' => brokers.keys.size
},
'jmx_port' => jmx_port,
'zookeeper' => {
'name' => zk_cluster_name,
'hosts' => zk_hosts,
'chroot' => "/kafka/#{cluster_name}",
'url' => "#{zk_hosts.join(',')}/kafka/#{cluster_name}"
}
}
config
end
|