Puppet Class: profile::query_service::streaming_updater

Defined in:
modules/profile/manifests/query_service/streaming_updater.pp

Overview

SPDX-License-Identifier: Apache-2.0

Parameters:

  • username (String) (defaults to: lookup('profile::query_service::username'))
  • kafka_cluster (String) (defaults to: lookup('profile::query_service::streaming_updater::kafka_cluster'))
  • kafka_topic (String) (defaults to: lookup('profile::query_service::streaming_updater::kafka_topic'))
  • logstash_logback_port (Stdlib::Port) (defaults to: lookup('logstash_logback_port'))
  • package_dir (Stdlib::Unixpath) (defaults to: lookup('profile::query_service::package_dir'))
  • data_dir (Stdlib::Unixpath) (defaults to: lookup('profile::query_service::data_dir'))
  • log_dir (Stdlib::Unixpath) (defaults to: lookup('profile::query_service::log_dir'))
  • deploy_name (String) (defaults to: lookup('profile::query_service::deploy_name'))
  • blazegraph_main_ns (String) (defaults to: lookup('profile::query_service::blazegraph_main_ns'))
  • journal (String) (defaults to: lookup('profile::query_service::streaming_updater::journal'))
  • uri_scheme_options (Array[String]) (defaults to: lookup('profile::query_service::uri_scheme_options'))
  • enable_updater (Boolean) (defaults to: lookup('profile::query_service::enable_updater', { 'default_value' => true }))


2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'modules/profile/manifests/query_service/streaming_updater.pp', line 2

class profile::query_service::streaming_updater (
    String $username = lookup('profile::query_service::username'),
    String $kafka_cluster = lookup('profile::query_service::streaming_updater::kafka_cluster'),
    String $kafka_topic = lookup('profile::query_service::streaming_updater::kafka_topic'),
    Stdlib::Port $logstash_logback_port = lookup('logstash_logback_port'),
    Stdlib::Unixpath $package_dir = lookup('profile::query_service::package_dir'),
    Stdlib::Unixpath $data_dir = lookup('profile::query_service::data_dir'),
    Stdlib::Unixpath $log_dir = lookup('profile::query_service::log_dir'),
    String $deploy_name = lookup('profile::query_service::deploy_name'),
    String $blazegraph_main_ns = lookup('profile::query_service::blazegraph_main_ns'),
    String $journal = lookup('profile::query_service::streaming_updater::journal'),
    Array[String] $uri_scheme_options = lookup('profile::query_service::uri_scheme_options'),
    Boolean $enable_updater = lookup('profile::query_service::enable_updater', { 'default_value' => true }),
) {
    require ::profile::query_service::common

    $instance_name = "${deploy_name}-updater"
    $prometheus_agent_path = '/usr/share/java/prometheus/jmx_prometheus_javaagent.jar'
    $prometheus_agent_port = 9101
    $prometheus_agent_config = "/etc/${deploy_name}/${instance_name}-prometheus-jmx.yaml"
    profile::prometheus::jmx_exporter { $instance_name:
        hostname    => $::hostname,
        port        => $prometheus_agent_port,
        config_file => $prometheus_agent_config,
        source      => 'puppet:///modules/profile/query_service/updater-prometheus-jmx.yaml',
        before      => Service[$instance_name],
    }

    $default_jvm_options = ['-XX:+UseNUMA', "-javaagent:${prometheus_agent_path}=${prometheus_agent_port}:${prometheus_agent_config}"]

    $kafka_brokers = kafka_config($kafka_cluster)['brokers']['string']
    $kafka_options = [
        '--brokers', $kafka_brokers,
        '--consumerGroup', $::hostname,
        '--topic', $kafka_topic,
        '--batchSize', '250'
    ]

    class { 'query_service::updater':
        ensure                => stdlib::ensure($enable_updater),
        package_dir           => $package_dir,
        data_dir              => $data_dir,
        log_dir               => $log_dir,
        deploy_name           => $deploy_name,
        username              => $username,
        logstash_logback_port => $logstash_logback_port,
        options               => ['-n', $blazegraph_main_ns, '--'] + $kafka_options,
        extra_jvm_opts        => $uri_scheme_options + $default_jvm_options,
        journal               => $journal,
    }

    class { 'query_service::monitor::updater':
        ensure             => stdlib::ensure($enable_updater),
        username           => $username,
        updater_main_class => 'org.wikidata.query.rdf.updater.consumer.StreamingUpdate',
    }

}