Using XCOMs in Airflow — Scenario based examples with code

Suman Kumar Gangopadhyay
5 min readOct 4, 2022

There are many tutorials which already describes how to use XCOMs in airflow, however I found that most of these are two simplistic and do not give a full picture of the various scenarios in which we need to use XCOMs. In this blog, I will attempt to provide an end to end ETL pipeline which should cover most of the scenarios where we want to use XCOMs.

Spoiler alert, this blog is targeted towards airflow beginners and intermediates, experts can ignore or provide their views on improvements.

Pre-requisites

  • S3 bucket
  • A Public dataset (can be obtained from Kaggle)
  • A working airflow installation ( astro CLI can be leveraged for spinning up one such installation )
  • A database (PostgreSQL or Snowflake; this blog uses snowflake)

Scenario 1

Passing information from a custom python operator to a non-python operator

We will attempt to address a situation where we need to extract some metadata stored in the database (e.g. the type of the dataset that the data pipeline is ingesting, master data or transactional data) and then use this metadata to build a S3 key.

We can certainly achieve this by declaring a snowflake hook and getting the required information in a variable using the snowflake hook. However, this approach would mean that the snowflake connection and the variable will be instantiated every time the airflow scheduler loads the DAG; definitely not something we want. So to avoid this, we will create two tasks and use XCOMs to pass information between them.

Task 1 :

Extract information from the database and push this information as XCOM

def get_key_type(ti, **kwargs):
dataset_type_select_sql = "SELECT m_key_type FROM {0}.{1}.{2} WHERE m_key = '{3}' " \
.format(config.snowflake_db_name,
config.snowflake_metadata_schema, config.snowflake_metadata_table_name, kwargs['metadata_key'])
hook_connection = SnowflakeHook(snowflake_conn_id=settings.snowflake_airflow_conn_id)
sf_conn = hook_connection.get_conn()
cur = sf_conn.cursor()
cur.execute(dataset_type_select_sql)
df = cur.fetch_pandas_all()
s3_dataset_type = df.values[0][0]
ti.xcom_push(key='s3_dataset_type', value=s3_dataset_type)

get_key_type_from_snowflake = PythonOperator(
task_id="get_key_type_from_snowflake",
provide_context=True,
python_callable=get_key_type,
op_kwargs={'metadata_key': metadata_key}
)

The custom python operator is pushing the required information in the XCOM key s3_dataset_type , this key will be used in the downstream task. Focus on the line

ti.xcom_push(key=’s3_dataset_type’, value=s3_dataset_type)
Pushed XCOM key-value pair

Task 2 :

Use the XCOM from the upstream task as an argument of the downstream task

file_sniffer_task = S3KeySensor(
task_id='look_for_bundesliga_match_scores_file_in_s3',
bucket_name=config.s3_bucket_name,
bucket_key="/".join([config.base_path_raw,
"{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],key='s3_dataset_type')[0] }}",
'bundesliga_match_scores', 'bundesliga_match_scores']),
wildcard_match=True,
aws_conn_id=settings.aws_s3_airflow_conn_id,
soft_fail=True,
timeout=180,
poke_interval=30,
mode='reschedule',
retries=2
)

The S3 key sensor operator is pulling the information pushed by the custom python operator. Focus on the line

"{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],key='s3_dataset_type')[0] }}",

See how the XCOM key of the previous task is being pulled in this task’s bucket_key argument.

Pulled XCOM value

Scenario 2

Passing information from one custom python operator to another custom python operator

Another common scenario which is slightly different from the previous one is where the information is passed between two custom python operators

Task 1 :

Push the returned value of a user defined function as XCOM

This task will call a custom class and generate a suffix based on the airflow dag execution start date. This suffix will then be passed as XCOM to the downstream task.

def generate_s3_partition_metadata(ti, **kwargs):
part = Partitioner()
file_destination_suffix = part.generate_s3_partition_suffix(kwargs.get('ds'))
ti.xcom_push(key='file_destination_suffix', value=file_destination_suffix)

