Word Count using CombineByKey in Spark
Word Count using Combine by key in Spark
Problem : Perform Word count using combine by key in spark. (Github)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
public class WordCountCombineByKey extends Configured implements Tool, Closeable {
// The job extends Configured implements Tool for parsing argumenets .
public static final String INPUT_PATH = "spark.input.path";
public static final String OUTPUT_PATH = "spark.output.path";
public static final String IS_RUN_LOCALLY = "spark.is.run.local";
public static final String DEFAULT_FS = "spark.default.fs";
public static final String NUM_PARTITIONS = "spark.num.partitions";
private SQLContext sqlContext;
private JavaSparkContext javaSparkContext;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
// Set spark conf here ,
// after one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
String inputPath = conf.get(INPUT_PATH);
String outputPath = conf.get(OUTPUT_PATH);
//Get spark context, This is the central context , which can be wrapped in Any Other context
javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
conf.get(DEFAULT_FS), WordCount.class);
// No input path has been read, no job has not been started yet .
//To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
// To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(inputPath);
stringJavaRDD
.flatMap(line -> Arrays.asList(line.split(" ")))
// New Tuple is being formed for every row the in the input
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.combineByKey(i -> i, (a, b) -> a + b, (c, d) -> c + d)
//.reduceByKey((count1, count2) -> count1 + count2)
// How many partitions to slpit the output into
.repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
.saveAsTextFile(outputPath);
// No Anynomous class has been used anywhere and hence,
// The outer class need not implement Serialzable
return 0;
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(javaSparkContext);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new WordCountCombineByKey(), args);
}
}
[addToAppearHere]
Integration Test : (Github)
package com.big.data.spark;
import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class WordCountCombineByKeyTest {
private final Configuration conf = new Configuration();
private static FileSystem fs;
private static final DateTime NOW = DateTime.now();
private static String baseDir;
private static String outputDir;
private static final String NEW_LINE_DELIMETER = "\n";
@BeforeClass
public static void startup() throws Exception {
Configuration conf = new Configuration();
//set the fs to file:/// which means the local fileSystem
// change this to point to your cluster Namenode
conf.set("fs.default.name", "file:///");
fs = FileSystem.getLocal(conf);
baseDir = "/tmp/spark/WordCountCombineByKeyTest/" + UUID.randomUUID().toString();
outputDir = baseDir + "/output";
File tempFile = new File(baseDir + "/input.txt");
String content = "My name is Maverick";
//Write the data into the local filesystem
FileUtils.writeStringToFile(tempFile, content, "UTF-8");
FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);
}
@AfterClass
public static void cleanup() throws Exception {
//Delete the local filesystem folder after the Job is done
fs.delete(new Path(baseDir), true);
}
@Test
public void WordCount() throws Exception {
// Any argument passed with -DKey=Value will be parsed by ToolRunner
String[] args = new String[]{"-D" + WordCountCombineByKey.INPUT_PATH + "=" + baseDir,
"-D" + WordCountCombineByKey.OUTPUT_PATH + "=" + outputDir,
"-D" + WordCountCombineByKey.IS_RUN_LOCALLY + "=true",
"-D" + WordCountCombineByKey.DEFAULT_FS + "=file:///",
"-D" + WordCountCombineByKey.NUM_PARTITIONS + "=1"};
WordCountCombineByKey.main(args);
//Read the data from the outputfile
File outputFile = new File(outputDir + "/part-00000");
String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
Map<String, Integer> wordToCount = new HashMap<>();
//4 lines in output file, with one word per line
Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
String[] wordCount = e.substring(e.indexOf("(") + 1, e.indexOf(")")).split(",");
wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));
});
//4 words .
Assert.assertEquals(4L, wordToCount.size());
Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());
}
}