HADOOP

Hadoop cluster is a special type of computational cluster designed for storing and analyzing vast amount of unstructured data in a distributed computing environment. These clusters run on low cost commodity computers.

ArchitectureHadoop
Figure 1: Cluster of machines running Hadoop at Yahoo! (Source: Yahoo)

The Hadoop framework is an open-source software framework. It is mostly written in the Java programming language, with some native code in C and command line utilities written as shell scripts. Hadoop framework incorporates features similar to those of the Google File System (GFS) and of the MapReduce computing paradigm. It is composed of the following modules:

ArchitectureHadoop
Image Source : Apache Software Foundation

Hadoop Principles

Data provides often from disparate systems and is then imported into the Hadoop Distributed File System. Then, it is treated using MapReduce or one of the many languages (Hive, Pig, etc.) ​​built on top of MapReduce. Finally, the results are filtered, processed, aggregated and exported to one or more external systems.

A set of machines running HDFS and MapReduce is called a Hadoop cluster. Each machine is called a node.

HDFS

Hadoop Distributed File Management System is the Hadoop component in charge of data storage in a Hadoop cluster. Files in HDFS are write-once and have strictly one writer at any time.

File Storage In Blocks

To be stored, a file is broken up in blocks. HDFS splits files into 64MB blocks. This is the default value, configurable in conf/hdfs-site.xml by modifying the dfs.blocksize parameter.

HDFS

HDFS

HDFS is based on the OS file system which is the underlying file system. Since the underlying file system also stores files as blocks, one Hadoop block may consist of many blocks in the underlying file system.

Most often, the last block of a file is smaller than the specified size.

EXAMPLE Assuming the block default size is 64MB, a 150MB file will be split in three blocks (64 MB + 64MB + 22MB). On the last block 42MB will remain free (Only a few KB will be lost : approximately the size of a block in the native system on which is based HDFS)

HDFS

HDFS

HDFS is optimized for handling large blocks. Working on important blocks maximizes data transfer rates and limits seek time on hard drives.

EXAMPLE Supposing we want to access a 1000MB file (1024 000KB) : With a 4KB block size, 256,000 requests would be necessary to get that file (1 request per block). In HDFS, each request has to be processed by the Name Node to figure out where that block can be found. If you use 64MB blocks, the number of requests goes down to 16, greatly reducing the cost of overhead and load on the Name Node. For comparison, traditional File Systems, as EXT3 or EXT4 on Linux are optimized for handling small blocks of a few KB.

 HDFS performance is better when working on large files and on a small number of files

Data Replication

Each block may be replicated to multiple DataNodes. This replication factor (default 3) is determines by the dfs.replication parameter in HDFS-site.xml. This mechanism ensures the reliability and availability of data.

HDFS

HDFS

Image Source : slideshare.net

Hadoop.Apache.org : The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is responsive. A Blockreport contains a list of all blocks on a DataNode. Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks. For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack. To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a replica that is closest to the reader. If there exists a replica on the same rack as the reader node, then that replica is preferred to satisfy the read request.

HDFS
Image source : hadoop.apache.org

 HDFS provides redundant storage for very large volumes of data at an acceptable cost (midrange and low cost machines can be used).

HDFS architecture

HDFS has a master and slave kind of architecture.

Hadoop 1.0 HDFS architecture

HDFS operates within three types of deamons in Hadoop 1.0.

Namenode acts as master and Datanodes as workers.

HDFS
Image source : http://saphanatutorial.com

Assuming a one hundred machines cluster. Three machines will host Master Nodes: NN + SNN + JOBTracker and 97 machines will host slave nodes (DataNodes).

NameNode or Master Node

DIRECTORY TREE?????

HDFS

HDFS

The NameNode does not store the actual data or the dataset but only metadata. The data itself is actually stored in the DataNodes.

Metadata is stored in the NameNode’s local file system in a file called FsImage

The NameNode is usually configured with a lot of memory (RAM) because the FsImage file is loaded in the RAM as cluster starts. The entire file system namespace, including the mapping of blocks to files and file system properties, is stored in the FsImage file.

Every change that occurs in a running cluster to file system metadata is reflected in the memory (RAM) and also stored in a log file (edits) in the NameNode’s local file system too.

Creating, Deleting or modifying a file in HDFS causes the NameNode to insert a record into the EditLog. Similarly, changing the replication factor of a file causes a new record to be inserted into the EditLog.

When the NameNode starts up, it reads the FsImage and EditLog from disk, applies all the transactions from the EditLog to the in-memory representation of the FsImage, and flushes out this new version into a new FsImage on disk.

The NameNode is critical to HDFS in Hadoop 1.0. If the NameNode is unresponsive, HDFS/Hadoop entire cluster is no more accessible. In Hadoop version 1.0, NameNode is a Single Point Of Failure (SPOF). But Since Hadoop 2.x, a mirror machine prevents this to happen.

Location of blocks on the cluster is neither stored in the fsimage file nor in the edits file. This choice allows administrators to retrieve or add DataNodes when Cluster is stopped. When restarting, commucation between DataName and DataNodes allows the NameNode to bring the location of blocks up to date in memory. This process takes time.

Hadoop 1.0 SecondaryNameNode

Hadoop 2.0 HDFS architecture

Hadoop 1.0 HDFS architecture is reconsidered in Hadoop 2.0 HDFS architecture providing solutions to Hadoop1.0 limitations :

Hadoop 1.0 SecondaryNameNode => CheckPointNode

