Spark Final Note

Overview

Goal of Apache Spark
To design a programming model that supports a much wider class of applications than MapReduce, while retaining the properties of MapReduce (fault tolerance, data locality, scalability).

Offers over 80 operators – an alternative to replace MapReduce.
• Spark core is written in Scala.
• Languages binding : Scala, Java, SQL, Python (PySpark) , R (SparkR).
• RDD: in-memory cache for very fast iterative queries à Up to 100x faster than MapReduce.
• Run on a standalone cluster, YARN, Mesos, Kubernetes (containers)

  • Spark SQL: DataFrames-based structured and semistructured data processing, SQL language support
  • Spark Streaming: streaming analytics (Datasets)
    – Support Kafka, Flume, Twitter, Kinesis, or TCP Sockets.
  • Spark GraphX : graph processing (e.g., PageRank)
  • Spark MLlib: machine learning

Cluster Manager Types

  • Standalone cluster
    – a simple cluster manager included with Spark that makes it easy to set up a cluster. “Without Hadoop”

  • YARN cluster

    – the resource manager in Hadoop 2.X.
    – richer resource scheduling capabilities (You learnt it already)
    – /opt/spark/bin/spark-submit –master yarn

  • Mesos cluster

    – Mesos is a scalable and fault-tolerant “distributed systems kernel” written in C++.

  • Kubernetes (Spark 2.3+)
    – Kubernetes, an open-source system for automating deployment, scaling, and management of containerized applications.
    – /opt/spark/bin/spark-submit –master k8s

s1

s1

s2

Weakness of MapReduce:

– “Many problems do not lend themselves to the twostep process of map and reduce”

• MapReduce awkward for big data workloads:
– Iterative applications that need to run the same Mapper and Reducer multiple times (e.g., machine learning, graph algorithms).
– Interactive applications: Web applications for interactive queries (SQL), interactive data analysis (integrating with machine learning).
– Streaming applications: infinitive data stream, need to maintain aggregate state over time.

s3

Weakness of MapReduce
• Support for Batch Processing only
• No streaming data processing to perform near realtime analysis. E.g., clickstream data analysis, real-time remote monitoring, recommendation system.
• Not efficient for in interactive querying of large datasets.

s4

s5

s6

RDD

s7

RDD: a distributed data structure !!
– RDDs are divided into smaller chunks called partitions.
– Partitions of an RDD is distributed across all the nodes in the cluster.
– Two types of operations that you can perform on an RDD: Transformations and Actions
– RDDs are immutable (i.e., can’t be modified once created)

When you can modify an RDD with a transformation, the transformation returns you a new RDD whereas the original RDD remains the same.

Transformation applies some function on an RDD and creates a new RDD, it does not modify the RDD that you apply the function on

Action is used to either save result to some location (e.g., HDFS) or to display it via the driver program

s8

s9

s10

s11

Types of Transformations

s12

s13

s14

s15

Spark are lazy

“All transformations in Spark are lazy”:
• Spark doesn’t compute anything until it gets to the first “action” operation (e.g. reduce or collect), at which point it will recursively compute all the RDDs.

Advantages:
• More efficient: transformations get bundled together and only run on demand, gaining opportunities of optimization.

s16

s17

RDD Internal

Five main properties:
– An RDD contains list of partitions
– function to compute (map, flatmap, ..)
– list of [parent RDD, type (wide/narrow)]
– Optionally, a partitioning scheme (e.g.,hash-partitioned)
– Optionally, a computation placement hint
a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD Partitions

