Spark application submission via Slurm

Apache Spark is a cluster computing framework for large-scale data processing. It is best known for its ability to cache large datasets in memory between intermediate calculations. The supported interfaces are via Scala, Python, R and Java.

This page provides guidelines for launching Spark on a cluster in the standalone mode using Slurm, which allows easy use of Spark on a majority of the clusters available at Princeton University.

The introductory Spark tutorial provides an introduction to the Spark framework and the submission guidelines for using YARN.

Below is an example Slurm script which can be used to launch standalone Spark on a cluster and to allocate driver and executor programs.

In a sense, the computing resources (memory and CPU) are allocated twice. First, sufficient resources for the Spark application need to be allocated via Slurm. Second, spark-submit resource allocation flags need to be properly specified. The following would allocate 3 full nodes, using 4 tasks per node making use of 5 CPUs per task:

#SBATCH --job-name=spark-pi      # create a short name for your job
#SBATCH --nodes=3                # node count
#SBATCH --ntasks-per-node=4      # number of tasks per node
#SBATCH --cpus-per-task=5        # cpu-cores per task (>1 if multi-threaded tasks)
#SBATCH --mem=20G                # memory per node
#SBATCH --time=00:05:00          # total run time limit (HH:MM:SS)
To use cluster resources optimally, one should use as many cores as possible on each node (a parameter dependent on the cluster). In the case where all the cores are requested, the user should explicitly request all the memory on the node.

The next part of the script starts the standalone Spark cluster and sets up working and logging directories:

module purge
module load anaconda3/2019.10 spark/hadoop2.7/2.4.6
echo $MASTER | tee master.txt

Echoing spark $MASTER (the node where the driver program is going to run) is not required. However, it is useful because this is the node where the Spark UI runs. The Spark UI can be accessed from the login node of the cluster using firefox, provided you have X11 forwarding enabled (i.e., ssh -X):

firefox --no-remote http://<master>:8080

<master> is the value of the MASTER variable above. For instance, if master is spark://tiger-i25c1n20-op0:7077 then run:

firefox --no-remote http://tiger-i25c1n20-op0:8080

The last part of the script is to start a Spark application. The current example uses the Pi Monte Carlo calculation written in PySpark, which is available here:

spark-submit --total-executor-cores 60 --executor-memory 5G 100

There are a few parameters to tune for a given Spark application: the number of executors, the number of cores per executor and the amount of memory per executor.

The - -ntasks-per-node parameter specifies how many executors will be started on each node (i.e., a total of 60 executors across 3 nodes in this example). By default, Spark will use 1 core per executor, thus it is essential to specify the - -total-executor-cores, where this number cannot exceed the total number of cores available on the nodes allocated for the Spark application (60 cores resulting in 5 CPU cores per executor in this example).

Finally, the - -executor-memory parameter specifies the memory for each executor. It is 2GB by default, and cannot be greater than the RAM available on a cluster node (125 GB on most Della nodes but varies depending on the cluster).

The Spark memory tutorial discusses memory allocation for Spark applications with Slurm and YARN and how to tune Spark applications in general.

The complete Slurm submission script (i.e., slurm.cmd) is shown below:

#SBATCH --job-name=spark-pi      # create a short name for your job
#SBATCH --nodes=3                # node count
#SBATCH --ntasks-per-node=4      # number of tasks per node
#SBATCH --cpus-per-task=5        # cpu-cores per task (>1 if multi-threaded tasks)
#SBATCH --mem=20G                # memory per node
#SBATCH --time=00:05:00          # total run time limit (HH:MM:SS)

module purge
module load anaconda3/2019.10 spark/hadoop2.7/2.4.6
echo $MASTER | tee master.txt

spark-submit --total-executor-cores 60 --executor-memory 5G 100

Submit the job from the command line by typing:

sbatch slurm.cmd

Standard Slurm commands described elsewhere can be used for job monitoring and cancellation.

Here is a version that calculates the command line parameters automatically:

#!/bin/bash -x
#SBATCH --job-name=myjob         # create a short name for your job
#SBATCH --nodes=1                # node count
#SBATCH --ntasks-per-node=2      # total number of tasks across all nodes
#SBATCH --cpus-per-task=3        # cpu-cores per task (>1 if multi-threaded tasks)
#SBATCH --mem=8G                 # total memory per node
#SBATCH --time=00:10:00          # total run time limit (HH:MM:SS)


