One of the best ways to start analyzing business data is by using a datawarehouse (dwh). For companies using the AWS stack redshift is the default option for a dwh.
In the early stages of the company the dwh is going to be very useful and it will perform really well. But as the size of data and the number of users increase the performance will decrease. This will be especially important if there are bots consuming data from redshift on a periodic basis.
We all know that the best alternative here is to switch to other technologies (datalake, prestoDB, cassandra...) for the rawest data but it's not possible to switch instantly. It will take some time until all data and transformation logic is migrated from redshift.
And the key question here is:
How can we consume redshift data efficiently?
In this post we will focus on how to consume data efficiently as pandas dataframes. This is because in our case a lot of bots are doing this. The biggest example here might be jobs that are running Machine Learning (ML) pipelines.
The most basic option for reading data from redshift is to connect directly to the database. As an example this can be done with SQL alchemy:
For other processing tools (like spark) it is possible to consume data with direct queries (by using jdbc
or other connectors).
The problem with such options is that it puts some pressure on redshift since it needs to:
One alternative to doing direct queries is to unload data from redshift.
This is done by passing a query to the UNLOAD
command so that redshift can export the result to S3.
More info in unload official AWS doc.
As usual, the suggestion is to use parquet for storing columnar data. More info here:
By default the unload will produce multiple files so that different redshift workers can do the job in parallel. It might be tempting to do the unload to only one file in order to simplify the reading. But by doing that the export will be slower and more computationally costly for redshift.
So, if we end up with multiple parquet files, how should we read them?
pandas_read
)The first option is to simply use pandas.read_parquet
function and read all files in a loop.
This can be done with:
import pandas as pd # 1. Create a list with all files called 'files' files = os.listdir(path) # 2. Read all files as a pandas dataframe dfs = [pd.read_parquet(file) for file in files] # 3. Concatenate all dataframes df = pd.concat(dfs)
The list of files can be created with different options. We suggest using
os.listdir
if all files are in the same folder andos.walk
if there are subfolder with files.
If we need to filter based on a column with this option we would do it after creating the pandas dataframe.
pyarrow_single_read
)Another option is to read each file with pyarrow instead. This would by done by:
import pandas as pd # 1. Create a list with all files called 'files' files = os.listdir(path) # 2. Read all files as a pandas dataframe dfs = [pq.read_table(file).to_pandas() for file in files] # 3. Concatenate all dataframes df = pd.concat(dfs)
The performance should be similar since pandas usually use
pyarrow
under the hood.
If we need to filter based on a column with this option we would do it after creating the pandas dataframe.
pyarrow_parquet_ds_read
)Another option is to use the ParquetDataset
from pyarrow.
With this we only need to specify the folder where the parquets are stored and pyarrow will pick all the files.
import pyarrow.parquet as pq # Create a dataset with all parquet files dataset = pq.ParquetDataset(path, validate_schema=False) # Read everything as one pandas dataframe df = dataset.read_pandas().to_pandas()
In this case we can filter before while reading. As an example this can be done with:
mfilters = [ ("p_creation_date"", ">=", "2021-04-29"), # More filters can be added # We are passing tuples with the conditions as strings ] dataset = pq.ParquetDataset(path, validate_schema=False, filter=mfilters)
pyarrow_ds_read
)With the release of pyarrow 3.0.0 (released on 2021-01-26) a new way of reading parquets was introduced.
The idea is to use the new pyarrow.dataset
to create a table which can then be transformed into a pandas dataframe.
This can be done with:
import pyarrow.dataset as ds # Create a dataset with all parquet files dataset = ds.dataset(path, format="parquet", partitioning="hive") # Create a table using the dataset table = dataset.to_table() # Transform it into a pandas dataframe df = table.to_pandas(use_threads=True)
In this case we can also filter the table while reading. As an example this can be done with:
table = dataset.to_table(filter=ds.field("p_creation_date") >= "2021-04-29")
For testing the different options we are going to use US accidents dataset from kaggle.
This is a 1 GB dataset (stored as csv
).
In order to test different set ups we are going to store this dataset in 3 different ways:
This first dataset is created by exporting all the data as one single parquet file. This is the kind of dataset we would have if we were to do only one export with all the data we want to consume.
This dataset contains only one parquet per month. Those parquets are stored in subfolders following the hive partitioning:
- /dataset_1 ├── /p_creation_month=p_creation_month=2016-02 │ └── 0001.parquet ├── /p_creation_month=p_creation_month=2016-03 │ └── 0001.parquet │ ... └── /p_creation_month=p_creation_month=2020-12 └── 0001.parquet
This represents a partitioned dataset that doesn't have a lot of files. This is what we would have if we were doing periodical unloads into each subfolder and then later combining the output into fewer files.
This last dataset is similar to the previous one but with multiple files per partition.
We used the state
column to easily create around 50 files per partition.
This represents a dataset with a lot of files. This is what we would have if we were doing periodical unloads into each subfolder
The first test is to compare the reading of one single file using the different methods.
We see that the the pyarrow datasets options performs a little bit better.
In this case we are comparing what would happen if we were reading partitioned data. There are 2 different readings that we are testing:
For pandas_read
and pyarrow_single_read
we see that including filtering is slower.
This happens because we are first reading and then filtering, and thus adding more operations means an increase in the time taken.
The important part here is that for the pyarrow datasets options the reading is a lot faster when we are also filtering. This happens because pyarrow can push down those filters and avoid reading unnecessary files.
If we repeat the same test but with dataset_1
(one file per partition) we see the same pattern.
Here the differences are even bigger.
Let's zoom in and compare the best options (pyarrow_ds_read
and pyarrow_parquet_ds_read
) and compare both partitioned datasets.
Here it seems like the new pyarrow.dataset
only performs better with a dataset with a lot of files.
If we have a small number of files it performs slightly worse.
In this case we want to do another test where we apply 2 filters at the same time:
p_creation_month
)state
)We are using the same column we used for splitting the
dataset_2
as the second filter
In this case we see how the new pyarrow.dataset
introduced in pyarrow 3.0.0 performs way better.
What is happening is that pyarrow is able to push down the filter and take advantage of how parquet files are actually written.
That means that pyarrow is only retriving some row_groups
and therefore less data needs to be read.
You can find more info about how parquet files are written in this post:
To sum it up doing unloads is a more efficent way of consuming large quantities of data from redshift.
There are are different ways of unloading
.
If the table we are consuming is not changing past data we suggest only unloading
the last day/month (or other time windows) to reduce the performance cost.
If reading data partitioned by the time window is slow, we suggest running a job that compacts the parquets
into each partition into less files.
This can be done after doing the unload.
Finally the best way of reading parquets into pandas dataframes is by using the new pyarrow.dataset
introduced in pyarrow 3.0.0