Write To Aerospike From Spark
Read from Hdfs and write to Aerospike from Spark via Map Transformation
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> </dependency> <dependency> <groupId>com.big.data</groupId> <artifactId>avro-schema</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.5.0-cdh5.9.0</version> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.5.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>com.aerospike.unit</groupId> <artifactId>aerospike-unit</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies>
[addToAppearHere]
Code:
package com.big.data.spark;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import java.io.Closeable;
import java.io.IOException;
public class ReadFromHdfsWriteToAerospikeSpark extends Configured implements Tool, Closeable {
public static final String INPUT_PATH = "spark.input.path";
public static final String OUTPUT_PATH = "spark.output.path";
public static final String IS_RUN_LOCALLY = "spark.is.run.local";
public static final String DEFAULT_FS = "spark.default.fs";
public static final String NUM_PARTITIONS = "spark.num.partitions";
//Aerospike related properties
public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
public static final String AEROSPIKE_PORT = "aerospike.port";
public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
public static final String AEROSPIKE_SETNAME = "aerospike.set.name";
// For Dem key is emp_id and value is emp_name
public static final String KEY_NAME = "avro.key.name";
public static final String VALUE_NAME = "avro.value.name";
private SQLContext sqlContext;
private JavaSparkContext javaSparkContext;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
//Set spark conf here ,
//after one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
String inputPath = conf.get(INPUT_PATH);
String outputPath = conf.get(OUTPUT_PATH);
String aerospikeHostname = conf.get(AEROSPIKE_HOSTNAME);
int aerospikePort = conf.getInt(AEROSPIKE_PORT, 3000);
String namespace = conf.get(AEROSPIKE_NAMESPACE);
String setName = conf.get(AEROSPIKE_SETNAME);
String keyName = conf.get(KEY_NAME);
String valueName = conf.get(VALUE_NAME);
//Get spark context, This is the central context ,
//which can be wrapped in Any Other context
javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
conf.get(DEFAULT_FS), this.getClass());
sqlContext = new SQLContext(javaSparkContext);
// No input path has been read, no job has not been started yet .
//To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
// To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD
// Avro schema to StructType conversion
final StructType outPutSchemaStructType =
(StructType) SchemaConverters
.toSqlType(Employee.getClassSchema())
.dataType();
final StructType inputSchema = (StructType) SchemaConverters
.toSqlType(Employee.getClassSchema())
.dataType();
// read data from parquetfile,
// the schema of the data is taken from the avro schema
DataFrame inputDf = sqlContext.read().schema(inputSchema).parquet(inputPath);
// convert DataFrame into JavaRDD
// the rows read from the parquetfile is converted into a Row object .
// Row has same schema as that of the parquet file roe
JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();
// Data read from parquet has same schema as that of avro (Empoyee Avro).
// Key is employeeId and value is EmployeeName
// In the map there is no special function to initialize or shutdown the Aerospike client.
JavaRDD<Row> returnedRowJavaRDD = rowJavaRDD.map(
new InsertIntoAerospike(aerospikeHostname, aerospikePort,
namespace, setName, keyName,valueName));
//Map is just a transformation in Spark hence a action
//like write(), collect() is needed to carry out the action.
// Remeber spark does Lazy evaluation, without a action transformations will not execute.
DataFrame outputDf = sqlContext.createDataFrame(returnedRowJavaRDD, outPutSchemaStructType);
// Convert JavaRDD to dataframe and save into parquet file
outputDf
.write()
.format(Employee.class.getCanonicalName())
.parquet(outputPath);
return 0;
}
// Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
// No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transient
// In the map , for each record insert into Aerospike , this can be converted into batch too
public static class InsertIntoAerospike implements Function<Row, Row> {
// not making it static , as it will not be serialized and sent to executors
private final String aerospikeHostName;
private final int aerospikePortNo;
private final String aerospikeNamespace;
private final String aerospikeSetName;
private final String keyColumnName;
private final String valueColumnName;
// The Aerospike client is not serializable and
// neither there is a need to instatiate on driver
private transient AerospikeClient client;
private transient WritePolicy policy;
public InsertIntoAerospike(String hostName, int portNo, String nameSpace,
String setName, String keyColumnName, String valueColumnName) {
this.aerospikeHostName = hostName;
this.aerospikePortNo = portNo;
this.aerospikeNamespace = nameSpace;
this.aerospikeSetName = setName;
this.keyColumnName = keyColumnName;
this.valueColumnName = valueColumnName;
//Add Shutdown hook to close the client gracefully
//This is the place where u can gracefully clean your Service resources
//as there is no cleanup() function in Spark Map
JVMShutdownHook jvmShutdownHook = new JVMShutdownHook();
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}
@Override
public Row call(Row row) throws Exception {
// Intitialize on the first call
if (client == null) {
policy = new WritePolicy();
// how to close the client gracefully ?
client = new AerospikeClient(aerospikeHostName, aerospikePortNo);
}
// As rows have schema with fieldName and Values being part of the Row
Key key = new Key(aerospikeNamespace, aerospikeSetName,
(Integer) row.get(row.fieldIndex(keyColumnName)));
Bin bin = new Bin(valueColumnName, (String) row.get(row.fieldIndex(valueColumnName)));
client.put(policy, key, bin);
return row;
}
//When JVM is going down close the client
private class JVMShutdownHook extends Thread {
@Override
public void run() {
System.out.println("JVM Shutdown Hook: Thread initiated ,
shutting down service gracefully");
IOUtils.closeQuietly(client);
}
}
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(javaSparkContext);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new ReadFromHdfsWriteToAerospikeSpark(), args);
}
}
[addToAppearHere]
Key Take Aways:
- The Map Transformation doesn’t has a start or stop function .
- Map is a transformation and not a action, hence a action is needed for map to execute.
- write() , collect() can be used to make sure the transformation executes.
- For service cleanup a ShutdownHook is needed
- A number of Task can run in the same JVM on a given executor
- A static variable for service means , multiple task can access the service , the service API needs to be thread safe in that case.
Aerospike Unit Test framework:
package com.big.data.spark;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
public class ReadFromHdfsWriteToAerospikeSparkTest {
private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
private static final String BASEDIR =
"/tmp/ReadFromHdfsWriteToASparkTest/avroparquetInputFile/" + System.currentTimeMillis() + "/";
private String input;
private String output;
private AerospikeSingleNodeCluster cluster;
private AerospikeRunTimeConfig runtimConfig;
private AerospikeClient client;
//Aerospike unit related Params
private String memorySize = "64M";
private String setName = "BIGTABLE";
private String binName = "BIGBIN";
private String nameSpace = "RandomNameSpace";
@Before
public void setUp() throws Exception {
input = BASEDIR + "input/";
output = BASEDIR + "output/";
Employee employee = new Employee();
employee.setEmpId(1);
employee.setEmpName("Maverick1");
employee.setEmpCountry("DE");
Employee employee1 = new Employee();
employee1.setEmpId(2);
employee1.setEmpName("Maverick2");
employee1.setEmpCountry("DE");
//Write parquet file2 with GZIP compression
ParquetWriter<Object> writer = AvroParquetWriter
.builder(new Path(input + "1.gz.parquet"))
.withCompressionCodec(CompressionCodecName.GZIP)
.withSchema(Employee.getClassSchema())
.build();
writer.write(employee);
writer.write(employee1);
writer.close();
// Start Aerospike MiniCluster
// Instatiate the cluster with NameSpaceName , memory Size.
// One can use the default constructor and retieve nameSpace,
// Memory info from cluster.getRunTimeConfiguration();
cluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
cluster.start();
// Get the runTime configuration of the cluster
runtimConfig = cluster.getRunTimeConfiguration();
client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort());
AerospikeUtils.printNodes(client);
}
@Test
public void testSuccess() throws Exception {
String[] args = new String[]{"-D" + ReadWriteAvroParquetFiles.INPUT_PATH + "=" + input,
"-D" + ReadFromHdfsWriteToAerospikeSpark.OUTPUT_PATH + "=" + output,
"-D" + ReadFromHdfsWriteToAerospikeSpark.IS_RUN_LOCALLY + "=true",
"-D" + ReadFromHdfsWriteToAerospikeSpark.DEFAULT_FS + "=file:///",
"-D" + ReadFromHdfsWriteToAerospikeSpark.NUM_PARTITIONS + "=1",
"-D" + ReadFromHdfsWriteToAerospikeSpark.AEROSPIKE_NAMESPACE + "="
+ runtimConfig.getNameSpaceName(),
"-D" + ReadFromHdfsWriteToAerospikeSpark.AEROSPIKE_HOSTNAME + "=127.0.0.1",
"-D" + ReadFromHdfsWriteToAerospikeSpark.AEROSPIKE_PORT + "="
+ runtimConfig.getServicePort(),
"-D" + ReadFromHdfsWriteToAerospikeSpark.AEROSPIKE_SETNAME + "=" + setName,
"-D" + ReadFromHdfsWriteToAerospikeSpark.KEY_NAME + "=emp_id",
"-D" + ReadFromHdfsWriteToAerospikeSpark.VALUE_NAME + "=emp_name"};
ReadFromHdfsWriteToAerospikeSpark.main(args);
ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output))
.build();
//Use .withConf(FS.getConf())
//for reading from a diferent HDFS and not local , by default the fs is local
GenericData.Record event = null;
while ((event = (GenericData.Record) reader.read()) != null) {
Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo(
AvroUtils.convertAvroPOJOtoByteArray(event,
Employee.getClassSchema()), Employee.getClassSchema());
LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
Assert.assertTrue(outputEvent.getEmpId().equals(1) || outputEvent.getEmpId().equals(2));
}
reader.close();
//Fetch both the keys from Aerospike
WritePolicy policy = new WritePolicy();
Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, 1);
Record result1 = client.get(policy, key1);
Assert.assertNotNull(result1);
Assert.assertEquals(result1.getValue("emp_name").toString(), "Maverick1");
Key key2 = new Key(runtimConfig.getNameSpaceName(), setName, 2);
Record result2 = client.get(policy, key2);
Assert.assertNotNull(result2);
Assert.assertEquals(result2.getValue("emp_name").toString(), "Maverick2");
}
@After
public void cleanup() throws Exception {
FileUtils.deleteDirectory(new File(BASEDIR));
if (cluster != null) {
// Stop the cluster
cluster.stop(true);
}
if (client != null) {
client.close();
}
}
}