Spark Map
Spark Map Example , which throws light at Object Reuse , CombineByKey , Transient variables usage in Spark
Problem : Given a parquet file having Employee data , one needs to find the maximum Bonus earned by each employee and save the data back in parquet (Github)
1. Parquet file (Huge file on HDFS ) , Avro Schema :
|– emp_id: integer (nullable = false)
|– emp_name: string (nullable = false)
|– emp_country: string (nullable = false
|– emp_bonus: string (nullable = true)
|– subordinates: map (nullable = true)
| |– key: string
| |– value: string (valueContainsNull = false)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.big.data</groupId>
<artifactId>avro-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
public class EmployeeMaxSalary extends Configured implements Tool, Closeable, Serializable {
public static final String INPUT_PATH = "spark.input.path";
public static final String OUTPUT_PATH = "spark.output.path";
public static final String IS_RUN_LOCALLY = "spark.is.run.local";
public static final String DEFAULT_FS = "spark.default.fs";
public static final String NUM_PARTITIONS = "spark.num.partitions";
// Just check because of a function use , the outer class is forced to be serialized
// Example which throws light of serialization of Lambda function .
private transient SQLContext sqlContext;
private transient JavaSparkContext javaSparkContext;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
//Set spark conf here ,
//After one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
// Convert Row to Avro POJO (Employee)
public Employee convert(Row row) {
try {
// Employee Schema => ParquetRow Schema =>Row Schema
Employee avroInstance = new Employee();
for (StructField field : row.schema().fields()) {
//row.fieldIndex => pos of the field Name in the schema
avroInstance.put(field.name(), row.get(row.fieldIndex(field.name())));
}
return avroInstance;
} catch (Exception e) {
throw new AvroRuntimeException("Avro POJO building failed ", e);
}
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
String inputPath = conf.get(INPUT_PATH);
String outputPath = conf.get(OUTPUT_PATH);
//Get spark context, This is the central context , which can be wrapped in Any Other context
javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
conf.get(DEFAULT_FS), this.getClass());
sqlContext = new SQLContext(javaSparkContext);
// No input path has been read, no job has not been started yet .
//To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
// To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD
// Avro schema to StructType conversion
final StructType outPutSchemaStructType = (StructType) SchemaConverters
.toSqlType(Employee.getClassSchema())
.dataType();
// read data from parquetfile, the schema of the data is taken from the avro schema
DataFrame inputDf = sqlContext.read()
.format(Employee.class.getCanonicalName())
.parquet(inputPath);
// convert DataFrame into JavaRDD
// the rows read from the parquetfile is converted into a Row object .
// Row has same schema as that of the parquet file row
JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();
//Row has same schema as that of Parquet row ,
//Parquet Row has same schema as that of Avro Object
rowJavaRDD
// convert each Row to Employee Object
// if i use a method call e -> convert(e) instead of static class,
// i will need to serialize the Outer class
// Lambda Functions internall needs to be serialized and is causing this issue
.map(e -> convert(e))
// Key by empid so that we can collect all the object on Reducer
.keyBy(Employee::getEmpId)
.combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiner())
.map(new MapSpark());
DataFrame outputDf = sqlContext.createDataFrame(rowJavaRDD, outPutSchemaStructType);
// Convert JavaRDD to dataframe and save into parquet file
outputDf
.write()
.format(Employee.class.getCanonicalName())
.parquet(outputPath);
return 0;
}
public static class MapSpark implements Function<Tuple2<Integer, Object>, Object> {
// LambdaFuncation used inside the Transformation are instantiated on Driver .
// The Serialized object is sent to the executor
// Making a filed transient helps in not serializing it
private transient TreeSet<Long> employeeBonusSet;
// Please do not declare any field with static ,
// As Multiple task can spawn inside same JVM(Executor) is Spark
@Override
public Object call(Tuple2<Integer, Object> v1) throws Exception {
if (employeeBonusSet == null) {
employeeBonusSet = new TreeSet<>();
}
// Object is being reused and not created on every call
employeeBonusSet.clear();
EmployeeAgregator aggregatedEvents = (EmployeeAgregator) v1._2();
aggregatedEvents
.getEmployeeList()
.stream().forEach(o -> employeeBonusSet.add(((Employee) o).getBonus()));
// select one object from the Employee List
Object output = aggregatedEvents.getEmployeeList().get(0);
((Employee) output).setBonus(employeeBonusSet.last());
return output;
}
}
public static class EmployeeAgregator {
private List<Object> employeeList;
public EmployeeAgregator() {
employeeList = new ArrayList<>();
}
public List<Object> getEmployeeList() {
return employeeList;
}
public void addEmployee(Employee emp) {
employeeList.add(emp);
}
public void addEmployeeAgregator(EmployeeAgregator aggregator) {
employeeList.addAll(aggregator.getEmployeeList());
}
}
// This class would be instantiated on MapTask for Every Employee Group
// (for a group and not individual input Row).
// Only for first Row in the Group it would be instantiated
public static class CreateCombiner implements Function<Employee, Object> {
@Override
public Object call(Employee v1) throws Exception {
EmployeeAgregator aggregator = new EmployeeAgregator();
aggregator.addEmployee(v1);
return aggregator;
}
}
// Any Subsequent input With same employeeId will be added into the EmployeeAgregator here
// This is like combiner from MapReduce
public static class MergeValue implements Function2<Object, Employee, Object> {
@Override
public Object call(Object v1, Employee v2) throws Exception {
// Type of v1 is EmployeeAgregator , Whereas Type of v2 is Employee
((EmployeeAgregator) v1).addEmployee(v2);
return v1;
}
}
// This will Be executed on Reduce Task .
// All EmployeeAgregator will be coming from Mapper and just merged on Reducer
public static class MergeCombiner implements Function2<Object, Object, Object> {
@Override
public Object call(Object v1, Object v2) throws Exception {
//Type of both v1 and v2 are EmployeeAgregator
((EmployeeAgregator) v1).addEmployeeAgregator((EmployeeAgregator) v2);
return v1;
}
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(javaSparkContext);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new EmployeeMaxSalary(), args);
}
}
[addToAppearHere]
Integration Test : (Github)
package com.big.data.spark;
import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
import java.io.IOException;
public class EmployeeMaxSalaryTest {
private static final Logger LOG = LoggerFactory.getLogger(EmployeeMaxSalaryTest.class);
private static final String BASEDIR =
"/tmp/EmployeeMaxSalaryTest/avroparquetInputFile/" + System.currentTimeMillis() + "/";
private String input;
private String output;
private Employee employee;
@Before
public void setUp() throws IOException {
input = BASEDIR + "input/";
output = BASEDIR + "output/";
employee = new Employee();
employee.setEmpId(1);
employee.setEmpName("Maverick");
employee.setEmpCountry("DE");
employee.setBonus(100L);
Employee employee2 = new Employee();
employee2.setEmpId(1);
employee2.setEmpName("Maverick");
employee2.setEmpCountry("DE");
employee2.setBonus(90L);
//Write parquet file with GZIP compression
ParquetWriter<Object> writer = AvroParquetWriter
.builder(new Path(input + "1.gz.parquet"))
.withCompressionCodec(CompressionCodecName.GZIP)
.withSchema(Employee.getClassSchema())
.build();
writer.write(employee);
writer.write(employee2);
writer.close();
}
@Test
public void testSuccess() throws Exception {
String[] args = new String[]{"-D" + EmployeeMaxSalary.INPUT_PATH + "=" + input,
"-D" + EmployeeMaxSalary.OUTPUT_PATH + "=" + output,
"-D" + EmployeeMaxSalary.IS_RUN_LOCALLY + "=true",
"-D" + EmployeeMaxSalary.DEFAULT_FS + "=file:///",
"-D" + EmployeeMaxSalary.NUM_PARTITIONS + "=1"};
EmployeeMaxSalary.main(args);
ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
//Use .withConf(FS.getConf()) for reading from a different HDFS and not local
// by default the fs is local
GenericData.Record event = (GenericData.Record) reader.read();
Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo
(AvroUtils.convertAvroPOJOtoByteArray(
event, Employee.getClassSchema()), Employee.getClassSchema());
reader.close();
LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
Assert.assertEquals(employee.getEmpId(), outputEvent.getEmpId());
Assert.assertEquals(100L, outputEvent.getBonus().longValue());
}
@After
public void cleanup() throws IOException {
FileUtils.deleteDirectory(new File(BASEDIR));
}
}