Tuning Spark applications

A resilient distributed dataset (RDD) in Spark is an immutable collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster, on different stages. RDD can contain any fundamental types of objects as well as user defined types.

The first step in almost every Spark application is to load an external dataset or to distribute a collection of objects into an RDD. Spark RDDs are lazily evaluated, which means that by default Spark will recompute the RDD and all its dependencies each time an action is called on it (and would not evaluate it if no action is called at all). To avoid recomputing and thus make the code faster one can persist an RDD in memory or on disk (or to split it in some proportion among them), as discussed later in this section.

Spark driver is a main program that declares the transformations and actions on RDDs and submits these requests to the master. This is the program where SparkContext is created.

The workers is where the tasks are executed - executors. They should have resources and network connectivity sufficient to perform transformations and actions on the RDDs defined in the main program.

The two main resources that are allocated for Spark applications are memory and CPU. The disk space and network I/O play an important part in Spark performance as well but neither Spark nor Slurm or YARN actively manage them. The resource negotiation is somewhat different when using Spark via YARN and standalone Spark via Slurm.

Managing CPU resources

The number of cores can be specified in YARN with the - -executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line or in the Slurm submission script and, alternatively, on SparkConf object inside the Spark script. The cores property controls the number of concurrent tasks an executor can run. - -executor-cores 5 means that each executor can run a maximum of five tasks at the same time. 

When using standalone Spark via Slurm, one can specify a total count of executor cores per Spark application with --total-executor-cores flag, which would distribute those uniformly per executor. The best practice would be to adjust the - -total-executor-cores parameter to be equal to the number of nodes times the number of tasks per node allocated for application by Slurm, assuming 2-3 CPU cores per executor (tasks). For instance:

#SBATCH -N 5
#SBATCH --ntasks-per-node 10

would yield --total-executor-cores 100 using the above described rule. It is recommended to use as many cores on a node as possible, when allocating with Slurm's -N option, leaving out 1-2 cores for OS and cluster specific daemons to function properly. 

The - -num-executors YARN flag controls the number of executors requested. One executor is created on each node allocated with Slurm when using Spark in the standalone mode (so that 5 executors would be created in the above example). 

Managing memory resources

The memory resources allocated for a Spark application should be greater than that necessary to cache, shuffle data structures used for grouping, aggregations, and joins.

There are three considerations in tuning memory usage: the amount of memory used by your objects, the cost of accessing those objects, and the overhead of garbage collection (GC).

The - -executor-memory flag controls the executor heap size (similarly for YARN and Slurm), the default value is 2 GB per executor. The - -driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application.

The first step in optimizing memory consumption by Spark is to determine how much memory your dataset would require. This can be done by creating an RDD and caching it while monitoring this in the Spark UI's Storage tab. Alternatively, one can determine that by looking at the SparkContext logs on the driver program (there is no easy way to estimate the RDD size and approximate methods use Spark SizeEstimator's methods).  

By default, Spark uses 60% of the configured executor memory (- -executor-memory) to cache RDDs. The remaining 40% of memory is available for any objects created during task execution. In case your tasks slow down due to frequent garbage-collecting in JVM or if JVM is running out of memory, lowering this value will help reduce the memory consumption. 

Example

Assume there are 6 nodes available on a cluster with 25 core nodes and 125 GB memory per node (this hardware configuration is used in the following example and is close to the Della cluster parameters). It is natural to try to utilize those resources as much as possible for your Spark application, before considering requesting more nodes (which might result in longer wait times in the queue and overall longer times to get the result). 

With YARN, a possible approach would be to use - -num-executors 6 - -executor-cores 24 - -executor-memory 124G. Here, we subtracted 1 core and some memory per node to allow for operating system and/or cluster specific daemons to run. However, this approach would be not be optimal, because large number of cores per executor leads to HDFS I/O throughput and thus significantly slow down the application. Allocating a similar number of cores would be possible by increasing the number of executors and decreasing the number of executor-cores and memory.

A recommended approach when using YARN would be to use - -num-executors 30 - -executor-cores 4 - -executor-memory 24G. Which would result in YARN allocating 30 containers with executors, 5 containers per node using up 4 executor cores each. The RAM per container on a node 124/5= 24GB (roughly).

With Slurm, a similar configuration for a Spark application could be achieved with the following:

#SBATCH -N 6
#SBATCH --ntasks-per-node 5

- -total-executor-cores 120, - -executor-memory 24 G.   

Tuning RDD partitioning

RDD partitioning is a key property to parallelize a Spark application on a cluster. RDDs produced by textFile or hadoopFile methods have their partitions determined by default by the number of blocks on a file system and can be modified by specifying a second argument to these methods. Partitions for RDDs produced by parallelize method come from the parameter given by the user, or spark.default.parallelism if none is given. For RDDs produced as a result of transformation like join, cartesian the partitioning is determined by parent RDDs.

The partitioning of the RDD can be accessed by calling getNumPartitions() method and can be increased or decreased by using repartition() method. Note, that the latter will always result in reshuffling all the data among nodes across network potentially increasing execution times.

The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available. A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. 

The most straightforward way to tune the number of partitions is to look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving. The rule of thumb is, too many partitions is usually better than too few.

Below, an example from the following Cloudera article is shown. 

The memory available to each task is:

(spark.executor.memory*spark.shuffle.memoryFraction*
spark.shuffle.safetyFraction)/spark.executor.cores

Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

The in-memory size of the total shuffle data is harder to determine. The closest heuristic is to find the ratio between Shuffle Spill (Memory) metric and the Shuffle Spill (Disk) for a stage that ran. Then multiply the total shuffle write by this number. However, this can be somewhat compounded if the stage is doing a reduction:

((observed-shuffle-write)*(observed-shuffle-spill-memory)*(spark.executor.cores))/((observed-shuffle-spill-disk)*(spark.executor.memory)*(spark.shuffle.MemoryFraction)*(spark.shuffle.SafetyFraction))