hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject [2/2] hbase git commit: HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor)
Date Tue, 27 Jun 2017 19:31:59 GMT
HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/293cb87d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/293cb87d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/293cb87d

Branch: refs/heads/master
Commit: 293cb87d52d6ccc98f0d03387f9dc07dc4522042
Parents: 14957d4
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Jun 27 12:31:55 2017 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Jun 27 12:31:55 2017 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/HFileOutputFormat2.java     | 785 ++++++++++---------
 .../mapreduce/MultiTableHFileOutputFormat.java  | 520 ++----------
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 424 ++++++----
 .../TestMultiTableHFileOutputFormat.java        | 382 ---------
 4 files changed, 758 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/293cb87d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index da507b1..f847608 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -22,14 +22,20 @@ import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -88,24 +94,48 @@ import com.google.common.annotations.VisibleForTesting;
  * all HFiles being written.
  * <p>
  * Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
+ * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)}.
  */
 @InterfaceAudience.Public
 public class HFileOutputFormat2
     extends FileOutputFormat<ImmutableBytesWritable, Cell> {
   private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
+  static class TableInfo {
+    private HTableDescriptor hTableDescriptor;
+    private RegionLocator regionLocator;
+
+    public TableInfo(HTableDescriptor hTableDescriptor, RegionLocator regionLocator) {
+      this.hTableDescriptor = hTableDescriptor;
+      this.regionLocator = regionLocator;
+    }
+
+    public HTableDescriptor getHTableDescriptor() {
+      return hTableDescriptor;
+    }
+
+    public RegionLocator getRegionLocator() {
+      return regionLocator;
+    }
+  }
+
+  protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
+
+  protected static byte[] combineTableNameSuffix(byte[] tableName,
+                                       byte[] suffix ) {
+    return Bytes.add(tableName, tableSeparator, suffix);
+  }
 
   // The following constants are private since these are used by
   // HFileOutputFormat2 to internally transfer data between job setup and
   // reducer run using conf.
   // These should not be changed by the client.
-  private static final String COMPRESSION_FAMILIES_CONF_KEY =
+  static final String COMPRESSION_FAMILIES_CONF_KEY =
       "hbase.hfileoutputformat.families.compression";
-  private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
       "hbase.hfileoutputformat.families.bloomtype";
-  private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.blocksize";
-  private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
 
   // This constant is public since the client can modify this when setting
@@ -121,8 +151,10 @@ public class HFileOutputFormat2
   public static final String LOCALITY_SENSITIVE_CONF_KEY =
       "hbase.bulkload.locality.sensitive.enabled";
   private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-  private static final String OUTPUT_TABLE_NAME_CONF_KEY =
+  static final String OUTPUT_TABLE_NAME_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.table.name";
+  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+          "hbase.mapreduce.use.multi.table.hfileoutputformat";
 
   public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
   public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@@ -133,270 +165,277 @@ public class HFileOutputFormat2
     return createRecordWriter(context);
   }
 
-  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
-  createRecordWriter(final TaskAttemptContext context) throws IOException {
-    return new HFileRecordWriter<>(context, null);
+  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
+    return combineTableNameSuffix(tableName, family);
   }
 
