A cluster computing framework for large-scale data processing on the HPC Clusters

Introduction

Apache Spark is a cluster computing framework for large-scale data processing. While Spark is written in Scala, it provides frontends in Python, R and Java. Spark can be used on a range of hardware from a laptop to a large multi-server cluster. See the User Guide and the Spark code on GitHub.

Before You Turn to Spark

Spark is relatively easy to use since users do not need to explicitly concern themselves with the parallelism. However, before turning to Spark make sure that your problem cannot be solved using software that you are already familiar with. For Pandas users, be sure to look at modin and possibility dask. These software libraries allow for multicore operations. Keep in mind that you can allocate tens of CPU-cores and hundreds of gigabytes of RAM on a compute node. There is also NVIDIA Rapids which is GPU-enabled but only the best choice in certain cases. For R users, consider data.table which can take advantage of multiple CPU-cores.

If you can divide your computations into a set of independent jobs (e.g., processing one subset of the data per job) then consider a job array.

If the above options are not useful then you probably have a genuine big data problem and Spark is an excellent choice.

Getting Started with Spark

The introductory Spark tutorial provides an introduction to the Spark framework and the submission guidelines for using YARN.

Using Spark 2 (not recommended)

We recommend using Spark 3 by loading the appropriate environment module. If you need to use Spark 2 then be aware that it can only be used with certain Python modules. Consider using the following choices:

module load anaconda3/2019.10 spark/hadoop2.7/2.4.6

Or use this:

module load anaconda3/2019.10 spark/hadoop2.7/2.2.0

Running Batch Jobs under Slurm

This page provides guidelines for launching Spark 3 on a Research Computing cluster in the standalone mode using Slurm.

Below is an example Slurm script which can be used to launch Spark on a cluster and to allocate the driver and executor programs.

In a sense, the computing resources (memory and CPU-cores) are allocated twice. First, sufficient resources for the Spark application need to be allocated via Slurm. Second, the spark-submit resource allocation flags need to be properly specified. The following would allocate 3 full nodes with 4 tasks per node where each task uses 5 CPU-cores:

#!/bin/bash
        #SBATCH --job-name=spark-pi      # create a short name for your job
        #SBATCH --nodes=3                # node count
        #SBATCH --ntasks-per-node=4      # number of tasks per node
        #SBATCH --cpus-per-task=5        # cpu-cores per task (>1 if multi-threaded tasks)
        #SBATCH --mem=20G                # memory per node
        #SBATCH --time=00:05:00          # total run time limit (HH:MM:SS)

To use cluster resources optimally, one should use as many cores as possible on each node (a parameter dependent on the cluster). In the case where all the cores are requested, the user should explicitly request all the memory on the node. The next part of the script starts the standalone Spark cluster and sets up working and logging directories:

module purge
        module load anaconda3/2022.10 spark/hadoop3.2/3.2.0
        spark-start
        echo $MASTER | tee master.txt

Echoing $MASTER (the node where the driver program is going to run) is not required. However, it is useful because this is the node where the Spark user interface (UI) runs. The Spark UI can be accessed from the login node of the cluster using Firefox, provided you have X11 forwarding enabled (i.e., ssh -X):

firefox --no-remote http://<master>:8080

<master> is the value of the MASTER variable above. For instance, if master is spark://tiger-i25c1n20-op0:7077 then run:

firefox --no-remote http://tiger-i25c1n20-op0:8080

The last part of the script is to start a Spark application. The current example uses the Pi Monte Carlo calculation written in PySpark, which is available here: https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py

spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100

There are a few parameters to tune for a given Spark application: the number of executors, the number of cores per executor and the amount of memory per executor.

The --ntasks-per-node parameter specifies how many executors will be started on each node (i.e., a total of 60 executors across 3 nodes in this example). By default, Spark will use 1 core per executor, thus it is essential to specify the - -total-executor-cores, where this number cannot exceed the total number of cores available on the nodes allocated for the Spark application (60 cores resulting in 5 CPU cores per executor in this example).

