Apache Spark is a very fast unified analytics engine for big data and machine learning. It relies on MapReduce to split tasks and distribute them into a cluster. This allows spark to work with PetaBytes of data using a cluster with hundreds or thousands of workers.
Apache Spark is a project 100% open source. At this momement databricks is the major actor behind its develepment.
Apache Spark consists of 4 major products:
This post is an introduction to Spark DataFrames.
The installation of spark is no easy task and is out of the scope of this post.
One of the most common ways to create a dataframe is by reading a file.
For example to read a csv
you should:
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("pyspark_intro").getOrCreate() sdf = spark.read.format("csv").load("titanic_train.csv")
It is even posible to use wildcards to read multiple files at one.
sdf = spark.read.format("csv").load("titanic_*.csv") # Read both train and test
It is posible to read raw files with spark and process them. As an example we are reading a csv and transform it to a datafram.
from pyspark.sql import SparkSession, types spark = SparkSession.builder.appName("pyspark_intro").getOrCreate() sc = spark.sparkContext data = sc.textFile("datasets/iris.csv") parts = data.map(lambda x: x.split(";")) iris_data = parts.map(lambda x: types.Row(SL=x[0], SW=x[1], PL=x[2], classification=x[3])) sdf = spark.createDataFrame(iris_data)
It is only an example is better to read it directly as a dataframe
To show the data simply call sdf.show(N)
where N
is the number of rows (default=20)
There are some functions to get general info about the dataframe.
Function | What it does |
---|---|
sdf.count() | Number of rows |
sdf.schema | Details about the structure of the dataframe |
sdf.columns | Columns of the dataframe as a python list |
sdf.dtypes | Columns of the dataframe with their data types (dtypes) |
sdf.describe() | Basic stats of the dataframe |
To retrive the first N rows you can either use sdf.head(N)
or sdf.take(N)
.
To get one or more columns use sdf.select
. For example:
sdf.select("Sex") sdf.select("Sex", "Age")
Filters in pyspark follow the same syntax as pandas. There are some synonyms for filtering. These 3 lines do exactly the same:
sdf[sdf["Age"] > 24] sdf.filter(sdf["Age"] > 24) sdf.where(sdf["Age"] > 24)
It also has some nice functions. Some examples are:
# Rows where age is between 20 and 30 sdf[sdf["Age"].between(20, 30)] # Rows where Pclass is one of the values of the list [1, 2] sdf[sdf["Pclass"].isin([1, 2])] # Rows that inclde 'Miss.' in the Name sdf[sdf["Name"].like("%Miss.%")] # Rows with Name starting or ending with a string sdf[sdf["Name"].startswith("Hei")] sdf[sdf["Name"].endswith(".")]
sdf.select("Pclass").distinct()
It is also posible to subtract values based on another list.
sdf.select("Pclass").exceptAll(sdf.select("Survived"))
To modify a dataframe you need to update the original dataframe.
For example you would do sdf = do_something(sdf)
.
It is posible to only call
do_something(sdf)
but it won't update the dataframe.
As an example, to add a column called new_col
with the same value as Age
do:
sdf = sdf.withColumn("new_col", sdf["Age"])
Dtypes are changed using the cast
method of a column.
from pyspark.sql.types import IntegerType sdf = sdf.withColumn("Age", sdf["Age"].cast(IntegerType()))
You need to specify a type from
pyspark.sql.types
.
To modify the data itself use sdf.withColumn
.
You can either create a new column at the same time or update the existing one.
For example:
from pyspark.sql.functions import when sdf.withColumn("sex_code", sdf["Sex"].substr(1, 1).alias("Sex code")) # This is the equivalent of pandas df.loc sdf.withColumn("Age", when(sdf["Age"] > 0, sdf["Age"]).otherwise(-1))
You can fill missing values of multiple columns using:
sdf.fillna({"Age": 0, "Cabin": "no cabin"})
And replace values on certain columns:
sdf.replace("male", "m", "Sex")
sdf.sort("Age", ascending=False)
sdf.drop("new_col")
sdf.drop_duplicates(["Pclass"])
It is posible to define new functions and apply them to a column. To define it you need to specify the output type (int, str...). The transformation will be apply element wise.
from pyspark.sql.functions import udf @udf(IntegerType()) def sum_1(x): if x is not None: return x + 1 return None sdf.withColumn("Next_Age", sum_1(sdf["Age"]))
To group data you can use the sdf.groupby
method.
It is possible to call simple functions like count
or use agg
to apply custom aggregations at once.
sdf.groupby("Sex").count() sdf.groupby("Sex").agg({"sex": "count", "Age": "max"})
One really useful feature of spark is that you can use SQL syntax and spark will translate and apply it.
Before using SQL you need to register the table:
sdf.createOrReplaceTempView("titanic")
Then you can perform queries using spark
object:
spark.sql("SELECT * FROM titanic WHERE sex = 'female' LIMIT 5")
The output of
spark.sql
is another dataframe and you can save it withsdf = spark.sql
Spark can write files in a lot of different formats. One of the best is parquet
.
To save a parquet file use:
sdf.write.format("parquet").mode("overwrite").save("titanic_train.parquet")
Some times you need to retrive some data from spark to python.
For example you can always transform a dataframe to pandas with sdf.toPandas()
.
When transforming to Pandas it will load the data into memory. Be careful to not load a dataframe too big.
To get a column as a python list use:
sdf.select("Name").rdd.flatMap(lambda x: x).collect()
And finally yo can use collect to retrive one value:
sdf.agg({"Age": "max"}).collect()[0][0]