How to start using Polars & DuckDB together for data analysis

Suman Kumar Gangopadhyay
7 min readMar 18, 2024

Python Pandas have been around for a very long time and it will continue to do so for the foreseeable future, however, that shouldn’t stop us engineers from trying out a new library or a combination of new libraries for our day-to-day development work.

This blog aims to demonstrate the usage of Polars with DuckDB to perform similar data transformations as is very often done using Pandas. There are a plethora of comparative studies done and documented between Pandas and Polars so this blog will shy away from all comparisons.

As always, this blog will also be mostly beginner-friendly, however, to keep it short, I have not started all the way from installing both Pandas and Polars. Both of these can be pip-installed.

I will use a dataset from Kaggle to demonstrate the various data transformation/exploration activities. You can explore the details of the dataset by visiting the URL tagged above

From the rich API library that is offered by Polars, we will focus on the following since they are responsible for starting the data analysis.

read_csv
scan_csv
read_csv_batched

Let’s first focus on read_csv since it is almost similar to Pandas

import polars as pl

train_data_dms_map =
pl.read_csv("/kaggle/input/stanford-ribonanza-rna-folding \
/train_data_QUICK_START.csv", low_memory=True) \
.filter(pl.col('experiment_type')=='DMS_MaP')

A few things are going on here

  1. Read the CSV file from a specific location (no surprises there)
  2. low_memory=True, as per the Polars documentation this option reduces memory pressure at the expense of performance. I have not verified this so please experiment with this and let me know
  3. .filter(pl.col('experiment_type')=='DMS_MaP') as you might have guessed, this option filters the dataset on a specific value of the experiment_type column

Once the required data is available in memory, let’s try to perform another very common activity. Applying a specific transformation logic to the values of a particular column of the dataframe

The sequence column of this dataset has RNA sequences of the form

GGGAACGACUCGAGUAGAGUCGAAAAAGAUCGCCACGCACUUACGAGUGCGUGGCGAUCACGCGUGUUGCAGCGCGUCUAACAUAGGCAGGGCAACCUGC…

As part of my first transformation, I want to create a column which stores the length of these sequences.

def get_rna_length(sequence):
return len(sequence)

train_data_dms_map = train_data_dms_map\
.with_columns(pl.struct(['sequence'])\
.map_elements(lambda s: get_rna_length(s['sequence']))\
.alias("seqn_length")\
.cast(pl.Int64))
  1. with_columns: This will be very familiar to folks who are attuned to using PySpark APIs. For others, this is to create another column in the dataframe based on the provided logic
  2. pl.struct: Create a custom composite type with the required inputs for the action which you want to run on the dataframe. In this case, we want to run the get_rna_length function with the sequence column as its input
  3. map_elements: Map a custom/user-defined function (UDF) over a specific composite structure. In this case, it is being applied or mapped over the elements of the sequence composite build from the column of the dataframe
  4. .alias: Just giving a name to the column
  5. .cast(pl.Int64): Casting to an appropriate data type

Before going into our next options, let me walk you through the pre-requisites, which will bring us to DuckDB

import duckdb as dd

dataset_name_list = dd.sql("SELECT distinct td.dataset_name \
FROM train_data_dms_map td")\
.pl()\
.get_column("dataset_name")\
.to_list()

for i in range(len(dataset_name_list)):
print(dataset_name_list[i])
dd.sql("SELECT td.dataset_name, count(distinct(td.sequence_id)) as record_count \
FROM train_data_dms_map td group by td.dataset_name").pl()

As you can see from the above two screenshots, there are 17 different groups of data available within the dataframe.

Did you notice the ease with which you can use DuckDB to run aggregations on the Polars dataframe.

Not only did we run SQL on the dataframe using DuckDB, but we were also able to convert the result into a python list. This can come in handy in many situations

Moving on, I intend to apply a few similar transformations to all of these groups but since it will be extremely memory-intensive to transform all groups together, I will use a for-loop along with the 2nd option of reading data provided by Polars.