Finally, the --executor-memory parameter specifies the memory for each executor. It is 2GB by default, and cannot be greater than the RAM available on a cluster node (run the "snodes" command and look at the MEMORY column which is in units of MB).

The Spark memory tutorial discusses memory allocation for Spark applications with Slurm and YARN and how to tune Spark applications in general.

The complete Slurm submission script (i.e., job.slurm) is shown below:

#!/bin/bash
        #SBATCH --job-name=spark-pi      # create a short name for your job
        #SBATCH --nodes=3                # node count
        #SBATCH --ntasks-per-node=4      # number of tasks per node
        #SBATCH --cpus-per-task=5        # cpu-cores per task (>1 if multi-threaded tasks)
        #SBATCH --mem=20G                # memory per node
        #SBATCH --time=00:05:00          # total run time limit (HH:MM:SS)
        module purge
        module load anaconda3/2022.10 spark/hadoop3.2/3.2.0
        spark-start
        echo $MASTER | tee master.txt
        spark-submit --total-executor-cores 60 --executor-memory 5G pi.py 100

Submit the job from the command line by typing:

sbatch job.slurm

Standard Slurm commands described elsewhere can be used for job monitoring and cancellation.

Here is a version that calculates the command line parameters automatically and a smaller Slurm allocation:

#!/bin/bash
        #SBATCH --job-name=myjob         # create a short name for your job
        #SBATCH --nodes=1                # node count
        #SBATCH --ntasks-per-node=2      # total number of tasks across all nodes
        #SBATCH --cpus-per-task=3        # cpu-cores per task (>1 if multi-threaded tasks)
        #SBATCH --mem=8G                 # total memory per node
        #SBATCH --time=00:10:00          # total run time limit (HH:MM:SS)
        TEC=$(($SLURM_NNODES * $SLURM_NTASKS_PER_NODE * $SLURM_CPUS_PER_TASK))
        EM=$(($SLURM_MEM_PER_NODE / $SLURM_NTASKS_PER_NODE / 1024))
        echo "total-executor-cores=${TEC}"
        echo "executor-memory=${EM}"
        module purge
        module load anaconda3/2022.10 spark/hadoop3.2/3.2.0
        spark-start
        spark-submit --total-executor-cores ${TEC} --executor-memory ${EM}G myscript.py

 

Hands-on Exercise: Submitting a Batch Job

$ ssh <YourNetID>@adroit.princeton.edu
        $ cd /scratch/network/<YourNetID>
        $ mkdir spark_batch_job && cd spark_batch_job
        $ wget https://raw.githubusercontent.com/apache/spark/master/examples/src/main/python/pi.py
        $ cat pi.py
        $ wget https://raw.githubusercontent.com/PrincetonUniversity/hpc_beginning_workshop/main/spark_big_data/job.slurm
        $ cat job.slurm
        # use a text editor to enter your NetID for email or use: $ sed -i 's$<YourNetID>$aturing$g' job.slurm
        $ sbatch job.slurm
        # run the next line after the job is finished
        $ cat slurm-*.out | grep -v -E ‘INFO|WARN’

Controlling the Amount of Logging

Most people find all the INFO and WARN logging messages to be distracting. Follow the procedure below to control the number of logging messages that are written to the console:

ssh <YourNetID>@tigercpu.princeton.edu
        mkdir ~/.spark && cd .spark
        module load spark
        cp -r $SPARK_HOME/conf . && cd conf
        cp log4j.properties.template log4j.properties
        # edit log4j.properties by replacing 
        log4j.rootCategory=INFO, console
        # with this line
        log4j.rootCategory=ERROR, console

Then in your Slrum script add:

export SPARK_CONF_DIR=$HOME/.spark/conf Now submit the job and everything except error messages will be suppressed. If you want more information for debugging then edit the line to read:

