Read from Aerospike from Spark via MapPartitions

Read from Aerospike  with a spark application via mapPartitions in Spark

 

Problem Statement :  We have keys saved in  file present on HDFS and for the given keys one has to lookup values from Aerospike , and save it back on HDFS.

 

Dependency

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>com.big.data</groupId>
        <artifactId>avro-schema</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.5.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.aerospike</groupId>
        <artifactId>aerospike-client</artifactId>
        <version>3.2.2</version>
    </dependency>
    <dependency>
        <groupId>com.aerospike.unit</groupId>
        <artifactId>aerospike-unit</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
</dependencies>


[addToAppearHere]

 

Code :

package com.big.data.spark;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.apache.spark.sql.types.DataTypes.StringType;


public class ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition 
                                    extends Configured implements Tool, Closeable {

    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";

    //Aerospike related properties
    public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
    public static final String AEROSPIKE_PORT = "aerospike.port";
    public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
    public static final String AEROSPIKE_SETNAME = "aerospike.set.name";

    // For Dem key is emp_id and value is emp_name
    public static final String KEY_NAME = "avro.key.name";
    public static final String VALUE_NAME = "avro.value.name";

    public static final String EMPLOYEE_COUNTRY_KEY_NAME = "emp_country";


    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , 
                //after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);
        String aerospikeHostname = conf.get(AEROSPIKE_HOSTNAME);
        int aerospikePort = conf.getInt(AEROSPIKE_PORT, 3000);
        String namespace = conf.get(AEROSPIKE_NAMESPACE);
        String setName = conf.get(AEROSPIKE_SETNAME);

        String valueName = conf.get(VALUE_NAME);

        //Get spark context, This is the central context , 
        //which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                conf.get(DEFAULT_FS), this.getClass());
        sqlContext = new SQLContext(javaSparkContext);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        // Avro schema to StructType conversion
        final StructType outPutSchemaStructType = (StructType) SchemaConverters
                                                   .toSqlType(Employee.getClassSchema())
                                                   .dataType();

        final StructType inputSchema = new StructType(new StructField[]
                              {new StructField("emp_id", StringType, false, Metadata.empty())});

        // read data from parquetfile, 
        // the schema of the data is taken from the avro schema
        DataFrame inputDf = sqlContext.read().schema(inputSchema).text(inputPath);

        // convert DataFrame into JavaRDD
        // the rows read from the parquetfile is converted into a Row object . 
        // Row has same schema as that of the parquet file roe
        JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();

        // Data read from parquet has same schema as that of avro (Empoyee Avro). 
        // Key is employeeId and value is EmployeeName
        // In the map there is no special function to initialize or shutdown the Aerospike client.
        JavaRDD<Row> returnedRowJavaRDD = rowJavaRDD.mapPartitions
       (new InsertIntoAerospike(aerospikeHostname, aerospikePort, namespace,
                                                    setName,"emp_id", valueName));

        //Map is just a transformation in Spark hence a action 
        // like write(), collect() is needed to carry out the action.
        // Remeber spark does Lazy evaluation, without a action transformations will not execute.

        DataFrame outputDf = sqlContext.createDataFrame(returnedRowJavaRDD, outPutSchemaStructType);

        // Convert JavaRDD to dataframe and save into parquet file
        outputDf
                .write()
                .format(Employee.class.getCanonicalName())
                .parquet(outputPath);

        return 0;
    }

    // Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
    // No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transiet
    // In the call() , for each record insert into Aerospike , this can be coverted into batch too
    public static class InsertIntoAerospike 
                          implements FlatMapFunction<Iterator<Row>, Row> {

        // not making it static , as it will not be serialized and sent to executors
        private final String aerospikeHostName;
        private final int aerospikePortNo;
        private final String aerospikeNamespace;
        private final String aerospikeSetName;
        private final String keyColumnName;
        private final String valueColumnName;

        // The Aerospike client is not serializable 
       //  and neither there is a need to instatiate on driver
        private transient AerospikeClient client;
        private transient WritePolicy policy;

        public InsertIntoAerospike(String hostName, int portNo, String nameSpace, String setName, 
                                                      String keyColumnName, String valueColumnName) {
            this.aerospikeHostName = hostName;
            this.aerospikePortNo = portNo;
            this.aerospikeNamespace = nameSpace;
            this.aerospikeSetName = setName;
            this.keyColumnName = keyColumnName;
            this.valueColumnName = valueColumnName;
        }

        @Override
        public Iterable<Row> call(Iterator<Row> rowIterator) throws Exception {

            // rowIterator   points to the whole partition 
            // (all the records in the partition) , not a single record.
            // The partition can be of Map Task or Reduce Task
            // This is where you initialize your service , 
            // mapPartition resemble setup , map , cleanup of MapReduce,
            // Drawback of mapPartition is it returns after,
            //  processing the whole chunk not after every row.

            // This will be held in memory till whole partition is evaluated
            List<Row> rowList = new ArrayList<>();

            //This is the place where you initialise your service
            policy = new WritePolicy();
            client = new AerospikeClient(aerospikeHostName, aerospikePortNo);

            // All the code before iterating over  rowIterator will be executed only once
           //  as the call() is called only once for each partition

            while (rowIterator.hasNext()) {

                Row row = rowIterator.next();

                String empID = (String) row.get(row.fieldIndex(keyColumnName));

                // As rows have schema with fieldName and Values being part of the Row
                Key key = new Key(aerospikeNamespace, aerospikeSetName, Integer.parseInt(empID));

                //Get from Aerospike  for a given employee id the employeeName
                Record result1 = client.get(policy, key);
                Employee employee = new Employee();
                employee.setEmpId(Integer.valueOf(empID));
                employee.setEmpName(result1.getValue(valueColumnName).toString());

                // This by default is available for Every key present in Aerospike
                employee.setEmpCountry(result1.getValue(EMPLOYEE_COUNTRY_KEY_NAME).toString());

                // creation of Employee object could have been skipped its just to make things clear
                // Convert Employee Avro To Object[]
                Object[] outputArray = new Object[Employee.getClassSchema().getFields().size()];
                for (Schema.Field field : Employee.getClassSchema().getFields()) {
                    outputArray[field.pos()] = employee.get(field.pos());

                }

                rowList.add(RowFactory.create(outputArray));

            }

            //Shutdown the service gracefully as this is called only once for the whole partition
            IOUtils.closeQuietly(client);

            return rowList;
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition(), args);
    }

}
[addToAppearHere]

