Spark tutorial

Apache Spark is a cluster computing framework for large-scale data processing. Spark does not use the MapReduce as an execution engine, however, it is closely integrated with Hadoop ecosystem and can run on YARN, use Hadoop file formats, and HDFS storage. Spark is best known for its ability to cache large datasets in memory between jobs. It's default API is simpler than MapReduce: the favored interface is via Scala, but there is also support for using Python.

In the beginning of the tutorial, we will learn how to launch and use the Spark shell. Next section will show how to prepare a simple Spark word count application using Python and Scala and run it in the interactive shell, client or a cluster mode using YARN scheduler.

For more details on Spark, one can refer to the external documentation.

Interactive spark shell

Spark provides and interactive shell which gives a way to learn the API, as well as to analyze datasets interactively. One can launch the shell by  simply running: 

spark-shell

This will launch the Spark shell with a Scala interpreter. Alternatively run:

pyspark

to to use the Python interpreter. 

RDDs, transformations and actions

 
Following few examples assume using the Spark shell, so that the sc (Spark context) variable will be created automatically at the start.
 
The key concepts to keep in mind when writing a Spark application are: the resilient distributed datasets (RDDs), the RDD transformations and actions. The RDD is a fault-tolerant collection of elements that can be operated on in parallel. A simple way to create an RDD in the spark-shell is:
 
input_data = [1, 2, 3, 4, 5]
distrData = sc.parallelize(input_data)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.  An example of a transformation is a map, and an example of an action is a reduce:
 
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
 
Here, you can use the text dataset downloaded in the previous tutorial.
map passes each dataset element through a function and returns a new RDD representing the results. 
On the other hand, reduce aggregates all the elements of the RDD using some function and returns the final result to the driver program.
 
See the following page for more details on the RDD transformations and actions.
 
Following is the PySpark implementation of the canonical word-count example:
 
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile("hdfs://...")
    counts = lines.flatMap(lambda x: x.split(' ')) \<
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)

 
Following is the corresponding Scala implementation:
 
sc = SparkContext(appName="ScalaWordCount")
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
val output = counts.collect()
output.foreach(println)

 

Packaging Scala Spark applications with SBT

 
SBT is a modern general purpose build tool written for Scala. It allows for convenient dependency management, continuous command execution and launching REPL in project context.
 
Following is a simple example SBT build file:
 
name := "Hello World"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
    // Spark dependency
    "org.apache.spark" % "spark-core_2.10" % "1.3.0" % "provided",
    {other dependencies}
)
If a project consists of only one Scala class, place this Build file in the same folder as the Scala file and type:
 
sbt package
 
That will create necessary folder structure and produce a jar file.
 

Spark job submission and monitoring

 
Before submitting a Spark application on a cluster, use spark-shell to test it locally by copying and pasting the script into the shell (as described above). Note, that in this case SparkContext is created automatically.
 
The spark-submit script is used to launch applications on a cluster. 
Once a user application is bundled (see the section describing SBT tool), it can be launched using the spark-submit script. 
This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports.
 
Here is the simple usage:
 
spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

 
Some of the commonly used options are:
Command line argument Definition
--class The entry point for your application
--master The master URL. E.g. local,
yarn-client, yarn-cluster
--deploy-mode Whether to deploy your driver
on the worker nodes (cluster) or
locally as an external client (client)
(default: client) 
--conf  Arbitrary Spark configuration property
in key=value format. For values that
contain spaces wrap “key=value” in
quotes (as shown).
application-jar Path to a bundled jar including your
application and all dependencies. 
 application-arguments Arguments passed to the main
method of your main class, if any
 
For more details please refer to the following page