Spark tutorial

OUTLINE

 

Introduction

Apache Spark is a cluster computing framework for large-scale data processing. Spark does not use MapReduce as an execution engine, however, it is closely integrated with Hadoop ecosystem and can run on YARN, use Hadoop file formats and HDFS storage. Note that we chose to decommission our Hadoop cluster at Princeton in favor of using GPFS on our current clusters. Spark works very well with GPFS.

Spark is best known for its ability to cache large datasets in memory between intermediate calculations. Its default API is simpler than MapReduce: the favored API is Scala, but there is also support for Python, R and Java.

In the beginning of the tutorial, we will learn how to launch and use the Spark shell. Next we will show how to prepare a simple Spark word count application using Python and Scala and run it in the interactive shell, client or a cluster mode using the YARN scheduler.

For more details on Spark, one can refer to the external documentation or the main Research Computing Spark page.

 

Interactive spark shell

Spark provides an interactive shell which gives a way to learn the API, as well as to analyze datasets interactively. For the Python interface use these commands:

$ salloc -N 1 -n 1 -t 30:00
$ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0
$ spark-start
$ pyspark

For the native Scala interface:

$ salloc -N 1 -n 1 -t 30:00
$ module load spark/hadoop3.2/3.2.0
$ spark-start
$ spark-shell

This will launch the Spark shell with a Scala interpreter.

For R users:

$ salloc -N 1 -n 1 -t 30:00
$ module load spark/hadoop3.2/3.2.0
$ spark-start
$ sparkR

See the R API for Spark. There is an example below for submitting a batch job.

 

Working with dataframes

Python Example

See the Spark getting started guide for working with dataframes as well as the dataframes API.

The interactive code below illustrates loading a CSV file:

$ salloc -N 1 -n 1 -t 30:00
$ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0
$ spark-start
$ pyspark

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.master("local[1]").getOrCreate()
>>> df = spark.read.csv('my.csv', header=False, inferSchema=True)

To write a CSV file (be careful since coalesce is called):