generate_s3_partition = PythonOperator(
task_id="generate_s3_partition",
provide_context=True,
python_callable=generate_s3_partition_metadata
)
the suffix pushed as XCOM

Task 2 :

Use the XCOM to build a dynamic SQL

This task will form an update SQL based on the XCOM value passed by the upstream task.

def generate_metadata_updt_sql(ti, **kwargs):
s3_suffix = ti.xcom_pull(task_ids=['generate_s3_partition'], key='file_destination_suffix')[0]
metadata_update_sql = "UPDATE {0}.{1}.{2} SET m_value = m_key_type||'/'||'{4}'||'{3}' WHERE m_key = '{4}' " \
.format(config.snowflake_db_name, config.snowflake_metadata_schema, config.snowflake_metadata_table_name,
s3_suffix, kwargs['metadata_key'])
ti.xcom_push(key='metadata_update_sql', value=metadata_update_sql)

generate_metadata_update_sql = PythonOperator(
task_id='generate_metadata_update_sql',
provide_context=True,
python_callable=generate_metadata_updt_sql,
op_kwargs={'metadata_key': metadata_key}
)

Focus on the line

s3_suffix = ti.xcom_pull(task_ids=['generate_s3_partition'], key='file_destination_suffix')[0]
dynamic SQL using the XCOM value from upstream task

The above screenshot shows the dynamic SQL as an XCOM value since its been passed downstream as a XCOM value, however we can clearly see how the XCOM value from the upstream task is being used here (highlighted in the screenshot)

Scenario 3

Passing information from a non python operator to a custom python operator

Task 1 :

Pass the default XCOM value of the upstream task

In airflow there are some tasks which generates default XCOMs that can be used downstream. The S3ListOperator is one such example, it implicitly pushes the list of S3 keys as XCOM

list_of_S3_files_to_copy = S3ListOperator(
task_id="list_of_S3_files_to_copy",
bucket=config.s3_bucket_name,
prefix="/".join([config.base_path_raw, "{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],"
"key='s3_dataset_type')[0] }}", 'bundesliga_match_scores', 'bundesliga_match_scores'])
)

Note : This task also uses the XCOM value of the key s3_dataset_type from an upstream task

S3 List of keys

Task 2 :

Use this XCOM downstream in a custom python operator

def copy_files(ti, **kwargs):
list_of_files_to_copy = ti.xcom_pull(task_ids=['list_of_S3_files_to_copy'], key='return_value')[0]
s3_partition_suffix = ti.xcom_pull(task_ids=['get_s3_partition_metadata'], key='file_destination')[0]
s3_resource = boto3.resource('s3')
for f in range(len(list_of_files_to_copy)):
file_to_copy = list_of_files_to_copy[f]
destination = "".join([config.base_path_raw, '/', s3_partition_suffix, file_to_copy.split('/')[-1]])
origin = "/".join([config.s3_bucket_name, file_to_copy])
s3_resource.Object(config.s3_bucket_name, destination).copy_from(CopySource=origin)

copy_files_within_s3 = PythonOperator(
task_id="copy_files_within_s3",
python_callable=copy_files
)

This task will copy the files from one location to another location within S3. The origin and destination are derived using the XCOM value passed from the upstream task.

Note : You can see that this task is using the XCOM values of two upstream tasks

Conclusion

The usage of XCOM between tasks keeps the DAG clean and avoids un-intended side-effects of instantiating variables within the DAG. The three scenarios discussed above covers pretty much all the use cases, however if you want to use a template based approach to instantiate DAGs, then you will need to use a f-string and also have to escape the double curly brackets as shown below

f"{{ '{{{{' }} ti.xcom_pull(task_ids=['dynamic_task_group_{name_from_templated_field}.dynamic_task_{name_from_templated_field}'], key='name_of_xcom_key')[0] {{ '}}}}' }}",

The complete codebase can be found here.

I hope this blog will help you speed up your development process. Do let me know if you come across any other scenarios.

--

--

Suman Kumar Gangopadhyay

A Data engineer who loves to foray into uncharted domains of engineering