-  protected static class HFileRecordWriter<V extends Cell>
-      extends RecordWriter<ImmutableBytesWritable, V> {
-    private final TaskAttemptContext context;
-    private final Path outputPath;
-    private final Path outputDir;
-    private final Configuration conf;
-    private final FileSystem fs;
-
-    private final long maxsize;
-
-    private final Algorithm defaultCompression;
-    private final boolean compactionExclude;
-
-    private final Map<byte[], Algorithm> compressionMap;
-    private final Map<byte[], BloomType> bloomTypeMap;
-    private final Map<byte[], Integer> blockSizeMap;
-
-    private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
-    private final DataBlockEncoding overriddenEncoding;
-
-    private final Map<byte[], WriterLength> writers;
-    private byte[] previousRow;
-    private final byte[] now;
-    private boolean rollRequested;
-
-    /**
-     * Mapredue job will create a temp path for outputting results. If out != null, it means that
-     * the caller has set the temp working dir; If out == null, it means we need to set it here.
-     * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
-     * temp working dir at the table level and HFileOutputFormat2 has to set it here within this
-     * constructor.
-     */
-    public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
-        throws IOException {
-      // Get the path of the temporary output file
-      context = taContext;
-
-      if (out == null) {
-        outputPath = FileOutputFormat.getOutputPath(context);
-        outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
-      } else {
-        outputPath = out;
-        outputDir = outputPath;
-      }
-
-      conf = context.getConfiguration();
-      fs = outputDir.getFileSystem(conf);
-
-      // These configs. are from hbase-*.xml
-      maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
-
-      // Invented config. Add to hbase-*.xml if other than default compression.
-      String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
-      defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
-      compactionExclude =
-          conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
-      // create a map from column family to the compression algorithm
-      compressionMap = createFamilyCompressionMap(conf);
-      bloomTypeMap = createFamilyBloomTypeMap(conf);
-      blockSizeMap = createFamilyBlockSizeMap(conf);
-
-      // Config for data block encoding
-      String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-      datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
-      if (dataBlockEncodingStr != null) {
-        overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
-      } else {
-        overriddenEncoding = null;
-      }
-
-      writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      previousRow = HConstants.EMPTY_BYTE_ARRAY;
-      now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
-      rollRequested = false;
+  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+      createRecordWriter(final TaskAttemptContext context)
+          throws IOException {
+
+    // Get the path of the temporary output file
+    final Path outputPath = FileOutputFormat.getOutputPath(context);
+    final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+    final Configuration conf = context.getConfiguration();
+    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
+    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+    if (writeTableNames==null || writeTableNames.isEmpty()) {
+      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
+              + " cannot be empty");
+    }
+    final FileSystem fs = outputDir.getFileSystem(conf);
+    // These configs. are from hbase-*.xml
+    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    // Invented config.  Add to hbase-*.xml if other than default compression.
+    final String defaultCompressionStr = conf.get("hfile.compression",
+        Compression.Algorithm.NONE.getName());
+    final Algorithm defaultCompression = HFileWriterImpl
+        .compressionByName(defaultCompressionStr);
+    final boolean compactionExclude = conf.getBoolean(
+        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
+            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
+
+    // create a map from column family to the compression algorithm
+    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
+    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
+
+    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+    final Map<byte[], DataBlockEncoding> datablockEncodingMap
+        = createFamilyDataBlockEncodingMap(conf);
+    final DataBlockEncoding overriddenEncoding;
+    if (dataBlockEncodingStr != null) {
+      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+    } else {
+      overriddenEncoding = null;
     }
 
-    @Override
-    public void write(ImmutableBytesWritable row, V cell) throws IOException {
-      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
-      // null input == user explicitly wants to flush
-      if (row == null && kv == null) {
-        rollWriters();
-        return;
-      }
-
-      byte[] rowKey = CellUtil.cloneRow(kv);
-      long length = kv.getLength();
-      byte[] family = CellUtil.cloneFamily(kv);
-      WriterLength wl = this.writers.get(family);
-
-      // If this is a new column family, verify that the directory exists
-      if (wl == null) {
-        Path cfPath = new Path(outputDir, Bytes.toString(family));
-        fs.mkdirs(cfPath);
-        configureStoragePolicy(conf, fs, family, cfPath);
-      }
+    return new RecordWriter<ImmutableBytesWritable, V>() {
+      // Map of families to writers and how much has been output on the writer.
+      private final Map<byte[], WriterLength> writers =
+              new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+      private boolean rollRequested = false;
+
+      @Override
+      public void write(ImmutableBytesWritable row, V cell)
+          throws IOException {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+
+        // null input == user explicitly wants to flush
+        if (row == null && kv == null) {
+          rollWriters();
+          return;
+        }
 
-      // If any of the HFiles for the column families has reached
-      // maxsize, we need to roll all the writers
-      if (wl != null && wl.written + length >= maxsize) {
-        this.rollRequested = true;
-      }
+        byte[] rowKey = CellUtil.cloneRow(kv);
+        long length = kv.getLength();
+        byte[] family = CellUtil.cloneFamily(kv);
+        byte[] tableNameBytes = null;
+        if (writeMultipleTables) {
+          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
+          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
+                    "' not" + " expected");
+          }
+        } else {
+          tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
+        }
+        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
+        WriterLength wl = this.writers.get(tableAndFamily);
+
+        // If this is a new column family, verify that the directory exists
+        if (wl == null) {
+          Path writerPath = null;
+          if (writeMultipleTables) {
+            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
+                    .toString(family)));
+          }
+          else {
+            writerPath = new Path(outputDir, Bytes.toString(family));
+          }
+          fs.mkdirs(writerPath);
+          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
+        }
 
-      // This can only happen once a row is finished though
-      if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-        rollWriters();
-      }
+        // If any of the HFiles for the column families has reached
+        // maxsize, we need to roll all the writers
+        if (wl != null && wl.written + length >= maxsize) {
+          this.rollRequested = true;
+        }
 
-      // create a new WAL writer, if necessary
-      if (wl == null || wl.writer == null) {
-        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-          HRegionLocation loc = null;
-          String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
-          if (tableName != null) {
-            try (Connection connection = ConnectionFactory.createConnection(conf);
-                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
-              loc = locator.getRegionLocation(rowKey);
-            } catch (Throwable e) {
-              LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
-                  e);
-              loc = null;
-            }
-          }
+        // This can only happen once a row is finished though
+        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+          rollWriters();
+        }
 
-          if (null == loc) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace(
-                  "failed to get region location, so use default writer: " + Bytes.toString(rowKey));
-            }
-            wl = getNewWriter(family, conf, null);
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
-            }
-            InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
-            if (initialIsa.isUnresolved()) {
+        // create a new WAL writer, if necessary
+        if (wl == null || wl.writer == null) {
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+            HRegionLocation loc = null;
+
+            String tableName = Bytes.toString(tableNameBytes);
+            if (tableName != null) {
+              try (Connection connection = ConnectionFactory.createConnection(conf);
+                     RegionLocator locator =
+                       connection.getRegionLocator(TableName.valueOf(tableName))) {
+                loc = locator.getRegionLocation(rowKey);
+              } catch (Throwable e) {
+                LOG.warn("There's something wrong when locating rowkey: " +
+                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
+                loc = null;
+              } }
+
+            if (null == loc) {
               if (LOG.isTraceEnabled()) {
-                LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
-                    + loc.getPort() + ", so use default writer");
+                LOG.trace("failed to get region location, so use default writer for rowkey: " +
+                  Bytes.toString(rowKey));
               }
-              wl = getNewWriter(family, conf, null);
+              wl = getNewWriter(tableNameBytes, family, conf, null);
             } else {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+              }
+              InetSocketAddress initialIsa =
+                  new InetSocketAddress(loc.getHostname(), loc.getPort());
+              if (initialIsa.isUnresolved()) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+                      + loc.getPort() + ", so use default writer");
+                }
+                wl = getNewWriter(tableNameBytes, family, conf, null);
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+                }
+                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
+                });
               }
-              wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
             }
