Puppet Class: profile::eventlogging::analytics::processor

Defined in:
modules/profile/manifests/eventlogging/analytics/processor.pp

Overview

SPDX-License-Identifier: Apache-2.0

Class profile::eventlogging::processor

Reads raw events, parses and validates them, and then sends them along for further consumption.

Parameters

kafka_producer_scheme

Choose the eventlogging URI scheme to use for consumers and producer (inputs vs outputs). This allows us to try out different Kafka handlers and different kafka clients that eventlogging supports. The default is kafka://. Also available is kafka-confluent:// eventlogging::processor is the only configured analytics eventlogging kafka producer, so we only need to define this here.

Parameters:

  • client_side_processors (Array[String]) (defaults to: lookup('profile::eventlogging::analytics::processor::client_side_processors', {'default_value' => ['client-side-00', 'client-side-01']}))
  • kafka_consumer_group (String) (defaults to: lookup('profile::eventlogging::analytics::processor::kafka_consumer_group', {'default_value' => 'eventlogging_processor_client_side_00'}))
  • kafka_producer_scheme (String) (defaults to: lookup('profile::eventlogging::analytics::processor::kafka_producer_scheme', {'default_value' => 'kafka://'}))


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'modules/profile/manifests/eventlogging/analytics/processor.pp', line 16

class profile::eventlogging::analytics::processor(
    Array[String] $client_side_processors = lookup('profile::eventlogging::analytics::processor::client_side_processors', {'default_value' => ['client-side-00', 'client-side-01']}),
    String $kafka_consumer_group          = lookup('profile::eventlogging::analytics::processor::kafka_consumer_group', {'default_value' => 'eventlogging_processor_client_side_00'}),
    String $kafka_producer_scheme         = lookup('profile::eventlogging::analytics::processor::kafka_producer_scheme', {'default_value' => 'kafka://'}),
){

    include profile::eventlogging::analytics::server

    $kafka_brokers_string = $profile::eventlogging::analytics::server::kafka_config['brokers']['string']

    # client-side-raw URI is defined for DRY purposes in profile::eventlogging::analytics::server.
    $kafka_client_side_raw_uri = $profile::eventlogging::analytics::server::kafka_client_side_raw_uri

    # Read in raw events from Kafka, process them, and send them to
    # the schema corresponding to their topic in Kafka.
    $kafka_schema_output_uri  = "${kafka_producer_scheme}/${kafka_brokers_string}?topic=eventlogging_{schema}"

    # Increase number and backoff time of retries for async
    # analytics uses.  If metadata changes, we should give
    # more time to retry.
    $kafka_producer_args = $kafka_producer_scheme ? {
        # args for kafka-confluent handler writer
        'kafka-confluent://' => 'message.send.max.retries=6,retry.backoff.ms=200',
        # args for kafka-python handler writer
        'kafka://'           => 'retries=6&retry_backoff_ms=200'
    }

    # This output URL writes to per schema Kafka topics like eventlogging_<SchemaName>
    # If an event's SchemaName is in the eventlogging_schemas_disabled list (defined in plugins.py),
    # it will not be sent to its topic, but a message will be logged about skipping it.
    $kafka_per_schema_output = "map://${kafka_schema_output_uri}&${kafka_producer_args}&function=eventlogging_schemas_disabled_filter"

    # Incoming format from /beacon/event via varnishkafka eventlogging-client-side
    # is of the format:
    #   %q          - GET query with encoded event
    #   %{recvFrom} - recvFrom hostname
    #   %{seqId}    - sequence #
    #   %D          - ISO-8601 dt
    #   %o          - omit
    #   %u          - userAgent
    $format = '%q %{recvFrom}s %{seqId}d %D %{ip}i %u'
    eventlogging::service::processor { $client_side_processors:
        format         => $format,
        input          => $kafka_client_side_raw_uri,
        sid            => $kafka_consumer_group,
        outputs        => [$kafka_per_schema_output],
        output_invalid => true,
    }
}