Word Count using CombineByKey in Spark

Word Count using Combine by key in Spark

Problem : Perform Word count using combine by key in spark. (Github)

 

Dependency:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
  </dependencies>
  
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>

[addToAppearHere]

 

Spark :

package com.big.data.spark;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;


public class WordCountCombineByKey extends Configured implements Tool, Closeable {
    // The job extends Configured implements Tool for parsing argumenets .

    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                // Set spark conf here , 
                // after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);

        //Get spark context, This is the central context , which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
                                                            conf.get(DEFAULT_FS), WordCount.class);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
        stringJavaRDD
                .flatMap(line -> Arrays.asList(line.split(" ")))
                // New Tuple is being formed for every row the in the input
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))

                .combineByKey(i -> i, (a, b) -> a + b, (c, d) -> c + d)
                //.reduceByKey((count1, count2) -> count1 + count2)
                // How many partitions to slpit the output into
                .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
                .saveAsTextFile(outputPath);
        // No Anynomous class has been used anywhere and hence,
       //  The outer class need not implement Serialzable

        return 0;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new WordCountCombineByKey(), args);
    }

}

[addToAppearHere]

Integration Test : (Github)

 

package com.big.data.spark;

import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


public class WordCountCombineByKeyTest {

    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static final DateTime NOW = DateTime.now();
    private static String baseDir;
    private static String outputDir;
    private static final String NEW_LINE_DELIMETER = "\n";

    @BeforeClass
    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        // change this to point to your cluster Namenode
        conf.set("fs.default.name", "file:///");

        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/spark/WordCountCombineByKeyTest/" + UUID.randomUUID().toString();
        outputDir = baseDir + "/output";

        File tempFile = new File(baseDir + "/input.txt");
        String content = "My name is Maverick";

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, content, "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);
    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);
    }

    @Test
    public void WordCount() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{"-D" + WordCountCombineByKey.INPUT_PATH + "=" + baseDir, 
                                     "-D" + WordCountCombineByKey.OUTPUT_PATH + "=" + outputDir, 
                                     "-D" + WordCountCombineByKey.IS_RUN_LOCALLY + "=true", 
                                     "-D" + WordCountCombineByKey.DEFAULT_FS + "=file:///", 
                                     "-D" + WordCountCombineByKey.NUM_PARTITIONS + "=1"};
        WordCountCombineByKey.main(args);

        //Read the data from the outputfile
        File outputFile = new File(outputDir + "/part-00000");
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Map<String, Integer> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.substring(e.indexOf("(") + 1, e.indexOf(")")).split(",");
            wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));
        });

        //4 words .
        Assert.assertEquals(4L, wordToCount.size());
        Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());
    }

}