module purge
module load anaconda3/2019.10 spark/hadoop2.7/2.4.6

spark-submit --total-executor-cores ${TEC} --executor-memory ${EM}G


Controlling the Amount of Logging

Most people find all the INFO and WARN logging messages to be distracting. Follow the procedure below to control the number of logging messages that are written to the console:

ssh <YourNetID>
mkdir ~/.spark && cd .spark
module load spark
cp -r $SPARK_HOME/conf . && cd conf

# edit by replacing 
log4j.rootCategory=INFO, console
# with this line
log4j.rootCategory=ERROR, console
Then in your Slrum script add:

export SPARK_CONF_DIR=$HOME/.spark/conf Now submit the job and everything except error messages will be suppressed. If you want more information for debugging then edit the line to read:

log4j.rootCategory=ALL, console


Viewing Event Logs in the Spark UI After Jobs Complete

Building on the directions above for controlling the logger:
mkdir -p $HOME/.spark/logs
cd $HOME/.spark/conf

# in this directory make a file called spark-defaults.conf with the following contents and replace YourNetID with your NetID
spark.eventLog.enabled           true
spark.eventLog.dir               /home/YourNetID/.spark/logs
spark.history.fs.logDirectory    /home/YourNetID/.spark/logs

Replace YourNetID with your NetID (e.g., /home/ceisgrub/.spark/logs).

Run a job. After it finishes you can see its event logs using the Spark UI as follows:

# start the spark history server
$SPARK_HOME/sbin/ /home/<YourNetID>/.spark/logs

firefox --no-remote http://localhost:18080

# to shutdown the server

Note that you can use TurboVNC for accelerated graphics. This exists natively on Perseus through an environment module. For Tiger and Della one can launch TurboVNC and then ssh -X to either Tiger or Della from Tigressdata.


Jupyter Notebooks

The Jupyter notebook is a popular environment for data science which provides a platform for code development, plotting and full text analysis.

Getting Jupyter notebooks to leverage the compute power of clusters is possible in 2 simple steps:

1) Allocate resources for Spark cluster using Slurm (as described in the beginning of this article). This can be done by submitting the following script with sbatch:

#SBATCH -t 10:00:00
#SBATCH --ntasks-per-node 3
#SBATCH --cpus-per-task 2
module load spark/hadoop2.7/2.2.0
echo $MASTER
sleep infinity

Unlike regular Slurm jobs, one would need to start up a cluster and keep it running while the interactive notebook is used. Therefore, set the -t to the maximum allowed limit so that you do not have to restart often; the --ntasks-per-node and --cpus-per-task as well as -N are up to you to configure.

This Slurm job will start the master on available node and print the master name in the slurm.out file, it will look something like:
Starting master on spark://ns-001:7077

and it is usually going to be a top line

2) Start Spark-enabled Jupyter (ipython) notebook (from the headnode typically).
Locate your python environment:

which python

Set environmental variables to launch Spark-enabled Jupyter notebook:

export PYSPARK_DRIVER_PYTHON=/path/to/your/jupyter
export PYSPARK_PYTHON=`which python`

Next time you launch pyspark, you will be prompted to the Jupyter notebook in the browser GUI:

pyspark --master spark://ns-001:7077 --total-executor-cores 6
Make sure the number of executors and total number of cores are consistent with what you request in the Slurm submission script.
Often times, you would not want to output Jupyter notebook to the browser running on a headnode or a worker node on the cluster. If that is the case, modify the PYSPARK_DRIVER_PYTHON_OPTS to be:
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip="

And then, on the local machine, establish an ssh-tunnel by typing:

ssh -N -f -L localhost:8889:localhost:8889 <YourNetID>

The first option -N tells SSH that no remote commands will be executed, and is useful for port forwarding. The second option -f has the effect that SSH will go to background, so the local tunnel-enabling terminal remains usable. The last option -L lists the port forwarding configuration (remote port 8889 to local port 8889).

Note: the tunnel will be running in the background. The notebook can now be accessed from your browser at http://localhost:8889