Write To Aerospike via mapPartitions
Write to Aerospike from spark via MapPartitions
Problem Statement : Data from HDFS needs be read from spark and saved in Aerospike. One needs to use mapPartition transformation to achieve the same.
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> </dependency> <dependency> <groupId>com.big.data</groupId> <artifactId>avro-schema</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.5.0-cdh5.9.0</version> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.5.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>com.aerospike.unit</groupId> <artifactId>aerospike-unit</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies>
[addToAppearHere]
Code:
package com.big.data.spark;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ReadFromHdfsWriteToAerospikeSparkMapPartition extends Configured implements Tool, Closeable {
public static final String INPUT_PATH = "spark.input.path";
public static final String OUTPUT_PATH = "spark.output.path";
public static final String IS_RUN_LOCALLY = "spark.is.run.local";
public static final String DEFAULT_FS = "spark.default.fs";
public static final String NUM_PARTITIONS = "spark.num.partitions";
//Aerospike related properties
public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
public static final String AEROSPIKE_PORT = "aerospike.port";
public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
public static final String AEROSPIKE_SETNAME = "aerospike.set.name";
// For Dem key is emp_id and value is emp_name
public static final String KEY_NAME = "avro.key.name";
public static final String VALUE_NAME = "avro.value.name";
private SQLContext sqlContext;
private JavaSparkContext javaSparkContext;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
//Set spark conf here ,
// After one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
String inputPath = conf.get(INPUT_PATH);
String outputPath = conf.get(OUTPUT_PATH);
String aerospikeHostname = conf.get(AEROSPIKE_HOSTNAME);
int aerospikePort = conf.getInt(AEROSPIKE_PORT, 3000);
String namespace = conf.get(AEROSPIKE_NAMESPACE);
String setName = conf.get(AEROSPIKE_SETNAME);
String keyName = conf.get(KEY_NAME);
String valueName = conf.get(VALUE_NAME);
//Get spark context, This is the central context , which can be wrapped in Any Other context
javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
conf.get(DEFAULT_FS), this.getClass());
sqlContext = new SQLContext(javaSparkContext);
// No input path has been read, no job has not been started yet .
//To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
// To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD
// Avro schema to StructType conversion
final StructType outPutSchemaStructType = (StructType) SchemaConverters
.toSqlType(Employee.getClassSchema()).dataType();
final StructType inputSchema = (StructType) SchemaConverters
.toSqlType(Employee.getClassSchema()).dataType();
// read data from parquetfile, the schema of the data is taken from the avro schema
DataFrame inputDf = sqlContext.read().schema(inputSchema).parquet(inputPath);
// convert DataFrame into JavaRDD
// the rows read from the parquetfile is converted into a Row object .
// Row has same schema as that of the parquet file roe
JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();
// Data read from parquet has same schema as that of avro (Empoyee Avro).
// Key is employeeId and value is EmployeeName
// In the mapPartition one can intialize and shutdown the service gracefully easily.
// it returns only once when the whole partition is executed ,
// the result has to held in memory (may cause OOME)
JavaRDD<Row> returnedRowJavaRDD = rowJavaRDD.mapPartitions
(new InsertIntoAerospike(aerospikeHostname,
aerospikePort, namespace, setName, keyName,valueName));
//MapPartition is just a transformation in Spark hence a action
//like write(), collect() is needed to carry out the action.
// Remeber spark does Lazy evaluation, without a action transformations will not execute.
DataFrame outputDf = sqlContext.createDataFrame(returnedRowJavaRDD, outPutSchemaStructType);
// Convert JavaRDD to dataframe and save into parquet file
outputDf
.write()
.format(Employee.class.getCanonicalName())
.parquet(outputPath);
return 0;
}
// Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
// No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transient
// In the call() , for each record insert into Aerospike , this can be converted into batch too
public static class InsertIntoAerospike implements
FlatMapFunction<Iterator<Row>, Row> {
// not making it static , as it will not be serialized and sent to executors
private final String aerospikeHostName;
private final int aerospikePortNo;
private final String aerospikeNamespace;
private final String aerospikeSetName;
private final String keyColumnName;
private final String valueColumnName;
// The Aerospike client is not serializable
// and neither there is a need to instantiate on driver
private transient AerospikeClient client;
private transient WritePolicy policy;
public InsertIntoAerospike(String hostName, int portNo, String nameSpace,
String setName, String keyColumnName, String valueColumnName){
this.aerospikeHostName = hostName;
this.aerospikePortNo = portNo;
this.aerospikeNamespace = nameSpace;
this.aerospikeSetName = setName;
this.keyColumnName = keyColumnName;
this.valueColumnName = valueColumnName;
}
@Override
public Iterable<Row> call(Iterator<Row> rowIterator) throws Exception {
// rowIterator points to the whole partition
// (all the records in the partition) , not a single record.
// The partition can be of Map Task or Reduce Task
// This is where you initialize your service ,
// MapPartition resemble setup , map , cleanup of MapReduce,
// Drawback of mapPartition:
//it returns after processing the whole chunk not after every row.
// This will be held in memory till whole partition is evaluated
List<Row> rowList = new ArrayList<>();
//This is the place where you initialise your service
policy = new WritePolicy();
client = new AerospikeClient(aerospikeHostName, aerospikePortNo);
// All the code before iterating over rowIterator will be executed only once
// as the call() is called only once for each partition
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
// As rows have schema with fieldName and Values being part of the Row
Key key = new Key(aerospikeNamespace, aerospikeSetName,
(Integer) row.get(row.fieldIndex(keyColumnName)));
Bin bin = new Bin(valueColumnName, (String)row
.get(row.fieldIndex(valueColumnName)));
client.put(policy, key, bin);
rowList.add(row);
}
//Shutdown the service gracefully as this is called only once for the whole partition
IOUtils.closeQuietly(client);
return rowList;
}
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(javaSparkContext);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new ReadFromHdfsWriteToAerospikeSparkMapPartition(), args);
}
}
[addToAppearHere]
Key Take Aways:
- mapPartitons do provide a clean way to intialize and shutdown a service
- mapPartions needs to keep the output in memory and returns only when the whole partition is executed.
Aerospike Unit Test framework:
package com.big.data.spark;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
public class ReadFromHdfsWriteToAerospikeSparkMapPartitionTest {
private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
private static final String BASEDIR =
"/tmp/ReadFromHdfsWriteToASMapPartition/avroparquetInputFile/" +System.currentTimeMillis()+ "/";
private String input;
private String output;
private AerospikeSingleNodeCluster cluster;
private AerospikeRunTimeConfig runtimConfig;
private AerospikeClient client;
//Aerospike unit related Params
private String memorySize = "64M";
private String setName = "BIGTABLE";
private String binName = "BIGBIN";
private String nameSpace = "RandomNameSpace";
@Before
public void setUp() throws Exception {
input = BASEDIR + "input/";
output = BASEDIR + "output/";
Employee employee = new Employee();
employee.setEmpId(1);
employee.setEmpName("Maverick1");
employee.setEmpCountry("DE");
Employee employee1 = new Employee();
employee1.setEmpId(2);
employee1.setEmpName("Maverick2");
employee1.setEmpCountry("DE");
//Write parquet file2 with GZIP compression
ParquetWriter<Object> writer = AvroParquetWriter
.builder(new Path(input + "1.gz.parquet"))
.withCompressionCodec (CompressionCodecName.GZIP)
.withSchema(Employee.getClassSchema())
.build();
writer.write(employee);
writer.write(employee1);
writer.close();
// Start Aerospike MiniCluster
// Instatiate the cluster with NameSpaceName , memory Size.
// One can use the default constructor and retrieve nameSpace,
//Memory info from cluster.getRunTimeConfiguration();
cluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
cluster.start();
// Get the runTime configuration of the cluster
runtimConfig = cluster.getRunTimeConfiguration();
client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort());
AerospikeUtils.printNodes(client);
}
@Test
public void testSuccess() throws Exception {
String[] args = new String[]{"-D" + ReadWriteAvroParquetFiles.INPUT_PATH + "=" + input,
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.OUTPUT_PATH + "=" + output,
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.IS_RUN_LOCALLY + "=true",
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.DEFAULT_FS + "=file:///",
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.NUM_PARTITIONS + "=1",
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.AEROSPIKE_NAMESPACE + "="
+ runtimConfig.getNameSpaceName(),
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.AEROSPIKE_HOSTNAME + "=127.0.0.1",
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.AEROSPIKE_PORT + "="
+ runtimConfig.getServicePort(),
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.AEROSPIKE_SETNAME + "=" + setName,
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.KEY_NAME + "=emp_id",
"-D" + ReadFromHdfsWriteToAerospikeSparkMapPartition.VALUE_NAME + "=emp_name"};
ReadFromHdfsWriteToAerospikeSparkMapPartition.main(args);
ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
//Use .withConf(FS.getConf()) for reading from a diferent HDFS and not local ,
// by default the fs is local
GenericData.Record event = null;
while ((event = (GenericData.Record) reader.read()) != null) {
Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo
(AvroUtils.convertAvroPOJOtoByteArray(event,
Employee.getClassSchema()), Employee.getClassSchema());
LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
Assert.assertTrue(outputEvent.getEmpId().equals(1) || outputEvent.getEmpId().equals(2));
}
reader.close();
//Fetch both the keys from Aerospike
WritePolicy policy = new WritePolicy();
Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, 1);
Record result1 = client.get(policy, key1);
Assert.assertNotNull(result1);
Assert.assertEquals(result1.getValue("emp_name").toString(), "Maverick1");
Key key2 = new Key(runtimConfig.getNameSpaceName(), setName, 2);
Record result2 = client.get(policy, key2);
Assert.assertNotNull(result2);
Assert.assertEquals(result2.getValue("emp_name").toString(), "Maverick2");
}
@After
public void cleanup() throws Exception {
FileUtils.deleteDirectory(new File(BASEDIR));
if (cluster != null) {
// Stop the cluster
cluster.stop(true);
}
if (client != null) {
client.close();
}
}
}