Airflow, Spark & S3, stitching it all together

Suman Kumar Gangopadhyay
Analytics Vidhya
Published in
8 min readApr 18, 2021

--

In my previous post, I described one of the many ways to set up your own Spark cluster (in AWS) and submitting spark jobs in that cluster from an edge node (in AWS). However, we all know how business requirements soon surpass the ability to run jobs manually and we land up in a quest to develop a data pipeline.

This post will put together a step by step guide to help setup a pipeline which can automate running spark jobs from an edge node to a spark cluster, all within AWS. I hope this post will be helpful for beginners and intermediates, experts (and intermediates too) however will know more ways to orchestrate such a pipeline.

The contents of this post should not be taken as a guide to setup a production pipeline, however, it can serve as a guide to setup a dev/test pipeline while the production infra gets ready with security, high availability and other best practices in mind.

Enough preamble, let’s jump right in.

Pre-requisites :

  1. Go through the airflow basics if you are not already aware of it.
  2. An active S3 bucket with some data in it.
  3. Make sure you have set up the EMR Cluster and the edge node as described in my previous post.

Once the pre-requisite is in place, we need to think of the next step. You need an orchestration tool to run spark jobs in an automated way at a specific frequency based on your business requirements. We have chosen airflow from the various options available out there and so our next step would be to install airflow in the edge node

Before going into the details of airflow installation, we need to take care of one of the basic needs of airflow, i.e. the database holding airflow metadata and all the job informations (logs, etc.). Although, airflow comes pre-packed with sqlite as a database, it doesn’t support parallel job execution and hence we can’t really use that for most of the use cases.

As a first step, we will setup a MySQL RDS instance in AWS

in AWS, select RDS as the service and then click on ‘Create Database’ from the next page. Then follow the screenshots below for choosing the remaining options

make a note of the credentials that you provided here, as these will be needed while setting up airflow. Leave all the other options as-is until you arrive at the screen (by scrolling down) below

Note that, I have chosen the EMR Master’s security group as the VPC security group along with the default. This is essential to allow the communication between the edge node (where airflow will be setup) and the RDS

expand the ‘Additional configuration’ and ensure that the port is 3306. Scroll down further and expand the next ‘Additional Configuration’. Enter the database name of your choice and make a note of this name, it will also be needed during the airflow setup

Leave everything else as is and then hit ‘Create database’. Stay calm while AWS takes its own time to set up the MySQL database

Once the database is up and running, click on the DB Identifier

In the next screen, make a note of the End Point , this will be the database host

Next scroll down to see the security groups and click on the default one. We will have to allow inbound traffic from the edge node to this database

Hit Edit Inbound Rules and then add the following (the public IP of the edge node should go into the blank space)

Hit Save rules

Let us now test if we can actually connect to the database from the edge node. Perform the following steps to check this

SSH into the edge node (duh! obvious), then run the below command

mysql -h <<the end point that you noted during the RDS setup>> -u <<username that you gave in the RDS setup>> -p <<the database name that you gave in the RDS setup>> (then enter password on prompt)

You should be able to see the mysql prompt now, run the below command in the mysql prompt. This will ensure that airflow gets the necessary character support during the metadata setup (not running this will make airflow db initialisation fail)

alter database <<the database name that you gave in the RDS setup>> CHARACTER SET utf8 COLLATE utf8_unicode_ci;

Now your database is all set to support airflow installation and subsequent job execution.

With the database creation taken care of, we now move to install airflow. We will install airflow 2.0 and not the latest version 2.0.1. The reason for this choice is that airflow 2.0.1 comes packed with a hard failure if the installed sqlite version in the EC2 instance (in our case the edge node) is < 3.15. The problem with that is although you don’t plan to use sqlite, due to this hard failure, you won’t be able to get the airflow home directory up and running and without the home directory set, you can’t alter the airflow configurations since you won’t be able to access airflow.cfg file.

If you know any other way to circumvent this problem, please let me know. For now, we will avoid airflow 2.0.1

Let’s now SSH into the Edge Node and perform the following steps to setup airflow

First check the python and pip versions

python --version

pip --version

Upgrade the pip version

