October 30, 2020 by

This tutorial is adapted from Web Age course Workflow Management with Apache Airflow.

1.1 A Traditional ETL Approach

  • Normally, an ETL job would involve the following steps:
    • Create a script automating such activities as downloading a dataset from the Internet, transforming it, and inserting the resulting data into a database
    • Schedule a job to run the script daily, every other hour, etc., using some existing scheduling systems, like cron
  • In most cases, existing scheduling systems won’t allow you (while Airflow will)
    • Automatically re-try any processing failures (missing data or data in the wrong format, the back-end database is down, etc.) either at the job’s level or job’s task level
    • Scale on demand
    • React to a data arrival event (Airflow does it with sensors)
    • … and more …

1.2 Apache Airflow Defined

Airflow (https://airflow.apache.org/) is a configuration-as-code OSS solution for workflow automation.  It is purely Python-based and there is no XML, YAML, etc. An Airflow workflow is defined as a DAG (Directed Acyclic Graph)coded in Python as a sequence of Tasks. It was originally developed at Airbnb in 2014; top-level Apache Software Foundation project as of January 2019.  It offers developers a way to programmatically author, schedule for execution, and monitor highly configurable complex workflows. People usually use it as an ETL tool or replacement for cron.

1.3 Airflow Core Components

  • Scheduler
    • Sends tasks defined in the scheduled DAG for execution
  • Executor
    • There are several kinds of Executors, specific for the processing domain; the default one is called SequentialExecutor
  • Web server (Airflow’s Web UI)
    • A Flask app with role-based access control (RBAC)
  • Metadata database
    • The default DB engine is SQLite; in production: MySQL, PostgresDB, etc.

1.4 The Component Collaboration Diagram

Source: https://www.astronomer.io/guides/airflow-components/

1.5  Workflow Building Blocks and Concepts

  • DAG
    • Defines the workflow tasks and their order of execution/dependencies
    • Specifies error/failure processing and re-try procedures.
  • Operator (Worker)
    • Defines a task’s code to be executed </li Maintains state in environment variables.
  • Task
    • Specific job to be done by an Operator (Worker).
  • Connection</
    • External system access configuration details (access points, passwords, keys, and other credentials).
  • Hook
    • Abstracts external system interfaces.
  • XCom (Cross-Communication facility)
    • Pub/Sub-like messaging model for inter-task communication, where one operator acts as an XCom sender (message publisher), and other(s) are designated to receive the message by the sender’s id and message key.

1.6 Airflow CLI

  • It is provided through the airflow tool using a wide range of commands for managing workflows.
  • Syntax:
	airflow <command> <arguments>, e.g.
	airflow clear dag_1 -s 2020-9-14 -e 2020-9-17   

Notes:

An abbreviated list of airflow CLI commands:
checkdb           Check if the database can be reached.
clear             Clear a set of task instance, as if they never ran
config            Show current application configuration
connections       List/Add/Delete connections
create_user       Create an account for the Web UI (FAB-based)
dag_state         Get the status of a dag run
delete_dag        Delete all DB records related to the specified DAG
delete_user       Delete an account for the Web UI
flower            Start a Celery Flower
info              Show information about current Airflow and environment
initdb            Initialize the metadata database
kerberos          Start a kerberos ticket renewer
list_dags         List all the DAGs
list_tasks        List the tasks within a DAG
list_users        List accounts for the Web UI
next_execution    Get the next execution datetime of a DAG.
pause             Pause a DAG
pool              CRUD operations on pools
resetdb           Burn down and rebuild the metadata database
run               Run a single task instance
scheduler         Start a scheduler instance
serve_logs        Serve logs generate by worker
shell             Runs a shell to access the database
show_dag          Displays DAG's tasks with their dependencies
task_state        Get the status of a task instance
test              Test a task instance. This will run a task without checking for dependencies or recording its state in the database
trigger_dag       Trigger a DAG run
unpause           Resume a paused DAG
variables         CRUD operations on variables
version           Show the version
webserver         Start a Airflow webserver instance
worker            Start a Celery worker node

1.7 Main Configuration File

  • $AIRFLOW_HOME/airflow.cfg controls various aspects of airflow runtime and integrations.
  • A few file extracts:
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/wasadmin/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/wasadmin/airflow/logs

...

# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log

...
# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = SequentialExecutor
 

1.8 Extending Airflow

Notes:

Extra packages that can be installed alongside the core Airflow packages:

aws
azure
celery
cloudant
crypto
devel
devel_hadoop
druid
gcp
github_enterprise
google_auth
hashicorp
hdfs
hive
jdbc
kerberos
kubernetes
ldap
mssql
mysql
oracle
password
postgres
presto
qds
rabbitmq
redis
samba
slack
ssh
vertica

See the details of the installation at https://airflow.apache.org/docs/stable/installation.html#extra-packages

1.9 Jinja Templates

  • Airflow DAGs can use the Jinja templating engine (jinja2) to declare a dictionary of variables, e.g.:
dict(foo='bar')
  • It can be later referenced through the Jinja template as follows:
 {{ foo }}  # gets evaluated to 'bar'
  • The main use of Jinja templates is to inject airflow variables and macros in code that is evaluated at runtime.

  • Airflow uses Jinja templates internally as well, for example, when generating unique log file names:

log_filename_template = \
	{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log 

Notes:

According to https://github.com/pallets/jinja and https://jinja.palletsprojects.com/en/master/,

Jinja is a fast, expressive, and extensible templating engine. Special placeholders in the template allow writing code similar to Python syntax. Then the template is passed data to render the final document.

Jinja is the default template engine used in Flask, a micro (with no dependencies) web framework written in Python.

1.10 Variables and Macros

  • Airflow automatically provisions a set of variables and macros [https://airflow.apache.org/docs/stable/macros-ref] that are automatically injected in the DAG Run’s execution context and made available through Jinja templates using the double curly braces: {{ }}
  • Reserved variables examples:

    • The variable ds gets evaluated to the execution date as YYYY-MM-DD string, which you can use as {{ ds }}
    • The variable ts represents a timestamp in ISO format, e.g. 2020-08-30T00:00:00+00:00
      • ts_nodash removes dashes and the time zone portion, e.g. 20200830T000000
  • Note: You can also define system-wide variables using the Web UI
  • Macros give you access to programmatic objects, e.g. macros.random.

1.11 Summary

  • In this tutorial, we discussed the following topics related to Apache Airflow:
    • Concepts
    • Main components
    • CLI
    • Extending Airflow with sub-packages
    • Jinja templates and variables

Leave a Reply

Your email address will not be published. Required fields are marked *