Hbase Read/Write from Map Reduce
Hbase read write from Map reduce
Problem : A input key file is present in HDFS which needs to be read , based on the input key , Hbase needs to be queried to get the value and save the value back in HDFS . (Github)
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.9.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> </dependency> </dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Mapper :
import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; public class HbaseFetchMapper extends Mapper<LongWritable, Text, Text, Text> { private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class); // Hbase Table, conn Types used for tableCreate , table drop , put , get private Table table; private Connection conn; //columfamilyName in Hbase private byte[] columnFamilyName; //Batchsize of get private int batchSize; //Columns for which the data needs to be extracted private final Set<byte[]> columnNames = new HashSet<>(); private final List<Get> batchGet = new ArrayList<>(); // Reusable writables private final Text outPutKey = new Text(); private final Text outPutValue = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { Properties prop = new Properties(); try (InputStream input = ClassLoader.getSystemResourceAsStream(Constant.CONFIG_FILE_NAME)) { prop.load(input); } String tableName = prop.getProperty(Constant.HBASE_TABLE_NAME); columnFamilyName = prop.getProperty(Constant.HBASE_TABLE_CF_NAME).getBytes(); batchSize = Integer.parseInt(prop.getProperty(Constant.HBASE_GET_BATCH_SIZE)); LOGGER.info("TableName : {}", tableName); LOGGER.info("ColumnFamilyName : {}", Bytes.toString(columnFamilyName)); LOGGER.info("BatchSize : {}", batchSize); for (String column : prop.getProperty(Constant.HBASE_COLUMN_NAMES).split(",")) { columnNames.add(column.getBytes()); LOGGER.info("ColumName to Fetch : {}", column); } //Hbase conf has appropriate keys set for fs.defaultFS , //hbase.zookeeper.quorum, hbase.zookeeper.property.clientPort conn = ConnectionFactory.createConnection(context.getConfiguration()); // open connection to Table table = conn.getTable(TableName.valueOf(tableName)); } /** * helps in parsing the result of a key * @param context * @throws InterruptedException */ private void getColumnValuesFromResult(Context context) throws InterruptedException { try { for (Result res : table.get(batchGet)) { // getRow() gets back the key for which the Row was fetched String outKey = Bytes.toString(res.getRow()); for (byte[] column : columnNames) { // getValue helps in retrieving the value for a given columnFamilyName and column String outValue = Bytes.toString(res.getValue(columnFamilyName, column)); if (StringUtils.isNotEmpty(outValue) && StringUtils.isNotBlank(outValue)) { // MR counter incremented to mark successful fetch of a key context.getCounter(Constant.HBASE_FETCH_COUNTERS.class.getSimpleName(), new String(column)) .increment(1); outPutKey.set(outKey); outPutValue.set(outValue); context.write(outPutKey, outPutValue); } } } } catch (IOException e) { context.getCounter(Constant.HBASE_FETCH_COUNTERS.class.getSimpleName(), Constant.HBASE_FETCH_COUNTERS.FETCH_EXCEPTIONS.name()).increment(1); } finally { batchGet.clear(); } } // Input keys in HDFS , for the corresponding keys , // fetch data from Hbase , the output needs to be saved into HDFS. // This is more of a selective Hbase dump, based on input keys @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TextInputformat has been used hence line offset ->LongWritable, // actual key to fetch data from Hbase is Text Get get = new Get(value.toString().getBytes()); //add required Columns for (byte[] column : columnNames) { get.addColumn(columnFamilyName, column); } //add to the batch , batch gets are more efficient. batchGet.add(get); if (batchGet.size() == batchSize) { getColumnValuesFromResult(context); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { if (!batchGet.isEmpty()) { LOGGER.info("keys left in the Batach before cleanup is {} ", batchGet.size()); getColumnValuesFromResult(context); } } finally { IOUtils.closeQuietly(table); conn.close(); } } } [addToAppearHere]
Constants :
package com.big.data.mapreduce.hbaseio; public final class Constant { public enum HBASE_FETCH_COUNTERS { FETCH_EXCEPTIONS } public static final String CONFIG_FILE_NAME = "hbase.config.properties"; public static final String HBASE_TABLE_NAME = "hbase.table.name"; public static final String HBASE_COLUMN_NAMES = "hbase.get.column.names"; public static final String HBASE_GET_BATCH_SIZE = "hbase.get.batch.size"; public static final String HBASE_TABLE_CF_NAME = "hbase.table.column.family.name"; public static final String INPUT_PATH = "input.path"; public static final String JOB_NAME = "job.name"; public static final String OUTPUT_PATH = "output.path"; }
Key Take Aways :
1. In Setup of ech Mapper connection to Hbase should be established
2. In map function for every input key , data is fetched from the mapper
3. To scan from Hbase use TableInputFormat
Driver :
package com.big.data.mapreduce.hbaseio; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class HbaseFetchDriver extends Configured implements Tool { private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class); public static void checkNotBlank(String key, String message) { if (StringUtils.isBlank(key)) { throw new IllegalArgumentException(message); } } private Job createJob() throws IOException { Configuration driverConf = getConf(); // Setting job name String jobName = driverConf.get(Constant.JOB_NAME); checkNotBlank(jobName, "Job Name is mandatory "); Job job = new Job(driverConf, jobName); Configuration jobconf = job.getConfiguration(); // Setting input path String inputPathString = jobconf.get(Constant.INPUT_PATH); checkNotBlank(inputPathString, "Input Path is mandatory"); LOGGER.info("Input path is {} ", inputPathString); // Setting output path String outputPathString = jobconf.get(Constant.OUTPUT_PATH); checkNotBlank(outputPathString, "Output Path is mandatory"); LOGGER.info("OutPut path is {} ", outputPathString); FileInputFormat.addInputPaths(job, inputPathString); FileOutputFormat.setOutputPath(job, new Path(outputPathString)); jobconf.set("mapred.input.dir.recursive", "true"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(HbaseFetchMapper.class); job.setNumReduceTasks(0); return job; } @Override public int run(String[] args) throws Exception { // Creating and submitting job Job job = createJob(); job.submit(); // Checking for status of job after completion job.waitForCompletion(true); return job.isSuccessful() ? 1 : 0; } public static void main(String[] args) throws Exception { if (ToolRunner.run(new HbaseFetchDriver(), args) == 0) { throw new RuntimeException("Job has failed"); } } } [addToAppearHere]
Integration Test: (Github)
package com.big.data.mapreduce.hbaseio; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Properties; public class HbaseFetchTest { private static final Logger LOGGER = LoggerFactory.getLogger(HbaseFetchMapper.class); //Embedded Hbase + hdfs private static HBaseTestingUtility hbaseEmbeddedClusterHandler; private static FileSystem fs; private static final String BASEDIR = "/tmp/embeddedHbase/"; private static final String HDFS_INPUT_FOLDER = BASEDIR + "/input/" + System.currentTimeMillis() + "/"; private static final String HDFS_OUTPUT_FOLDER = BASEDIR + "/output/"+System.currentTimeMillis() + "/"; private static String tableName; private static String columnFamilyName; private static Configuration hbaseConf; @BeforeClass public static void setUp() throws Exception { Properties prop = new Properties(); try (InputStream input = ClassLoader .getSystemResourceAsStream(Constant.CONFIG_FILE_NAME)) { prop.load(input); } tableName = prop.getProperty(Constant.HBASE_TABLE_NAME); columnFamilyName = prop.getProperty(Constant.HBASE_TABLE_CF_NAME); // init embedded hbase //Instantiate hbase minicluster hbaseEmbeddedClusterHandler = new HBaseTestingUtility(); // Start Hbase minicluster with Number of region server = 1 hbaseEmbeddedClusterHandler.startMiniCluster(1); fs = hbaseEmbeddedClusterHandler.getTestFileSystem(); hbaseConf = hbaseEmbeddedClusterHandler.getConfiguration(); // create HDFS_TEST_INPUT_FOLDER fs.mkdirs(new Path(HDFS_INPUT_FOLDER)); } /** * To Create table in Hbase * * @throws IOException */ @Before public void createTable() throws IOException { try (Connection connection = ConnectionFactory.createConnection(hbaseConf); Admin admin = connection.getAdmin()) { HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName)); table.addFamily(new HColumnDescriptor(columnFamilyName)); if (!admin.tableExists(table.getTableName())) { LOGGER.info("Creating Table {} ", tableName); admin.createTable(table); } } } /** * To delete Table in Hbase * * @throws IOException */ @After public void deleteTable() throws IOException { try (Connection connection = ConnectionFactory .createConnection(hbaseConf); Admin admin = connection.getAdmin()) { HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName)); table.addFamily(new HColumnDescriptor(columnFamilyName)); if (!admin.tableExists(table.getTableName())) { LOGGER.info("Disabling And Deleting Table {} ", tableName); admin.disableTable(table.getTableName()); admin.deleteTable(table.getTableName()); } } } private void putIntoHbase(Table table, String columnFamily, String columnName, String key, String value) throws IOException { Put put = new Put(key.getBytes()); put.addColumn(columnFamily.getBytes(), columnName.getBytes(), value.getBytes()); table.put(put); } private void createTestDataInHbaseAndHdfs() throws IOException { try (Connection connection = ConnectionFactory.createConnection(hbaseConf); Table table = connection.getTable(TableName.valueOf(tableName))) { //Only col1, col2, col3 will be fetched as provided //in hbase.config.properties in test/resources //Batch size is 3 hence inserting 4 keys //Key k1 with one good values putIntoHbase(table, columnFamilyName, "col1", "k1", "k1v1"); //col4 key not to be considered for fetching putIntoHbase(table, columnFamilyName, "col4", "k1", "k1v2"); //Key k2 with two values putIntoHbase(table, columnFamilyName, "col1", "k2", "k2v1"); putIntoHbase(table, columnFamilyName, "col2", "k2", "k2v2"); //key k3 with three values putIntoHbase(table, columnFamilyName, "col1", "k3", "k3v1"); putIntoHbase(table, columnFamilyName, "col2", "k3", "k3v2"); putIntoHbase(table, columnFamilyName, "col3", "k3", "k3v3"); //col4 key not to be considered for fetching putIntoHbase(table, columnFamilyName, "col4", "k3", "k3v4"); //Key k4 with unknown columnName putIntoHbase(table, columnFamilyName, "unknown", "k4", "v2"); //Writing all the keys in Hdfs , input to the Job FSDataOutputStream out = fs.create(new Path(HDFS_INPUT_FOLDER, "part000")); out.write("k1\n".getBytes()); out.write("k2\n".getBytes()); out.write("k3\n".getBytes()); out.write("k4\n".getBytes()); //not exsistent key out.write("k5\n".getBytes()); out.close(); } } @Test public void testWithValidFiles() throws Exception { createTestDataInHbaseAndHdfs(); Configuration conf = new Configuration(); //mandatory param to access Hdfs which is being retrieved from the minicluster started conf.set("fs.defaultFS", hbaseConf.get("fs.defaultFS")); //mandatory parameters to access hbase , which is being retrieved from minicluster started conf.set("hbase.zookeeper.quorum", hbaseConf.get("hbase.zookeeper.quorum")); conf.set("hbase.zookeeper.property.clientPort", hbaseConf.get("hbase.zookeeper.property.clientPort")); conf.set(Constant.JOB_NAME, "MY_JOB"); conf.set(Constant.INPUT_PATH, HDFS_INPUT_FOLDER); conf.set(Constant.OUTPUT_PATH, HDFS_OUTPUT_FOLDER); HbaseFetchDriver test = new HbaseFetchDriver(); test.setConf(conf); test.run(null); Multimap<String, Object> myMultimap = ArrayListMultimap.create(); //Read data from outPut path in HDFS BufferedReader bfr = new BufferedReader (new InputStreamReader(fs.open( new Path(HDFS_OUTPUT_FOLDER, "part-m-00000")))); String str = null; while ((str = bfr.readLine()) != null) { String[] keyValye = str.split("\t"); myMultimap.put(keyValye[0], keyValye[1]); } bfr.close(); Assert.assertEquals(1, myMultimap.get("k1").size()); Assert.assertEquals(true, myMultimap.get("k1").contains("k1v1")); Assert.assertEquals(false, myMultimap.get("k1").contains("k1v2")); Assert.assertEquals(2, myMultimap.get("k2").size()); Assert.assertEquals(true, myMultimap.get("k2").contains("k2v1")); Assert.assertEquals(true, myMultimap.get("k2").contains("k2v2")); Assert.assertEquals(3, myMultimap.get("k3").size()); Assert.assertEquals(true, myMultimap.get("k3").contains("k3v1")); Assert.assertEquals(true, myMultimap.get("k3").contains("k3v2")); Assert.assertEquals(true, myMultimap.get("k3").contains("k3v3")); Assert.assertEquals(false, myMultimap.get("k3").contains("k3v4")); Assert.assertEquals(true, myMultimap.get("k4").isEmpty()); Assert.assertEquals(true, myMultimap.get("k5").isEmpty()); } @AfterClass public static void shutdown() throws Exception { if (hbaseEmbeddedClusterHandler != null) { hbaseEmbeddedClusterHandler.shutdownMiniCluster(); } FileUtils.deleteQuietly(new File(BASEDIR)); } }