Aerospike Unit Test framework:


Integration Test
:

package com.big.data.spark;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.hadoop.ParquetReader;

import java.io.File;


public class ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartitionTest {

    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
    private static final String BASEDIR = 
        "/tmp/ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartitionTest/avroparquetInputFile/" 
                                                              + System.currentTimeMillis()+ "/";
    private String input;
    private String output;

    private AerospikeSingleNodeCluster cluster;
    private AerospikeRunTimeConfig runtimConfig;
    private AerospikeClient client;

    //Aerospike unit related Params
    private String memorySize = "64M";
    private String setName = "BIGTABLE";
    private String binName = "BIGBIN";
    private String nameSpace = "RandomNameSpace";

    @Before
    public void setUp() throws Exception {

        input = BASEDIR + "input/";
        output = BASEDIR + "output/";

        Configuration conf = new Configuration();
        conf.set("fs.default.fs", "file:///");
        FileSystem fs = FileSystem.get(conf);

        //Writing all the keys in Hdfs , input to the Job
        FSDataOutputStream out = fs.create(new Path(input, "part000"));
        out.write("1\n".getBytes());
        out.write("2\n".getBytes());
        //not exsistent key
        out.close();

        // Start Aerospike MiniCluster
        // Instatiate the cluster with NameSpaceName , memory Size.
        // One can use the default constructor and retieve nameSpace,
        // Memory info from cluster.getRunTimeConfiguration();
        cluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
        cluster.start();
        // Get the runTime configuration of the cluster
        runtimConfig = cluster.getRunTimeConfiguration();
        client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort());
        AerospikeUtils.printNodes(client);

        // Insert Values in Aerospike for the appropriate keys

        //Fetch both the keys from Aerospike
        WritePolicy policy = new WritePolicy();
        Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, 1);
        Bin bin1 = new Bin("emp_name", "Maverick1");
        Bin bin2 = 
         new Bin(ReadKeysFromHdfsgetValuesfromAerospikeSpark.EMPLOYEE_COUNTRY_KEY_NAME, "DE");
        client.put(policy, key1, bin1, bin2);

        Key key2 = new Key(runtimConfig.getNameSpaceName(), setName, 2);
        Bin bin3 = new Bin("emp_name", "Maverick2");
        Bin bin4 = 
          new Bin(ReadKeysFromHdfsgetValuesfromAerospikeSpark.EMPLOYEE_COUNTRY_KEY_NAME, "IND");

        client.put(policy, key2, bin3, bin4);

    }

    @Test
    public void testSuccess() throws Exception {

    String[] args = new String[]{"-D" + ReadWriteAvroParquetFiles.INPUT_PATH + "=" + input,
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.OUTPUT_PATH + "=" + output,
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.IS_RUN_LOCALLY + "=true",
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.DEFAULT_FS + "=file:///",
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.NUM_PARTITIONS + "=1",
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.AEROSPIKE_NAMESPACE + "=" 
                                                                 + runtimConfig.getNameSpaceName(),
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.AEROSPIKE_HOSTNAME + "=127.0.0.1",
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.AEROSPIKE_PORT + "="
                                                                     + runtimConfig.getServicePort(),
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.AEROSPIKE_SETNAME + "=" + setName,
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.KEY_NAME + "=emp_id",
    "-D" + ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.VALUE_NAME + "=emp_name"};

        ReadKeysFromHdfsgetValuesfromAerospikeSparkMapPartition.main(args);

        // Read from HDFS that the key and values are available
        ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
        //Use .withConf(FS.getConf()) 
        //for reading from a diferent HDFS and not local , by default the fs is local
        GenericData.Record event = null;
        while ((event = (GenericData.Record) reader.read()) != null) {
            Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo
                                     (AvroUtils.convertAvroPOJOtoByteArray(event, 
                                             Employee.getClassSchema()), Employee.getClassSchema());
            LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
            Assert.assertTrue(outputEvent.getEmpId().equals(1) || outputEvent.getEmpId().equals(2));
            Assert.assertTrue(outputEvent.getEmpName().equals("Maverick1") || outputEvent.getEmpName().equals("Maverick2"));
        }
        reader.close();

    }

    @After
    public void cleanup() throws Exception {
        FileUtils.deleteDirectory(new File(BASEDIR));

        if (cluster != null) {
            // Stop the cluster
            cluster.stop(true);
        }
        if (client != null) {
            client.close();
        }
    }
}