Defined Type: profile::analytics::refinery::job::eventlogging_to_druid_job

Defined in:
modules/profile/manifests/analytics/refinery/job/eventlogging_to_druid_job.pp

Overview

Define profile::analytics::refinery::job::eventlogging_to_druid_job

Installs crons to run HiveToDruid Spark jobs on EventLogging datasets. This job loads data from the given EL Hive table to a Druid datasource.

We use spark's –files option to load each job's config file to the corresponding working HDFS dir. It will be then referenced via its relative file name with the –config_file option.

TEMPORARY HACK

The problem: HiveDruid does not use RefineTarget to determine which data pieces are available at a given moment and are to be loaded to Druid, because currently RefineTarget does not support Druid. Instead, HiveDruid just assumes that the passed date/time interval is correct and loads it without any check or filter. The interval checking needs to be done then by puppet (cron), passing a relative number of hours ago as since and until.

Potential issues: 1) If the data pipeline is late for any reason (high load, outage, restarts,

etc.) HiveToDruid might not find the input data, or find it
incomplete, thus loading corrupt data to Druid for that hour.

2) If the cluster is busy and the HiveToDruid job takes more than

1 hour to launch (waiting), then 'since 6 hours ago' will skip 1 hour
(or more) and there will be a hole in the corresponding Druid datasource.

This would cause user confusion, frustration and give the maintainers lots of work to manually backfill datasources.

The right solution: We should improve RefineTarget to support Druid. However, this seems to be quite a bit of work. Task: phabricator.wikimedia.org/T207207 But in the meantime…

The temporary solution:

Issue 1) Make this module install 2 loading jobs for each given datasource:

one hourly and one daily. The hourly one will load data as soon as
possible with the mentioned potential issues. The daily one, will load
data with a lag of 3-4 days (configurable), to automatically cover up any
hourly load issues that happened during that lag. A desirable side-effect
of this hack is that Druid hourly data gets compacted in daily segments.

Issue 2) Instead of passing relative time offsets (hours ago), calculate

absolute timestamps for since and until using bash. To allow bash to
interpret date commands since and until params can not be passed via
config property file.

Properties

job_config

A hash of job config properites that will be rendered as a properties file and given to the HiveToDruid job as the –config_file argument. Please, do not include the following properties: since, until, segment_granularity, reduce_memory, num_shards. The reason being: This profile will install 2 jobs for each datasource: hourly and daily. Each of those will have different parameters for since, until, segment_ granularity, reduce_memory and num_shards. Therefore, those options are specified inside this file.

daily_shards

Number of shards that the daily segment of this datasource should have. This will be usually 1, except for very big schemas. Default: 1.

job_name

The Spark job name. Default: eventlogging_to_druid_$title

Parameters:

  • job_config (Any)
  • daily_shards (Any) (defaults to: 1)
  • job_name (Any) (defaults to: "eventlogging_to_druid_${title}")
  • refinery_job_jar (Any) (defaults to: undef)
  • job_class (Any) (defaults to: 'org.wikimedia.analytics.refinery.job.HiveToDruid')
  • queue (Any) (defaults to: 'production')
  • user (Any) (defaults to: 'analytics')
  • hourly_hours_since (Any) (defaults to: 6)
  • hourly_hours_until (Any) (defaults to: 5)
  • daily_days_since (Any) (defaults to: 4)
  • daily_days_until (Any) (defaults to: 3)
  • use_kerberos (Any) (defaults to: false)
  • ensure_hourly (Any) (defaults to: 'present')
  • ensure_daily (Any) (defaults to: 'present')
  • ensure (Any) (defaults to: 'present')


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'modules/profile/manifests/analytics/refinery/job/eventlogging_to_druid_job.pp', line 69

