Spark Code Analysis

How to figure out from spark code, which code is executed on driver and which is executed on executors.

 

Lets take WordCount example in Spark and lets try to understand whats executed where and how to understand . And also try to understand spark serialization.

 

In Map Reduce the Driver , Mapper and Reducer code was written as separate class and was very easy to figure out which code is executed where. In spark all the code for driver , Map Task and reduce Task is part of the same class and hence one needs to understand whats going under the hood to understand the same.

 

  • The spark code follows all the rule of Java , so theres no magic and no changes in the rule.
  • The spark code once submitted the framework depending on the Transformation and actions present in the spark code generates a DAG of tasks.
  • All transformation Before shuffle are clubbed into the same tasks.
  • Operations such as grouby, reduceByKey , combineByKey, AggregateByKey results in a shuffle , in Spark terminology results in stages.
  • Job is composed of Stages. the Boundary of a stage is a shuffle operation.

 

Given the “word count code” , the same class is available for driver as well as for any other map/reduce task.

[addToAppearHere]

From Driver Perspective :

  • ToolRunner.run(new WordCount(), args) is executed on the driver.
  • The Lambda Function / Anonymous/ Static class used within the Transformation are instantiated on driver.
  • The Lambda Function / Anonymous/ Static class used within the Transformations are serialized and sent to the Task as defined by the DAG.
  • Actions such as collect(), count() are executed on driver . These actions results in collecting data from tasks(on executors) onto the driver and calculating the sum , average etc. These operations results in lot of data coming on driver.
  • Actions like mapPartitions(), map(), reduceByKey(), combineByKey(), aggregateByKey(), groupByKey() are executed on executors as part of Map Task or Reduce Task.

 

Executors :

  • Actions like mapPartitions(), map(), reduceByKey(), combineByKey(), aggregateByKey(), groupByKey() are executed on executors as part of Map Task or Reduce Task.
  • The function inside these Transformation are received in a serialized from the driver onto the executor .
  • All the transformations in the same stage is executed as part of same task in the executor.

 

 

                           Code              Executed on  Driver/Executor
javaSparkContext = getJavaSparkContext Driver
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);  Results in Data Read operation in all  Executors
.flatMap Executor (Map Task)
.mapToPair Executor (Map Task)
.reduceByKey Executor (Reduce Task)
.repartition

decides parallelism for Num Reduce Task. (if used after Map task, Results in a Shuffle Task on Executors )

.saveAsTextFile(outputPath);  saves output from all Reduce Task on Executor
stringJavaRDD.count() On Driver , Data Pulled from all tasks to Driver to perform count

 

[addToAppearHere]

WordCount :

 

package com.big.data.spark;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;


public class WordCount extends Configured implements Tool, Closeable {
    // The job extends Configured implements Tool for parsing argumenets .


    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);

        //Get spark context, This is the central context , which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), conf.get(DEFAULT_FS), WordCount.class);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
        stringJavaRDD
                .flatMap(line -> Arrays.asList(line.split(" ")))
                // New Tuple is being formed for every row the in the input
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                // The reduceByKey Api => only take the values , key is not fed in the api
                // Before reduceByKey results into a shuffle hence breaking the DAG into stages
                .reduceByKey((a,b)->a+b)
                //.reduceByKey((count1, count2) -> count1 + count2)
                // How many partitions to slpit the output into
                .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
                .saveAsTextFile(outputPath);
        // No Anynomous class has been used anywhere and hence, The outer class need not implement Serialzable

        return 0;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new WordCount(), args);
    }

}