Next up, scan_csv

for i in range(len(dataset_name_list)):
train_data = pl.scan_csv("/kaggle/input/stanford-ribonanza-rna-folding/\
train_data_QUICK_START.csv", low_memory=True)\
.filter((pl.col('experiment_type')=='DMS_MaP')\
& (pl.col('dataset_name')==dataset_name_list[i]))

Let’s review the steps which we will introduce in for loop

  • We have already seen the first one
train_data = train_data.with_columns(pl.struct(['sequence'])\
.map_elements(lambda s: get_rna_length(s['sequence']))\
.alias("seqn_length").cast(pl.Int64))

Get the length of the sequences.

  • Next, we want to convert some specific columns into rows using melt
react_col_names = []

for i in train_data.columns:
if ('reactivity_' in i) & ('reactivity_error_' not in i):
react_col_names.append(i)
else:
pass

melted_react_ds = train_data\
.melt(id_vars=["sequence_id","sequence","experiment_type","dataset_name"\
,"seqn_length"], value_vars=react_col_names, variable_name="seq_pos", \
value_name="reactivity")

melted_react_ds.collect()\
.filter(pl.col('seq_pos')=='reactivity_0104')\
.head(20)

In this step, we are converting the RNA reactivity and reactivity error (no need to get into contextual specifics for this blog) columns into rows for specific RNA protein positions, e.g. position 104 of an RNA sequence with ID 000e3417937f has a reactivity of 0.769

Next, we remove unwanted data from the columns and apply proper data types on them

melted_react_ds = melted_react_ds.with_columns(
pl.col("seq_pos").str.replace("reactivity_", "").cast(pl.Int64),
pl.col("reactivity").cast(pl.Float64)
)


melted_react_ds.collect().filter(pl.col('seq_pos')==104).head(20)
  • We then fill up the nulls with medians
melted_react_ds = melted_react_ds\
.with_columns(pl.col("reactivity").fill_null(pl.col("reactivity").median()))
  • Let us build some more features from this data. To do that, we need some custom functions first
def count_nucleotide_occurence(sequence, nucleotide):
return sequence.count(nucleotide)

def get_nucleotide_count(sequence, position):
if position <= len(sequence):
return count_nucleotide_occurence(sequence, sequence[position - 1])
else:
return count_nucleotide_occurence(sequence\
, sequence[(position - len(sequence)) - 1])

def get_nucleotide_abundance(sequence, nucleotide_count):
return round(nucleotide_count/len(sequence),5)

I will not discuss the details of these functions (you can very easily make sense of them). In short, they count the occurrence of a specific nucleotide within a sequence, e.g. count of A within the sequence GGGAACGACUCGAGUAGAGUCGAAAAAGAUCGCCACGCACUUACGAGUGCGUGGCGAUCACGCGUGUUGCAGCGCGUCUAACAUAGGCAGGGCAACCUGC…

the 3rd function computes the fraction of a particular nucleotide within an entire sequence

Now, let’s apply these functions and build the features we want

melted_ds = melted_react_ds.with_columns(pl.struct(['sequence','seq_pos'])\
.map_elements(lambda s: get_nucleotide_count(s['sequence'], s['seq_pos']))\
.alias("id_pos_nc_cnt").cast(pl.Int64))

melted_ds = melted_ds.with_columns(pl.struct(['sequence','id_pos_nc_cnt'])
.map_elements(lambda s: get_nucleotide_abundance(s['sequence'], \
s['id_pos_nc_cnt'])).alias("id_pos_nc_abundnc").cast(pl.Float64))

melted_ds.collect().filter(pl.col('seq_pos')==104).head(20)

So far, these steps have been run inside a for loop, but how do we collate all the data for the various groups of data?

First, we need to initialise an empty Polars LazyFrame

# Define an empty list
full_melted_ds_data = []

