Cluster Final Note

Lecture 1: Introduction

What are Clouds?

An emerging computing paradigm where data and services reside in massively scalable data centers and can be ubiquitously accessed from any connected devices over the Internet.

  • SaaS:Software-as-a-Service

    A provider licenses an application to customers for use as a service on demand. (Gmail/Hotmail, Web hosting, etc.)

  • PaaS:Platform-as-a-Service

    Provide the software platform where systems run on. The sizing of the hardware resources demanded by the execution of the services is made in a transparent manner

  • IaaS: Infrastructure-as-a-Service

    Through virtualization, split, assign and dynamically resize the resources to build ad-hoc systems as demanded by customers, or the service providers (SPs).

  • Containers as a Service (Caas):

    Docker Cloud, Amazon Amazon Elastic Container Service (ECS).

  • Functions as a Service (FaaS):

    AWS Lambda, Google Cloud Functions, Azure Functions, etc

aas

caas

Virtualization

vmm

Cloud Model

  • Public Cloud

    Cloud infrastructure is available to the general public, owned by organization selling cloud services

  • Private Cloud

    Cloud infrastructure for single organization only, may be managed by the org or a 3rd party (Virtual Private Cloud)

  • Community Cloud

    Cloud infrastructure shared by several organizations that have shared concerns, managed by org or 3rd party (e.g., Amazon)

  • Hybrid Cloud

    Combo of >=2 clouds bound by standard or proprietary technology

Public Clouds

  • Large scale infrastructure available on a rental basis
  • Fully customer self-service
  • Accountability is e-commerce based

Private Clouds

A cloud infrastructure operated solely for a single organization, whether managed internally or by a third-party and hosted internally or externally.

  • More expensive, but more secure (behind a firewall) when
    compared to public clouds.

Community Clouds

A community cloud is a multi-tenant infrastructure that is shared among several organizations from a specific community with common concerns

  • High Security
  • High Availability
  • High Performance

Hybrid Cloud

An integrated cloud service utilizing both private and public clouds to perform distinct functions within the same organization

  • Take advantage of the scalability and cost-effectiveness of the public cloud, while keeping sensitive data in the secure environment of a private cloud without exposing to the public cloud

Costs for Moving to the Cloud

Transparency

Must add Internet gateways for apps previously only on LAN

transparency

Security

Must configure firewall rules to limit access

firewall

Flexible Resource Mgmt

any future change must deal with transparency and security issues all over again!

Lecture 2: Cloud Service Model

SaaS

A model of software deployment where an application is hosted as a service provided to customers across the Internet.

  • Apps
  • Cloud Storage
  • Cloud-Based Photo Editors
  • Cloud Gaming
  • SaaS E-commerce Platforms
  • Business & Financial Services
  • Robotics as a Service (RaaS)

PaaS

Cloud providers offer an Internet-based platform to developers who want to create services but don’t want to build their own cloud (infrastructure)

  • Elastic Beanstalk
  • Google App Engine

IaaS

Delivers basic storage and compute capabilities as standardized services over the network

  • Amazon EC2

  • Elastic IP

    Elastic IP addresses are static IP addresses designed for dynamic cloud computing.

Summary

Advantage

  • Reduce hardware and software investment
  • Improved performance
  • Reduced software costs
  • Instant software updates

Disadvantage

  • Requires a constant Internet connection

  • Does not work well with low-speed connections

  • Can be slow:

    1. Virtualization overhead
    2. Extra Network latency
  • Vender Lock-In

    Software developed in Google can not be easily moved to Amazon

  • Service Stability

    Potential problems: outage, overheat, network disconnection, h/w damage, software errors

  • Data lost and Privacy Issue

  • Cost to move enterprise software to Cloud could be high

Lecture 3: HDFS & MapReduce Programming Model

Hadoop

An open-source software framework for storage and large scale processing of data-sets on clusters of commodity hardware.

  • Hadoop Common - contains libraries and utilities needed by other Hadoop modules
  • Hadoop Distributed File System (HDFS) - a distributed filesystem for storing data
  • Hadoop MapReduce - a programming model for large scale data processing
  • Hadoop YARN - resource-management and task scheduling
  • Hadoop Ozone (new in Hadoop 3.2.0): a distributed keyvalue store for Hadoop (similar to Amazon S3)

Hadoop 1.x

  • Job Tracker @ master node

    Keeps track of all the MapReduce jobs that are running on various nodes

  • Name Node @ master node

    Stores metadata : file/chunk namespaces, file-to-chunk mapping, location of each chunk’s replicas. All in memory!Oversees and coordinates the data storage function

  • Task Tracker @ each slave node

    a slave to the Job Tracker, Launches child processes (JVMs) to execute the map or reduce tasks

  • Data Node @ each slave node

    a slave to the Name Node, stores data on the local file system (e.g., Linux ext3/ext4)

hd1

hd11

Hadoop 2.x

