Puppet Class: profile::airflow

Defined in:
modules/profile/manifests/airflow.pp

Overview

SPDX-License-Identifier: Apache-2.0

Class profile::airflow

Creates airflow::instance resources for each of $airflow_instances

Parameters

airflow_instances

Hash of airflow::instance parameters keyed by name. This will be passed directly to the create_resources function. E.g.

myinstanceA:
    service_user: ...
    ...
myinstanceB:
    service_user: ...
    connections:
      analytics-mariadb:
        conn_type: mysql
        host: ...
        ...
    ...
airflow_instances_secrets

Any sensitive parameters that you don't want to put directly into puppet hiera airflow_instances should be defined in puppet private hiera in this variable. This is the exact same structure as $airflow_instances and will be merged on top of it before being passed to create_resources. The params here should be keyed by the same instance names as in $airflow_instances. E.g.

myinstanceA:
    db_password: SECRET
myinstanceB:
    db_password: SECRET
    connections:
      analytics-mariadb:
        login: myuser
        password: SECRET

Default: {}

*use_wmf_defaults

If true, defaults for WMF Data Platform Engineering airflow deployments will be merged into each of the provided instance's params. This reduces Hiera boilerplate needed with conventions we use to deploy Airflow. Default: true

Notably:

  • A scap::target for airflow-dags/$title is declared. This target is expected to be for the data-engineering/airflow-dags repository, and contains the instance specific dags_folder inside at $title/dags.

  • dags_folder will default to /srv/deployment/airflow-dags/$title/$title/dags

  • plugins_folder will default to /srv/deployment/airflow-dags/$title/wmf_airflow_common/plugins

  • Ferm will default to only allowing $ANALYTICS_NETWORKS to the airflow instance services.

  • Common Airflow connections are configured.

airflow_database_host_default

Hostname used in the default sql_alchemy_conn when use_wmf_defaults is true. Default: an-coord1001.eqiad.wmnet

airflow_version

Airflow version used to separate deprecated config options. Default: 2.1.4. (TO DO: Set default to 2.5.0 once we upgrade everywhere.)

Parameters:

  • airflow_instances (Hash) (defaults to: lookup('profile::airflow::instances'))
  • airflow_instances_secrets (Hash) (defaults to: lookup('profile::airflow::instances_secrets', { 'default_value' => {} }))
  • use_wmf_defaults (Boolean) (defaults to: lookup('profile::airflow::use_wmf_defaults', { 'default_value' => true }))
  • airflow_database_host_default (String) (defaults to: lookup('profile::airflow::database_host_default', { 'default_value' => 'an-coord1001.eqiad.wmnet' }))
  • airflow_version (Optional[String]) (defaults to: lookup('profile::airflow::airflow_version', { 'default_value' => '2.7.3-py3.10-20231127' }))
  • renew_skein_certificate (Boolean) (defaults to: lookup('profile::airflow::renew_skein_certificate', { 'default_value' => true }))
  • statsd_exporter_mappings (Array) (defaults to: lookup('profile::airflow::statsd_exporter_default_mappings', { 'default_value' => [] }))


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'modules/profile/manifests/airflow.pp', line 64