# Define the schema for the empty LazyFrame
full_melted_ds_schema = [
("sequence_id", pl.Utf8),
("sequence", pl.Utf8),
("experiment_type", pl.Utf8),
("dataset_name", pl.Utf8),
("seqn_length", pl.Int64),
("seq_pos", pl.Int64),
("reactivity", pl.Float64),
("id_pos_nc_cnt", pl.Int64),
("id_pos_nc_abundnc", pl.Float64)
]

# Create an empty LazyFrame with the defined schema
full_melted_ds = pl.LazyFrame(full_melted_ds_data\
, schema=full_melted_ds_schema)

Then in the first iteration, assign the dataframe built inside the for loop as-is and from the 2nd iteration onwards, keep concatenating vertically (i.e. row-wise)

if i==0:
full_melted_ds = melted_ds
else:
full_melted_ds = pl.concat([full_melted_ds, melted_ds])

With all the above steps, we are almost ready with our final dataframe. However, before going to the next step, let’s make some cosmetic changes

full_melted_ds = full_melted_ds.drop(["experiment_type","dataset_name"])

full_melted_ds = full_melted_ds.rename({"seq_pos": "id", \
"reactivity": "reactivity_DMS_MaP"})

There is no specific need for these two steps from the point of view of this blog, these two are needed for the specific use case. I have included them here to illustrate the APIs available in Polars for dropping and renaming columns of a dataframe

As a final note on Option II, pay attention to the use of collect(), which is forcing Polars to compute the actions it has lazily evaluated in the computation chain (Once again, this is similar to PySpark)

As a final topic, we will look at read_csv_batched

After going through direct reading into memory and lazy evaluation of actions in Option I and Option II respectively, we now shift our attention towards reading data in batches.

test_data_reader = pl.read_csv_batched("/kaggle/input/ \
stanford-ribonanza-rna-folding/test_sequences.csv"\
, low_memory=True\
, n_rows=1343823)

batches = test_data_reader.next_batches(100)

The read_csv_batched API returns an iterator object which will read 100 rows at one go and work on those.

seen_groups = set()

while batches:
df_current_batches = pl.concat(batches)

df_current_batches = df_current_batches\
.with_columns(pl.struct(['sequence'])\
.map_elements(lambda s: get_rna_length(s['sequence']))\
.alias("seqn_length").cast(pl.Int64))
partition_dfs = df_current_batches.partition_by("sequence", as_dict=True)
df_current_batches = df_current_batches\
.with_columns(pl.struct(['sequence'])\
.map_elements(lambda s: count_nucleotide_occurence(s['sequence'],'A'))\
.alias("A_count").cast(pl.Int64))

partition_dfs = df_current_batches\
.partition_by(["seqn_length","A_count"], as_dict=True)

for group, df in partition_dfs.items():
if group in seen_groups:
# append data to existing file
print('inside seen groups')
with open(f"./data/{group}.csv", "a") as fh:
fh.write(df.write_csv(file=None, include_header=False))
else:
print('inside unseen groups')
# create new file
df.write_csv(file=f"./data/{group}.csv", include_header=True)
seen_groups.add(group)

batches = test_data_reader.next_batches(100)

In each iteration, we take 100 records, and create the features on them (remember we also run melt so there is an explosion of records). Once these actions are finished, we partition the batch data based on a set of partition keys and get a separate dataframe from each partition. Using these dataframes we can write the transformed data iteratively, without overwhelming the available memory

The usage of seen_groups set helps us to keep track of whether we need to create a new file or append to an existing file

The parameter n_rows ensures that the reading of data will stop once we have read the 1343823 rows available in the source file.

That’s all for now folks.

It is my firm belief that both Polars and Pandas are extremely flexible and capable in their own right and the choice of usage should be purely dominated by use case and a careful study of the pros of cons of using either.

--

--

Suman Kumar Gangopadhyay

A Data engineer who loves to foray into uncharted domains of engineering