By default, a partition is created for each HDFS block, with size = 64MB (or 128MB). Note: size changes after the transform operation.
• To increase the number of partitions:
– rdd = sc.textFile(“hdfs://… /file.txt”, 400) Meaning split file.txt into 400 partitions.
• Note: compressed files are ‘non-splittable‘ Each ZIP file will be processed by a single mapper.

RDD In-memory Cache

RDD can be cached in RAM for fast access & reuse
Key reason why Spark can achieve up to 100x faster than MapReduce

s18

In some cases, Spark may not have sufficient memory to cache the entire RDD. Only use cache() if the evaluated RDD will be used more than once .

Notes on RDD In-memory Cache
• Note 1: “Cached is just a hint, not a guarantee”.
– If there is not enough space in memory, a partition from the least recently used (LRU) RDD is evicted.
• Note 2: An RDD that hasn’t been cached is not stored anywhere; once consumed, it is garbage-collected.
• Note 3: Cache commands indicate that Spark needs to keep these RDD’s in memory. This will not cause the RDD to be instantly be cached. Instead it will be cached the next time it is loaded into memory

RDD Persistence: rdd.persist()

RDDs by default is recomputed each time an action is run on them.
RDD Persistence:save the intermediate result, so that we can reuse it if required. It reduces the computation overhead !

s19

s20

s21

Which Storage Level to Choose?
• If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way (most CPU-efficient)
• If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
• Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data (easily run out of RAM). Otherwise, recomputing a partition is about as fast as reading it from disk.
• Use the replicated storage levels if you want fast fault recovery.
• All the storage levels provide full fault tolerance by re-computing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to re-compute a lost partition.

Key elements of a Spark cluster

  • Spark Driver: “the main() function of an application”
    – creates SparkContext to schedule jobs execution and negotiate with cluster manager

  • Cluster Manager: manages the resources of a cluster
    – YARN, Mesos, Spark Standalone (run in FIFO order)

  • Workers (slave nodes)
    – Any node that can run application code in the cluster

  • Executors
    – A process launched on a worker node, running tasks scheduled by driver. Each executor is a JVM instance. You can have multiple executors in a single node

  • Deployment mode: cluster mode (driver runs on worker hosts) and client mode (driver is out of the cluster) .

Spark Execution

s22

s23

s24

s25

s26

Note: Spark hosts multiple tasks within the same container for lower overhead of JVM.
– In Spark, tasks are just threads in the same JVM.
– If you set spark.executor.cores > 1, your executor will have multiple parallel threads
– Different from MapReduce which schedules a container (i.e., starts a JVM) for EACH map/reduce task High overhead as JVM has to startup and initialize certain data structures before it can begin running tasks.

s27

s28

Spark Memory Management

s29

spark.executor.memory (Default: 1g) impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins!

s30

s31

Spark Driver & Spark Context

• Spark Driver: Major Components
• Spark Context
– SparkContext object tells Spark how to access a cluster
• DAG Scheduler
– Computes a DAG of stages for each job and submits them to TaskScheduler + Determines preferred locations for tasks
• Task Scheduler
– sending tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers
• Scheduler Backend
– allows plugging in YARN, Mesos, Standalone, ..
• Block Manager @ Executor
– provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap)

s32

s33

s34

s35

DAG Scheduler does three things:

– (1) Computes an execution DAG for a job.
• keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs

– (2) Determines the preferred locations to run each task on based on the current cache status.
• DAGScheduler tracks which RDDs are cached (or persisted) to avoid “recomputing” them

– (3) Handles failures due to shuffle output files being lost (old stages may need to be resubmitted).
• DAGScheduler need to track what ShuffleMapStages have already produced output files.

s36

s37

Spark Execution Flow

s38

s39

s40

s41

Re-compute only the lost partitions of an RDD.
– Narrow dependency: only depends on a few partitions that need to be recomputed.
– Wide dependency: might require to recompute an entire RDD

Checkpoint write data to storage (HDFS) so that the task can be recovered.
– We can specify where the data is saved to (on HDFS)
• E.g., sc.setCheckpointDir(“/tmp/spark/checkpoint”)
– RDD should be cached before checkpoint, so that data will be saved in memory while saving to storage

What kind of RDD needs checkpoint ?
– the computation takes a long time
– the computing chain is too long
– depends too many RDDs
– (Actually, saving the output of ShuffleMapTask on local disk is also checkpoint, but it is just for data output of partition.)
– do a rdd.cache() before rdd.checkpoint()

s42

s43

  • Copyrights © 2019-2020 Rex