class profile::airflow(
    Hash $airflow_instances               = lookup('profile::airflow::instances'),
    Hash $airflow_instances_secrets       = lookup('profile::airflow::instances_secrets', { 'default_value' => {} }),
    Boolean $use_wmf_defaults             = lookup('profile::airflow::use_wmf_defaults', { 'default_value' => true }),
    String $airflow_database_host_default = lookup('profile::airflow::database_host_default', { 'default_value' => 'an-coord1001.eqiad.wmnet' }),
    Optional[String] $airflow_version     = lookup('profile::airflow::airflow_version', { 'default_value' => '2.7.3-py3.10-20231127' }),
    Boolean $renew_skein_certificate      = lookup('profile::airflow::renew_skein_certificate', { 'default_value' => true }),
    Array $statsd_exporter_mappings       = lookup('profile::airflow::statsd_exporter_default_mappings', { 'default_value' => [] })
) {

    include profile::mail::default_mail_relay
    class { 'airflow':
        version        => $airflow_version,
        mail_smarthost => $profile::mail::default_mail_relay::smarthosts,
    }

    # Need to make sure a few packages exist. Some airflow code
    # (like Hive sensors) depends on these.
    # TODO: consider using conda deps to install these in the airflow conda env instead.
    ensure_packages([
        'sasl2-bin',
        'libsasl2-dev',
        'libsasl2-modules-gssapi-mit',
    ])
    # If use_wmf_defaults, merge in smart per instance wmf defaults.
    $airflow_instances_with_defaults = $use_wmf_defaults ? {
        # Not $use_wmf_defaults, keep $airflow_instances as provided.
        default => $airflow_instances,

        # If $use_wmf_defaults, create a dynamic set of defaults for each instance's params
        # and merge those defaults in to create a new Hash of
        # { instance name => instance_params (with smart defaults) }
        true    => $airflow_instances.reduce({}) |$instances_accumulator, $key_value| {
            $instance_name = $key_value[0]
            $instance_params = $key_value[1]
            $airflow_home = "/srv/airflow-${instance_name}"
            # This is where scap will deploy
            $deployment_dir = "/srv/deployment/airflow-dags/${instance_name}"


            # Used in places where '-', etc. won't work, like database names.
            $instance_name_normalized = regsubst($instance_name, '\W', '_', 'G')


            # scap::targets should use the same $ensure for the airflow::instance.
            $scap_target_ensure = $instance_params['ensure'] ? {
                undef   => 'present',
                default => $instance_params['ensure']
            }

            # We don't want to deep_merge scap_targets, but allow a full override
            # if provided in $instance_params. I.e. If the configured instance
            # explicitly declares scap_targets, ONLY use those scap_targets,
            # don't use our smart scap_target defaults.
            $default_scap_targets = has_key($instance_params, 'scap_targets') ? {
                true    => undef,
                default => {
                    "airflow-dags/${instance_name}" => {
                        'deploy_user' => $instance_params['service_user'],
                        # The service user (that runs airflow) will be managed
                        # by airflow::instance, but the deploy_airflow ssh key
                        # is only used for scap deployments.  scap::target should
                        # not manage the user, but it should manage the ssh key.
                        'manage_user' => false,
                        'manage_ssh_key' => true,
                        # key_name must match a keyholder::agent declared in profile::keyholder::server::agents,
                        # which also must match an ssh keypair added in puppet private repo
                        # in modules/secret/secrets/keyholder.
                        'key_name' => 'deploy_airflow',
                        'ensure' => $scap_target_ensure
                    },
                }
            }


            # Default WMF (analytics cluster, for now) specific instance params.
            $default_wmf_instance_params = {
                'airflow_home' => $airflow_home,
                'firewall_srange' => 'ANALYTICS_NETWORKS',
                'scap_targets' => $default_scap_targets,
                'airflow_config' => {
                    'core' => {
                        'dags_folder' => "/srv/deployment/airflow-dags/${instance_name}/${instance_name}/dags",
                        'plugins_folder' => "/srv/deployment/airflow-dags/${instance_name}/wmf_airflow_common/plugins",
                        'security' => 'kerberos',
                        'executor' => 'LocalExecutor',

                        # The amount of parallelism as a setting to the executor. This defines
                        # the max number of task instances that should run simultaneously
                        # on this airflow installation.
                        # This value was 32 by default, but we have decided to increase it to
                        # backfill more quickly, and globally run more concurrent jobs. But, by doing
                        # so, we assume that most tasks are low intensive in resources.
                        # You can always set some restriction per dag with max_active_tasks, and per
                        # task with max_active_tis_per_dag. Example: a task using a significative
                        # amount of memory from the cluster should set its max_active_tis_per_dag to 1.
                        # This would be recommanded for intensive
                        # tasks for example.
                        'parallelism' => '64',

                        # The maximum number of active DAG runs per DAG. The scheduler will not create
                        # more DAG runs if it reaches the limit. This is configurable at the DAG level
                        # with max_active_runs, which is defaulted as max_active_runs_per_dag.
                        # The goal is to avoid 1 dag to take all resources. But in the same time, to
                        # keep some parallelism:
                        #   - for faster backfill
                        #   - for faster backfill
                        'max_active_runs_per_dag' => '3',

                        # The number of retries each task is going to have by default. Can be
                        # overridden at dag or task level.
                        # It is increased from 0 to 5 to let a chance for closing minor problems like
                        # db connection problems.
                        # It could be modified per task with the `retries` parameter. For example, to
                        # avoid a very compute-intensive (large) task to retry.
                        'default_task_retries' => '5',
                    },

                },
                'connections' => {
                    'fs_local' => {
                        'conn_type' => 'fs',
                        'description' => 'Local filesystem on the Airflow Scheduler node',
                    },
                },
                'environment_extra' => {
                    # data-engineering/airflow-dags has custom airflow hooks that can use
                    # Skein to launch jobs in yarn. Skein will by default write runtime
                    # config files into ~/.skein.  A home dir might not exist for the
                    # user running airflow, so set SKEIN_CONFIG to write into AIRFLOW_HOME/.skein
                    # (/srv/airflow-$instance_name is default airflow home in airflow::instance)
                    'SKEIN_CONFIG' => "${airflow_home}/.skein",
                    # By putting $deployment_dir on python path, it lets dags .py files
                    # import python modules from there, e.g. import wmf_airflow_common
                    # or import $instance_name/conf/dag_config.py
                    'PYTHONPATH' => $deployment_dir,
                },
                'renew_skein_certificate' => $renew_skein_certificate,
            }

            if $airflow_version != undef and versioncmp($airflow_version, '2.3.0') >= 0 {
                # Set WMF airflow default params for airflow version >= 2.3.0
                $default_wmf_instance_params_versioned = {
                    'airflow_config' => {
                        'core' => {
                            # The maximum number of task instances allowed to run concurrently in each DAG.
                            # To calculate the number of tasks that is running concurrently for a DAG, add
                            # up the number of running tasks for all DAG runs of the DAG.
                            # This value has been set lower than the default (16). An example scenario when
                            # this would be useful is when you want to stop a new dag with an early start
                            # date from stealing all the executor slots in a cluster.
                            # Currently, dag_concurrency = 2 * max_active_runs_per_dag
                            # This is configurable at the DAG level with max_active_tasks.
                            'max_active_tasks_per_dag' => '6',

                        },
                        'database' => {
                            # NOTE: @db_user and @db_password should be provided via
                            # $airflow_instances_secrets as the $db_user and $db_password params.
                            # This ERb template string will be rendered in airflow::instance
                            # with those values.
                            'sql_alchemy_conn' => "postgresql://<%= @db_user %>:<%= @db_password %>@${airflow_database_host_default}/airflow_${instance_name_normalized}?sslmode=require&sslrootcert=/etc/ssl/certs/wmf-ca-certificates.crt",
                        }
                    }
                }
            } else {
                # Set WMF airflow default params for airflow version < 2.3.0
                $default_wmf_instance_params_versioned = {
                    'airflow_config' => {
                        'core' => {
                            # Same as max_active_tasks_per_dag in airflow > 2.3.0
                            'dag_concurrency' => '6',
                            # NOTE: @db_user and @db_password should be provided via
                            # $airflow_instances_secrets as the $db_user and $db_password params.
                            # This ERb template string will be rendered in airflow::instance
                            # with those values.
                            'sql_alchemy_conn' => "mysql://<%= @db_user %>:<%= @db_password %>@${airflow_database_host_default}/airflow_${instance_name_normalized}?ssl_ca=/etc/ssl/certs/wmf-ca-certificates.crt",
                        }
                    }
                }
            }

            # Merge this instance's params with the smart wmf defaults we just constructed.
            $merged_instance_params = deep_merge($default_wmf_instance_params, $default_wmf_instance_params_versioned, $instance_params)

            # $instances_accumulator is just the reduce accumulator.
            # Merge it with the instance_name -> params we just created to
            # build up a new Hash of instances with smart defaults.
            deep_merge(
                $instances_accumulator,
                { $instance_name => $merged_instance_params },
            )
        },
    }

    # If any of the Airflow instances on this machine have statsd_monitoring_enabled, add a statsd_exporter.
    if $airflow_instances.any |$dict| { $dict[1]['statsd_monitoring_enabled'] } {
        class { 'profile::prometheus::statsd_exporter':
          prometheus_instance => 'analytics',
          mappings            => $statsd_exporter_mappings,
        }
    }

    # Finally, merge any airflow secrets into our airflow instances...
    $_airflow_instances = deep_merge($airflow_instances_with_defaults, $airflow_instances_secrets)

    # and declare the airflow::instances using our final Hash.
    create_resources('airflow::instance', $_airflow_instances)
}