Defined Type: profile::analytics::refinery::job::refine_job

Defined in:
modules/profile/manifests/analytics/refinery/job/refine_job.pp

Overview

SPDX-License-Identifier: Apache-2.0

Define profile::analytics::refinery::job::refine_job

Installs a systemd timer to run the Refine Spark job. This is used to import arbitrary (schemaed) JSON data into Hive.

If $refine_monitor_enabled is true, a daily RefineMonitor job will be scheduled to look back over a 48 hour period to ensure that all datasets expected to be refined were successfully done so, or if any _FAILURE flags exist.

For description of the Refine $job_config parameters, see: github.com/wikimedia/analytics-refinery-source/blob/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/refine/Refine.scala

Properties

job_config

(required)

A hash of job config properites that will be rendered as a .properties file and
given to the Refine job as the --config_file argument.
monitor_interval

(required)

Systemd time interval for running the RefineMonitor job.
job_name

The Spark job name. Default: refine_$title

spark_submit

Path to spark-submit executable spark_job should use. Default: /usr/bin/spark3-submit

interval

Systemd time interval. Default: '--* *:00:00' (hourly)

monitor_since

If refine_monitor_enabled, RefineMonitor should use the same settings as the main Refine job, but likely with a wider range of data to check. Instead of re-rendering an entirely different job config file, use the same one as the main Refine job, but with CLI flags to override –since to $monitor_since. Default: 48 hours ago.

monitor_until

If refine_monitor_enabled, RefineMonitor should use the same settings as the main Refine job, but likely with a wider range of data to check. Instead of re-rendering an entirely different job config file, use the same one as the main Refine job, but with CLI flags to override –until to $monitor_until. Default: 4 hours ago.

send_mail

If true, service failures will alert, e.g. a bad exit code from Refine or RefineMonitor will result in an email sent to Analytics/DSE

refine_monitor_enabled

If true, a RefineMonitor job will be scheduled using the same job_config for Refine plus any refine_monitor_job_config overrides. RefineMonitor alerts on missing dataset hours or failure flags.

Parameters:

  • job_config (Any)
  • monitor_interval (Any)
  • job_name (Any) (defaults to: "refine_${title}")
  • refinery_job_jar (Any) (defaults to: undef)
  • job_class (Any) (defaults to: 'org.wikimedia.analytics.refinery.job.refine.Refine')
  • monitor_class (Any) (defaults to: 'org.wikimedia.analytics.refinery.job.refine.RefineMonitor')
  • monitor_since (Any) (defaults to: 48)
  • monitor_until (Any) (defaults to: 4)
  • queue (Any) (defaults to: 'production')
  • spark_submit (Any) (defaults to: '/usr/bin/spark3-submit')
  • spark_executor_memory (Any) (defaults to: '4G')
  • spark_executor_cores (Any) (defaults to: 1)
  • spark_driver_memory (Any) (defaults to: '8G')
  • spark_max_executors (Any) (defaults to: 64)
  • spark_extra_files (Any) (defaults to: undef)
  • spark_extra_opts (Any) (defaults to: '')
  • deploy_mode (Any) (defaults to: 'cluster')
  • user (Any) (defaults to: 'analytics')
  • interval (Any) (defaults to: '*-*-* *:00:00')
  • send_mail (Any) (defaults to: true)
  • refine_monitor_enabled (Any) (defaults to: $send_mail)
  • ensure (Any) (defaults to: 'present')
  • use_keytab (Any) (defaults to: false)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'modules/profile/manifests/analytics/refinery/job/refine_job.pp', line 57

