How to submit spark job through oozie

Oozie is a workflow scheduler system to manage Apache Hadoop jobs. (Map Reduce, Spark)

To execute the spark job through ooze first of all we need to configure oozie workflow. So in the hdfs let’s create the directory /user/test/oozie-workflow/. Inside this directory should be following:

/user/test/oozie-workflow/lib/word-count-spark-job.jar
/user/test/oozie-workflow/workflow.xml

The workflow xml is:

<workflow-app xmlns="uri:oozie:workflow:0.4" name="word-count-job-wf">
    <start to="run-word-count-job"/>
    <action name="run-word-count-job" retry-max="0">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>oozie.use.system.libpath</name>
                    <value>true</value>
                </property>
                <property>
                    <name>oozie.launcher.mapreduce.map.java.opts</name>
                    <value>-verbose:class</value>
                </property>
            </configuration>
            <master>${sparkMaster}</master>
            <name>${sparkJobName}</name>
            <class>com.spark.wordcount.WordCountJob</class>
            <jar>${jarPath}</jar>
            <spark-opts>--files=${sparkFiles} --num-executors 1 --conf spark.yarn.maxAppAttempts=1 --conf spark.task.maxFailures=6 --executor-memory ${sparkExecutorMemory} --driver-memory ${sparkDriverMemory} --conf spark.driver.extraJavaOptions=${sparkDriverJavaOptions} --conf spark.executor.extraJavaOptions=${sparkExecutorJavaOptions} --conf spark.executor.extraClassPath=${sparkExecutorExtraClassPath} --conf spark.driver.extraClassPath=${sparkDriverExtraClassPath}</spark-opts>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>WordCount workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name="end"/>
</workflow-app>

[addToAppearHere]

So let’s create the simple spark job which will count the words in the file. So as an input we will provide the input.txt which will be located in the jar resources folder.

So let’s create the project with the following pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.spark.example</groupId>
    <artifactId>wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <hadoop.version>2.6.0-cdh5.4.7</hadoop.version>
        <spark.version>1.3.0-cdh5.4.7</spark.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.esotericsoftware.kryo</groupId>
            <artifactId>kryo</artifactId>
            <version>2.21</version>
        </dependency>

        <!--Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                    <outputDirectory>${project.build.directory}/stage</outputDirectory>
                    <artifactSet>
                        <includes>
                            <include>*:*</include>
                        </includes>
                    </artifactSet>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <id>shade-spark-job</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                    <resource>config.properties</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                    <resource>hdfs-site.xml</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                    <resource>log4j.xml</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                    <resource>raw_event_mapping.json</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                    <resource>raw_fact_mapping.json</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

</project>

[addToAppearHere]

And the WordCountJob.java will be following:

 

package com.spark.wordcount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.net.URL;
import java.util.Arrays;

public class WordCountJob {

    public static void main(String[] args) {
        new WordCountJob().run();
    }

    private void run() {

        final JavaSparkContext sparkContext = create(true, "file:///", WordCountJob.class, "testUser");

        JavaRDD textFile = sparkContext.textFile(getFilePath("input.txt"));
        JavaRDD words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")));
        JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
        counts.coalesce(1).saveAsTextFile("file:///tmp/output");
    }

    private String getFilePath(final String resourceName) {
        URL url = this.getClass().getClassLoader().getResource(resourceName);
        String filePath = null;
        if (url != null) {
            filePath = url.getPath();
        }
        return filePath;
    }

    /**
     * @param isRunLocal
     * @param defaultFs
     * @param tClass
     * @param username
     * @param 
     * @return creates the Java Spark Context
     */
    public  JavaSparkContext create(final boolean isRunLocal,
                                       final String defaultFs,
                                       final Class tClass,
                                       final String username) {
        final SparkConf sparkConf = new SparkConf()
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        // set hadoop user name
        System.setProperty("HADOOP_USER_NAME", username);

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
            sparkContext.hadoopConfiguration().set("dfs.client.use.datanode.hostname", "true");
        }

        return sparkContext;
    }
}

 

And after that the spark job will generate the output.

To submit the spark job without oozie we just need to execute the spark-submit command.