+          } else {
+            wl = getNewWriter(tableNameBytes, family, conf, null);
           }
-        } else {
-          wl = getNewWriter(family, conf, null);
         }
-      }
 
-      // we now have the proper WAL writer. full steam ahead
-      kv.updateLatestStamp(this.now);
-      wl.writer.append(kv);
-      wl.written += length;
+        // we now have the proper WAL writer. full steam ahead
+        kv.updateLatestStamp(this.now);
+        wl.writer.append(kv);
+        wl.written += length;
 
-      // Copy the row so we know when a row transition.
-      this.previousRow = rowKey;
-    }
+        // Copy the row so we know when a row transition.
+        this.previousRow = rowKey;
+      }
 
-    private void rollWriters() throws IOException {
-      for (WriterLength wl : this.writers.values()) {
-        if (wl.writer != null) {
-          LOG.info(
-              "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
-          close(wl.writer);
+      private void rollWriters() throws IOException {
+        for (WriterLength wl : this.writers.values()) {
+          if (wl.writer != null) {
+            LOG.info(
+                "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
+            close(wl.writer);
+          }
+          wl.writer = null;
+          wl.written = 0;
         }
-        wl.writer = null;
-        wl.written = 0;
+        this.rollRequested = false;
       }
-      this.rollRequested = false;
-    }
 
-    /*
-     * Create a new StoreFile.Writer.
-     * @param family
-     * @return A WriterLength, containing a new StoreFile.Writer.
-     * @throws IOException
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
-        justification = "Not important")
-    private WriterLength getNewWriter(byte[] family, Configuration conf,
-        InetSocketAddress[] favoredNodes) throws IOException {
-      WriterLength wl = new WriterLength();
-      Path familyDir = new Path(outputDir, Bytes.toString(family));
-      Algorithm compression = compressionMap.get(family);
-      compression = compression == null ? defaultCompression : compression;
-      BloomType bloomType = bloomTypeMap.get(family);
-      bloomType = bloomType == null ? BloomType.NONE : bloomType;
-      Integer blockSize = blockSizeMap.get(family);
-      blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
-      DataBlockEncoding encoding = overriddenEncoding;
-      encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
-      encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
-      Configuration tempConf = new Configuration(conf);
-      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-      HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
-          .withChecksumType(HStore.getChecksumType(conf))
-          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
-
-      if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
-        contextBuilder.withIncludesTags(true);
-      }
+      /*
+       * Create a new StoreFile.Writer.
+       * @param family
+       * @return A WriterLength, containing a new StoreFile.Writer.
+       * @throws IOException
+       */
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+          justification="Not important")
+      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
+              conf, InetSocketAddress[] favoredNodes) throws IOException {
+        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
+        Path familydir = new Path(outputDir, Bytes.toString(family));
+        if (writeMultipleTables) {
+          familydir = new Path(outputDir,
+                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
+        }
+        WriterLength wl = new WriterLength();
+        Algorithm compression = compressionMap.get(tableAndFamily);
+        compression = compression == null ? defaultCompression : compression;
+        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
+        bloomType = bloomType == null ? BloomType.NONE : bloomType;
+        Integer blockSize = blockSizeMap.get(tableAndFamily);
+        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+        DataBlockEncoding encoding = overriddenEncoding;
+        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
+        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        HFileContextBuilder contextBuilder = new HFileContextBuilder()
+                                    .withCompression(compression)
+                                    .withChecksumType(HStore.getChecksumType(conf))
+                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+                                    .withBlockSize(blockSize);
+
+        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+          contextBuilder.withIncludesTags(true);
+        }
 
-      contextBuilder.withDataBlockEncoding(encoding);
-      HFileContext hFileContext = contextBuilder.build();
-
-      if (null == favoredNodes) {
-        wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
-            .withOutputDir(familyDir).withBloomType(bloomType)
-            .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
-      } else {
-        wl.writer =
-            new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-                .withOutputDir(familyDir).withBloomType(bloomType)
-                .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-                .withFavoredNodes(favoredNodes).build();
-      }
+        contextBuilder.withDataBlockEncoding(encoding);
+        HFileContext hFileContext = contextBuilder.build();
+        if (null == favoredNodes) {
+          wl.writer =
+              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+        } else {
+          wl.writer =
+              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+                  .withFavoredNodes(favoredNodes).build();
+        }
 
-      this.writers.put(family, wl);
-      return wl;
-    }
+        this.writers.put(tableAndFamily, wl);
+        return wl;
+      }
 
-    private void close(final StoreFileWriter w) throws IOException {
-      if (w != null) {
-        w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
-        w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-            Bytes.toBytes(context.getTaskAttemptID().toString()));
-        w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
-        w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-            Bytes.toBytes(compactionExclude));
-        w.appendTrackedTimestampsToMetadata();
-        w.close();
+      private void close(final StoreFileWriter w) throws IOException {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+              Bytes.toBytes(System.currentTimeMillis()));
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+              Bytes.toBytes(context.getTaskAttemptID().toString()));
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+              Bytes.toBytes(true));
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+              Bytes.toBytes(compactionExclude));
+          w.appendTrackedTimestampsToMetadata();
+          w.close();
+        }
       }
-    }
 
-    @Override
-    public void close(TaskAttemptContext c) throws IOException, InterruptedException {
-      for (WriterLength wl : this.writers.values()) {
-        close(wl.writer);
+      @Override
+      public void close(TaskAttemptContext c)
+      throws IOException, InterruptedException {
+        for (WriterLength wl: this.writers.values()) {
+          close(wl.writer);
+        }
       }
-    }
+    };
   }
 
   /**
    * Configure block storage policy for CF after the directory is created.
    */
   static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
