OUTLINE
- Introduction
- Before You Turn to Spark
- Getting Started with Spark
- Using Spark 2
- Running Batch Jobs under Slurm
- Controlling the Amount of Logging
- Viewing Event Logs in the Spark UI After Jobs Complete
- Machine Learning
- Jupyter Notebooks
- Tuning Spark Applications
Introduction
Apache Spark is a cluster computing framework for large-scale data processing. While Spark is written in Scala, it provides frontends in Python, R and Java. Spark can be used on a range of hardware from a laptop to a large multi-server cluster. See the User Guide and the Spark code on GitHub.
Before You Turn to Spark
Spark is relatively easy to use since users do not need to explicitly concern themselves with the parallelism. However, before turning to Spark make sure that your problem cannot be solved using software that you are already familiar with. For Pandas users, be sure to look at modin and possibility dask. These software libraries allow for multicore operations. Keep in mind that you can allocate tens of CPU-cores and hundreds of gigabytes of RAM on a compute node. There is also NVIDIA Rapids which is GPU-enabled but only the best choice in certain cases. For R users, consider data.table which can take advantage of multiple CPU-cores.
If you can divide your computations into a set of independent jobs (e.g., processing one subset of the data per job) then consider a job array.
If the above options are not useful then you probably have a genuine big data problem and Spark is an excellent choice.
Getting Started with Spark
The introductory Spark tutorial provides an introduction to the Spark framework and the submission guidelines for using YARN.
Using Spark 2 (not recommended)
We recommend using Spark 3 by loading the appropriate environment module. If you need to use Spark 2 then be aware that it can only be used with certain Python modules. Consider using the following choices:
module load anaconda3/2019.10 spark/hadoop2.7/2.4.6
Or use this:
module load anaconda3/2019.10 spark/hadoop2.7/2.2.0
Running Batch Jobs under Slurm
This page provides guidelines for launching Spark 3 on a Research Computing cluster in the standalone mode using Slurm.
Below is an example Slurm script which can be used to launch Spark on a cluster and to allocate the driver and executor programs.
In a sense, the computing resources (memory and CPU-cores) are allocated twice. First, sufficient resources for the Spark application need to be allocated via Slurm. Second, the spark-submit resource allocation flags need to be properly specified. The following would allocate 3 full nodes with 4 tasks per node where each task uses 5 CPU-cores:
#!/bin/bash #SBATCH --job-name=spark-pi # create a short name for your job #SBATCH --nodes=3 # node count #SBATCH --ntasks-per-node=4 # number of tasks per node #SBATCH --cpus-per-task=5 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=20G # memory per node #SBATCH --time=00:05:00 # total run time limit (HH:MM:SS)To use cluster resources optimally, one should use as many cores as possible on each node (a parameter dependent on the cluster). In the case where all the cores are requested, the user should explicitly request all the memory on the node. The next part of the script starts the standalone Spark cluster and sets up working and logging directories:
module purge module load anaconda3/2022.10 spark/hadoop3.2/3.2.0 spark-start echo $MASTER | tee master.txt
Echoing $MASTER (the node where the driver program is going to run) is not required. However, it is useful because this is the node where the Spark user interface (UI) runs. The Spark UI can be accessed from the login node of the cluster using Firefox, provided you have X11 forwarding enabled (i.e., ssh -X):
firefox --no-remote http://<master>:8080
<master> is the value of the MASTER variable above. For instance, if master is spark://tiger-i25c1n20-op0:7077 then run:
firefox --no-remote http://tiger-i25c1n20-op0:8080
The last part of the script is to start a Spark application. The current example uses the Pi Monte Carlo calculation written in PySpark, which is available here: https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py
spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100
There are a few parameters to tune for a given Spark application: the number of executors, the number of cores per executor and the amount of memory per executor.
The --ntasks-per-node parameter specifies how many executors will be started on each node (i.e., a total of 60 executors across 3 nodes in this example). By default, Spark will use 1 core per executor, thus it is essential to specify the - -total-executor-cores, where this number cannot exceed the total number of cores available on the nodes allocated for the Spark application (60 cores resulting in 5 CPU cores per executor in this example).
Finally, the --executor-memory parameter specifies the memory for each executor. It is 2GB by default, and cannot be greater than the RAM available on a cluster node (run the "snodes" command and look at the MEMORY column which is in units of MB).
The Spark memory tutorial discusses memory allocation for Spark applications with Slurm and YARN and how to tune Spark applications in general.
The complete Slurm submission script (i.e., job.slurm) is shown below:
#!/bin/bash #SBATCH --job-name=spark-pi # create a short name for your job #SBATCH --nodes=3 # node count #SBATCH --ntasks-per-node=4 # number of tasks per node #SBATCH --cpus-per-task=5 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=20G # memory per node #SBATCH --time=00:05:00 # total run time limit (HH:MM:SS) module purge module load anaconda3/2022.10 spark/hadoop3.2/3.2.0 spark-start echo $MASTER | tee master.txt spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100
Submit the job from the command line by typing:
sbatch job.slurm
Standard Slurm commands described elsewhere can be used for job monitoring and cancellation.
Here is a version that calculates the command line parameters automatically and a smaller Slurm allocation:
#!/bin/bash #SBATCH --job-name=myjob # create a short name for your job #SBATCH --nodes=1 # node count #SBATCH --ntasks-per-node=2 # total number of tasks across all nodes #SBATCH --cpus-per-task=3 # cpu-cores per task (>1 if multi-threaded tasks) #SBATCH --mem=8G # total memory per node #SBATCH --time=00:10:00 # total run time limit (HH:MM:SS) TEC=$(($SLURM_NNODES * $SLURM_NTASKS_PER_NODE * $SLURM_CPUS_PER_TASK)) EM=$(($SLURM_MEM_PER_NODE / $SLURM_NTASKS_PER_NODE / 1024)) echo "total-executor-cores=${TEC}" echo "executor-memory=${EM}" module purge module load anaconda3/2022.10 spark/hadoop3.2/3.2.0 spark-start spark-submit --total-executor-cores ${TEC} --executor-memory ${EM}G myscript.py
Hands-on Exercise: Submitting a Batch Job
$ ssh <YourNetID>@adroit.princeton.edu $ cd /scratch/network/<YourNetID> $ mkdir spark_batch_job && cd spark_batch_job $ wget https://raw.githubusercontent.com/apache/spark/master/examples/src/main/python/pi.py $ cat pi.py $ wget https://raw.githubusercontent.com/PrincetonUniversity/hpc_beginning_workshop/main/spark_big_data/job.slurm $ cat job.slurm # use a text editor to enter your NetID for email or use: $ sed -i 's$<YourNetID>$aturing$g' job.slurm $ sbatch job.slurm # run the next line after the job is finished $ cat slurm-*.out | grep -v -E 'INFO|WARN'
Controlling the Amount of Logging
Most people find all the INFO and WARN logging messages to be distracting. Follow the procedure below to control the number of logging messages that are written to the console:
ssh <YourNetID>@tigercpu.princeton.edu mkdir ~/.spark && cd .spark module load spark cp -r $SPARK_HOME/conf . && cd conf cp log4j.properties.template log4j.properties # edit log4j.properties by replacing log4j.rootCategory=INFO, console # with this line log4j.rootCategory=ERROR, consoleThen in your Slrum script add:
export SPARK_CONF_DIR=$HOME/.spark/conf
Now submit the job and everything except error messages will be suppressed. If you want more information for debugging then edit the line to read:
log4j.rootCategory=ALL, console
Viewing Event Logs in the Spark UI After Jobs Complete
Building on the directions above for controlling the logger:mkdir -p $HOME/.spark/logs cd $HOME/.spark/conf # in this directory make a file called spark-defaults.conf with the following contents and replace YourNetID with your NetID spark.eventLog.enabled true spark.eventLog.dir /home/<YourNetID>/.spark/logs spark.history.fs.logDirectory /home/<YourNetID>/.spark/logs
Replace YourNetID with your NetID (e.g., /home/ceisgrub/.spark/logs).
Run a job. After it finishes you can see its event logs using the Spark UI as follows:
# start the spark history server $SPARK_HOME/sbin/start-history-server.sh /home/<YourNetID>/.spark/logs firefox --no-remote http://localhost:18080 # to shutdown the server $SPARK_HOME/sbin/stop-history-server.sh
Note that you can use TurboVNC for accelerated graphics. For Tiger and Della one can launch TurboVNC and then ssh -X to either Tiger or Della from Tigressdata.
Machine Learning
See the Intro to the Machine Learning Libraries workshop for an introduction to machine learning in Spark.
Jupyter Notebooks
The Jupyter notebook is a popular environment for data science which provides a platform for code development, plotting and full text analysis.
Getting Jupyter notebooks to leverage the compute power of clusters is possible in 2 simple steps:
#!/bin/bash #SBATCH --nodes=1 #SBATCH --ntasks-per-node=3 #SBATCH --cpus-per-task=2 #SBATCH --time=10:00:00 module purge module load spark/hadoop3.2/3.2.0 spark-start echo $MASTER sleep infinity
Unlike regular Slurm jobs, one would need to start up a cluster and keep it running while the interactive notebook is used. Therefore, set the -t to the maximum allowed limit so that you do not have to restart often; the --ntasks-per-node and --cpus-per-task as well as -N are up to you to configure.
Starting master on spark://ns-001:7077
and it is usually going to be a top line
2) Start Spark-enabled Jupyter (ipython) notebook (from the headnode typically).
which python
Set environmental variables to launch Spark-enabled Jupyter notebook:
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
export PYSPARK_PYTHON=`which python`
Next time you launch pyspark, you will be prompted to the Jupyter notebook in the browser GUI:
pyspark --master spark://ns-001:7077 --total-executor-cores 6
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip=127.0.0.1"
And then, on the local machine, establish an ssh-tunnel by typing:
ssh -N -f -L localhost:8889:localhost:8889 <YourNetID>@<clustername>.princeton.edu
The first option -N tells SSH that no remote commands will be executed, and is useful for port forwarding. The second option -f has the effect that SSH will go to background, so the local tunnel-enabling terminal remains usable. The last option -L lists the port forwarding configuration (remote port 8889 to local port 8889).
Note: the tunnel will be running in the background. The notebook can now be accessed from your browser at http://localhost:8889
OnDemand Jupyter via MyAdroit, MyDella and MyStellar
Another approach (if you only need 1 node) would be to make a Conda environment and then use OnDemand Jupyter with the custom environment:
$ conda create --name spark-env pyspark ipykernel -c conda-forge -y
Tuning Spark Applications
See this page for tips on improving the performance of your Spark jobs.