Understanding Map Reduce The missing guide

Understanding Map Reduce The missing guide

 

 

 

Some Facts about Distributed Framework in General .

  • There shouldn’t be any single point of failure.
  • Like any algorithm, Framework should be handle the extreme corner cases .
  • Under any circumstances Framework shouldn’t result in Failure of the machine .
  • End of the Day Distributed framework is all about running processes on different host in a coordinated way.
  • The individual process should honor the constraints of the underlying Operating System .
  • Few constraints from Operating System is a given User has a limit to number of Files it can open (ulimit).
  • All the corner cases the Framework is allowed to perform slow but is not expected to get killed.
  • While Thinking of Map-reduce think of the corner cases like
Corner Case Number of  Mappers Number of  Reducer
1 1 Billion 1
2 1 Billion

 (Like 2,  100 , 1000 , 10 Billion)

3 1 Mapper reading 100 TB of data 1
 4 1 Mapper reading 100 TB of data

 (Like 2,  100 , 1000 , 10 Billion)


Please go through the word count code  for a thorough understanding 

Driver  : ( Job)

Step 1 Meaning
Based on Input Path and Input File Format the number of Split is decide

 The input path is taken into consideration to find the number of slpits.

Do understand the FileInputFormat is used to decide the number of splits.  One can write his own custom InputFileFormat and change the logic of how to split the file

FileInputFormat.setInputPaths(job, inputPath)

job.setInputFormatClass(TextInputFormat.class)

Calculate the Split and store the info in form of XML for Application master to use it and spwn mappers to read the xml  job.submit

 

Mapper :

File(Hdfs) => Split => Start Reading Record By Record From split (RecordReader) => Write to in memory buffer(context.write in mapper) => Whenever buffer gets full => Sort the data and write into appropriate Partitions(Local File System) => For each partitions merge and sort to get less number of files(Local File System) => Data ready for fetch .

Key Take Aways :

  • Number of Mappers = num Split calculated by Driver . One cannot control number of Mappers its decided by the Framework based on InputFormat.
  • Mapper takes in KeyIn, ValueIn and Emits KeyOut, ValueOut.
  • Mapper Output is always sorted .
  • It is the mapper which sorts the output and not reducer , Reducer just Merges the sorted output.
  • Whenever the In memory Buffer gets full, data its flushed on disk (local File System and not HDFS), the flush creates sorted file in appropriate partitions.
  • Number of partition is decided based on Number of Reducer Configured in driver . job.setNumReduceTasks();
  • Hence for a given partition , a number of small files can be there depending on number of Flush that has happened in Mapper.
  • For each partition the small files are merged together (small Files are already sorted hence it has to be merged and outcome will be a sorted file).
  • Each reducer can fetch the appropriate partition from the mapper node Local File System.
  • The Mapper output File-format is SequenceFileFormat irrespective of whichever file format you have specified for Job Input/Output.
  • If its a map only job then map output is not sorted , and the map output file-format is changed to the job output file format.

Reducer :

fetch Data from appropriate partition from each Mapper Node => store the data into local file System => Start merging the files fetched from mapper (its about merging sorted files and hence the outcome is a merged file) => Reduce the number of files to X => Open file pointers to the X files and reduce.

Key Take Aways :

  • Reducer always merges the sorted file , the actual sorting has been done on Mapper.
  • The mapper data is stored in local file System where mapper was executed , The data is shuffled and brought to reducer machine and saved on local file system.
  • The shuffle is a pull of the files from Mapper Nodes(local File System) over HTTPS.
  • The Application Master/JobTracker keeps a track of on which Node the mapper ran. When a reducer is spawned this info is passed to the reducer.
  • The small mapper files are merged and saved back on local File-system to form bigger Files.
  • Reducer has shuffle => Sort => reduce phase . Sort is a misnomer its Merge of sorted files to Generate Sorted Files.

 

 

 Various Cases                                                   Why ?
Why Sort on Map Side  If data wasn’t sorted on Map side , was it even possible to get all the value for a given key at reduce ?

On the reducer side, imagine a situation if mapper output wasn’t sorted. How would the reducer collect all the value for a given key . As the key can be present at any position in any of the mapper files.

Just think of a situation if a given key exist at position 1 in MapperFile 1,  at position 1000 at MapperFile 2, at position Billion at MapperFile3 and so on.

To collect all values of a given key one has to read values pertaining to the  key from MapperFile1 , MapperFile 2, MapperFile 3. To achieve this one needs to scan all the files.

(Just think of each file as an array , and you have to find a given key in all the array what is the complexity of this algorithm)

 Both on Mapper and reducer why merge the     sorted files The Map Reduce framework runs as processes on individual node as a given Linux user (YRAN , HDFS  user).

A given linux user has a restriction on number of files it can open(ulimit Linux) .

Think of 1 billion Mapper and 1 Reducer situation. The MapperFiles from 1 Billion mapper will be fetched by the Reducer and if the Reducer tries to open 1 Billion File at the same time, Linux will kill the process.

Do keep in mind at any point of time  there can be multiple Map Reduce Job which can be running under the same Linux user.  The ulimit Restriction needs to be fulfilled at any given point of time

How many file on Mapper or Reducer are merges at once  mapreduce.task.io.sort.factor   controls the number of files to take in one merge.  Multiple waves of such merges are carried out .
 Whats the Default Mapper Sort Algorithm Quick Sort
 Why Mapper output and Shuffle from Reduce are saved on Local FileSystem and not HDFS The data is temporary and writing onto HDFS(With replication factor) is much slower as compared to writing on local fs.

Data flushed from from In Memory buffer into partitions are smaller are <= HDFS block size . This will lead to too many small file problems on HDFS.

Does Reduce Sorts data Reduce does a Merge of Sorted Files . Merging of Sorted Files results into a sorted file . Do remember the merges are done in phases based on mapreduce.task.io.sort.factor.
How to visualize the Reduce Phase of Reducer N Sorted Arrays are present , one need to find all values for a given key among all the N  sorted arrays .