Using XCOMs in Airflow — Scenario based examples with code
There are many tutorials which already describes how to use XCOMs in airflow, however I found that most of these are two simplistic and do not give a full picture of the various scenarios in which we need to use XCOMs. In this blog, I will attempt to provide an end to end ETL pipeline which should cover most of the scenarios where we want to use XCOMs.
Spoiler alert, this blog is targeted towards airflow beginners and intermediates, experts can ignore or provide their views on improvements.
Scenario 1
Passing information from a custom python operator to a non-python operator
We will attempt to address a situation where we need to extract some metadata stored in the database (e.g. the type of the dataset that the data pipeline is ingesting, master data or transactional data) and then use this metadata to build a S3 key.
We can certainly achieve this by declaring a snowflake hook and getting the required information in a variable using the snowflake hook. However, this approach would mean that the snowflake connection and the variable will be instantiated every time the airflow scheduler loads the DAG; definitely not something we want. So to avoid this, we will create two tasks and use XCOMs to pass information between them.
Task 1 :
Extract information from the database and push this information as XCOM
def get_key_type(ti, **kwargs):
dataset_type_select_sql = "SELECT m_key_type FROM {0}.{1}.{2} WHERE m_key = '{3}' " \
.format(config.snowflake_db_name,
config.snowflake_metadata_schema, config.snowflake_metadata_table_name, kwargs['metadata_key'])
hook_connection = SnowflakeHook(snowflake_conn_id=settings.snowflake_airflow_conn_id)
sf_conn = hook_connection.get_conn()
cur = sf_conn.cursor()
cur.execute(dataset_type_select_sql)
df = cur.fetch_pandas_all()
s3_dataset_type = df.values[0][0]
ti.xcom_push(key='s3_dataset_type', value=s3_dataset_type)
get_key_type_from_snowflake = PythonOperator(
task_id="get_key_type_from_snowflake",
provide_context=True,
python_callable=get_key_type,
op_kwargs={'metadata_key': metadata_key}
)
The custom python operator is pushing the required information in the XCOM key s3_dataset_type
, this key will be used in the downstream task. Focus on the line
ti.xcom_push(key=’s3_dataset_type’, value=s3_dataset_type)
Task 2 :
Use the XCOM from the upstream task as an argument of the downstream task
file_sniffer_task = S3KeySensor(
task_id='look_for_bundesliga_match_scores_file_in_s3',
bucket_name=config.s3_bucket_name,
bucket_key="/".join([config.base_path_raw,
"{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],key='s3_dataset_type')[0] }}",
'bundesliga_match_scores', 'bundesliga_match_scores']),
wildcard_match=True,
aws_conn_id=settings.aws_s3_airflow_conn_id,
soft_fail=True,
timeout=180,
poke_interval=30,
mode='reschedule',
retries=2
)
The S3 key sensor operator is pulling the information pushed by the custom python operator. Focus on the line
"{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],key='s3_dataset_type')[0] }}",
See how the XCOM key of the previous task is being pulled in this task’s bucket_key
argument.
Scenario 2
Passing information from one custom python operator to another custom python operator
Another common scenario which is slightly different from the previous one is where the information is passed between two custom python operators
Task 1 :
Push the returned value of a user defined function as XCOM
This task will call a custom class and generate a suffix based on the airflow dag execution start date. This suffix will then be passed as XCOM to the downstream task.
def generate_s3_partition_metadata(ti, **kwargs):
part = Partitioner()
file_destination_suffix = part.generate_s3_partition_suffix(kwargs.get('ds'))
ti.xcom_push(key='file_destination_suffix', value=file_destination_suffix)
generate_s3_partition = PythonOperator(
task_id="generate_s3_partition",
provide_context=True,
python_callable=generate_s3_partition_metadata
)
Task 2 :
Use the XCOM to build a dynamic SQL
This task will form an update SQL based on the XCOM value passed by the upstream task.
def generate_metadata_updt_sql(ti, **kwargs):
s3_suffix = ti.xcom_pull(task_ids=['generate_s3_partition'], key='file_destination_suffix')[0]
metadata_update_sql = "UPDATE {0}.{1}.{2} SET m_value = m_key_type||'/'||'{4}'||'{3}' WHERE m_key = '{4}' " \
.format(config.snowflake_db_name, config.snowflake_metadata_schema, config.snowflake_metadata_table_name,
s3_suffix, kwargs['metadata_key'])
ti.xcom_push(key='metadata_update_sql', value=metadata_update_sql)
generate_metadata_update_sql = PythonOperator(
task_id='generate_metadata_update_sql',
provide_context=True,
python_callable=generate_metadata_updt_sql,
op_kwargs={'metadata_key': metadata_key}
)
Focus on the line
s3_suffix = ti.xcom_pull(task_ids=['generate_s3_partition'], key='file_destination_suffix')[0]
The above screenshot shows the dynamic SQL as an XCOM value since its been passed downstream as a XCOM value, however we can clearly see how the XCOM value from the upstream task is being used here (highlighted in the screenshot)
Scenario 3
Passing information from a non python operator to a custom python operator
Task 1 :
Pass the default XCOM value of the upstream task
In airflow there are some tasks which generates default XCOMs that can be used downstream. The S3ListOperator
is one such example, it implicitly pushes the list of S3 keys as XCOM
list_of_S3_files_to_copy = S3ListOperator(
task_id="list_of_S3_files_to_copy",
bucket=config.s3_bucket_name,
prefix="/".join([config.base_path_raw, "{{ ti.xcom_pull(task_ids=['get_key_type_from_snowflake'],"
"key='s3_dataset_type')[0] }}", 'bundesliga_match_scores', 'bundesliga_match_scores'])
)
Note : This task also uses the XCOM value of the key s3_dataset_type
from an upstream task
Task 2 :
Use this XCOM downstream in a custom python operator
def copy_files(ti, **kwargs):
list_of_files_to_copy = ti.xcom_pull(task_ids=['list_of_S3_files_to_copy'], key='return_value')[0]
s3_partition_suffix = ti.xcom_pull(task_ids=['get_s3_partition_metadata'], key='file_destination')[0]
s3_resource = boto3.resource('s3')
for f in range(len(list_of_files_to_copy)):
file_to_copy = list_of_files_to_copy[f]
destination = "".join([config.base_path_raw, '/', s3_partition_suffix, file_to_copy.split('/')[-1]])
origin = "/".join([config.s3_bucket_name, file_to_copy])
s3_resource.Object(config.s3_bucket_name, destination).copy_from(CopySource=origin)
copy_files_within_s3 = PythonOperator(
task_id="copy_files_within_s3",
python_callable=copy_files
)
This task will copy the files from one location to another location within S3. The origin and destination are derived using the XCOM value passed from the upstream task.
Note : You can see that this task is using the XCOM values of two upstream tasks
Conclusion
The usage of XCOM between tasks keeps the DAG clean and avoids un-intended side-effects of instantiating variables within the DAG. The three scenarios discussed above covers pretty much all the use cases, however if you want to use a template based approach to instantiate DAGs, then you will need to use a f-string and also have to escape the double curly brackets as shown below
f"{{ '{{{{' }} ti.xcom_pull(task_ids=['dynamic_task_group_{name_from_templated_field}.dynamic_task_{name_from_templated_field}'], key='name_of_xcom_key')[0] {{ '}}}}' }}",
The complete codebase can be found here.
I hope this blog will help you speed up your development process. Do let me know if you come across any other scenarios.