Apache Spark is a cluster computing framework for large-scale data processing. It is best known for its ability to cache large datasets in memory between intermediate calculations. The supported interfaces are via Scala, Python, R and Java.
This page provides guidelines for launching Spark on a cluster in the standalone mode using Slurm, which allows easy use of Spark on a majority of the clusters available at Princeton University.
The introductory Spark tutorial provides an introduction to the Spark framework and the submission guidelines for using YARN.
Below is an example Slurm script which can be used to launch standalone Spark on a cluster and to allocate driver and executor programs.
In a sense, the computing resources (memory and CPU) are allocated twice. First, sufficient resources for the Spark application need to be allocated via Slurm. Second, spark-submit resource allocation flags need to be properly specified. The following would allocate 3 full nodes, using 4 tasks per node making use of 5 CPUs per task:
#!/bin/bash #SBATCH --job-name=spark-pi # create a short name for your job #SBATCH --nodes=3 # node count #SBATCH --ntasks-per-node=4 # number of tasks per node #SBATCH --cpus-per-task=5 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=20G # memory per node #SBATCH --time=00:05:00 # total run time limit (HH:MM:SS)
The next part of the script starts the standalone Spark cluster and sets up working and logging directories:
module load anaconda3 spark
echo $MASTER | tee master.txt
Echoing spark $MASTER (the node where the driver program is going to run) is not required. However, it is useful because this is the node where the Spark UI runs. The Spark UI can be accessed from the login node of the cluster using firefox, provided you have X11 forwarding enabled (i.e., ssh -X):
firefox --no-remote http://<master>:8080
<master> is the value of the MASTER variable above. For instance, if master is spark://tiger-i25c1n20-op0:7077 then run:
firefox --no-remote http://tiger-i25c1n20-op0:8080
The last part of the script is to start a Spark application. The current example uses the Pi Monte Carlo calculation written in PySpark, which is available here: https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py
spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100
There are a few parameters to tune for a given Spark application: the number of executors, the number of cores per executor and the amount of memory per executor.
The - -ntasks-per-node parameter specifies how many executors will be started on each node (i.e., a total of 60 executors across 3 nodes in this example). By default, Spark will use 1 core per executor, thus it is essential to specify the - -total-executor-cores, where this number cannot exceed the total number of cores available on the nodes allocated for the Spark application (60 cores resulting in 5 CPU cores per executor in this example).
Finally, the - -executor-memory parameter specifies the memory for each executor. It is 2GB by default, and cannot be greater than the RAM available on a cluster node (125 GB on most Della nodes but varies depending on the cluster).
The Spark memory tutorial discusses memory allocation for Spark applications with Slurm and YARN and how to tune Spark applications in general.
The complete Slurm submission script (i.e., slurm.cmd) is shown below:
#!/bin/bash #SBATCH --job-name=spark-pi # create a short name for your job #SBATCH --nodes=3 # node count #SBATCH --ntasks-per-node=4 # number of tasks per node #SBATCH --cpus-per-task=5 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=20G # memory per node #SBATCH --time=00:05:00 # total run time limit (HH:MM:SS) module purge module load anaconda3 spark spark-start echo $MASTER | tee master.txt spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100
Submit the job from the command line by typing:
Standard Slurm commands described elsewhere can be used for job monitoring and cancellation.
Controlling the Amount of Logging (INFO, WARN, ERROR, ALL, DEBUG)
Most people find all the INFO and WARN logging messages to be distracting. Follow the procedure below to control the number of logging messages that are written to the console:
ssh YourNetID@tigercpu.princeton.edu mkdir ~/.spark && cd .spark module load spark cp -r $SPARK_HOME/conf . && cd conf cp log4j.properties.template log4j.properties # edit log4j.properties by replacing log4j.rootCategory=INFO, console # with this line log4j.rootCategory=ERROR, consoleThen in your Slrum script add:
export SPARK_CONF_DIR=$HOME/.spark/conf Now submit the job and everything except error messages will be suppressed. If you want more information for debugging then edit the line to read:
Viewing Event Logs in the Spark UI After Jobs CompleteBuilding on the directions above for controlling the logger:
mkdir -p $HOME/.spark/logs cd $HOME/.spark/conf # in this directory make a file called spark-defaults.conf with the following contents and replace YourNetID with your NetID spark.eventLog.enabled true spark.eventLog.dir /home/YourNetID/.spark/logs spark.history.fs.logDirectory /home/YourNetID/.spark/logs
Replace YourNetID with your NetID (e.g., /home/ceisgrub/.spark/logs).
Run a job. After it finishes you can see its event logs using the Spark UI as follows:
# start the spark history server $SPARK_HOME/sbin/start-history-server.sh /home/YourNetID/.spark/logs firefox --no-remote http://localhost:18080 # to shutdown the server $SPARK_HOME/sbin/stop-history-server.sh
Note that you can use TurboVNC for accelerated graphics. This exists natively on Perseus through an environment module. For Tiger and Della one can launch TurboVNC and then ssh -X to either Tiger or Della from Tigressdata.
The Jupyter notebook is a popular environment for data science which provides a platform for code development, plotting and full text analysis.
Getting Jupyter notebooks to leverage the compute power of clusters is possible in 2 simple steps:
#SBATCH -N 1
#SBATCH -t 10:00:00
#SBATCH --ntasks-per-node 3
#SBATCH --cpus-per-task 2
module load spark/hadoop2.7/2.2.0
Unlike regular Slurm jobs, one would need to start up a cluster and keep it running while the interactive notebook is used. Therefore, set the -t to the maximum allowed limit so that you do not have to restart often; the --ntasks-per-node and --cpus-per-task as well as -N are up to you to configure.
Starting master on spark://ns-001:7077
and it is usually going to be a top line
2) Start Spark-enabled Jupyter (ipython) notebook (from the headnode typically).
Set environmental variables to launch Spark-enabled Jupyter notebook:
export PYSPARK_PYTHON=`which python`
Next time you launch pyspark, you will be prompted to the Jupyter notebook in the browser GUI:
pyspark --master spark://ns-001:7077 --total-executor-cores 6
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip=127.0.0.1"
And then, on the local machine, establish an ssh-tunnel by typing:
ssh -N -f -L localhost:8889:localhost:8889 email@example.com
The first option -N tells SSH that no remote commands will be executed, and is useful for port forwarding. The second option -f has the effect that SSH will go to background, so the local tunnel-enabling terminal remains usable. The last option -L lists the port forwarding configuration (remote port 8889 to local port 8889).
Note: the tunnel will be running in the background. The notebook can now be accessed from your browser at http://localhost:8889