Efficiently reading from redshift large volumes of data with pyarrow

2021-06-08
Python Redshift Pyarrow SQL



reading from redshift with pyarrow

Table of Contents

0. The problem

One of the best ways to start analyzing bussiness data is by using a datawarehouse (dwh). For companies using AWS stack redshift is the default option for a dwh.

In the early stages of the company the dwh is going to be very useful and it will perform really well. But as the size of data and the number of users increase the performance will decrease. This will be specially important if there are bots consuming data from redshift on a periodic basis.

We all know that the best alternative here is to switch to other technologies (datalake, prestoDB, cassandra...) for the rawest data but it's not possible to switch instantly. It will take some time until all data and transformation logic is migrated from redshift.

And the key question here is:

How can we consume redshift data efficently?

In this post we will focus on how to consume data efficently as pandas dataframes. This is because in our case a lot of bots are doing this. The biggest example here might be jobs that are running Machine Learning (ML) pipelines.

1. Using direct queries

The most basic option for reading data from redshift is to directly connect to the database. As an example this can be done with SQL alchemy:

For other processing tools (like spark) it is possible to consume data with direct queries (by using jdbc or other connectors).

The problem with such options is that some pressure is put into redshift since it needs to:

2. Unloading to parquets

One alternative to doing direct queries is to unload data from redshift. This is done by passing a query to the UNLOAD command so that redshift can export the result to S3. More info in unload offical AWS doc.

As usual, the suggestion is to use parquet for storing columnar data. More info here:

By default the unload will produce multipe files so that different redshift workers can do the job in parallel. It might be tempting to do the unload to only one file in order to simplify the reading. But by doing that the export will be slower and more computationally costly for redshift.

So, if we end up with multipe parquet files, how should we read them?

3. Reading parquets as pandas dataframes

3.1. Using pandas read parquet (pandas_read)

The first option is to simply use pandas.read_parquet function and read all files in a loop. This can be done with:

import pandas as pd

# 1. Create a list with all files called 'files'
files = os.listdir(path)

# 2. Read all files as a pandas dataframe
dfs = [pd.read_parquet(file) for file in files]

# 3. Concatenate all dataframes
df = pd.concat(dfs)

The list of files can be created with different options. We suggest using os.listdir if all files are in the same folder and os.walk if there are subfolder with files.

If we need to filter based on a column with this option we would do it after creating the pandas dataframe.

3.2. Using pyarrow read table (pyarrow_single_read)

Another option is to read each file with pyarrow instead. This would by done by:

import pandas as pd

# 1. Create a list with all files called 'files'
files = os.listdir(path)

# 2. Read all files as a pandas dataframe
dfs = [pq.read_table(file).to_pandas() for file in files]

# 3. Concatenate all dataframes
df = pd.concat(dfs)

The performance should be similar since pandas usually use pyarrow under the hood.

If we need to filter based on a column with this option we would do it after creating the pandas dataframe.

3.3. Using pyarrow parquet dataset (pyarrow_parquet_ds_read)

Another option is to use the ParquetDataset from pyarrow. With this we only need to specify the folder were the parquets are and pyarrow will pick all the files.

import pyarrow.parquet as pq

# Create a dataset with all parquet files
dataset = pq.ParquetDataset(path, validate_schema=False)

# Read everything as one pandas dataframe
df = dataset.read_pandas().to_pandas()

In this case we can filter before while reading. As an example this can be done with:

mfilters = [
    ("p_creation_date"", ">=", "2021-04-29"),
    # More filters can be added
    # We are passing tuples with the conditions as strings
]

dataset = pq.ParquetDataset(path, validate_schema=False, filter=mfilters)

3.4. Using pyarrow dataset (pyarrow_ds_read)

With the release of pyarrow 3.0.0 (released on 2021-01-26) a new way of reading parquets was introduced. The idea is to use the new pyarrow.dataset to then create a table which can be transformed to a pandas dataframe.

This can be done with:

import pyarrow.dataset as ds

# Create a dataset with all parquet files
dataset = ds.dataset(path, format="parquet", partitioning="hive")

# Create a table using the dataset
table = dataset.to_table()

# Transform it to a pandas dataframe
df = table.to_pandas(use_threads=True)

In this case we can also filter before while reading. As an example this can be done with:

table = dataset.to_table(filter=ds.field("p_creation_date") >= "2021-04-29")

4. Datasets for testing

For testing the different options we are going to use US accidents dataset from kaggle. This is a 1 GB dataset (stored as csv).

In order to test different set ups we are going to store this dataset in 3 different ways:

  1. Single parquet file
  2. One parquet by month
  3. Multiple parquets by month

4.1. Dataset 0 (single parquet)

This first dataset is done by exporting all the data as one sigle parquet file. This is the kind of dataset we would have if we were to do only one export with all the data we want to consume.

4.2. Dataset 1 (One parquet by month)

This dataset contains only one parquet per each month. Those parquets are store in subfolder following the hive partitioning:

- /dataset_1
    ├── /p_creation_month=p_creation_month=2016-02
    │   └── 0001.parquet
    ├── /p_creation_month=p_creation_month=2016-03
    │   └── 0001.parquet
    │   ...
    └── /p_creation_month=p_creation_month=2020-12
        └── 0001.parquet

This represents a partitioned dataset that don't have a lot of files. This is what we would have if we were doing periodical unloads into each subfolder and then later combining the output into fewer files.

4.3. Dataset 2 (Multiple parquets by month)

This last dataset is similar to the previous one but with multipe files per partition. We used the state column to easily create around 50 files per partition.

This represents a dataset with a lot of files. This is what we would have if we were doing periodical unloads into each subfolder

5. Testing all options

5.1. Reading from a single parquet (dataset 0)

The first test is to compare the reading of one single file.

We see that the the pyarrow datasets options perform a little bit better.

5.2. Reading from data paritioned (datasets 1 and 2)

In this case we are comparing what would happen if we were reading partitioned data. There are 2 different readings that we are testing:

For pandas_read and pyarrow_single_read we see that filtering it's slower. This happens because we are first reading and then filtering. So more operations implies more time.

The important part here is that for the pyarrow datasets options when we are filtering the reading is a lot faster. This happens because pyarrow can push down those filters and avoid reading unnecessary files.

If we repeat the same test but with dataset_1 (one file per partition) we see the same pattern. Here the differences are even bigger.

5.3. Comparing best options

Let's zoom in and compare the best options (pyarrow_ds_read and pyarrow_parquet_ds_read) and compared both partitioned datasets.

Here it seems like the new pyarrow.dataset only performance better with a dataset with a lot of files. If we have a small number of files it performs slightly worse.

5.4. Reading using row groups

In this case we want to do another test where we do 2 filters at the same time:

We are using the same column we used for splitting the dataset_2 as the second filter

In this case we see how the new pyarrow.dataset introduced in pyarrow 3.0.0 performs way better.

What is happening is that pyarrow is able to push down the filter and take advantage of how parquet files are actually written. That means that pyarrow is only retriving some row_groups so that less data needs to be read.

You can find more info about how parquet files are written in this post:

6. Conclusions

To sum it up it doing unloads it is a more efficent way of consuming large quantities of data from redshift.

There are are different ways of doing to unload. If the table we are consuming is not changing past data we suggest only unloading the last day/month (or other time windows) to reduce the performance cost.

If reading data partitioned by the time window is slow, we suggest running a job that compacts the parquets into each partition into less files. This can be done after doing the unload.

Finally the best way of reading parquets into pandas dataframes is by using the new pyarrow.dataset introduced in pyarrow 3.0.0