log4j.rootCategory=ALL, console

Viewing Event Logs in the Spark UI After Jobs Complete

Building on the directions above for controlling the logger:

mkdir -p $HOME/.spark/logs
        cd $HOME/.spark/conf
        # in this directory make a file called spark-defaults.conf with the following contents and replace YourNetID with your NetID
        spark.eventLog.enabled           true
        spark.eventLog.dir               /home/<YourNetID>/.spark/logs
        spark.history.fs.logDirectory    /home/<YourNetID>/.spark/logs

Replace YourNetID with your NetID (e.g., /home/ceisgrub/.spark/logs).

Run a job. After it finishes you can see its event logs using the Spark UI as follows:

# start the spark history server
        $SPARK_HOME/sbin/start-history-server.sh /home/<YourNetID>/.spark/logs
        firefox --no-remote http://localhost:18080
        # to shutdown the server
        $SPARK_HOME/sbin/stop-history-server.sh

Note that you can use TurboVNC for accelerated graphics. For Tiger and Della one can launch TurboVNC and then ssh -X to either Tiger or Della from Tigressdata.

Machine Learning

See the Intro to the Machine Learning Libraries workshop for an introduction to machine learning in Spark.

Jupyter Notebooks

The Jupyter notebook is a popular environment for data science which provides a platform for code development, plotting and full text analysis.

Getting Jupyter notebooks to leverage the compute power of clusters is possible in 2 simple steps:

1) Allocate resources for Spark cluster using Slurm (as described in the beginning of this article). This can be done by submitting the following script with sbatch:

#!/bin/bash
                        #SBATCH --nodes=1
                        #SBATCH --ntasks-per-node=3
                        #SBATCH --cpus-per-task=2
                        #SBATCH --time=10:00:00
                        module purge
                        module load spark/hadoop3.2/3.2.0
                        spark-start
                        echo $MASTER
                        sleep infinity


Unlike regular Slurm jobs, one would need to start up a cluster and keep it running while the interactive notebook is used. Therefore, set the -t to the maximum allowed limit so that you do not have to restart often; the --ntasks-per-node and --cpus-per-task as well as -N are up to you to configure.

This Slurm job will start the master on available node and print the master name in the slurm.out file, it will look something like:
 

Starting master on spark://ns-001:7077


and it is usually going to be a top line


2) Start Spark-enabled Jupyter (ipython) notebook (from the headnode typically).

Locate your python environment:


which python 

Set environmental variables to launch Spark-enabled Jupyter notebook:

export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
export PYSPARK_PYTHON=`which python` 

Next time you launch pyspark, you will be prompted to the Jupyter notebook in the browser GUI:

pyspark --master spark://ns-001:7077 --total-executor-cores 6

Make sure the number of executors and total number of cores are consistent with what you request in the Slurm submission script.
 

Often times, you would not want to output Jupyter notebook to the browser running on a headnode or a worker node on the cluster. If that is the case, modify the PYSPARK_DRIVER_PYTHON_OPTS to be:
 

export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip=127.0.0.1"


And then, on the local machine, establish an ssh-tunnel by typing:


ssh -N -f -L localhost:8889:localhost:8889 <YourNetID>@<clustername>.princeton.edu

The first option -N tells SSH that no remote commands will be executed, and is useful for port forwarding. The second option -f has the effect that SSH will go to background, so the local tunnel-enabling terminal remains usable. The last option -L lists the port forwarding configuration (remote port 8889 to local port 8889).

Note: the tunnel will be running in the background. The notebook can now be accessed from your browser at http://localhost:8889

OnDemand Jupyter via MyAdroit, MyDella and MyStellar

Another approach (if you only need 1 node) would be to make a Conda environment and then use OnDemand Jupyter with the custom environment:

$ conda create --name spark-env pyspark ipykernel -c conda-forge -y

Tuning Spark Applications

See this page for tips on improving the performance of your Spark jobs.