Understanding Spark through Map Reduce

Transition From MapReduce To Spark

Trying to Understand Spark from Map Reduce Perspective .

  • If we look at the history of ETL we started with SQL on RDBMS => Excel => Map Reduce => Pig/Hive => Spark .
  • Its just like evolution of Programming Language from C => C++ => Java . In each stage of evolution the problems of priors were tackled and new features were added . Same holds true for the Evolution of ETL.
  • When thinking of a problem its very important to understand what is the level of abstraction we are trying to find the solution. For Example : reading keys from a file and doing a lookup from RDBMS we think in terms of the logical flow of events rather than how File buffer is being maintained by JAVA or Operating System.
  • In Big Data Map Reduce makes us think a very low level , Pig/Hive Makes us think at a higher abstraction level of ETL (Extract Transform and Load ) and later translates into lower level Map Reduce.
  • Spark Abstraction works a higher abstraction similar to PIG/Hive and internally translating the ETL into optimized ETL tasks.
  • The direct translation from Map Reduce to Spark is little difficult as the level of abstraction at which they work is different, yet the concepts remains the same .
  • Please read through  Spark Code  analysis for getting crystal clear clarity

Lets take Word Count Example from Map Reduce and Spark and try to realize whats happening .

 

MapReduce Word Count :

Mapper:

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // As TextInput Format has been used , 
    // the key is the offset of line in the file , The actual line goes in the value

    // Reuse the writables, to avoid GC.
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value,
                    Mapper.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {
            //Writable.Set() replaces the previous content of 
            // the writable Object with the new content
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
}

[addToAppearHere]

Reducer :

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                                            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        context.write(key, new IntWritable(sum));
    }
}

Spark Word Count

JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
stringJavaRDD
        .flatMap(line -> Arrays.asList(line.split(" ")))
        // New Tuple is being formed for every row the in the input
        .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
        // The reduceByKey Api => only take the values , key is not fed in the api
        // Before reduceByKey results into a shuffle hence breaking the DAG into stages
        .reduceByKey((a,b)->a+b)
        //.reduceByKey((count1, count2) -> count1 + count2)
        // How many partitions to slpit the output into
        .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
        .saveAsTextFile(outputPath);

[addToAppearHere]

Map Reduce Word Count  Spark Word Count
InputFormat = TextInputFormat

InputFormat = TextInputFormat.

(Input and output Format comes from hadoop hence same)

 Mappers :

Read data from Split

job.setMapperClass(WordCountMapper.class); job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, inputPath);

Read data from Split. Based on InputFormat used ,

spawned mapTask = Number of Splits.

JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);

In Mapper You Get Key and Value.

Key = Line Offset , Value = Lines

In Mapper Task you get only values as there is  no concept keys in Spark. One gets only Values i.e the whole line known as JavaRDD.

Do remember Its the same TextInputFormat and LineRecordReader. Spark just takes the value from the RecordReader

Inside Mapper , Each Line is split into word. StringTokenizer tokenizer = new StringTokenizer(line);

flatMap(line -> Arrays.asList(line.split(” “)))

 FlatMap is just a transformation thats being applied to each input line. Its   just like writing a UDF in pig/Hive , for every Row this function is called.          A transformation can be attached to either Map Task or Reduce Task or other transformation task .

Transformation are just chaining of functions.

Input line  => function1 => function2 => function3 ….

In Mapper for Each Word , attach 1 as the value .

context.write(word, one);

Again chain another transformation to get another transformed value JavaRDD => FlatMap => mapToPair

mapToPair(word -> new Tuple2<String, Integer>(word, 1)).

This generates Key, Value like Mapper in Map Reduce

If You Look at Spark DAG all these transformation are happening inside the same Mapper Task

On Reducer You Collect for a Word , all the counts .

for (IntWritable value : values) {
sum += value.get();
}

reduceByKey((a,b)->a+b)

Just Like Map Reduce you do Reduce , In Spark the key is not part of reduce function .

Just like Map Reduce  reduceByKey results into shuffle and ReduceTask is executed .

Num Reducer is set in the driver .

job.setNumReduceTasks()

repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))

                             write to HDFS
context.write(key, new IntWritable(sum));
write to HDFS
saveAsTextFile(outputPath);

Central Object which holds all info about the job

Context context

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf)

OutPut FileFormat = TextOutputFormat

OutPut FileFormat = TextOutputFormat

We have Mapper And Reducer running to perform word count

The Spark code is scanned and translated to Task (Mapper and Reducer)

We have a separate Driver, Mapper, Reducer code in Map Reduce

In Case Of Spark the driver + Task (Mp Reduce) are part of same code.
Whatever code (lambda functions) we write inside the transformations (Flat Map , map, mapPartitions ) are instantiated on Driver , serialized and sent to the executor to run as Task code .

In Terms of Map Reduce we have Mapper and Reducer.

Reducer leads to shuffle

In Terms of spark Whenever a shuffle happen , a stage is formed .
And a job is divided in stages.Every time you do a shuffle a stage is created .