-      byte[] family, Path cfPath) {
-    if (null == conf || null == fs || null == family || null == cfPath) {
+      byte[] tableAndFamily, Path cfPath) {
+    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
       return;
     }
 
     String policy =
-        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
           conf.get(STORAGE_POLICY_PROPERTY));
     FSUtils.setStoragePolicy(fs, cfPath, policy);
   }
@@ -413,12 +452,29 @@ public class HFileOutputFormat2
    * Return the start keys of all of the regions in this table,
    * as a list of ImmutableBytesWritable.
    */
-  private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
-  throws IOException {
-    byte[][] byteKeys = table.getStartKeys();
-    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(byteKeys.length);
-    for (byte[] byteKey : byteKeys) {
-      ret.add(new ImmutableBytesWritable(byteKey));
+  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
+                                                                 boolean writeMultipleTables)
+          throws IOException {
+
+    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
+    for(RegionLocator regionLocator : regionLocators)
+    {
+      TableName tableName = regionLocator.getName();
+      LOG.info("Looking up current regions for table " + tableName);
+      byte[][] byteKeys = regionLocator.getStartKeys();
+      for (byte[] byteKey : byteKeys) {
+        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+        if (writeMultipleTables)
+        {
+          //MultiTableHFileOutputFormat use case
+          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
+                  (fullKey) + "]");
+        }
+        ret.add(new ImmutableBytesWritable(fullKey));
+      }
     }
     return ret;
   }
@@ -429,7 +485,7 @@ public class HFileOutputFormat2
    */
   @SuppressWarnings("deprecation")
   private static void writePartitions(Configuration conf, Path partitionsPath,
-      List<ImmutableBytesWritable> startKeys) throws IOException {
+      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
     LOG.info("Writing partition information to " + partitionsPath);
     if (startKeys.isEmpty()) {
       throw new IllegalArgumentException("No regions passed");
@@ -440,14 +496,17 @@ public class HFileOutputFormat2
     // so we need to remove it. Otherwise we would end up with an
     // empty reducer with index 0
     TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
-
     ImmutableBytesWritable first = sorted.first();
+    if (writeMultipleTables) {
+      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
+              ().get()));
+    }
     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
       throw new IllegalArgumentException(
           "First region of table should have empty start key. Instead has: "
           + Bytes.toStringBinary(first.get()));
     }
-    sorted.remove(first);
+    sorted.remove(sorted.first());
 
     // Write the actual file
     FileSystem fs = partitionsPath.getFileSystem(conf);
@@ -499,17 +558,25 @@ public class HFileOutputFormat2
    */
   public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
       RegionLocator regionLocator) throws IOException {
-    configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
+    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
+    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
   }
 
-  static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
-      RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
-      UnsupportedEncodingException {
+  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
     Configuration conf = job.getConfiguration();
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(KeyValue.class);
     job.setOutputFormatClass(cls);
 
+    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
+      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
+    }
+    boolean writeMultipleTables = false;
+    if (MultiTableHFileOutputFormat.class.equals(cls)) {
+      writeMultipleTables = true;
+      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+    }
     // Based on the configured map output class, set the correct reducer to properly
     // sort the incoming values.
     // TODO it would be nice to pick one or the other of these formats.
@@ -528,28 +595,44 @@ public class HFileOutputFormat2
         KeyValueSerialization.class.getName());
 
     if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-      // record this table name for creating writer by favored nodes
       LOG.info("bulkload locality sensitive enabled");
-      conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
     }
 
+    /* Now get the region start keys for every table required */
+    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
+    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
+    List<HTableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
+
+    for( TableInfo tableInfo : multiTableInfo )
+    {
+      regionLocators.add(tableInfo.getRegionLocator());
+      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+      tableDescriptors.add(tableInfo.getHTableDescriptor());
+    }
+    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
+    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
+            .toString(tableSeparator)));
+    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
     // Use table's region boundaries for TOP split points.
-    LOG.info("Looking up current regions for table " + regionLocator.getName());
-    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
-        "to match current region count");
+        "to match current region count for all tables");
     job.setNumReduceTasks(startKeys.size());
 
-    configurePartitioner(job, startKeys);
+    configurePartitioner(job, startKeys, writeMultipleTables);
     // Set compression algorithms based on column families
-    configureCompression(conf, tableDescriptor);
-    configureBloomType(tableDescriptor, conf);
-    configureBlockSize(tableDescriptor, conf);
-    configureDataBlockEncoding(tableDescriptor, conf);
+
+    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
+            tableDescriptors));
+    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
+            tableDescriptors));
+    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
+            tableDescriptors));
+    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
 
     TableMapReduceUtil.addDependencyJars(job);
     TableMapReduceUtil.initCredentials(job);
-    LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
+    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
   }
 
   public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
@@ -560,11 +643,19 @@ public class HFileOutputFormat2
     job.setOutputValueClass(KeyValue.class);
     job.setOutputFormatClass(HFileOutputFormat2.class);
 
+    ArrayList<HTableDescriptor> singleTableDescriptor = new ArrayList<>(1);
+    singleTableDescriptor.add(tableDescriptor);
+
+    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getNameAsString());
     // Set compression algorithms based on column families
-    configureCompression(conf, tableDescriptor);
-    configureBloomType(tableDescriptor, conf);
-    configureBlockSize(tableDescriptor, conf);
-    configureDataBlockEncoding(tableDescriptor, conf);
+    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
+    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
+    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
+    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
 
     TableMapReduceUtil.addDependencyJars(job);
     TableMapReduceUtil.initCredentials(job);
