accumulate all values for a given key in spark

For a given key collect  all the values, which can be later use for applying some custom logic like ( average , max , min, top n, expression evaluation) in Spark


Dependency :




Code :



import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

class Acummulate{
    List<String> stringList;

    public Acummulate(String i){

        stringList = new ArrayList<>();

    public void addIntoList(String i){


    public List<String> getIntegerList() {
        return stringList;

public class AccumulateValuesOfAKey extends Configured implements Tool, Closeable {
    // The job extends Configured implements Tool for parsing argumenets .

    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , 
                //After one gets spark context you can set hadoop configuration for InputFormats
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);

        return sparkContext;

    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);

        //Get spark context, 
        //This is the central context , which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                    conf.get(DEFAULT_FS), WordCount.class);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        //Input Country Dlimeter(,)city
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
                .keyBy(e-> e.split(",")[0])
                // combineBykey initates shuffle , hence all record for a given key are collected
                // We will call it input group
                  // Combine by key is used if input row and expected output types are different
                  // reduceBy key can be used if same type is expected

                  // Initialize the first row of the input group with the outputType needed
                        e-> new Acummulate(e),

                        // v1 is the object which has been intialized
                        // In everycall the return of the previous call will reappear as v1
                              (v1, v2) -> {
                                  ((Acummulate) v1).addIntoList(v2);
                                  return  v1;

                        // V1 is the object which is coming from the previous call .
                              (v1, v2) -> {
                            return v1;

                // U get tuple(key, Object) which can be used for any kind of complex aggregation
                .map(e-> new Tuple2<>(e._1(),e._2().getIntegerList().toString()))
                .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
                // No Anynomous class has been used anywhere and hence, 
                //The outer class need not implement Serialzable

        return 0;

    public void close() throws IOException {

    public static void main(String[] args) throws Exception { AccumulateValuesOfAKey(), args);



Integration Test:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class AccumulateValuesOfAKeyTest {
    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static final DateTime NOW =;
    private static String baseDir;
    private static String outputDir;
    private static final String NEW_LINE_DELIMETER = "\n";

    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        // change this to point to your cluster Namenode
        conf.set("", "file:///");

        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/spark/AccumulateValuesOfAKeyTest/" + UUID.randomUUID().toString();
        outputDir = baseDir + "/output";

        File tempFile = new File(baseDir + "/input.txt");

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, "India,delhi", "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, "India,Mumbai", "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, "India,Bangalore", "UTF-8", true);

    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);

    public void accumulateValuesOfAKey() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{"-D" + AccumulateValuesOfAKey.INPUT_PATH + "=" + baseDir,
                                     "-D" + AccumulateValuesOfAKey.OUTPUT_PATH + "=" + outputDir,
                                     "-D" + AccumulateValuesOfAKey.IS_RUN_LOCALLY + "=true",
                                     "-D" + AccumulateValuesOfAKey.DEFAULT_FS + "=file:///", 
                                     "-D" + AccumulateValuesOfAKey.NUM_PARTITIONS + "=1"};

        //Read the data from the outputfile
        File outputFile = new File(outputDir + "/part-00000");
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Map<String, String> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line -> {
            String[] wordCount = e.substring(e.indexOf("(") + 1, e.indexOf(")")).split(",");
            wordToCount.put(wordCount[0], wordCount[1]);

        //1 Country
        Assert.assertEquals(1, wordToCount.size());