Hbase Read/Write from Map Reduce

Hbase read write from Map reduce

Problem :   A input key file is present in HDFS  which needs to be read , based on the input key , Hbase needs to be queried to get the value and save the value back in HDFS .   (Github)

dependency

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0-cdh5.9.0<version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0-cdh5.9.0<version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.9.0<version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.6.0-cdh5.9.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>1.2.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.0-cdh5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-testing-util</artifactId>
    </dependency>
  </dependencies>
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>

[addToAppearHere]

Mapper :

 

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;


public class HbaseFetchMapper extends Mapper<LongWritable, Text, Text, Text> {

    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class);


    // Hbase Table, conn Types used for tableCreate , table drop , put , get
    private Table table;
    private Connection conn;

    //columfamilyName in Hbase
    private byte[] columnFamilyName;

    //Batchsize of get
    private int batchSize;


    //Columns for which the data needs to be extracted
    private final Set<byte[]> columnNames = new HashSet<>();

    private final List<Get> batchGet = new ArrayList<>();

    // Reusable writables
    private final Text outPutKey = new Text();
    private final Text outPutValue = new Text();


    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Properties prop = new Properties();
        try (InputStream input = ClassLoader.getSystemResourceAsStream(Constant.CONFIG_FILE_NAME)) {
            prop.load(input);
        }

        String tableName = prop.getProperty(Constant.HBASE_TABLE_NAME);
        columnFamilyName = prop.getProperty(Constant.HBASE_TABLE_CF_NAME).getBytes();
        batchSize = Integer.parseInt(prop.getProperty(Constant.HBASE_GET_BATCH_SIZE));
        LOGGER.info("TableName : {}", tableName);
        LOGGER.info("ColumnFamilyName : {}", Bytes.toString(columnFamilyName));
        LOGGER.info("BatchSize : {}", batchSize);


        for (String column : prop.getProperty(Constant.HBASE_COLUMN_NAMES).split(",")) {
            columnNames.add(column.getBytes());
            LOGGER.info("ColumName to Fetch : {}", column);
        }

        //Hbase conf has appropriate keys set for fs.defaultFS , 
        //hbase.zookeeper.quorum, hbase.zookeeper.property.clientPort
         conn = ConnectionFactory.createConnection(context.getConfiguration());

        // open connection to Table
        table = conn.getTable(TableName.valueOf(tableName));
    }

    /**
     * helps in parsing the result of a key
     * @param context
     * @throws InterruptedException
     */
    private void getColumnValuesFromResult(Context context) throws InterruptedException {
        try {
            for (Result res : table.get(batchGet)) {
                // getRow() gets back the key for which the Row was fetched
                String outKey = Bytes.toString(res.getRow());
                for (byte[] column : columnNames) {
                 // getValue helps in retrieving the value for a given columnFamilyName and column
                    String outValue = Bytes.toString(res.getValue(columnFamilyName, column));
                    if (StringUtils.isNotEmpty(outValue) && StringUtils.isNotBlank(outValue)) {
                        // MR counter incremented to mark successful fetch of a key
                        context.getCounter(Constant.HBASE_FETCH_COUNTERS.class.getSimpleName(), new String(column))
                                .increment(1);
                        outPutKey.set(outKey);
                        outPutValue.set(outValue);
                        context.write(outPutKey, outPutValue);
                    }
                }
            }
        } catch (IOException e) {
            context.getCounter(Constant.HBASE_FETCH_COUNTERS.class.getSimpleName(),
                     Constant.HBASE_FETCH_COUNTERS.FETCH_EXCEPTIONS.name()).increment(1);
        } finally {
            batchGet.clear();
        }
    }

    // Input keys in HDFS , for the corresponding keys , 
    // fetch data from Hbase , the output needs to be saved into HDFS.
    // This is more of a selective Hbase dump, based on input keys
    @Override
    protected void map(LongWritable key, Text value, Context context) 
                                         throws IOException, InterruptedException {
     // TextInputformat has been used hence line offset ->LongWritable,  
     // actual key to fetch data from Hbase is Text
        Get get = new Get(value.toString().getBytes());
        //add required Columns
        for (byte[] column : columnNames) {
            get.addColumn(columnFamilyName, column);
        }
        //add to the batch , batch gets are more efficient.
        batchGet.add(get);
        if (batchGet.size() == batchSize) {
            getColumnValuesFromResult(context);
        }
    }

    @Override
    protected void cleanup(Context context) 
                          throws IOException, InterruptedException {
        try {

            if (!batchGet.isEmpty()) {
                LOGGER.info("keys left in the Batach before cleanup is {} ", batchGet.size());
                getColumnValuesFromResult(context);
            }
        } finally {
            IOUtils.closeQuietly(table);
            conn.close();
        }
    }
}
[addToAppearHere]

Constants :

package com.big.data.mapreduce.hbaseio;


public final class Constant {

    public enum HBASE_FETCH_COUNTERS {
        FETCH_EXCEPTIONS
    }