@@ -667,7 +758,7 @@ public class HFileOutputFormat2
         continue;
       }
       try {
-        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
             URLDecoder.decode(familySplit[1], "UTF-8"));
       } catch (UnsupportedEncodingException e) {
         // will not happen with UTF-8 encoding
@@ -681,7 +772,8 @@ public class HFileOutputFormat2
    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
    */
-  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
+  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
+          writeMultipleTables)
       throws IOException {
     Configuration conf = job.getConfiguration();
     // create the partitions file
@@ -691,7 +783,7 @@ public class HFileOutputFormat2
           HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
     Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
     fs.makeQualified(partitionsPath);
-    writePartitions(conf, partitionsPath, splitPoints);
+    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
     fs.deleteOnExit(partitionsPath);
 
     // configure job to use it
@@ -699,143 +791,102 @@ public class HFileOutputFormat2
     TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
   }
 
-  /**
-   * Serialize column family to compression algorithm map to configuration.
-   * Invoked while configuring the MR job for incremental load.
-   *
-   * @param tableDescriptor to read the properties from
-   * @param conf to persist serialized values into
-   * @throws IOException
-   *           on failure to read column family descriptors
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
   @VisibleForTesting
-  static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
+  static String serializeColumnFamilyAttribute(Function<HColumnDescriptor, String> fn, List<HTableDescriptor> allTables)
       throws UnsupportedEncodingException {
-    StringBuilder compressionConfigValue = new StringBuilder();
-    if(tableDescriptor == null){
-      // could happen with mock table instance
-      return;
-    }
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    StringBuilder attributeValue = new StringBuilder();
     int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        compressionConfigValue.append('&');
+    for (HTableDescriptor tableDescriptor : allTables) {
+      if (tableDescriptor == null) {
+        // could happen with mock table instance
+        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
+        return "";
+      }
+      Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+      for (HColumnDescriptor familyDescriptor : families) {
+        if (i++ > 0) {
+          attributeValue.append('&');
+        }
+        attributeValue.append(URLEncoder.encode(
+            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
+            "UTF-8"));
+        attributeValue.append('=');
+        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
       }
-      compressionConfigValue.append(URLEncoder.encode(
-        familyDescriptor.getNameAsString(), "UTF-8"));
-      compressionConfigValue.append('=');
-      compressionConfigValue.append(URLEncoder.encode(
-        familyDescriptor.getCompressionType().getName(), "UTF-8"));
     }
     // Get rid of the last ampersand
-    conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
+    return attributeValue.toString();
   }
 
   /**
-   * Serialize column family to block size map to configuration.
+   * Serialize column family to compression algorithm map to configuration.
    * Invoked while configuring the MR job for incremental load.
+   *
    * @param tableDescriptor to read the properties from
    * @param conf to persist serialized values into
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @VisibleForTesting
+  static Function<HColumnDescriptor, String> compressionDetails = familyDescriptor ->
+          familyDescriptor.getCompressionType().getName();
+
+  /**
+   * Serialize column family to block size map to configuration. Invoked while
+   * configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
    *
    * @throws IOException
    *           on failure to read column family descriptors
    */
   @VisibleForTesting
-  static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
-      throws UnsupportedEncodingException {
-    StringBuilder blockSizeConfigValue = new StringBuilder();
-    if (tableDescriptor == null) {
-      // could happen with mock table instance
-      return;
-    }
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        blockSizeConfigValue.append('&');
-      }
-      blockSizeConfigValue.append(URLEncoder.encode(
-          familyDescriptor.getNameAsString(), "UTF-8"));
-      blockSizeConfigValue.append('=');
-      blockSizeConfigValue.append(URLEncoder.encode(
-          String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
-    }
-    // Get rid of the last ampersand
-    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
-  }
+  static Function<HColumnDescriptor, String> blockSizeDetails = familyDescriptor -> String
+          .valueOf(familyDescriptor.getBlocksize());
 
   /**
-   * Serialize column family to bloom type map to configuration.
-   * Invoked while configuring the MR job for incremental load.
-   * @param tableDescriptor to read the properties from
-   * @param conf to persist serialized values into
+   * Serialize column family to bloom type map to configuration. Invoked while
+   * configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
    *
    * @throws IOException
    *           on failure to read column family descriptors
    */
   @VisibleForTesting
-  static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
-      throws UnsupportedEncodingException {
-    if (tableDescriptor == null) {
-      // could happen with mock table instance
-      return;
+  static Function<HColumnDescriptor, String> bloomTypeDetails = familyDescriptor -> {
+    String bloomType = familyDescriptor.getBloomFilterType().toString();
+    if (bloomType == null) {
+      bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
     }
-    StringBuilder bloomTypeConfigValue = new StringBuilder();
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        bloomTypeConfigValue.append('&');
-      }
-      bloomTypeConfigValue.append(URLEncoder.encode(
-        familyDescriptor.getNameAsString(), "UTF-8"));
-      bloomTypeConfigValue.append('=');
-      String bloomType = familyDescriptor.getBloomFilterType().toString();
-      if (bloomType == null) {
-        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
-      }
-      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
-    }
-    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
-  }
+    return bloomType;
+  };
 
   /**
    * Serialize column family to data block encoding map to configuration.
    * Invoked while configuring the MR job for incremental load.
    *
-   * @param tableDescriptor to read the properties from
-   * @param conf to persist serialized values into
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
    * @throws IOException
    *           on failure to read column family descriptors
    */
   @VisibleForTesting