In Hadoop 2.0 The checkpoint node is a new implementation of the SecondaryNameNode. The Checkpoint Node fetches periodically fsimage and edits from the NameNode and merges them. The Checkpoint Node uploads the merged fsimage to the NameNode. The resulting state is called checkpoint. Checkpointing is triggered automatically by configuration policies or manually by HDFS administration commands.

The difference between SNN and Check point NN is that SNN stores the data locally in file system but it does not upload the merged fsimage (with edit logs) to the NameNode while CheckPointNode uploads the merged fsimage to the NameNode.

Hadoop 2.0 Standby NameNode

Hadoop 2.0 provides the option of running two redundant NameNodes in the same cluster in an Active/Passive mode : * One Primary NameNode * A Standby NameNode.

They both share an edits log. All namespace edits are logged to a shared NFS storage. The passive NameNode reads from the this storage and keeps updated metadata information for cluster. In case of Active NameNode failure, the passive NameNode becomes the active NameNode.

HDFS

HDFS

Using Java API to Read Data From HDFS

The following steps are involved in reading a file from HDFS:

HDFS

HDFS

Step 1: First client opens the required file by calling open() on an instance of the DistributedFileSystem java class. Open() initiates HDFS client for the read request.

Step 2: a DistributedFileSystem object can call the Namenode, using RPC (Remote Procedure Call), to get the block locations of file to be read. NameNode returns : ICI CE N EST PAS CLAIR : * for each block, the sorted address of Datanode that holds the copy of that block. Here sorting is done based on the proximity of Datanode from Namenode, picking up the nearest Datanode first. * an instance of FSDataInputStream. This FSDataInputStream object wraps a DFSInputStream object which stores datanode locations for the first batch of blocks.

Step 3: Client invokes read() on the FSDataInputStream object : this call leads the instance of DFSInputStream to connect to the closest datanode: to the first block of file in this datanode.

Step 4: Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream. This process of read() operation continues till it reaches end of block.

Step 5: When the end of block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block. This happens transparently to the client, which from its point of view is just reading a continuous stream.

Step 6: Blocks are read in order. The DFSInputStream object opens new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed. When the client has finished reading, it calls close() on the FSDataInputStream.

If Datanode is down during reading or DFSInputStream instance encounters an error during communication, DFSInputStream object will switch to next available Datanode where replica can be found. DFSInputStream memorizes the Datanode which encountered an error so that it does not use them for later blocks.

Using Java API To Write Data To HDFS

HDFS
Image source : unknown

HDFS is a type of write once File System.

Step 1: The client creates the file by calling create() method on a DistributedFileSystem object.

Step 2: this DistributedFileSystem object makes an RPC call to the namenode to create a new file in the filesystem’s namespace.

The namenode performs various checks to make sure

The RPC call on the DistributedFileSystem instance returns a DFSDataOutputStream object for the client to start writing data to.

Step 3: As the client writes data, the DFSOutputStream instance splits it into packets which are stored in the data queue. The data queue is consumed by a DataStreamer object.

This DataStreamer object is responsible for asking the namenode to allocate new blocks by picking a list of suitable datanodes. The list of datanodes is written in a pipeline. We assume the replication level is three, so there are three datanodes in the pipeline. DataStreamer streams the packets to the first datanode which stores the packet. Then, the first DataNode forwards it to the second datanode.

Step 4: Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipeline.

The client only has to transfer one replica to the first data node, and each node only gets and sends one replica over the network (except the last data node which only receives data). Thereby it leads to a much more balanced network bandwidth consumption compared to the client writing three replicas into three different datanodes.

Step 5: The DFSOutputStream object also maintains an internal queue of acknowledgment packets, called the ack queue. An aknowledgenent packet is removed from the ack queue only when it has been acknowledged by all the datanodes from the pipeline.

Step 6: When the client has finished writing data, it calls close() on the stream.

Step 7: This action waits for acknowledgments before contacting the namenode to signal that the file is complete. The namenode already knows which blocks the file is made up of, so it only has to wait for blocks to be minimally replicated before returning successfully.

Data write operation is considered successful if one replica is successfully written. It is governed by the property dfs.namenode.replication.min in hdfs-default.xml (configuration file). If there is any failure of datanode while writing a replica, the written data is not considered unsuccessful, but under-replicated. Ack packet is independent of the status of data written to datanodes. Even if the data packet is not written the acknowledgement packet is delivered. (DEVELOPPER)

Server Failure Management

NaneNode Failure

In Hadoop 1.x : If the NameNode fails, the entire cluster fails. The NameNode is The Single Point Of Failure. But since Hadoop 2.0, the standy NameNode is implemented as a miror and becomes active in case of NameNode failure.

DataNode Failure

DataNodes send Heartbeat message every 3 seconds to confirm they are still alive to the NameNode. If the NameNode do not get any messages from a DataNode for 10 minutes. This DataNode is considered to be dead.

Network Failure

Whenever data is sent, an ACK message is sent back from the host. If this Ack message is not received (after several tries). The sender assumes that the host is inactive or that the network failed.

Data Corruption

Detection is done from DataNodes and from the NameNode :

In Hadoop 1.0 Heartbeat signal and block reports are sent to NameNode only. In Hadoop 2.0 heartbeat signal and block-reports are sent to both active Namenode and Stand-by NameNode.

Il faut organiser l'info Hadoop 1.0 vs Hadoop 2.0 :

Hadoop 1.0

To process the data, based on heartbeat signals from DataNodes, Job Tracker assigns Task to the Tasks Trackers which are active.

Hadoop 2.0

Heartbeat is implemented in YARN as well. Containers communicate with ApplicationMaster and Node manager as well.

HDFS

HDFS

source Hadoop.Apache.Org ArchitectureHadoop