Word Count Flink

Word Count  in Flink .  If you can count on Spark then Flink !!

Github

dependency

<dependencies>
    <!--Fink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>
</dependencies>

 

[addToAppearHere]

WordCount Code :

package com.big.data.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.utils.ParameterTool;


public class WordCount {
    public static final String INPUT_PATH = "input.path";
    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";

    public static void main(String[] args) throws Exception {

        //Setting up Execution Enverionment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        // To get only one file as output
        env.setParallelism(parameterTool.getInt(PARALLELISM));

        //Read input from the given path , Just remeber internally its TextInputFormat
        //The path of the file, as a URI 
        //(e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
        DataSet<String> text = env.readTextFile(parameterTool.get(INPUT_PATH));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);

        //Save to given Path
        wordCounts.writeAsCsv(parameterTool.get(OUTPUT_PATH));

        // Execute the Flink Job with the given Name
        env.execute("Word Count Example");
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

[addToAppearHere]

Integration Test :


package com.big.data.flink;

import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


public class WordCountTest {

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    private static String baseDir;
    private static String outputDir;

    @BeforeClass
    public static void startup() throws Exception {

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        //OutPutDirectory
        outputDir = baseDir + "/output";

        //Write Data in input File
        File tempFile = new File(baseDir + "/input.txt");
        String content = "My name is Maverick";

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, content, "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);
    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        FileUtils.deleteDirectory(new File(baseDir));
    }

    @Test
    public void WordCount() throws Exception {

        // Any argument passed with --input.path /tmp/tmmm   --output.path /tmp/abc/cde
        String[] args = new String[]{
                  "--" + WordCount.INPUT_PATH, LOCAL_FILEURI_PREFIX + baseDir,
                  "--" + WordCount.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
                  "--" + WordCount.PARALLELISM, "1"};
        WordCount.main(args);

        //Read the data from the outputfile
        File outputFile = new File(outputDir);
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Map<String, Integer> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));
        });

        //4 words .
        Assert.assertEquals(4L, wordToCount.size());
        Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());
    }

}