In Hadoop 2.x, the JobTracker and TaskTracker no longer exist and have been replaced by 3 components:

  • ResourceManager

  • NodeManager

    a worker daemon that can launch ApplicationMaster and task Containers (running mappers or reducers).

  • ApplicationMaster

    A supervisory task that requests the resources needed for executing tasks based on “containers”, responsible for tracking task status and monitoring for progress.

Master Node(s): Typically one machine in the cluster is designated as the NameNode (NN) and another machine as the ResourceManager (RM), exclusively.

Slave Nodes: The rest of the machines in the cluster act as both DataNode (DN) and NodeManager (NM). These are the slaves.

hd2

Users can bundle their Map Reduce code in a jar file and execute it using this command: yarn jar

HDFS

  • Files stored as blocks

    an HDFS file is chopped up into 64 MB/128MB blocks, each block will reside on a different data node

  • Reliability through replication

    Each block is replicated across 3+ data nodes

    Stored as local files on Linux filesystem

  • Single master to coordinate access, keep metadata

    Simple centralized master per Hadoop cluster

    Manages metadata (doesn’t store the actual data chunks)

    Periodic heartbeat messages to checkup on slave servers

  • Files in HDFS are write-once and have strictly one writer at any time

  • Emphasis is on high throughput of data access rather than low latency of data access

Writing files to HDFS

For each block, Client consults Name Node.

Client writes block directly to one Data Node.

write

Data Replication

Find it in /opt/hadoop-2.7.5/etc/hadoop/hdfs-site.xml

<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>
</configuration>

Top-of-Rack Switch (ToR Switch)

In the top-of-rack design, servers connect to one Ethernet switch or two (e.g., Gigabit Ethernet switch or 10GbE switch) installed inside the rack.

tor

tor2

Replica Placement Policy

  • 1st one on the same node as the client

  • 2nd on a node in a different rack (off-rack)

  • 3rd on the same rack as the 2nd one, but on a different node

why? Tradeoff between reliability and write bandwidth and read bandwidth

  • Network performance issue:

    Communication in-rack a higher bandwidth, lower latency (good for performance)

    Keep bulky flows in-rack when possible

  • Data loss prevention

    Never lose all data even the entire rack fails

Why not put them in THREE nodes located at three different racks?

  • Seem Good: This maximizes redundancy (better fault tolerance).
    But at the cost of higher bandwidth consumption over ToR switches (2 crossings)

Rack failure << node failure.

The Name Node ensures that each block always has the intended number of replicas, also makes sure not all replicas of a block are located on one single rack.

Under- or over-replicated detection upon receiving a block report from a Data Node.

Over replicated a chooses a replica to remove.

  • Prefer not to reduce the number of racks that host replicas
  • Prefer to remove a replica from the Data Node with the least amount of available disk space

Anatomy of a Block Write&Read

File Write

write2

Pipelined Write

write3

When completed, each Data Node reports to Name Node “block received” with block info.

The initial node of the subsequent blocks of File.txt will vary for each block à Spreading around the hot spots of in-rack and crossrack traffic.

Reading files from HDFS

  • Client receives Data Node list for each block
  • Client picks first Data Node for each block
  • Client reads blocks sequentially

Heartbeats & Block Reports

Data Node sends Heartbeats to Name Node

Every 10th heartbeat is a Block report. Name Node builds metadata from Block reports

heartbeat

Re-replicating Missing Replicas

Missing heartbeats signify lost Nodes.

  • Name Node consults metadata, finds affected data.
  • Name Node consults Rack Awareness script.
  • Name Node tells a Data Node to re-replicate.

miss

Secondary NameNode

If Name Node is down, HDFS is down.

Solution: add a 2nd Name Node

Hadoop 2.x used a single active Name Node and a single Standby Name Node.

  • Connects to 1st Name Node every hour.
  • Housekeeping, backup of Name Node metadata.
  • Saved metadata can rebuild a failed Name Node.

2nn

Basic HDFS Commands

cl

cl2

cl3

MapReduce Programming Model

  • Map: take one operation, apply to many data tuples.

    run in parallel, creating different intermediate values from different input data sets.

  • Reduce: take result, aggregate them.

    run in parallel, each working on a different output key

MapReduce runtime system does the rest for you, including

(1) Fault-tolerance, (2) I/O scheduling, (3) Job status monitoring

mapreduce

Map

Hadoop 1.0: Job Tracker first consults the Name Node to learn which Data Nodes have blocks of File.txt.

Job Tracker delivers Java (Map) code to Nodes with the data block.

map

  • Map Tasks deliver output data over the network.

  • Reduce Task: data output is written to HDFS (disk I/O)

reduce

wc

Scheduling Policy

Schedule a map task on a machine that contains a replica of corresponding input data

Or schedule a map task near the replica of that task’s input data (e.g, at the same rack)

What if data isn’t local?

Why it happens? All the nodes with local data already have too many other tasks running and cannot accept anymore. Name Node suggests other nodes without data in the same rack to run the Map

  • Data local map task: the map task is running local to the machine that contains the actual data.

  • Rack local map task: while the data isn’t local to the node running the map task, it is still on the same rack.