    public static final String CONFIG_FILE_NAME = "hbase.config.properties";
    public static final String HBASE_TABLE_NAME = "hbase.table.name";
    public static final String HBASE_COLUMN_NAMES = "hbase.get.column.names";
    public static final String HBASE_GET_BATCH_SIZE = "hbase.get.batch.size";
    public static final String HBASE_TABLE_CF_NAME = "hbase.table.column.family.name";

    public static final String INPUT_PATH = "input.path";
    public static final String JOB_NAME = "job.name";
    public static final String OUTPUT_PATH = "output.path";
}

Key Take Aways :
1. In Setup of ech Mapper connection to Hbase should be established
2. In map function for every input key , data is fetched from the mapper
3. To scan from Hbase use TableInputFormat
Driver :

 

package com.big.data.mapreduce.hbaseio;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class HbaseFetchDriver extends Configured implements Tool {

    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class);

    public static void checkNotBlank(String key, String message) {
        if (StringUtils.isBlank(key)) {
            throw new IllegalArgumentException(message);
        }
    }

    private Job createJob() throws IOException {
        Configuration driverConf = getConf();


        // Setting job name
        String jobName = driverConf.get(Constant.JOB_NAME);
        checkNotBlank(jobName, "Job Name is mandatory ");

        Job job = new Job(driverConf, jobName);
        Configuration jobconf = job.getConfiguration();

        // Setting input path
        String inputPathString = jobconf.get(Constant.INPUT_PATH);
        checkNotBlank(inputPathString, "Input Path is mandatory");
        LOGGER.info("Input path is {} ", inputPathString);

        // Setting output path
        String outputPathString = jobconf.get(Constant.OUTPUT_PATH);
        checkNotBlank(outputPathString, "Output Path is mandatory");
        LOGGER.info("OutPut path is {} ", outputPathString);


        FileInputFormat.addInputPaths(job, inputPathString);
        FileOutputFormat.setOutputPath(job, new Path(outputPathString));
        jobconf.set("mapred.input.dir.recursive", "true");

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setMapperClass(HbaseFetchMapper.class);
        job.setNumReduceTasks(0);
        return job;
    }

    @Override
    public int run(String[] args) throws Exception {
        // Creating and submitting job
        Job job = createJob();
        job.submit();
        // Checking for status of job after completion
        job.waitForCompletion(true);

        return job.isSuccessful() ? 1 : 0;
    }

    public static void main(String[] args) throws Exception {
        if (ToolRunner.run(new HbaseFetchDriver(), args) == 0) {
            throw new RuntimeException("Job has failed");
        }
    }
}
[addToAppearHere]

Integration Test:  (Github)

 

package com.big.data.mapreduce.hbaseio;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;

