Member-only story

Using XCOMs in Airflow — Scenario based examples with code

Suman Kumar Gangopadhyay
5 min readOct 4, 2022

--

There are many tutorials which already describes how to use XCOMs in airflow, however I found that most of these are two simplistic and do not give a full picture of the various scenarios in which we need to use XCOMs. In this blog, I will attempt to provide an end to end ETL pipeline which should cover most of the scenarios where we want to use XCOMs.

Spoiler alert, this blog is targeted towards airflow beginners and intermediates, experts can ignore or provide their views on improvements.

Pre-requisites

  • S3 bucket
  • A Public dataset (can be obtained from Kaggle)
  • A working airflow installation ( astro CLI can be leveraged for spinning up one such installation )
  • A database (PostgreSQL or Snowflake; this blog uses snowflake)

Scenario 1

Passing information from a custom python operator to a non-python operator

We will attempt to address a situation where we need to extract some metadata stored in the database (e.g. the type of the dataset that the data pipeline is ingesting, master data or transactional data) and then use this metadata to build a S3 key.

We can certainly achieve this by declaring a snowflake hook and getting the required information in a variable using the snowflake hook. However, this approach would mean that the snowflake connection and the variable will be instantiated every time the airflow scheduler loads the DAG; definitely not something we want. So to avoid this, we will create two tasks and use XCOMs to pass information between them.

Task 1 :

Extract information from the database and push this information as XCOM

def get_key_type(ti, **kwargs):
dataset_type_select_sql = "SELECT m_key_type FROM {0}.{1}.{2} WHERE m_key = '{3}' " \
.format(config.snowflake_db_name,
config.snowflake_metadata_schema, config.snowflake_metadata_table_name, kwargs['metadata_key'])
hook_connection = SnowflakeHook(snowflake_conn_id=settings.snowflake_airflow_conn_id)
sf_conn = hook_connection.get_conn()
cur = sf_conn.cursor()
cur.execute(dataset_type_select_sql)…

--

--

Suman Kumar Gangopadhyay
Suman Kumar Gangopadhyay

Written by Suman Kumar Gangopadhyay

A Data engineer who loves to foray into uncharted domains of engineering

Responses (1)

Write a response