Puppet Class: profile::airflow

Defined in:
modules/profile/manifests/airflow.pp

Overview

SPDX-License-Identifier: Apache-2.0

Class profile::airflow

Creates airflow::instance resources for each of $airflow_instances

Parameters

airflow_instances

Hash of airflow::instance parameters keyed by name. This will be passed directly to the create_resources function. E.g.

myinstanceA:
    service_user: ...
    ...
myinstanceB:
    service_user: ...
    connections:
      analytics-mariadb:
        conn_type: mysql
        host: ...
        ...
    ...
airflow_instances_secrets

Any sensitive parameters that you don't want to put directly into puppet hiera airflow_instances should be defined in puppet private hiera in this variable. This is the exact same structure as $airflow_instances and will be merged on top of it before being passed to create_resources. The params here should be keyed by the same instance names as in $airflow_instances. E.g.

myinstanceA:
    db_password: SECRET
myinstanceB:
    db_password: SECRET
    connections:
      analytics-mariadb:
        login: myuser
        password: SECRET

Default: {}

*use_wmf_defaults

If true, defaults for WMF Data Engineering airflow deployments will be merged into each of the provided instance's params. This reduces Hiera boilerplate needed with conventions we use to deploy Airflow. Default: true

Notably:

  • A scap::target for airflow-dags/$title is declared. This target is expected to be for the data-engineering/airflow-dags repository, and contains the instance specific dags_folder inside at $title/dags.

  • dags_folder will default to /srv/deployment/airflow-dags/$title/$title/dags

  • Ferm will default to only allowing $ANALYTICS_NETWORKS to the airflow instance services.

  • Common Airflow connections are configured.

airflow_database_host_default

Hostname used in the default sql_alchemy_conn when use_wmf_defaults is true. Default: an-coord1001.eqiad.wmnet

Parameters:

  • airflow_instances (Hash) (defaults to: lookup('profile::airflow::instances'))
  • airflow_instances_secrets (Hash) (defaults to: lookup('profile::airflow::instances_secrets', { 'default_value' => {} }))
  • use_wmf_defaults (Boolean) (defaults to: lookup('profile::airflow::use_wmf_defaults', { 'default_value' => true }))
  • airflow_database_host_default (String) (defaults to: lookup('profile::airflow::database_host_default', { 'default_value' => 'an-coord1001.eqiad.wmnet' }))


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'modules/profile/manifests/airflow.pp', line 58