public class HbaseFetchTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class);

    //Embedded Hbase + hdfs
    private static HBaseTestingUtility hbaseEmbeddedClusterHandler;
    private static FileSystem fs;
    private static final String BASEDIR = "/tmp/embeddedHbase/";
    private static final String HDFS_INPUT_FOLDER = 
                               BASEDIR + "/input/" + System.currentTimeMillis() + "/";
    private static final String HDFS_OUTPUT_FOLDER = 
                               BASEDIR + "/output/"+System.currentTimeMillis() + "/";
    private static String tableName;
    private static String columnFamilyName;
    private static Configuration hbaseConf;

    @BeforeClass
    public static void setUp() throws Exception {

        Properties prop = new Properties();
        try (InputStream input = ClassLoader
                           .getSystemResourceAsStream(Constant.CONFIG_FILE_NAME)) {
            prop.load(input);
        }

        tableName = prop.getProperty(Constant.HBASE_TABLE_NAME);
        columnFamilyName = prop.getProperty(Constant.HBASE_TABLE_CF_NAME);
        // init embedded hbase

        //Instantiate hbase minicluster
        hbaseEmbeddedClusterHandler = new HBaseTestingUtility();

        // Start Hbase minicluster with Number of region server = 1
        hbaseEmbeddedClusterHandler.startMiniCluster(1);

        fs = hbaseEmbeddedClusterHandler.getTestFileSystem();
        hbaseConf = hbaseEmbeddedClusterHandler.getConfiguration();

        // create HDFS_TEST_INPUT_FOLDER
        fs.mkdirs(new Path(HDFS_INPUT_FOLDER));
    }

    /**
     * To Create table in Hbase
     *
     * @throws IOException
     */
    @Before
    public void createTable() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(hbaseConf);
             Admin admin = connection.getAdmin()) {

            HTableDescriptor table =
                    new HTableDescriptor(TableName.valueOf(tableName));
            table.addFamily(new HColumnDescriptor(columnFamilyName));

            if (!admin.tableExists(table.getTableName())) {
                LOGGER.info("Creating Table {} ", tableName);
                admin.createTable(table);
            }
        }

    }

    /**
     * To delete Table in Hbase
     *
     * @throws IOException
     */
    @After
    public void deleteTable() throws IOException {
        try (Connection connection = ConnectionFactory
                                      .createConnection(hbaseConf); 
                                          Admin admin = connection.getAdmin()) {

            HTableDescriptor table =
                    new HTableDescriptor(TableName.valueOf(tableName));
            table.addFamily(new HColumnDescriptor(columnFamilyName));

            if (!admin.tableExists(table.getTableName())) {
                LOGGER.info("Disabling And Deleting Table {} ", tableName);
                admin.disableTable(table.getTableName());
                admin.deleteTable(table.getTableName());
            }
        }

    }

    private void putIntoHbase(Table table, String columnFamily,
                                 String columnName, String key, String value) throws
            IOException {
        Put put = new Put(key.getBytes());
        put.addColumn(columnFamily.getBytes(), columnName.getBytes(), value.getBytes());
        table.put(put);
    }

    private void createTestDataInHbaseAndHdfs() throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(hbaseConf);
                               Table table = connection.getTable(TableName.valueOf(tableName))) {

            //Only col1, col2, col3 will be fetched as provided 
            //in hbase.config.properties in test/resources
            //Batch size is 3  hence inserting 4 keys
            //Key k1 with one good  values
            putIntoHbase(table, columnFamilyName, "col1", "k1", "k1v1");                       

            //col4 key not to be considered for fetching
            putIntoHbase(table, columnFamilyName, "col4", "k1", "k1v2");

            //Key k2 with two values
            putIntoHbase(table, columnFamilyName, "col1", "k2", "k2v1");
            putIntoHbase(table, columnFamilyName, "col2", "k2", "k2v2");

            //key k3 with three values
            putIntoHbase(table, columnFamilyName, "col1", "k3", "k3v1");
            putIntoHbase(table, columnFamilyName, "col2", "k3", "k3v2");
            putIntoHbase(table, columnFamilyName, "col3", "k3", "k3v3");

            //col4 key not to be considered for fetching
            putIntoHbase(table, columnFamilyName, "col4", "k3", "k3v4");

            //Key k4 with unknown columnName
            putIntoHbase(table, columnFamilyName, "unknown", "k4", "v2");

            //Writing all the keys in Hdfs , input to the Job
            FSDataOutputStream out = fs.create(new Path(HDFS_INPUT_FOLDER, "part000"));
            out.write("k1\n".getBytes());
            out.write("k2\n".getBytes());
            out.write("k3\n".getBytes());
            out.write("k4\n".getBytes());
            //not exsistent key
            out.write("k5\n".getBytes());
            out.close();
        }
    }

    @Test
    public void testWithValidFiles() throws Exception {
        createTestDataInHbaseAndHdfs();

        Configuration conf = new Configuration();

        //mandatory param to access Hdfs which is being retrieved from the minicluster started
        conf.set("fs.defaultFS", hbaseConf.get("fs.defaultFS"));
        //mandatory parameters to access hbase , which is being retrieved from minicluster started
        conf.set("hbase.zookeeper.quorum", hbaseConf.get("hbase.zookeeper.quorum"));
        conf.set("hbase.zookeeper.property.clientPort",
                                     hbaseConf.get("hbase.zookeeper.property.clientPort"));
        conf.set(Constant.JOB_NAME, "MY_JOB");
        conf.set(Constant.INPUT_PATH, HDFS_INPUT_FOLDER);
        conf.set(Constant.OUTPUT_PATH, HDFS_OUTPUT_FOLDER);
        HbaseFetchDriver test = new HbaseFetchDriver();
        test.setConf(conf);
        test.run(null);

        Multimap<String, Object> myMultimap = ArrayListMultimap.create();
        //Read data from outPut path in HDFS
        BufferedReader bfr = new BufferedReader
                      (new InputStreamReader(fs.open(
                            new Path(HDFS_OUTPUT_FOLDER, "part-m-00000"))));
        String str = null;
        while ((str = bfr.readLine()) != null) {
            String[] keyValye = str.split("\t");
            myMultimap.put(keyValye[0], keyValye[1]);

        }
        bfr.close();

        Assert.assertEquals(1, myMultimap.get("k1").size());
        Assert.assertEquals(true, myMultimap.get("k1").contains("k1v1"));
        Assert.assertEquals(false, myMultimap.get("k1").contains("k1v2"));

        Assert.assertEquals(2, myMultimap.get("k2").size());
        Assert.assertEquals(true, myMultimap.get("k2").contains("k2v1"));
        Assert.assertEquals(true, myMultimap.get("k2").contains("k2v2"));

        Assert.assertEquals(3, myMultimap.get("k3").size());
        Assert.assertEquals(true, myMultimap.get("k3").contains("k3v1"));
        Assert.assertEquals(true, myMultimap.get("k3").contains("k3v2"));
        Assert.assertEquals(true, myMultimap.get("k3").contains("k3v3"));
        Assert.assertEquals(false, myMultimap.get("k3").contains("k3v4"));

        Assert.assertEquals(true, myMultimap.get("k4").isEmpty());
        Assert.assertEquals(true, myMultimap.get("k5").isEmpty());

    }

    @AfterClass
    public static void shutdown() throws Exception {
        if (hbaseEmbeddedClusterHandler != null) {
            hbaseEmbeddedClusterHandler.shutdownMiniCluster();
        }
        FileUtils.deleteQuietly(new File(BASEDIR));
    }
}