Analytics Blog

Shifting from Pandas to Spark DataFrames Pt 2

Welcome to the second part of this introduction to Spark DataFrames!

Using Spark DataFrames

If you successfully installed Spark you should be able to launch the Spark/Scala shell with the “spark-shell” command from any terminal window.

A few things automatically happen when you launch this shell. This command defaults to local mode, which means that all Spark components will be launched on a single JVM on your local machine. To see more options run “spark-shell –help” from the command line.

It also launches an instance of the special class SparkSession named spark. We will use this as the entry point to access the DataFrame API. This unifies and replaces the SQLcontext and HiveContext of previous versions. Other Spark commands are still accessed using the SparkContext sc.

Try this command: sc.isLocal

Reading data
Now let’s read in some data. I’m using a dataset of world airports from here.

It has a mix of numerical and categorical data in a .csv format with a header containing the column names.

We could use the “textFile(path)” command to read in the data as an RDD of strings, but then we’d have to manually parse the data and define the schema to turn it into a dataframe.

Fortunately, Spark has a csv parser that will do this for us. New in 2.0, it will optionally infer the schema – a huge of quality-of-life improvement. Be aware that using this option requires two passes through the data.

Compare the following two ways of reading the file:

Code1

The new csv reader is based on the Databricks spark-csv. I have found cases where the Spark 2.0 csv reader incorrectly infers schema that spark-csv gets right.

Transformations and Actions
Spark commands are one of two types. Transformations are lazily evaluated and return pointers to Dataframes. Actions are evaluated immediately and return values. Here are some examples.

Transformations:
Code2
Actions:
Code3
You can chain transformations together without triggering any computation. As soon as you enter an action the entire chain will be computed. Watch out for the collect() action! Don’t collect() a large dataset! It tells Spark to move all of the distributed data to the driver. Large data will cause an out of memory error and crash your program. A better alternative it use take(n) to get n rows in an Array format or show(n) to display n rows on screen.

Selecting, adding and deleting columns
Selecting columns is done by using the column names. This returns a new dataframe with just one column:
Code4
When adding columns we have to remember that Spark Dataframes are immutable. Unlike pandas we can’t just add a new column to the existing dataframe. Instead, we create a new dataframe:

Code5

For more complicated transformations we can create user-defined functions. Here’s a simple function that takes a Double and returns a Boolean that tests if a latitude value is in the northern hemisphere:

Code7

Now we can use this function on an existing column to create a new column.

Code7

Be careful – this doesn’t modify dfMetric, nor did we save our result to a new dataframe.

Filter early, filter often
Use filter to reduce the size of the dataset. Do this early in the calculation to reduce run time. Filtering can be done with a SQL-like statement or with a more pandas-like condition. The following are all equivalent:

Code9

Cache intermediate results to improve performance
If you are going to reuse an intermediate dataframe, like in a loop, then we can tell Spark to hold it in memory, rather than recompute it every time it’s used.

dfMetric.cache()

Data movement is expensive
Spark partitions your data behind the scenes. Some operations require data movement (e.g. join), but you should avoid it when possible.

Aggregations
Simple aggregations can be done with a grouping followed by an aggregation. Some examples of built-in aggregations are sum, mean, max, min, stddev.

Code10

To calculate more than one aggregate, use the .agg function.

Code11

Make your code parallelizable
You can run into a “Task not serializable” error if you define a variable on the driver but then try to use it in code that is executed on the workers. Here’s a somewhat contrived example:

Code12

Here’s an example of another example of an unexpected result. The counter variable will be incremented on the workers but won’t be on the driver:

Code13

Useful resources.
I hope you enjoyed this very brief introduction to Spark 2.0 Dataframes. There are many topics I l didn’t discuss – accumulators, MLlib, ML pipelines, running on clusters and more. If you’d like to learn more here are some resources that I found useful .

Spark 2.0 Guide
An in-depth Spark overview
Learning Spark book
High Performance Spark – forthcoming book