-  static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
-      Configuration conf) throws UnsupportedEncodingException {
-    if (tableDescriptor == null) {
-      // could happen with mock table instance
-      return;
+  static Function<HColumnDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
+    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+    if (encoding == null) {
+      encoding = DataBlockEncoding.NONE;
     }
-    StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        dataBlockEncodingConfigValue.append('&');
-      }
-      dataBlockEncodingConfigValue.append(
-          URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-      dataBlockEncodingConfigValue.append('=');
-      DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
-      if (encoding == null) {
-        encoding = DataBlockEncoding.NONE;
-      }
-      dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
-          "UTF-8"));
-    }
-    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
-        dataBlockEncodingConfigValue.toString());
-  }
+    return encoding.toString();
+  };
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/293cb87d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
index 03256bf..fdcf30e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
@@ -6,504 +6,118 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.mapreduce;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.TreeMap;
-import java.util.ArrayList;
-import java.util.TreeSet;
-import java.util.Collections;
+package org.apache.hadoop.hbase.mapreduce;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
 
 /**
- * Create 3 level tree directory, first level is using table name as parent directory and then use
- * family name as child directory, and all related HFiles for one family are under child directory
+ * Create 3 level tree directory, first level is using table name as parent
+ * directory and then use family name as child directory, and all related HFiles
+ * for one family are under child directory
  * -tableName1
- *   -columnFamilyName1
- *     -HFile (region1)
- *   -columnFamilyName2
- *     -HFile1 (region1)
- *     -HFile2 (region2)
- *     -HFile3 (region3)
+ *     -columnFamilyName1
+ *     -columnFamilyName2
+ *         -HFiles
  * -tableName2
- *   -columnFamilyName1
- *     -HFile (region1)
- * family directory and its hfiles match the output of HFileOutputFormat2
- * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
+ *     -columnFamilyName1
+ *         -HFiles
+ *     -columnFamilyName2
  */
-
 @InterfaceAudience.Public
 @VisibleForTesting
-public class MultiTableHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
   private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
 
-  @Override
-  public RecordWriter<ImmutableBytesWritable, Cell>
-  getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
-    return createMultiHFileRecordWriter(context);
-  }
-
-  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
-  createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
-
-    // Get the path of the output directory
-    final Path outputPath = FileOutputFormat.getOutputPath(context);
-    final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
-    final Configuration conf = context.getConfiguration();
-    final FileSystem fs = outputDir.getFileSystem(conf);
-
-    Connection conn = ConnectionFactory.createConnection(conf);
-    Admin admin = conn.getAdmin();
-
-    // Map of existing tables, avoid calling getTable() everytime
-    final Map<ImmutableBytesWritable, Table> tables = new HashMap<>();
-
-    // Map of tables to writers
-    final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters = new HashMap<>();
-
-    return new RecordWriter<ImmutableBytesWritable, V>() {
-      @Override
-      public void write(ImmutableBytesWritable tableName, V cell)
-          throws IOException, InterruptedException {
-        RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
-        // if there is new table, verify that table directory exists
-        if (tableWriter == null) {
-          // using table name as directory name
-          final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
-          fs.mkdirs(tableOutputDir);
-          LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
-              + tableOutputDir.toString());
-          // Configure for tableWriter, if table exist, write configuration of table into conf
-          Table table = null;
-          if (tables.containsKey(tableName)) {
-            table = tables.get(tableName);
-          } else {
-            table = getTable(tableName.copyBytes(), conn, admin);
-            tables.put(tableName, table);
-          }
-          if (table != null) {
-            configureForOneTable(conf, table.getTableDescriptor());
-          }
-          // Create writer for one specific table
-          tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
-          // Put table into map
-          tableWriters.put(tableName, tableWriter);
-        }
-        // Write <Row, Cell> into tableWriter
-        // in the original code, it does not use Row
-        tableWriter.write(null, cell);
-      }
-
-      @Override
-      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
-        for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
-          writer.close(c);
-        }
-        if (conn != null) {
-          conn.close();
-        }
-        if (admin != null) {
-          admin.close();
-        }
-      }
-    };
-  }
-
-  /**
-   * Configure for one table, should be used before creating a new HFileRecordWriter,
-   * Set compression algorithms and related configuration based on column families
-   */
-  private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor)
-      throws UnsupportedEncodingException {
-    HFileOutputFormat2.configureCompression(conf, tableDescriptor);
-    HFileOutputFormat2.configureBlockSize(tableDescriptor, conf);
-    HFileOutputFormat2.configureBloomType(tableDescriptor, conf);
-    HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
-  }
-
   /**
-   * Configure a MapReduce Job to output HFiles for performing an incremental load into
-   * the multiple tables.
-   * <ul>
-   *   <li>Inspects the tables to configure a partitioner based on their region boundaries</li>
-   *   <li>Writes the partitions file and configures the partitioner</li>
-   *   <li>Sets the number of reduce tasks to match the total number of all tables' regions</li>
-   *   <li>Sets the reducer up to perform the appropriate sorting (KeyValueSortReducer)</li>
-   * </ul>
+   * Creates a composite key to use as a mapper output key when using
+   * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
    *
-   * ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job.
-   * Caller needs to setup input path, output path and mapper
-   *
-   * @param job
-   * @param tables A list of tables to inspects
-   * @throws IOException
+   * @param tableName Name of the Table - Eg: TableName.getNameAsString()
+   * @param suffix    Usually represents a rowkey when creating a mapper key or column family
+   * @return          byte[] representation of composite key
    */