>>> myRDD = sc.parallelize([('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17)])
>>> myRDD.count()
>>> df = sqlContext.createDataFrame(myRDD, ['name', 'age'])
>>> df.show()  # be careful if df is large
>>> df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/aturing/mydir')
>>> exit()

Below is an example of reading a JSON file:

>>> from pyspark.sql import SparkSession
>>> import os
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.read.json(f"{os.environ['SPARK_HOME']}/examples/src/main/resources/people.json")
>>> df.printSchema()

The example below illustrates different ways to work with dataframes:

df = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/aturing/small/*.csv")
print((df.count(), len(df.columns)))
df.printSchema()

# drop unnecessary columns
cols = ['device_key', 'location_key', 'min_duration_secods', 'visit_date_time_local']
df = df.select(*cols)

# create column of just the date (later used for grouping)
from pyspark.sql.types import DateType
df = df.withColumn('visit_date', df['visit_date_time_local'].cast(DateType()))

from pyspark.sql import functions as F
ave = df.groupby("location_key").agg(F.mean("min_duration_secods"), F.stddev("min_duration_secods"))

# replace NaNs
ave = ave.replace(float('nan'), 0)

df = df.join(ave, ['location_key'])

# filter
df = df.filter((F.col("min_duration_secods") != 0) &
               (F.col("min_duration_secods") < F.col("avg(min_duration_secods)") +
                2 * F.col("stddev_samp(min_duration_secods)")))

# drop ave and std
cols.extend(['visit_date'])
df = df.select(*cols)

print((df.count(), len(df.columns)))
df.printSchema()

df.groupby("location_key").pivot("visit_date").agg(F.countDistinct("device_key"))

 

Hands-on Exercise

Research Computing stores the GPU utlilization of each GPU once every 30 seconds. Find the top 5 users with the highest average utilization. The NetID is column "user" and the GPU utilization is "util". Here is how to load the 1.7 GB data file on Adroit:

>>> df = spark.read.json("/scratch/network/jdh4/ML_WORKSHOP/data/utilization.json")
>>> df.show(5)

Hint: See the Python example above. In addition to this you will probably need to consult the Spark dataframe API.

 

R Example

Consider the sample R program below (/usr/licensed/spark/spark-3.2.0-bin-hadoop3.2/examples/src/main/r/dataframe.R):

library(SparkR)

# Initialize SparkSession
sparkR.session(appName = "SparkR-DataFrame-example")

# Create a simple local data.frame
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))

# Convert local data frame to a SparkDataFrame
df <- createDataFrame(localDF)

# Print its schema
printSchema(df)

# Create a DataFrame from a JSON file
path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
peopleDF <- read.json(path)
printSchema(peopleDF)

# Register this DataFrame as a table.
createOrReplaceTempView(peopleDF, "people")

# SQL statements can be run by using the sql methods
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# Call collect to get a local data.frame
teenagersLocalDF <- collect(teenagers)

# Print the teenagers in our dataset
print(teenagersLocalDF)

# Stop the SparkSession now
sparkR.session.stop()

Below is an Slurm script for the above R program:

#!/bin/bash
#SBATCH --job-name=spark-r       # create a short name for your job
#SBATCH --nodes=1                # node count
#SBATCH --ntasks-per-node=2      # number of tasks per node
#SBATCH --cpus-per-task=1        # cpu-cores per task (>1 if multi-threaded tasks)
#SBATCH --mem=8G                 # memory per node
#SBATCH --time=00:05:00          # total run time limit (HH:MM:SS)

module purge
module load spark/hadoop3.2/3.2.0

spark-start
echo $MASTER | tee master.txt

spark-submit --total-executor-cores 2 --executor-memory 4G dataframe.R

See the SparkR API.

 

RDDs, transformations and actions

The following examples assume the use of the Spark shell, so that the sc (Spark context) variable will be created automatically at the start.
 
The key concepts to keep in mind when writing a Spark application are: the resilient distributed datasets (RDDs) and the RDD transformations and actions. The RDD is a fault-tolerant collection of elements that can be operated on in parallel. A simple way to create an RDD in the spark-shell is:
 
input_data = [1, 2, 3, 4, 5]
distrData = sc.parallelize(input_data)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.  An example of a transformation is a map, and an example of an action is a reduce:
 
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
 
The map transformation passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce aggregates all the elements of the RDD using some function and returns the final result to the driver program.
 
See the following page for more details on the RDD transformations and actions.
 
Below is the PySpark implementation of the canonical word-count example:
 
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile("hdfs://...")
    counts = lines.flatMap(lambda x: x.split(' ')) \<
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)
Below is the corresponding Scala implementation:
 

sc = SparkContext(appName="ScalaWordCount")
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
val output = counts.collect()
output.foreach(println)

 

Common Mistakes with Big Data Processing in Spark

  1. Big data requires storage space. Run the checkquota command to see your available space.
  2. Spark can be installed and ran on a laptop. An excellent way to write a Spark script to solve a big data problem is to take a subset of your data (e.g., 1 GB) and write the Spark script on a laptop or workstation. You can then apply the script on one of the RC clusters using your full dataset.
  3. Do not read in all your data, make a slight change and then write it all back out with the intention of working with it in a second Spark application. If you already have big data to begin with then this will double its size. Instead, read in the data, do all your processing in one Spark application and then write the results which are almost always much smaller than the input data.
  4. Be very careful with functions like collect and coalesce. These functions and others can be used to gather all of your data onto a single node. If you have more data than can fit into memory on the single node then this will cause a crash.
  5. If you are coming from Pandas or R dataframes then you may find it awkward to write code in Spark. Depending on your workflow, you may be able to do most of the processing in Spark and then write out a subset of the data to work with in Python or R.
  6. Be sure to learn about caching or persisting datasets. You can speed-up your computations by storing intermediate results in memory or on disk.
  7. Be aware that lazy evaluation is used for transforms. That is, a join operation, for example, will appear to complete instantaneously. Transforms enter as a node in a computation graph. Spark will only execute computation graph when the user specifies an action in the Spark script such as displaying the last row of a dataframe produced by a join operation. This contrasts with imperative programming where lines of code are evaluated immediately.

 

Pandas on Spark

There is a Pandas API to Spark called "Pandas on Spark". See the getting started guide and API. Below is an example session:

$ module load anaconda3/2021.11
$ conda create --name pyspark-env pyspark pyarrow pandas -y
$ salloc -N 1 -n 1 -t 30:00
$ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0
$ conda activate pyspark-env
$ spark-start
$ pyspark
>>> import pyspark.pandas as ps
>>> s = ps.Series([1, 2, 3])

 

Packaging Scala Spark applications with SBT

 
SBT is a modern general purpose build tool written for Scala. It allows for convenient dependency management, continuous command execution and launching REPL in project context.
 
Following is a simple example SBT build file:
 
name := "Hello World"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
    // Spark dependency
    "org.apache.spark" % "spark-core_2.10" % "1.3.0" % "provided",
    {other dependencies}
)
If a project consists of only one Scala class, place this Build file in the same folder as the Scala file and type:
 
sbt package
 

That will create necessary folder structure and produce a jar file.

 

Spark job submission and monitoring

Before submitting a Spark application on a cluster, use spark-shell to test it locally by copying and pasting the script into the shell (as described above). Note, that in this case SparkContext is created automatically.
 
The spark-submit script is used to launch applications on a cluster. Once a user application is bundled (see the section describing SBT tool), it can be launched using the spark-submit script. This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports.
 
Here is the simple usage:
 
spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

 
Some of the commonly used options are:
Command line argument Definition
--class The entry point for your application
--master The master URL. E.g. local,
yarn-client, yarn-cluster
--deploy-mode Whether to deploy your driver
on the worker nodes (cluster) or
locally as an external client (client)
(default: client) 
--conf  Arbitrary Spark configuration property
in key=value format. For values that
contain spaces wrap “key=value” in
quotes (as shown).
application-jar Path to a bundled jar including your
application and all dependencies. 
 application-arguments Arguments passed to the main
method of your main class, if any
 
For more details please refer to the following page