define profile::analytics::refinery::job::refine_job (
    $job_config,
    $monitor_interval,
    $job_name                         = "refine_${title}",
    $refinery_job_jar                 = undef,
    $job_class                        = 'org.wikimedia.analytics.refinery.job.refine.Refine',
    $monitor_class                    = 'org.wikimedia.analytics.refinery.job.refine.RefineMonitor',
    $monitor_since                    = 48,
    $monitor_until                    = 4,
    $queue                            = 'production',
    $spark_submit                     = '/usr/bin/spark3-submit',
    $spark_executor_memory            = '4G',
    $spark_executor_cores             = 1,
    $spark_driver_memory              = '8G',
    $spark_max_executors              = 64,
    $spark_extra_files                = undef,
    $spark_extra_opts                 = '',
    $deploy_mode                      = 'cluster',
    $user                             = 'analytics',
    $interval                         = '*-*-* *:00:00',
    $send_mail                        = true,
    $refine_monitor_enabled           = $send_mail,
    $ensure                           = 'present',
    $use_keytab                       = false,
) {
    require ::profile::analytics::refinery
    $refinery_path = $profile::analytics::refinery::path

    # refine job properties will go in /etc/refinery/refine
    $job_config_dir = "${::profile::analytics::refinery::config_dir}/refine"
    if !defined(File[$job_config_dir]) {
        file { $job_config_dir:
            ensure => 'directory',
        }
    }

    # If $refinery_job_jar not given, use the symlink at artifacts/refinery-job.jar
    $_refinery_job_jar = $refinery_job_jar ? {
        undef   => "${refinery_path}/artifacts/refinery-job-shaded.jar",
        default => $refinery_job_jar,
    }

    $job_config_file = "${job_config_dir}/${job_name}.properties"
    profile::analytics::refinery::job::config { $job_config_file:
        ensure     => $ensure,
        properties => $job_config,
    }

    # All spark jobs declared here share these parameters.
    Profile::Analytics::Refinery::Job::Spark_job {
        spark_submit       => $spark_submit,
        jar                => $_refinery_job_jar,
        require            => Profile::Analytics::Refinery::Job::Config[$job_config_file],
        user               => $user,
        send_mail          => $send_mail,
        use_keytab         => $use_keytab,
    }


    # We need to load an older CDH's version of these Hive jars in order to use
    # Hive JDBC directly inside of a spark job.  This is a workaround for
    # https://issues.apache.org/jira/browse/SPARK-23890.
    # See also:
    # https://github.com/wikimedia/analytics-refinery/blob/master/artifacts/hive-cdh5.10.0.README
    # https://phabricator.wikimedia.org/T209407
    # Because these older CDH hive jars are no longer deployed throughout the cluster,
    # we need to include them in --files to upload them to the YARN AM/Spark Driver container.
    $driver_extra_hive_jars = "${refinery_path}/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar,${refinery_path}/artifacts/hive-service-1.1.0-cdh5.10.0.jar"
    # We need hadoop-mapreduce-client-common which IS deployed throughout the cluster,
    # as well as the aforementioned CDH 5.10.0 hive jars, which have will be uploaded to the
    # Spark Driver's working dir, and should be referenced by relative path.
    $driver_extra_classpath = '/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:hive-jdbc-1.1.0-cdh5.10.0.jar:hive-service-1.1.0-cdh5.10.0.jar'

    # If $spark_extra_files is given, append it with a comma to existing files.
    $_spark_extra_files = $spark_extra_files ? {
        undef   => '',
        default => ",${spark_extra_files}",
    }

    $config_file_path = $deploy_mode ? {
        'client' => $job_config_file,
        default  => "${job_name}.properties",
    }

    # In DataFrameToHive we issue CREATE/ALTER sql statement to Hive if needed.
    # Spark is not aware of this code and by default it retrieves Delegation Tokens
    # only for HDFS/Hive-Metastore/HBase. When the JDBC connection to the Hive Server 2
    # is established (with Kerberos enabled), some credentials will be needed to be able
    # to login as $user. These credentials are explicitly provided as keytab, that is copied
    # (securely) by Yarn to its distributed cache.
    profile::analytics::refinery::job::spark_job { $job_name:
        ensure     => $ensure,
        main_class => $job_class,
        # We use spark's --files option to load the $job_config_file to the Spark job's working HDFS dir.
        # It is then referenced via its relative file name with --config_file $job_name.properties.
        spark_opts => "--files /etc/hive/conf/hive-site.xml,${job_config_file},${driver_extra_hive_jars}${_spark_extra_files} --master yarn --deploy-mode ${deploy_mode} --queue ${queue} --driver-memory ${spark_driver_memory} --executor-memory ${spark_executor_memory} --executor-cores ${spark_executor_cores} --conf spark.driver.extraClassPath=${driver_extra_classpath} --conf spark.dynamicAllocation.maxExecutors=${spark_max_executors} ${spark_extra_opts}",
        job_opts   => "--config_file ${config_file_path}",
        interval   => $interval,
    }


    # NOTE: RefineMonitor should not be run in YARN,
    # as they only look in HDFS paths and don't crunch any data.

    # Look back over a 48 period before 4 hours ago and ensure that all expected
    # refined datasets for this job are present and no _FAILURE flags exist.
    if $ensure == 'present' and $refine_monitor_enabled {
        $ensure_monitor = 'present'
    }
    else {
        $ensure_monitor = 'absent'
    }
    profile::analytics::refinery::job::spark_job { "monitor_${job_name}":
        ensure     => $ensure_monitor,
        main_class => $monitor_class,
        # Use the same config file as the Refine job, but override the since and until.
        job_opts   => "--config_file ${job_config_file} --since ${monitor_since} --until ${monitor_until}",
        interval   => $monitor_interval,
        send_mail  => $send_mail,
    }

}