-  public static void configureIncrementalLoad(Job job, List<TableName> tables) throws IOException {
-    configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class);
-  }
-
-  public static void configureIncrementalLoad(Job job, List<TableName> tables,
-      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
-
-    Configuration conf = job.getConfiguration();
-    Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys =
-        MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables);
-    configureIncrementalLoad(job, tableSplitKeys, cls);
+  public static byte[] createCompositeKey(byte[] tableName,
+                                          byte[] suffix) {
+    return combineTableNameSuffix(tableName, suffix);
   }
 
   /**
-   * Same purpose as configureIncrementalLoad(Job job, List<TableName> tables)
-   * Used when region startKeys of each table is available, input as <TableName, List<RegionStartKey>>
-   *
-   * Caller needs to transfer TableName and byte[] to ImmutableBytesWritable
+   * Alternate api which accepts an ImmutableBytesWritable for the suffix
+   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
    */
-  public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
-      List<ImmutableBytesWritable>> tableSplitKeys) throws IOException {
-    configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class);
-  }
-
-  public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
-      List<ImmutableBytesWritable>> tableSplitKeys, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
-    Configuration conf = job.getConfiguration();
-
-    // file path to store <table, splitKey>
-    String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
-    LOG.info("Writing partition info into dir: " + partitionsPath.toString());
-    job.setPartitionerClass(MultiHFilePartitioner.class);
-    // get split keys for all the tables, and write them into partition file
-    MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys);
-    MultiHFilePartitioner.setPartitionFile(conf, partitionsPath);
-    partitionsPath.getFileSystem(conf).makeQualified(partitionsPath);
-    partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath);
-
-    // now only support Mapper output <ImmutableBytesWritable, KeyValue>
-    // we can use KeyValueSortReducer directly to sort Mapper output
-    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(KeyValueSortReducer.class);
-    } else {
-      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
-    }
-    int reducerNum = getReducerNumber(tableSplitKeys);
-    job.setNumReduceTasks(reducerNum);
-    LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count");
-
-    // setup output format
-    job.setOutputFormatClass(cls);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
-
-    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
-    TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.initCredentials(job);
+  public static byte[] createCompositeKey(byte[] tableName,
+                                          ImmutableBytesWritable suffix) {
+    return combineTableNameSuffix(tableName, suffix.get());
   }
 
   /**
-   * Check if table exist, should not dependent on HBase instance
-   * @return instance of table, if it exist
+   * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
+   * suffix
+   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
    */
-  private static Table getTable(final byte[] tableName, Connection conn, Admin admin) {
-    if (conn == null || admin == null) {
-      LOG.info("can not get Connection or Admin");
-      return null;
-    }
-
-    try {
-      TableName table = TableName.valueOf(tableName);
-      if (admin.tableExists(table)) {
-        return conn.getTable(table);
-      }
-    } catch (IOException e) {
-      LOG.info("Exception found in getTable()" + e.toString());
-      return null;
-    }
-
-    LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist");
-    return null;
+  public static byte[] createCompositeKey(String tableName,
+                                          ImmutableBytesWritable suffix) {
+    return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
   }
 
   /**
-   * Get the number of reducers by tables' split keys
+   * Analogous to
+   * {@link HFileOutputFormat2#configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)},
+   * this function will configure the requisite number of reducers to write HFiles for multple
+   * tables simultaneously
+   *
+   * @param job                   See {@link org.apache.hadoop.mapreduce.Job}
+   * @param multiTableDescriptors Table descriptor and region locator pairs
+   * @throws IOException
    */
-  private static int getReducerNumber(
-      Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys) {
-    int reducerNum = 0;
-    for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : tableSplitKeys.entrySet()) {
-      reducerNum += entry.getValue().size();
-    }
-    return reducerNum;
+  public static void configureIncrementalLoad(Job job, List<TableInfo>
+      multiTableDescriptors)
+      throws IOException {
+    MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
+            MultiTableHFileOutputFormat.class);
   }
 
