Puppet Class: profile::cache::kafka::statsv

Defined in:
modules/profile/manifests/cache/kafka/statsv.pp

Overview

Class profile::cache::kafka::statsv

Sets up a varnishkafka logging endpoint for collecting application level metrics. We are calling this system statsv, as it is similar to statsd, but uses varnish as its logging endpoint.

Parameters

cache_cluster

Used in when naming varnishkafka metrics. Default: lookup('cache::cluster')

kafka_cluster_name

The name of the kafka cluster to use from the kafka_clusters hiera variable. Since only one statsd instance is active at any given time, you should probably set this explicitly to a fully qualified kafka cluster name (with DC suffix) that is located in the same DC as the active statsd instance.

monitoring_enabled

True if the varnishkafka instance should be monitored. Default: false

Parameters:

  • cache_cluster (String) (defaults to: lookup('cache::cluster'))
  • kafka_cluster_name (String) (defaults to: lookup('profile::cache::kafka::statsv::kafka_cluster_name'))
  • monitoring_enabled (Boolean) (defaults to: lookup('profile::cache::kafka::statsv::monitoring_enabled', {default_value => false}))
  • ssl_enabled (Boolean) (defaults to: lookup('profile::cache::kafka::statsv::ssl_enabled', {'default_value' => false}))
  • statsd (String) (defaults to: lookup('statsd'))


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'modules/profile/manifests/cache/kafka/statsv.pp', line 23

class profile::cache::kafka::statsv(
    String $cache_cluster       = lookup('cache::cluster'),
    String $kafka_cluster_name  = lookup('profile::cache::kafka::statsv::kafka_cluster_name'),
    Boolean $monitoring_enabled = lookup('profile::cache::kafka::statsv::monitoring_enabled', {default_value => false}),
    Boolean $ssl_enabled        = lookup('profile::cache::kafka::statsv::ssl_enabled', {'default_value' => false}),
    String $statsd              = lookup('statsd')
)
{
    $kafka_config  = kafka_config($kafka_cluster_name)

    if $ssl_enabled {
        $kafka_brokers = $kafka_config['brokers']['ssl_array']

        # Include this class to get key and certificate for varnishkafka
        # to produce to Kafka over SSL/TLS.
        require ::profile::cache::kafka::certificate
        $ssl_ca_location          = $::profile::cache::kafka::certificate::ssl_ca_location
        $ssl_key_password         = $::profile::cache::kafka::certificate::ssl_key_password
        $ssl_key_location         = $::profile::cache::kafka::certificate::ssl_key_location
        $ssl_certificate_location = $::profile::cache::kafka::certificate::ssl_certificate_location
        $ssl_cipher_suites        = $::profile::cache::kafka::certificate::ssl_cipher_suites
        $ssl_curves_list          = $::profile::cache::kafka::certificate::ssl_curves_list
        $ssl_sigalgs_list         = $::profile::cache::kafka::certificate::ssl_sigalgs_list
    }
    else {
        $kafka_brokers = $kafka_config['brokers']['array']

        $ssl_ca_location          = undef
        $ssl_key_password         = undef
        $ssl_key_location         = undef
        $ssl_certificate_location = undef
        $ssl_cipher_suites        = undef
        $ssl_curves_list          = undef
        $ssl_sigalgs_list         = undef
    }

    $format  = "%{fake_tag0@hostname?${::fqdn}}x %{%FT%T@dt}t %{X-Client-IP@ip}o %{@uri_path}U %{@uri_query}q %{User-Agent@user_agent}i"

    varnishkafka::instance { 'statsv':
        brokers                     => $kafka_brokers,
        format                      => $format,
        format_type                 => 'json',
        topic                       => 'statsv',
        varnish_name                => 'frontend',
        varnish_svc_name            => 'varnish-frontend',
        # Only log webrequests to /beacon/statsv
        varnish_opts                => { 'q' => 'ReqURL ~ "^/beacon/statsv\?"' },
        # -1 means all brokers in the ISR must ACK this request.
        topic_request_required_acks => '-1',
        # TLS/SSL config
        ssl_enabled                 => $ssl_enabled,
        ssl_ca_location             => $ssl_ca_location,
        ssl_key_password            => $ssl_key_password,
        ssl_key_location            => $ssl_key_location,
        ssl_certificate_location    => $ssl_certificate_location,
        ssl_cipher_suites           => $ssl_cipher_suites,
        ssl_curves_list             => $ssl_curves_list,
        ssl_sigalgs_list            => $ssl_sigalgs_list,
    }

    # Make sure varnishes are configured and started for the first time
    # before the instances as well, or they fail to start initially...
    Service <| tag == 'varnish_instance' |> -> Varnishkafka::Instance['statsv']

    if $monitoring_enabled {
        # Aggregated alarms for delivery errors are defined in icinga::monitor::analytics

        # Generate icinga alert if varnishkafka is not running.
        nrpe::monitor_service { 'varnishkafka-statsv':
            description   => 'statsv Varnishkafka log producer',
            nrpe_command  => "/usr/lib/nagios/plugins/check_procs -c 1:1 -a '/usr/bin/varnishkafka -S /etc/varnishkafka/statsv.conf'",
            contact_group => 'admins,analytics',
            require       => Class['::varnishkafka'],
            notes_url     => 'https://wikitech.wikimedia.org/wiki/Analytics/Systems/Varnishkafka',
        }

        # Sets up Logster to read from the Varnishkafka instance stats JSON file
        # and report metrics to statsd.
        varnishkafka::monitor::statsd { 'statsv':
            ensure                 => 'absent',
            graphite_metric_prefix => "varnishkafka.${::hostname}.statsv.${cache_cluster}",
            statsd_host_port       => $statsd,
        }
    }
}