curl -O https://bootstrap.pypa.io/get-pip.py

python3 get-pip.py --user

Declare the variables that we will use to issue the airflow 2.0.0 installation command

AIRFLOW_VERSION=2.0.0PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

Install airflow (note that I have included additional packages to be included during the setup, you can avoid that for now, just take out the [amazon] part from the command)

pip install "apache-airflow[amazon]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Now we check for installation

PATH=$PATH:~/.local/binairflow version

The last command should show airflow 2.0.0 (if nothing has gone wrong during the installation, not that I expect any)

Now comes the part where we need to initialise the database (this is where choosing 2.0.0 saves the day for us). Run the following command

airflow db init

This makes airflow setup the sqlite db for us and gives us the airflow home directory, get inside the directory and open up the airflow.cfg file

cd ~/airflowvim airflow.cfg

Search for the entry of sql_alchemy_conn and modify the entry as below

sql_alchemy_conn = mysql://<<username that you gave in the RDS setup>>:<<password that you gave in the RDS setup>>@<<the end point that you noted during the RDS setup>>:3306/<<the database name that you gave in the RDS setup>>

Next change the executor to LocalExecutor, this change will allow us to achieve job parallelism in out airflow DAGs

executor = LocalExecutor

run the db initialisation once again to let airflow repoint it to the MySQL db

airflow db init

Now that your are all set, spin up the airflow webserver and scheduler

airflow webserver -D -p 8080
airflow scheduler -D

Create a user to get into the airflow GUI

airflow users create --username CaptainAmerica --firstname Captain --lastname America --role Admin --email nosuchemail@nowhere.com

Type in the password of your choice and remember it, this credential will be needed to get into the GUI

Now, you should be able to see the shiny new GUI of Airflow 2.0.0

After all the hard work you have done so far, now it’s time to reap the rewards. All we need now is a DAG and a spark script to read and write to/from our S3 bucket

For my post, I have the following bucket contents. The first one is the output of my pipeline and the 2nd one is the source

Let’s code in a sample spark script which reads from the bucket perform simple transform/filters and then writes back into the bucket

## We will name this script as testEMRtoS3Conn.py
import pyspark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('FirstSparkApp').setMaster('yarn')
sc = SparkContext(conf=conf)
from pyspark.sql import SQLContext, HiveContext, SparkSession
spark = SparkSession.builder.appName('FirstSparkApp').getOrCreate()
my_first_dframe = spark.read.option("header",True).csv("s3://yourBucketName/netflix_titles.csv")

my_first_dframe_filtered = my_first_dframe.filter(my_first_dframe['country']=='Brazil')

my_first_dframe_filtered.write.option("header",True).csv("s3://yourBucketName/netflix_titles_from_brazil.csv")

In order to submit this script we will write a wrapper shell script

### Let's name the script as testEMRtoS3Conn_pyWrapper.sh
#!/bin/sh

echo "calling spark script"
export HADOOP_USER_NAME=hdfsspark-submit --driver-memory 2g --executor-memory 2g --executor-cores 2 --num-executors 2 --deploy-mode cluster testEMRtoS3Conn.py

and finally we will author the DAG

### We will name the DAG as edgeToEMRandS3
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
}
dag = DAG(
dag_id='edgeToEMRandS3',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
tags=['somethingForYouToFindYourDAG']
#params={"example_key": "example_value"},
)
run_this_first = DummyOperator(
task_id='run_this_first',
dag=dag,
)
now_run_spark_job = BashOperator(
task_id='now_run_spark_job',
bash_command='bash /home/ec2-user/airflow/scripts/testEMRtoS3Conn_pyWrapper.sh ',
dag=dag,
)
run_this_first >> now_run_spark_job

Give the airflow scheduler some time to refresh the DAGbag and then your DAG should show up in the GUI (note that in my screenshot it shows as green since I have successfully run the DAG)

Run the DAG and once it finishes successfully, then you should be able to get your output in the S3 bucket

Hope this post helps save some of your time to get up and running with your data pipeline.

--

--

Suman Kumar Gangopadhyay
Analytics Vidhya

A Data engineer who loves to foray into uncharted domains of engineering