-  /**
-   * MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner
-   * The input is partitioned based on table's name and its region boundaries with the table.
-   * Two records are in the same partition if they have same table name and the their cells are
-   * in the same region
-   */
-  static class MultiHFilePartitioner extends Partitioner<ImmutableBytesWritable, Cell>
-      implements Configurable {
-
-    public static final String DEFAULT_PATH = "_partition_multihfile.lst";
-    public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path";
-    private Configuration conf;
-    // map to receive <table, splitKeys> from file
-    private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> table_SplitKeys;
-    // each <table,splitKey> pair is map to one unique integer
-    private TreeMap<TableSplitKeyPair, Integer> partitionMap;
-
-    @Override
-    public void setConf(Configuration conf) {
-      try {
-        this.conf = conf;
-        partitionMap = new TreeMap<>();
-        table_SplitKeys = readTableSplitKeys(conf);
-
-        // initiate partitionMap by table_SplitKeys map
-        int splitNum = 0;
-        for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : table_SplitKeys.entrySet()) {
-          ImmutableBytesWritable table = entry.getKey();
-          List<ImmutableBytesWritable> list = entry.getValue();
-          for (ImmutableBytesWritable splitKey : list) {
-            partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++);
-          }
-        }
-      } catch (IOException e) {
-        throw new IllegalArgumentException("Can't read partitions file", e);
-      }
-    }
-
-    @Override
-    public Configuration getConf() {
-      return conf;
-    }
-
-    /**
-     * Set the path to the SequenceFile storing the sorted <table, splitkey>. It must be the case
-     * that for <tt>R</tt> reduces, there are <tt>R-1</tt> keys in the SequenceFile.
-     */
-    public static void setPartitionFile(Configuration conf, Path p) {
-      conf.set(PARTITIONER_PATH, p.toString());
-    }
-
-    /**
-     * Get the path to the SequenceFile storing the sorted <table, splitkey>.
-     * @see #setPartitionFile(Configuration, Path)
-     */
-    public static String getPartitionFile(Configuration conf) {
-      return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
-    }
-
-
-    /**
-     * Return map of <tableName, the start keys of all of the regions in this table>
-     */
-    public static Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> getTablesRegionStartKeys(
-        Configuration conf, List<TableName> tables) throws IOException {
-      final TreeMap<ImmutableBytesWritable, List<ImmutableBytesWritable>> ret = new TreeMap<>();
-
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Admin admin = conn.getAdmin()) {
-        LOG.info("Looking up current regions for tables");
-        for (TableName tName : tables) {
-          RegionLocator table = conn.getRegionLocator(tName);
-          // if table not exist, use default split keys for this table
-          byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY };
-          if (admin.tableExists(tName)) {
-            byteKeys = table.getStartKeys();
-          }
-          List<ImmutableBytesWritable> tableStartKeys = new ArrayList<>(byteKeys.length);
-          for (byte[] byteKey : byteKeys) {
-            tableStartKeys.add(new ImmutableBytesWritable(byteKey));
-          }
-          ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys);
+  final private static int validateCompositeKey(byte[] keyBytes) {
 
-        }
-        return ret;
-      }
-    }
-
-    /**
-     * write <tableName, start key of each region in table> into sequence file in order,
-     * and this format can be parsed by MultiHFilePartitioner
-     */
-    public static void writeTableSplitKeys(Configuration conf, Path partitionsPath,
-        Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map) throws IOException {
-      LOG.info("Writing partition information to " + partitionsPath);
-
-      if (map == null || map.isEmpty()) {
-        throw new IllegalArgumentException("No regions passed for all tables");
-      }
-
-      SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath),
-          Writer.keyClass(ImmutableBytesWritable.class),
-          Writer.valueClass(ImmutableBytesWritable.class));
-
-      try {
-        for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : map.entrySet()) {
-          ImmutableBytesWritable table = entry.getKey();
-          List<ImmutableBytesWritable> list = entry.getValue();
-          if (list == null) {
-            throw new IOException("Split keys for a table can not be null");
-          }
-
-          TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(list);
-
-          ImmutableBytesWritable first = sorted.first();
-          if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
-            throw new IllegalArgumentException(
-                "First region of table should have empty start key. Instead has: "
-                    + Bytes.toStringBinary(first.get()));
-          }
-
-          for (ImmutableBytesWritable startKey : sorted) {
-            writer.append(table, startKey);
-          }
-        }
-      } finally {
-        writer.close();
-      }
-    }
-
-    /**
-     * read partition file into map <table, splitKeys of this table>
-     */
-    private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> readTableSplitKeys(
-        Configuration conf) throws IOException {
-      String parts = getPartitionFile(conf);
-      LOG.info("Read partition info from file: " + parts);
-      final Path partFile = new Path(parts);
+    int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
 
-      SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile));
-      // values are already sorted in file, so use list
-      final Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map =
-          new TreeMap<>();
-      // key and value have same type
-      ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
-      ImmutableBytesWritable value =
-          ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
-      try {
-        while (reader.next(key, value)) {
-
-          List<ImmutableBytesWritable> list = map.get(key);
-          if (list == null) {
-            list = new ArrayList<>();
-          }
-          list.add(value);
-          map.put(key, list);
-
-          key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
-          value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
-        }
-      } finally {
-        IOUtils.cleanup(LOG, reader);
-      }
-      return map;
-    }
-
-    @Override
-    public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) {
-      byte[] row = CellUtil.cloneRow(value);
-      final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row);
-      ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
-      //find splitKey by input rowKey
-      if (table_SplitKeys.containsKey(table)) {
-        List<ImmutableBytesWritable> list = table_SplitKeys.get(table);
-        int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator());
-        if (index < 0) {
-          index = (index + 1) * (-1) - 1;
-        } else if (index == list.size()) {
-          index -= 1;
-        }
-        if (index < 0) {
-          index = 0;
-          LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY ");
-        }
-        splitId = list.get(index);
-      }
-
-      // find the id of the reducer for the input
-      Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId));
-      if (id == null) {
-          LOG.warn("Can not get reducer id for input record");
-          return -1;
-      }
-      return id.intValue() % numPartitions;
+    // Either the separator was not found or a tablename wasn't present or a key wasn't present
+    if (separatorIdx == -1) {
+      throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
+              .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
     }
+    return separatorIdx;
+  }
 
-    /**
-     * A class store pair<TableName, SplitKey>, has two main usage
-     * 1. store tableName and one of its splitKey as a pair
-     * 2. implement comparable, so that partitioner can find splitKey of its input cell
-     */
-    static class TableSplitKeyPair extends Pair<ImmutableBytesWritable, ImmutableBytesWritable>
-        implements Comparable<TableSplitKeyPair> {
-
-      private static final long serialVersionUID = -6485999667666325594L;
-
-      public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) {
-        super(a, b);
-      }
+  protected static byte[] getTableName(byte[] keyBytes) {
+    int separatorIdx = validateCompositeKey(keyBytes);
+    return Bytes.copy(keyBytes, 0, separatorIdx);
+  }
 
-      @Override
-      public int compareTo(TableSplitKeyPair other) {
-        if (this.getFirst().equals(other.getFirst())) {
-          return this.getSecond().compareTo(other.getSecond());
-        }
-        return this.getFirst().compareTo(other.getFirst());
-      }
-    }
+  protected static byte[] getSuffix(byte[] keyBytes) {
+    int separatorIdx = validateCompositeKey(keyBytes);
+    return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
   }
-}
\ No newline at end of file
+}


Mime
View raw message