Read from Aerospike

Read From Aerospike Using Spark via Map Transformation  in Spark

Problem Statement We have keys saved in  file present on HDFS and for the given keys one has to lookup values from Aerospike , and save it back on HDFS.

 

Dependency

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>com.big.data</groupId>
        <artifactId>avro-schema</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.5.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.aerospike</groupId>
        <artifactId>aerospike-client</artifactId>
        <version>3.2.2</version>
    </dependency>
    <dependency>
        <groupId>com.aerospike.unit</groupId>
        <artifactId>aerospike-unit</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
</dependencies>


Code :

package com.big.data.spark;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.io.Closeable;
import java.io.IOException;

import static org.apache.spark.sql.types.DataTypes.StringType;


public class ReadKeysFromHdfsgetValuesfromAerospikeSpark 
                             extends Configured implements Tool, Closeable {

    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";

    //Aerospike related properties
    public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
    public static final String AEROSPIKE_PORT = "aerospike.port";
    public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
    public static final String AEROSPIKE_SETNAME = "aerospike.set.name";

    // For Dem key is emp_id and value is emp_name
    public static final String KEY_NAME = "avro.key.name";
    public static final String VALUE_NAME = "avro.value.name";

    public static final String EMPLOYEE_COUNTRY_KEY_NAME = "emp_country";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , 
                //after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);
        String aerospikeHostname = conf.get(AEROSPIKE_HOSTNAME);
        int aerospikePort = conf.getInt(AEROSPIKE_PORT, 3000);
        String namespace = conf.get(AEROSPIKE_NAMESPACE);
        String setName = conf.get(AEROSPIKE_SETNAME);

        String valueName = conf.get(VALUE_NAME);

        //Get spark context, This is the central context , 
        //which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                            conf.get(DEFAULT_FS), this.getClass());
        sqlContext = new SQLContext(javaSparkContext);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        // Avro schema to StructType conversion
        final StructType outPutSchemaStructType = (StructType) SchemaConverters
                                                     .toSqlType(Employee.getClassSchema()).dataType();

        final StructType inputSchema = new StructType(
               new StructField[]{new StructField("emp_id", StringType, false, Metadata.empty())});

        // read data from parquetfile, the schema of the data is taken from the avro schema
        DataFrame inputDf = sqlContext.read().schema(inputSchema).text(inputPath);

        // convert DataFrame into JavaRDD
        // the rows read from the parquetfile is converted into a Row object .
        //  Row has same schema as that of the parquet file roe
        JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();

        // Data read from parquet has same schema as that of avro (Empoyee Avro). 
        // Key is employeeId and value is EmployeeName
        // In the map there is no special function to initialize or shutdown the Aerospike client.
        JavaRDD<Row> returnedRowJavaRDD = rowJavaRDD.map
              (new InsertIntoAerospike(aerospikeHostname, aerospikePort, namespace, 
                                                         setName, "emp_id",valueName));

        //Map is just a transformation in Spark hence a action 
        //like write(), collect() is needed to carry out the action.
        // Remeber spark does Lazy evaluation, without a action transformations will not execute.

        DataFrame outputDf = sqlContext.createDataFrame(returnedRowJavaRDD, outPutSchemaStructType);

        // Convert JavaRDD to dataframe and save into parquet file
        outputDf
                .write()
                .format(Employee.class.getCanonicalName())
                .parquet(outputPath);

        return 0;
    }

    // Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
    // No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transiet
    // In the map , for each record insert into Aerospike , this can be coverted into batch too
    public static class InsertIntoAerospike implements Function<Row, Row> {

        // not making it static , as it will not be serialized and sent to executors
        private final String aerospikeHostName;
        private final int aerospikePortNo;
        private final String aerospikeNamespace;
        private final String aerospikeSetName;
        private final String keyColumnName;
        private final String valueColumnName;

        // The Aerospike client is not serializable 
        // and neither there is a need to instatiate on driver
        private transient AerospikeClient client;
        private transient WritePolicy policy;

        public InsertIntoAerospike(String hostName, int portNo,
             String nameSpace, String setName, String keyColumnName, String valueColumnName) {
            this.aerospikeHostName = hostName;
            this.aerospikePortNo = portNo;
            this.aerospikeNamespace = nameSpace;
            this.aerospikeSetName = setName;
            this.keyColumnName = keyColumnName;
            this.valueColumnName = valueColumnName;

            //Add Shutdown hook to close the client gracefully
            //This is the place where u can gracefully clean your Service resources 
            //As there is no cleanup() function in Spark Map
            JVMShutdownHook jvmShutdownHook = new JVMShutdownHook();
            Runtime.getRuntime().addShutdownHook(jvmShutdownHook);

        }

        @Override
        public Row call(Row row) throws Exception {
            // Intitialize on the first call
            if (client == null) {
                policy = new WritePolicy();
                // how to close the client gracefully ?
                client = new AerospikeClient(aerospikeHostName, aerospikePortNo);
            }

            String empID = (String) row.get(row.fieldIndex(keyColumnName));

            // As rows have schema with fieldName and Values being part of the Row
            Key key = new Key(aerospikeNamespace, aerospikeSetName, Integer.parseInt(empID));

            //Get from Aerospike  for a given employee id the employeeName
            Record result1 = client.get(policy, key);
            Employee employee = new Employee();
            employee.setEmpId(Integer.valueOf(empID));
            employee.setEmpName(result1.getValue(valueColumnName).toString());

            // This by default is available for Every key present in Aerospike
            employee.setEmpCountry(result1.getValue(EMPLOYEE_COUNTRY_KEY_NAME).toString());

            // creation of Employee object could have been skipped its just to make things clear
            // Convert Employee Avro To Object[]
            Object[] outputArray = new Object[Employee.getClassSchema().getFields().size()];
            for (Schema.Field field : Employee.getClassSchema().getFields()) {
                outputArray[field.pos()] = employee.get(field.pos());

            }

            return RowFactory.create(outputArray);

        }

        //When JVM is going down close the client
        private class JVMShutdownHook extends Thread {
            @Override
            public void run() {
                System.out.println("JVM Shutdown Hook: Thread initiated , 
                                               shutting down service gracefully");
                IOUtils.closeQuietly(client);
            }
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new ReadKeysFromHdfsgetValuesfromAerospikeSpark(), args);
    }

}
[addToAppearHere]

