Word Count Flink

Word Count  in Flink .  If you can count on Spark then Flink !!



    <!--Fink dependencies -->


WordCount Code :

package com.big.data.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.utils.ParameterTool;

public class WordCount {
    public static final String INPUT_PATH = "input.path";
    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";

    public static void main(String[] args) throws Exception {

        //Setting up Execution Enverionment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        // To get only one file as output

        //Read input from the given path , Just remeber internally its TextInputFormat
        //The path of the file, as a URI 
        //(e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
        DataSet<String> text = env.readTextFile(parameterTool.get(INPUT_PATH));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())

        //Save to given Path

        // Execute the Flink Job with the given Name
        env.execute("Word Count Example");

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));


Integration Test :

package com.big.data.flink;

import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class WordCountTest {

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    private static String baseDir;
    private static String outputDir;

    public static void startup() throws Exception {

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        outputDir = baseDir + "/output";

        //Write Data in input File
        File tempFile = new File(baseDir + "/input.txt");
        String content = "My name is Maverick";

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, content, "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);

    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        FileUtils.deleteDirectory(new File(baseDir));

    public void WordCount() throws Exception {

        // Any argument passed with --input.path /tmp/tmmm   --output.path /tmp/abc/cde
        String[] args = new String[]{
                  "--" + WordCount.INPUT_PATH, LOCAL_FILEURI_PREFIX + baseDir,
                  "--" + WordCount.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
                  "--" + WordCount.PARALLELISM, "1"};

        //Read the data from the outputfile
        File outputFile = new File(outputDir);
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Map<String, Integer> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));

        //4 words .
        Assert.assertEquals(4L, wordToCount.size());
        Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());