OUTLINE
- Introduction
- Interactive spark shell
- Working with dataframes
- RDDs, transformations and actions
- Common Mistakes with Big Data Processing in Spark
- Pandas on Spark
- Packaging Scala Spark applications with SBT
- Spark job submission and monitoring
Introduction
Apache Spark is a cluster computing framework for large-scale data processing. Spark does not use MapReduce as an execution engine, however, it is closely integrated with Hadoop ecosystem and can run on YARN, use Hadoop file formats and HDFS storage. Note that we chose to decommission our Hadoop cluster at Princeton in favor of using GPFS on our current clusters. Spark works very well with GPFS.
Spark is best known for its ability to cache large datasets in memory between intermediate calculations. Its default API is simpler than MapReduce: the favored API is Scala, but there is also support for Python, R and Java.
In the beginning of the tutorial, we will learn how to launch and use the Spark shell. Next we will show how to prepare a simple Spark word count application using Python and Scala and run it in the interactive shell, client or a cluster mode using the YARN scheduler.
For more details on Spark, one can refer to the external documentation or the main Research Computing Spark page.
Interactive spark shell
Spark provides an interactive shell which gives a way to learn the API, as well as to analyze datasets interactively. For the Python interface use these commands:
$ salloc -N 1 -n 1 -t 30:00 $ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0 $ spark-start $ pyspark
For the native Scala interface:
$ salloc -N 1 -n 1 -t 30:00 $ module load spark/hadoop3.2/3.2.0 $ spark-start $ spark-shell
This will launch the Spark shell with a Scala interpreter.
For R users:
$ salloc -N 1 -n 1 -t 30:00 $ module load spark/hadoop3.2/3.2.0 $ spark-start $ sparkR
See the R API for Spark. There is an example below for submitting a batch job.
Working with dataframes
Python Example
See the Spark getting started guide for working with dataframes as well as the dataframes API.
The interactive code below illustrates loading a CSV file:
$ salloc -N 1 -n 1 -t 30:00 $ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0 $ spark-start $ pyspark >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.master("local[1]").getOrCreate() >>> df = spark.read.csv('my.csv', header=False, inferSchema=True)
To write a CSV file (be careful since coalesce is called):
>>> myRDD = sc.parallelize([('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17)]) >>> myRDD.count() >>> df = sqlContext.createDataFrame(myRDD, ['name', 'age']) >>> df.show() # be careful if df is large >>> df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/aturing/mydir') >>> exit()
Below is an example of reading a JSON file:
>>> from pyspark.sql import SparkSession >>> import os >>> spark = SparkSession.builder.getOrCreate() >>> df = spark.read.json(f"{os.environ['SPARK_HOME']}/examples/src/main/resources/people.json") >>> df.printSchema()
The example below illustrates different ways to work with dataframes:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/aturing/small/*.csv") print((df.count(), len(df.columns))) df.printSchema() # drop unnecessary columns cols = ['device_key', 'location_key', 'min_duration_secods', 'visit_date_time_local'] df = df.select(*cols) # create column of just the date (later used for grouping) from pyspark.sql.types import DateType df = df.withColumn('visit_date', df['visit_date_time_local'].cast(DateType())) from pyspark.sql import functions as F ave = df.groupby("location_key").agg(F.mean("min_duration_secods"), F.stddev("min_duration_secods")) # replace NaNs ave = ave.replace(float('nan'), 0) df = df.join(ave, ['location_key']) # filter df = df.filter((F.col("min_duration_secods") != 0) & (F.col("min_duration_secods") < F.col("avg(min_duration_secods)") + 2 * F.col("stddev_samp(min_duration_secods)"))) # drop ave and std cols.extend(['visit_date']) df = df.select(*cols) print((df.count(), len(df.columns))) df.printSchema() df.groupby("location_key").pivot("visit_date").agg(F.countDistinct("device_key"))
Hands-on Exercise
Research Computing stores the GPU utlilization of each GPU once every 30 seconds. Find the top 5 users with the highest average utilization. The NetID is column "user" and the GPU utilization is "util". Here is how to load the 1.7 GB data file on Adroit:
>>> df = spark.read.json("/scratch/network/jdh4/ML_WORKSHOP/data/utilization.json") >>> df.show(5)
Hint: See the Python example above. In addition to this you will probably need to consult the Spark dataframe API.
R Example
Consider the sample R program below (/usr/licensed/spark/spark-3.2.0-bin-hadoop3.2/examples/src/main/r/dataframe.R):
library(SparkR) # Initialize SparkSession sparkR.session(appName = "SparkR-DataFrame-example") # Create a simple local data.frame localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) # Convert local data frame to a SparkDataFrame df <- createDataFrame(localDF) # Print its schema printSchema(df) # Create a DataFrame from a JSON file path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json") peopleDF <- read.json(path) printSchema(peopleDF) # Register this DataFrame as a table. createOrReplaceTempView(peopleDF, "people") # SQL statements can be run by using the sql methods teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # Call collect to get a local data.frame teenagersLocalDF <- collect(teenagers) # Print the teenagers in our dataset print(teenagersLocalDF) # Stop the SparkSession now sparkR.session.stop()
Below is an Slurm script for the above R program:
#!/bin/bash #SBATCH --job-name=spark-r # create a short name for your job #SBATCH --nodes=1 # node count #SBATCH --ntasks-per-node=2 # number of tasks per node #SBATCH --cpus-per-task=1 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=8G # memory per node #SBATCH --time=00:05:00 # total run time limit (HH:MM:SS) module purge module load spark/hadoop3.2/3.2.0 spark-start echo $MASTER | tee master.txt spark-submit --total-executor-cores 2 --executor-memory 4G dataframe.R
See the SparkR API.
RDDs, transformations and actions
input_data = [1, 2, 3, 4, 5]
distrData = sc.parallelize(input_data)
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. An example of a transformation is a map, and an example of an action is a reduce:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile("hdfs://...")
counts = lines.flatMap(lambda x: x.split(' ')) \<
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)
sc = SparkContext(appName="ScalaWordCount")
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
val output = counts.collect()
output.foreach(println)
Common Mistakes with Big Data Processing in Spark
- Big data requires storage space. Run the checkquota command to see your available space.
- Spark can be installed and ran on a laptop. An excellent way to write a Spark script to solve a big data problem is to take a subset of your data (e.g., 1 GB) and write the Spark script on a laptop or workstation. You can then apply the script on one of the RC clusters using your full dataset.
- Do not read in all your data, make a slight change and then write it all back out with the intention of working with it in a second Spark application. If you already have big data to begin with then this will double its size. Instead, read in the data, do all your processing in one Spark application and then write the results which are almost always much smaller than the input data.
- Be very careful with functions like collect and coalesce. These functions and others can be used to gather all of your data onto a single node. If you have more data than can fit into memory on the single node then this will cause a crash.
- If you are coming from Pandas or R dataframes then you may find it awkward to write code in Spark. Depending on your workflow, you may be able to do most of the processing in Spark and then write out a subset of the data to work with in Python or R.
- Be sure to learn about caching or persisting datasets. You can speed-up your computations by storing intermediate results in memory or on disk.
- Be aware that lazy evaluation is used for transforms. That is, a join operation, for example, will appear to complete instantaneously. Transforms enter as a node in a computation graph. Spark will only execute computation graph when the user specifies an action in the Spark script such as displaying the last row of a dataframe produced by a join operation. This contrasts with imperative programming where lines of code are evaluated immediately.
Pandas on Spark
There is a Pandas API to Spark called "Pandas on Spark". See the getting started guide and API. Below is an example session:
$ module load anaconda3/2021.11 $ conda create --name pyspark-env pyspark pyarrow pandas -y $ salloc -N 1 -n 1 -t 30:00 $ module load anaconda3/2021.11 spark/hadoop3.2/3.2.0 $ conda activate pyspark-env $ spark-start $ pyspark >>> import pyspark.pandas as ps >>> s = ps.Series([1, 2, 3])
Packaging Scala Spark applications with SBT
name := "Hello World"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
// Spark dependency
"org.apache.spark" % "spark-core_2.10" % "1.3.0" % "provided",
{other dependencies}
)
sbt package
That will create necessary folder structure and produce a jar file.
Spark job submission and monitoring
spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
Command line argument | Definition |
---|---|
--class | The entry point for your application |
--master | The master URL. E.g. local, yarn-client, yarn-cluster |
--deploy-mode | Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) |
--conf | Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). |
application-jar | Path to a bundled jar including your application and all dependencies. |
application-arguments | Arguments passed to the main method of your main class, if any |