Puppet Class: profile::analytics::refinery::job::test::refine
- Defined in:
- modules/profile/manifests/analytics/refinery/job/test/refine.pp
Overview
Class profile::analytics::refinery::job::test::refine
Install cron jobs for Spark Refine jobs. These jobs transform data imported into Hadoop into augmented Parquet backed Hive tables.
This version is only for the Hadoop testing cluster
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'modules/profile/manifests/analytics/refinery/job/test/refine.pp', line 9
class profile::analytics::refinery::job::test::refine (
Wmflib::Ensure $ensure_timers = lookup('profile::analytics::refinery::job::test::refine::ensure_timers', { 'default_value' => 'present' }),
Boolean $use_kerberos_keytab = lookup('profile::analytics::refinery::job::test::refine::use_kerberos_keytab', { 'default_value' => true }),
) {
require ::profile::analytics::refinery
require ::profile::hive::client
# Update this when you want to change the version of the refinery job jar
# being used for the refine job.
$refinery_version = '0.1.16'
# Use this value by default
Profile::Analytics::Refinery::Job::Refine_job {
# Use this value as default refinery_job_jar.
refinery_job_jar => "${::profile::analytics::refinery::path}/artifacts/org/wikimedia/analytics/refinery/refinery-job-${refinery_version}.jar",
}
# These configs will be used for all refine jobs unless otherwise overridden.
$default_config = {
'to_emails' => 'otto@wikimedia.org',
'should_email_report' => true,
'output_database' => 'event',
'output_path' => '/wmf/data/event',
# Look for data to refine from 26 hours ago to 2 hours ago, giving some time for
# raw data to be imported in the last hour or 2 before attempting refine.
'since' => '26',
'until' => '2',
# Until T259924 is fixed, we MUST merge with Hive schema before reading JSON data.
'merge_with_hive_schema_before_read' => true,
}
# Conventional Hive format path with partition keys (used by Gobblin), i.e. year=yyyy/month=mm/day=dd/hour=hh.
$hive_hourly_path_regex = 'year=(\\d+)/month=(\\d+)/day=(\\d+)/hour=(\\d+)'
$hive_hourly_path_regex_capture_groups = 'year,month,day,hour'
# Used by Java time formats to find potential hourly paths to refine.
$hive_input_path_datetime_format = '\'year=\'yyyy/\'month=\'MM/\'day=\'dd/\'hour=\'HH'
# === Event data ===
# /wmf/data/raw/event -> /wmf/data/event
# NOTE: refinery::job::test::gobblin only imports limited data in test cluster,
# so we don't need to specify any table include or exclude regexes.
$event_input_path = '/wmf/data/raw/event'
$event_input_path_regex = "${event_input_path}/(eqiad|codfw)\\.(.+)/${hive_hourly_path_regex}"
$event_input_path_regex_capture_groups = "datacenter,table,${hive_hourly_path_regex_capture_groups}"
profile::analytics::refinery::job::refine_job { 'event_test':
ensure => $ensure_timers,
job_config => merge($default_config, {
input_path => $event_input_path,
input_path_regex => $event_input_path_regex,
input_path_regex_capture_groups => $event_input_path_regex_capture_groups,
input_path_datetime_format => $hive_input_path_datetime_format,
transform_functions => 'org.wikimedia.analytics.refinery.job.refine.event_transforms',
# Get JSONSchemas from the HTTP schema service.
# Schema URIs are extracted from the $schema field in each event.
schema_base_uris => 'https://schema.discovery.wmnet/repositories/primary/jsonschema,https://schema.discovery.wmnet/repositories/secondary/jsonschema',
}),
interval => '*-*-* *:20:00',
monitor_interval => '*-*-* 01:15:00',
use_keytab => $use_kerberos_keytab,
}
# === EventLogging Legacy data ===
# /wmf/data/raw/eventlogging_legacy -> /wmf/data/event
# EventLogging legacy events migrated to Event Platform.
$eventlogging_legacy_input_path = '/wmf/data/raw/eventlogging_legacy'
# NOTE: We need to prefix our partition discovery regex with the input_path here,
# since eventlogging_legacy would match eventlogging_(.+) without it.
$eventlogging_legacy_input_path_regex = "${eventlogging_legacy_input_path}/eventlogging_(.+)/${hive_hourly_path_regex}"
$eventlogging_legacy_input_path_regex_capture_groups = "table,${hive_hourly_path_regex_capture_groups}"
profile::analytics::refinery::job::refine_job { 'eventlogging_legacy_test':
ensure => $ensure_timers,
job_config => merge($default_config, {
input_path => $eventlogging_legacy_input_path,
input_path_regex => $eventlogging_legacy_input_path_regex,
input_path_regex_capture_groups => $eventlogging_legacy_input_path_regex_capture_groups,
input_path_datetime_format => $hive_input_path_datetime_format,
# Since EventLogging legacy data comes from external clients,
# non wikimedia domains and other unwanted domains have always been filtered out.
transform_functions => 'org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains,org.wikimedia.analytics.refinery.job.refine.event_transforms',
# Get JSONSchemas from the HTTP schema service.
# Schema URIs are extracted from the $schema field in each event.
schema_base_uris => 'https://schema.discovery.wmnet/repositories/primary/jsonschema,https://schema.discovery.wmnet/repositories/secondary/jsonschema',
}),
interval => '*-*-* *:25:00',
monitor_interval => '*-*-* 00:30:00',
use_keytab => $use_kerberos_keytab,
}
}
|