If data local map tasks are more the performance will be improved much.

local

log

Reduce

  • Shuffle/Copy: moving map outputs to the reducers

Each reducer fetches the relevant partition of the output of all the mappers via HTTP. Note: This causes network traffic !

  • Sort

The framework merge sorts Reducer inputs by keys (since different Mappers may have output the same key).

Automatically sorted before they are presented to the Reducer.

  • Reduce

Perform the actual Reduce function that you wrote. The reduce output is normally stored in HDFS for reliability. The output of all Reducers is not re-sorted.

Shuffle Phase : Guarantees that the keys are ordered and all the values with a particular key are presented to the same reducer.

shuffle

flow

Partitioner

The Partitioner partitions the Mapper output, based on the number of Reducers and any custom partitioning logic (you can define it).

partition

part2

spill

Large “Spilled Records“ means High I/O overhead

Timeline

timeline

When to Start Reducers

mapreduce.job.reduce.slowstart.completed.maps

  • = 1.00 : will wait for ALL the mappers to finish before starting the reducers à Underutilized network bandwidth problem.
  • = 0.0 : will start the reducers right away à Reducer “Slot Hoarding” problem (Container activated, but not doing work).
  • Default = 0.05, reducer tasks start when 5% of map tasks are complete

The reduce phase has 3 steps: shuffle, sort, reduce. Even you start Reducer earlier, it only starts partition download from the Mapper nodes (i.e., shuffle phase) ––Reduce op is NOT performed yet!

Specifying the number of Mappers and Reducers

  • Usually no need to set the number of map tasks for the job

    – Default: 1 mapper for 1 block (128MB).

    The user specified number is just a “hint” to the MapReduce framework.

  • NOTE 2: the number of reducers needs to be specified by the user.

M map tasks, R reduce tasks:

  • Make M and R much larger than the number of nodes in cluster
  • One block (64MB) per map is common
  • Other suggestion: set the number of mappers and reducers to the number of cores available minus 1 for each machine.
  • Usually R is smaller than M

Fault Tolerance

  • If a task crashes, Retry on another node.

    OK for a map because it has no dependencies.

    OK for reduce because map’s outputs are saved on disk.

  • If a node crashes

    OK. Re-launch its current tasks on other nodes.

    Re-run any maps the node previously ran.

    Necessary because their output files (saved in disk) were lost along with the crashed node.

  • If a task is going slowly (“straggler”):

    Launch second copy of task on another node (This is called “speculative execution”).

    You have to set:

    mapreduce.map.speculative true

    mapreduce.reduce.speculative true

    Stragglers occur frequently due to failing hardware, software bugs, misconfiguration, etc.

    Single straggler may noticeably slow down a job.

Summary

Mapper:

  • Process a key/value pair to generate intermediate key/value pairs
  • The Mapper outputs are sorted and then partitioned per Reducer.
  • The total # of partitions is the same as the # of reduce tasks.
  • Notifies Master of this partially completed work.
  • Map tasks output is written to the local disk, not to HDFS.

Reducer:

  • Reducer uses the HTTP protocol to retrieve its own partition from the Mapper nodes.
  • Reducer reduces a set of intermediate values which share a key to a smaller set of values.
  • Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

YARN

YARN supports non-MapReduce workloads.

yarn

The YARN Scheduler

Treats each server as a collection of containers
– Container : fixed CPU + fixed memory + (disk, network)
3 main components:

  • Global Resource Manager (RM)

    The ultimate authority that arbitrates resources among all the
    applications in the system

  • Per-server Node Manager (NM)

    The “worker” daemon in YARN; launch the applications’ containers, monitor resource usage and report to ResourceManager.

  • Per-application (job) Application Master (AM)

    Negotiating resources from the RM and working with the NodeManager(s) to execute and monitor the tasks\

What is a Container in YARN

A container is a JVM process. In simple terms, container is a place where a YARN task is run.

  • supervised by the node manager (launch, monitor)
  • scheduled by the resource manager
  • tasks that can run inside a container: Map/Reduce, Spark, Hbase, Hive, ….
  • Note: Container location is determined by the resource manager.
  • Each container has a unique Container Id.
  • It has a specific amount of Resource allocated

hd2modify

am

Yarn (Job) Schedulers

scheduler

Job Execution on YARN: Control Flow

yarnflow

flow1

flowdetail

Memory Setting in Yarn

memory

Java Heap Size & GC

Garbage Collect (GC) time. The closer a Java application gets to full heap utilization, the more often the JVM has to run Full Garbage Collections, blocking any other work and using large amounts of CPU.

  • Physical RAM limit for each Map and Reduce task

  • The JVM heap size limit for each task

  • The amount of virtual memory each task will get

To execute a map or reduce task, YARN will run a JVM within the container. If a YARN container grows beyond its heap size setting, the task will fail.

The Java heap settings should be smaller than the container memory limit because we need reserve memory for Java code.

mm

vpmemo

vpmm

checker

errorc

vocres

vcset

Example

optimize1

optimize2

  • Copyrights © 2019-2020 Rex