Word Count Flink
Word Count in Flink . If you can count on Spark then Flink !!
<dependencies>
<!--Fink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
[addToAppearHere]
package com.big.data.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.utils.ParameterTool;
public class WordCount {
public static final String INPUT_PATH = "input.path";
public static final String OUTPUT_PATH = "output.path";
public static final String PARALLELISM = "parallelism";
public static void main(String[] args) throws Exception {
//Setting up Execution Enverionment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Using the parser provided by Flink
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// To get only one file as output
env.setParallelism(parameterTool.getInt(PARALLELISM));
//Read input from the given path , Just remeber internally its TextInputFormat
//The path of the file, as a URI
//(e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
DataSet<String> text = env.readTextFile(parameterTool.get(INPUT_PATH));
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//Save to given Path
wordCounts.writeAsCsv(parameterTool.get(OUTPUT_PATH));
// Execute the Flink Job with the given Name
env.execute("Word Count Example");
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
[addToAppearHere]
package com.big.data.flink;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class WordCountTest {
private static final String LOCAL_FILEURI_PREFIX = "file://";
private static final String NEW_LINE_DELIMETER = "\n";
private static String baseDir;
private static String outputDir;
@BeforeClass
public static void startup() throws Exception {
//Input Directory
baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();
//OutPutDirectory
outputDir = baseDir + "/output";
//Write Data in input File
File tempFile = new File(baseDir + "/input.txt");
String content = "My name is Maverick";
//Write the data into the local filesystem
FileUtils.writeStringToFile(tempFile, content, "UTF-8");
FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);
}
@AfterClass
public static void cleanup() throws Exception {
//Delete the local filesystem folder after the Job is done
FileUtils.deleteDirectory(new File(baseDir));
}
@Test
public void WordCount() throws Exception {
// Any argument passed with --input.path /tmp/tmmm --output.path /tmp/abc/cde
String[] args = new String[]{
"--" + WordCount.INPUT_PATH, LOCAL_FILEURI_PREFIX + baseDir,
"--" + WordCount.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
"--" + WordCount.PARALLELISM, "1"};
WordCount.main(args);
//Read the data from the outputfile
File outputFile = new File(outputDir);
String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
Map<String, Integer> wordToCount = new HashMap<>();
//4 lines in output file, with one word per line
Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
String[] wordCount = e.split(",");
wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));
});
//4 words .
Assert.assertEquals(4L, wordToCount.size());
Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());
}
}