class profile::airflow(
    Hash    $airflow_instances             = lookup('profile::airflow::instances'),
    Hash    $airflow_instances_secrets     = lookup('profile::airflow::instances_secrets', { 'default_value' => {} }),
    Boolean $use_wmf_defaults              = lookup('profile::airflow::use_wmf_defaults', { 'default_value' => true }),
    String  $airflow_database_host_default = lookup('profile::airflow::database_host_default', { 'default_value' => 'an-coord1001.eqiad.wmnet' })
) {

    # Need to make sure a few packages exist. Some airflow code
    # (like Hive sensors) depends on these.
    # TODO: consider using conda deps to install these in the airflow conda env instead.
    ensure_packages([
        'sasl2-bin',
        'libsasl2-dev',
        'libsasl2-modules-gssapi-mit',
    ])

    # If use_wmf_defaults, merge in smart per instance wmf defaults.
    $airflow_instances_with_defaults = $use_wmf_defaults ? {
        # Not $use_wmf_defaults, keep $airflow_instances as provided.
        default => $airflow_instances,

        # If $use_wmf_defaults, create a dynamic set of defaults for each instance's params
        # and merge those defaults in to create a new Hash of
        # { instance name => instance_params (with smart defaults) }
        true    => $airflow_instances.reduce({}) |$instances_accumulator, $key_value| {
            $instance_name = $key_value[0]
            $instance_params = $key_value[1]
            $airflow_home = "/srv/airflow-${instance_name}"
            # This is where scap will deploy
            $deployment_dir = "/srv/deployment/airflow-dags/${instance_name}"


            # Used in places where '-', etc. won't work, like database names.
            $instance_name_normalized = regsubst($instance_name, '\W', '_', 'G')


            # scap::targets should use the same $ensure for the airflow::instance.
            $scap_target_ensure = $instance_params['ensure'] ? {
                undef   => 'present',
                default => $instance_params['ensure']
            }

            # We don't want to deep_merge scap_targets, but allow a full override
            # if provided in $instance_params. I.e. If the configured instance
            # explicitly declares scap_targets, ONLY use those scap_targets,
            # don't use our smart scap_target defaults.
            $default_scap_targets = has_key($instance_params, 'scap_targets') ? {
                true    => undef,
                default => {
                    "airflow-dags/${instance_name}" => {
                        'deploy_user' => $instance_params['service_user'],
                        # The service user (that runs airflow) will be managed
                        # by airflow::instance, but the deploy_airflow ssh key
                        # is only used for scap deployments.  scap::target should
                        # not manage the user, but it should manage the ssh key.
                        'manage_user' => false,
                        'manage_ssh_key' => true,
                        # key_name must match a keyholder::agent declared in profile::keyholder::server::agents,
                        # which also must match an ssh keypair added in puppet private repo
                        # in modules/secret/secrets/keyholder.
                        'key_name' => 'deploy_airflow',
                        'ensure' => $scap_target_ensure
                    },
                }
            }


            # Default WMF (analytics cluster, for now) specific instance params.
            $default_wmf_instance_params = {
                'airflow_home' => $airflow_home,
                'ferm_srange' => '$ANALYTICS_NETWORKS',
                'scap_targets' => $default_scap_targets,
                'airflow_config' => {
                    'core' => {
                        'dags_folder' => "/srv/deployment/airflow-dags/${instance_name}/${instance_name}/dags",
                        'security' => 'kerberos',
                        'executor' => 'LocalExecutor',

                        # NOTE: @db_user and @db_password should be provided via
                        # $airflow_instances_secrets as the $db_user and $db_password params.
                        # This ERb template string will be rendered in airflow::instance
                        # with those values.
                        'sql_alchemy_conn' => "mysql://<%= @db_user %>:<%= @db_password %>@${airflow_database_host_default}/airflow_${instance_name_normalized}?ssl_ca=/etc/ssl/certs/Puppet_Internal_CA.pem",

                        # The amount of parallelism as a setting to the executor. This defines
                        # the max number of task instances that should run simultaneously
                        # on this airflow installation.
                        # This value was 32 by default, but we have decided to increase it to
                        # backfill more quickly, and globally run more concurrent jobs. But, by doing
                        # so, we assume that most tasks are low intensive in resources.
                        # You can always set some restriction per dag with max_active_tasks, and per
                        # task with max_active_tis_per_dag. Example: a task using a significative
                        # amount of memory from the cluster should set its max_active_tis_per_dag to 1.
                        # This would be recommanded for intensive
                        # tasks for example.
                        'parallelism' => '64',

                        # The maximum number of task instances allowed to run concurrently in each DAG.
                        # To calculate the number of tasks that is running concurrently for a DAG, add
                        # up the number of running tasks for all DAG runs of the DAG.
                        # This value has been set lower than the default (16). An example scenario when
                        # this would be useful is when you want to stop a new dag with an early start
                        # date from stealing all the executor slots in a cluster.
                        # Currently, dag_concurrency = 2 * max_active_runs_per_dag
                        # This is configurable at the DAG level with max_active_tasks.
                        # TODO Rename me max_active_tasks_per_dag after Airflow is upgraded to 2.2.0.
                        'dag_concurrency' => '6',

                        # The maximum number of active DAG runs per DAG. The scheduler will not create
                        # more DAG runs if it reaches the limit. This is configurable at the DAG level
                        # with max_active_runs, which is defaulted as max_active_runs_per_dag.
                        # The goal is to avoid 1 dag to take all resources. But in the same time, to
                        # keep some parallelism:
                        #   - for faster backfill
                        #   - for faster backfill
                        'max_active_runs_per_dag' => '3',

                        # The number of retries each task is going to have by default. Can be
                        # overridden at dag or task level.
                        # It is increased from 0 to 5 to let a chance for closing minor problems like
                        # db connection problems.
                        # It could be modified per task with the `retries` parameter. For example, to
                        # avoid a very compute-intensive (large) task to retry.
                        'default_task_retries' => '5',
                    },

                },
                'connections' => {
                    'fs_local' => {
                        'conn_type' => 'fs',
                        'description' => 'Local filesystem on the Airflow Scheduler node',
                    },
                },
                'environment_extra' => {
                    # data-engineering/airflow-dags has custom airflow hooks that can use
                    # Skein to launch jobs in yarn. Skein will be default write runtime
                    # config files into ~/.skein.  A home dir might not exist for the
                    # user running airflow, so set SKEIN_CONFIG to write into AIRFLOW_HOME/.skein
                    # (/srv/airflow-$instance_name is default airflow home in airflow::instance)
                    'SKEIN_CONFIG' => "${airflow_home}/.skein",
                    # By putting $deployment_dir on python path, it lets dags .py files
                    # import python modules from there, e.g. import wmf_airflow_common
                    # or import $instance_name/conf/dag_config.py
                    'PYTHONPATH' => $deployment_dir,
                }
            }

            # Merge this instance's params with the smart wmf defaults we just constructed.
            $merged_instance_params = deep_merge($default_wmf_instance_params, $instance_params)

            # $instances_accumulator is just the reduce accumulator.
            # Merge it with the instance_name -> params we just created to
            # build up a new Hash of instances with smart defaults.
            deep_merge(
                $instances_accumulator,
                { $instance_name => $merged_instance_params },
            )
        },
    }

    # Finally, merge any airflow secrets into our airflow instances...
    $_airflow_instances = deep_merge($airflow_instances_with_defaults, $airflow_instances_secrets)

    # and declare the airflow::instances using our final Hash.
    create_resources('airflow::instance', $_airflow_instances)
}