define profile::analytics::refinery::job::eventlogging_to_druid_job (
    $job_config,
    $daily_shards        = 1,
    $job_name            = "eventlogging_to_druid_${title}",
    $refinery_job_jar    = undef,
    $job_class           = 'org.wikimedia.analytics.refinery.job.HiveToDruid',
    $queue               = 'production',
    $user                = 'analytics',
    $hourly_hours_since  = 6,
    $hourly_hours_until  = 5,
    $daily_days_since    = 4,
    $daily_days_until    = 3,
    $use_kerberos        = false,
    $ensure_hourly       = 'present',
    $ensure_daily        = 'present',
    $ensure              = 'present',
) {
    require ::profile::analytics::refinery
    $refinery_path = $profile::analytics::refinery::path

    # Override specific ensures, in case global ensure is absent.
    $_ensure_hourly = $ensure ? {
        'absent' => 'absent',
        default  => $ensure_hourly
    }
    $_ensure_daily = $ensure ? {
        'absent' => 'absent',
        default  => $ensure_daily
    }

    # If $refinery_job_jar not given, use the symlink at artifacts/refinery-job.jar
    $_refinery_job_jar = $refinery_job_jar ? {
        undef   => "${refinery_path}/artifacts/refinery-job.jar",
        default => $refinery_job_jar,
    }

    # Directory where HiveToDruid config property files will go
    $job_config_dir = "${::profile::analytics::refinery::config_dir}/eventlogging_to_druid"
    if !defined(File[$job_config_dir]) {
        file { $job_config_dir:
            ensure => 'directory',
        }
    }

    # Config options for all jobs, can be overriden by define params
    $default_config = {
        'database'            => 'event',
        'table'               => $title,
        'query_granularity'   => 'minute',
        'hadoop_queue'        => $queue,
        'druid_host'          => 'an-druid1001.eqiad.wmnet',
        'druid_port'          => '8090',
    }

    # Common Spark options for all jobs
    $default_spark_opts = "--master yarn --deploy-mode cluster --queue ${queue} --conf spark.driver.extraClassPath=/usr/lib/hive/lib/hive-jdbc.jar:/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/usr/lib/hive/lib/hive-service.jar"

    # Hourly job
    $hourly_job_config_file = "${job_config_dir}/${job_name}_hourly.properties"
    profile::analytics::refinery::job::config { $hourly_job_config_file:
        ensure     => $_ensure_hourly,
        properties => merge($default_config, $job_config, {
            'segment_granularity' => 'hour',
            'num_shards'          => 1,
            'reduce_memory'       => '4096',
        }),
    }
    profile::analytics::refinery::job::spark_job { "${job_name}_hourly":
        ensure       => $_ensure_hourly,
        jar          => $_refinery_job_jar,
        class        => $job_class,
        spark_opts   => "${default_spark_opts} --files /etc/hive/conf/hive-site.xml,${hourly_job_config_file} --conf spark.dynamicAllocation.maxExecutors=32 --driver-memory 2G",
        job_opts     => "--config_file ${job_name}_hourly.properties --since $(date --date '-${hourly_hours_since}hours' -u +'%Y-%m-%dT%H:00:00') --until $(date --date '-${hourly_hours_until}hours' -u +'%Y-%m-%dT%H:00:00')",
        require      => Profile::Analytics::Refinery::Job::Config[$hourly_job_config_file],
        user         => $user,
        interval     => '*-*-* *:00:00',
        use_kerberos => $use_kerberos,
    }

    # Daily job
    $daily_job_config_file = "${job_config_dir}/${job_name}_daily.properties"
    profile::analytics::refinery::job::config { $daily_job_config_file:
        ensure     => $_ensure_daily,
        properties => merge($default_config, $job_config, {
            'segment_granularity' => 'day',
            'num_shards'          => $daily_shards,
            'reduce_memory'       => '8192',
        }),
    }
    profile::analytics::refinery::job::spark_job { "${job_name}_daily":
        ensure       => $_ensure_daily,
        jar          => $_refinery_job_jar,
        class        => $job_class,
        spark_opts   => "${default_spark_opts} --files /etc/hive/conf/hive-site.xml,${daily_job_config_file} --conf spark.dynamicAllocation.maxExecutors=64 --driver-memory 2G",
        job_opts     => "--config_file ${job_name}_daily.properties --since $(date --date '-${daily_days_since}days' -u +'%Y-%m-%dT00:00:00') --until $(date --date '-${daily_days_until}days' -u +'%Y-%m-%dT00:00:00')",
        require      => Profile::Analytics::Refinery::Job::Config[$daily_job_config_file],
        user         => $user,
        interval     => '*-*-* 00:00:00',
        use_kerberos => $use_kerberos,
    }
}