Aerospike Unit Test framework:

Integration Test:

package com.big.data.spark;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.hadoop.ParquetReader;

import java.io.File;


public class ReadKeysFromHdfsgetValuesfromAerospikeSparkTest {

    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
    private static final String BASEDIR = 
        "/tmp/ReadKeysFromHdfsgetValuesfromAerospikeSparkTest/avroparquetInputFile/" 
                               + System.currentTimeMillis() + "/";
    private String input;
    private String output;

    private AerospikeSingleNodeCluster cluster;
    private AerospikeRunTimeConfig runtimConfig;
    private AerospikeClient client;

    //Aerospike unit related Params
    private String memorySize = "64M";
    private String setName = "BIGTABLE";
    private String binName = "BIGBIN";
    private String nameSpace = "RandomNameSpace";

    @Before
    public void setUp() throws Exception {

        input = BASEDIR + "input/";
        output = BASEDIR + "output/";


        Configuration conf = new Configuration();
        conf.set("fs.default.fs","file:///");
        FileSystem fs =  FileSystem.get(conf);



        //Writing all the keys in Hdfs , input to the Job
        FSDataOutputStream out = fs.create(new Path(input, "part000"));
        out.write("1\n".getBytes());
        out.write("2\n".getBytes());
        //not exsistent key
        out.close();


        // Start Aerospike MiniCluster
        // Instatiate the cluster with NameSpaceName , memory Size.
        // One can use the default constructor and retieve nameSpace
        // ,Memory info from cluster.getRunTimeConfiguration();
        cluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
        cluster.start();
        // Get the runTime configuration of the cluster
        runtimConfig = cluster.getRunTimeConfiguration();
        client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort());
        AerospikeUtils.printNodes(client);


        // Insert Values in Aerospike for the appropriate keys

        //Fetch both the keys from Aerospike
        WritePolicy policy = new WritePolicy();
        Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, 1);
        Bin bin1 = new Bin("emp_name", "Maverick1");
        Bin bin2 = 
            new Bin(ReadKeysFromHdfsgetValuesfromAerospikeSpark.EMPLOYEE_COUNTRY_KEY_NAME, "DE");
        client.put(policy, key1,bin1,bin2);

        Key key2 = new Key(runtimConfig.getNameSpaceName(), setName, 2);
        Bin bin3 = new Bin("emp_name", "Maverick2");
        Bin bin4 = 
             new Bin(ReadKeysFromHdfsgetValuesfromAerospikeSpark.EMPLOYEE_COUNTRY_KEY_NAME, "IND");

        client.put(policy, key2,bin3,bin4);


    }

    @Test
    public void testSuccess() throws Exception {

    String[] args = new String[]{
         "-D" + ReadWriteAvroParquetFiles.INPUT_PATH + "=" + input,
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.OUTPUT_PATH + "=" + output,
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.IS_RUN_LOCALLY + "=true",
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.DEFAULT_FS + "=file:///",
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.NUM_PARTITIONS + "=1",
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.AEROSPIKE_NAMESPACE + "="
                                                         + runtimConfig.getNameSpaceName(),
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.AEROSPIKE_HOSTNAME + "=127.0.0.1",
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.AEROSPIKE_PORT + "="
                                                         + runtimConfig.getServicePort(),
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.AEROSPIKE_SETNAME + "=" + setName,
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.KEY_NAME + "=emp_id",
         "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSpark.VALUE_NAME + "=emp_name"};

        ReadKeysFromHdfsgetValuesfromAerospikeSpark.main(args);


        // Read from HDFS that the key and values are available
        ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
        //Use .withConf(FS.getConf()) 
        //for reading from a diferent HDFS and not local , by default the fs is local
        GenericData.Record event = null;
        while ((event = (GenericData.Record) reader.read()) != null) {
            Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo(
                                        AvroUtils.convertAvroPOJOtoByteArray(event, 
                                           Employee.getClassSchema()), Employee.getClassSchema());
            LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
            Assert.assertTrue(outputEvent.getEmpId().equals(1) || outputEvent.getEmpId().equals(2));
            Assert.assertTrue
       (outputEvent.getEmpName().equals("Maverick1") || outputEvent.getEmpName().equals("Maverick2"));
        }
        reader.close();

    }

    @After
    public void cleanup() throws Exception {
        FileUtils.deleteDirectory(new File(BASEDIR));

        if (cluster != null) {
            // Stop the cluster
            cluster.stop(true);
        }
        if (client != null) {
            client.close();
        }
    }
}