Spark tutorial

Apache Spark is a cluster computing framework for large-scale data processing. Spark does not use MapReduce as an execution engine, however, it is closely integrated with Hadoop ecosystem and can run on YARN, use Hadoop file formats, and HDFS storage. Note that we chose to decommission our Hadoop cluster at Princeton in favor of using GPFS on our current clusters. Spark works very well with GPFS.

Spark is best known for its ability to cache large datasets in memory between intermediate calculations. Its default API is simpler than MapReduce: the favored APi is Scala, but there is also support for Python, R and Java.

In the beginning of the tutorial, we will learn how to launch and use the Spark shell. Next we will show how to prepare a simple Spark word count application using Python and Scala and run it in the interactive shell, client or a cluster mode using the YARN scheduler.

For more details on Spark, one can refer to the external documentation.


Interactive spark shell

Spark provides an interactive shell which gives a way to learn the API, as well as to analyze datasets interactively. For the Python interface use these commands:

$ salloc -N 1 -n 1 -t 30:00
$ module load anaconda3/2019.10 spark/hadoop2.7/2.4.6
$ spark-start
$ pyspark

For the native Scala interface:

$ salloc -N 1 -n 1 -t 30:00
$ module load spark/hadoop2.7/2.4.6
$ spark-start
$ spark-shell

This will launch the Spark shell with a Scala interpreter.


Working with CSV files and dataframes

The interactive code below illustrates loading a CSV file:

$ salloc -N 1 -n 1 -t 30:00
$ module load anaconda3/2019.10 spark/hadoop2.7/2.2.0
$ spark-start
$ pyspark

$ from pyspark.sql import SparkSession
$ spark = SparkSession.builder.master("local[1]").appName("Wine classification").getOrCreate()
$ df ='my.csv', header=False, inferSchema=True)

To write a CSV file (be careful since coalesce is called):

$ myRDD = sc.parallelize([('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17)])
$ myRDD.count()
$ df = sqlContext.createDataFrame(myRDD, ['name', 'age'])
$  # be careful if df is large
$ df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/aturing/mydir')
$ exit()

The example below illustrates different ways to work with dataframes:

df ="header", "true").option("inferSchema", "true").csv("/home/aturing/small/*.csv")
print((df.count(), len(df.columns)))

# drop unnecessary columns
cols = ['device_key', 'location_key', 'min_duration_secods', 'visit_date_time_local']
df =*cols)

# create column of just the date (later used for grouping)
from pyspark.sql.types import DateType
df = df.withColumn('visit_date', df['visit_date_time_local'].cast(DateType()))

from pyspark.sql import functions as F
ave = df.groupby("location_key").agg(F.mean("min_duration_secods"), F.stddev("min_duration_secods"))

# replace NaNs
ave = ave.replace(float('nan'), 0)

df = df.join(ave, ['location_key'])

# filter
df = df.filter((F.col("min_duration_secods") != 0) & (F.col("min_duration_secods") < F.col("avg(min_duration_secods)") + 2 * F.col("stddev_samp(min_duration_secods)")))

# drop ave and std
df =*cols)

print((df.count(), len(df.columns)))



RDDs, transformations and actions

The following examples assume the use of the Spark shell, so that the sc (Spark context) variable will be created automatically at the start.
The key concepts to keep in mind when writing a Spark application are: the resilient distributed datasets (RDDs) and the RDD transformations and actions. The RDD is a fault-tolerant collection of elements that can be operated on in parallel. A simple way to create an RDD in the spark-shell is:
input_data = [1, 2, 3, 4, 5]
distrData = sc.parallelize(input_data)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.  An example of a transformation is a map, and an example of an action is a reduce:
lines = sc.textFile("data.txt")
lineLengths = s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
The map transformation passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce aggregates all the elements of the RDD using some function and returns the final result to the driver program.
See the following page for more details on the RDD transformations and actions.
Below is the PySpark implementation of the canonical word-count example:
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile("hdfs://...")
    counts = lines.flatMap(lambda x: x.split(' ')) \<
                  .map(lambda x: (x, 1)) \
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)
Below is the corresponding Scala implementation:
sc = SparkContext(appName="ScalaWordCount")
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
val output = counts.collect()


Packaging Scala Spark applications with SBT

SBT is a modern general purpose build tool written for Scala. It allows for convenient dependency management, continuous command execution and launching REPL in project context.
Following is a simple example SBT build file:
name := "Hello World"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
    // Spark dependency
    "org.apache.spark" % "spark-core_2.10" % "1.3.0" % "provided",
    {other dependencies}
If a project consists of only one Scala class, place this Build file in the same folder as the Scala file and type:
sbt package
That will create necessary folder structure and produce a jar file.

Spark job submission and monitoring

Before submitting a Spark application on a cluster, use spark-shell to test it locally by copying and pasting the script into the shell (as described above). Note, that in this case SparkContext is created automatically.
The spark-submit script is used to launch applications on a cluster. Once a user application is bundled (see the section describing SBT tool), it can be launched using the spark-submit script. This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports.
Here is the simple usage:
spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \

Some of the commonly used options are:
Command line argument Definition
--class The entry point for your application
--master The master URL. E.g. local,
yarn-client, yarn-cluster
--deploy-mode Whether to deploy your driver
on the worker nodes (cluster) or
locally as an external client (client)
(default: client) 
--conf  Arbitrary Spark configuration property
in key=value format. For values that
contain spaces wrap “key=value” in
quotes (as shown).
application-jar Path to a bundled jar including your
application and all dependencies. 
 application-arguments Arguments passed to the main
method of your main class, if any
For more details please refer to the following page