hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject [1/2] hbase git commit: HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor)
Date Tue, 27 Jun 2017 19:33:47 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 0a4794aab -> d5e206dfa


http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e206df/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 3533f8a..87522b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -109,6 +108,9 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.mockito.Mockito;
 
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * Simple test for {@link HFileOutputFormat2}.
  * Sets up and runs a mapreduce job that writes hfile output.
@@ -123,9 +125,9 @@ public class TestHFileOutputFormat2  {
 
   private static final byte[][] FAMILIES
     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
-      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
-  private static final TableName TABLE_NAME =
-      TableName.valueOf("TestTable");
+          , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
+          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
 
   private HBaseTestingUtility util = new HBaseTestingUtility();
 
@@ -146,6 +148,9 @@ public class TestHFileOutputFormat2  {
     private static final int VALLEN_DEFAULT=10;
     private static final String VALLEN_CONF="randomkv.val.length";
     private static final byte [] QUALIFIER = Bytes.toBytes("data");
+    private boolean multiTableMapper = false;
+    private TableName[] tables = null;
+
 
     @Override
     protected void setup(Context context) throws IOException,
@@ -155,6 +160,13 @@ public class TestHFileOutputFormat2  {
       Configuration conf = context.getConfiguration();
       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+              false);
+      if (multiTableMapper) {
+        tables = TABLE_NAMES;
+      } else {
+        tables = new TableName[]{TABLE_NAMES[0]};
+      }
     }
 
     @Override
@@ -170,19 +182,23 @@ public class TestHFileOutputFormat2  {
 
       int taskId = context.getTaskAttemptID().getTaskID().getId();
       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
       Random random = new Random();
-      for (int i = 0; i < ROWSPERSPLIT; i++) {
-
-        random.nextBytes(keyBytes);
-        // Ensure that unique tasks generate unique keys
-        keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
-        random.nextBytes(valBytes);
-        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+      byte[] key;
+      for (int j = 0; j < tables.length; ++j) {
+        for (int i = 0; i < ROWSPERSPLIT; i++) {
+          random.nextBytes(keyBytes);
+          // Ensure that unique tasks generate unique keys
+          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+          random.nextBytes(valBytes);
+          key = keyBytes;
+          if (multiTableMapper) {
+            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+          }
 
-        for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
-          Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
-          context.write(key, kv);
+          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+            context.write(new ImmutableBytesWritable(key), kv);
+          }
         }
       }
     }
@@ -196,31 +212,39 @@ public class TestHFileOutputFormat2  {
                  ImmutableBytesWritable, Put> {
 
     private int keyLength;
-    private static final int KEYLEN_DEFAULT=10;
-    private static final String KEYLEN_CONF="randomkv.key.length";
+    private static final int KEYLEN_DEFAULT = 10;
+    private static final String KEYLEN_CONF = "randomkv.key.length";
 
     private int valLength;
-    private static final int VALLEN_DEFAULT=10;
-    private static final String VALLEN_CONF="randomkv.val.length";
-    private static final byte [] QUALIFIER = Bytes.toBytes("data");
+    private static final int VALLEN_DEFAULT = 10;
+    private static final String VALLEN_CONF = "randomkv.val.length";
+    private static final byte[] QUALIFIER = Bytes.toBytes("data");
+    private boolean multiTableMapper = false;
+    private TableName[] tables = null;
 
     @Override
     protected void setup(Context context) throws IOException,
-        InterruptedException {
+            InterruptedException {
       super.setup(context);
 
       Configuration conf = context.getConfiguration();
       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+              false);
+      if (multiTableMapper) {
+        tables = TABLE_NAMES;
+      } else {
+        tables = new TableName[]{TABLE_NAMES[0]};
+      }
     }
 
     @Override
     protected void map(
-        NullWritable n1, NullWritable n2,
-        Mapper<NullWritable, NullWritable,
-               ImmutableBytesWritable,Put>.Context context)
-        throws java.io.IOException ,InterruptedException
-    {
+            NullWritable n1, NullWritable n2,
+            Mapper<NullWritable, NullWritable,
+                    ImmutableBytesWritable, Put>.Context context)
+            throws java.io.IOException, InterruptedException {
 
       byte keyBytes[] = new byte[keyLength];
       byte valBytes[] = new byte[valLength];
@@ -229,20 +253,25 @@ public class TestHFileOutputFormat2  {
       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
 
       Random random = new Random();
-      for (int i = 0; i < ROWSPERSPLIT; i++) {
-
-        random.nextBytes(keyBytes);
-        // Ensure that unique tasks generate unique keys
-        keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
-        random.nextBytes(valBytes);
-        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
-
-        for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
-          Put p = new Put(keyBytes);
-          p.addColumn(family, QUALIFIER, valBytes);
-          // set TTL to very low so that the scan does not return any value
-          p.setTTL(1l);
-          context.write(key, p);
+      byte[] key;
+      for (int j = 0; j < tables.length; ++j) {
+        for (int i = 0; i < ROWSPERSPLIT; i++) {
+          random.nextBytes(keyBytes);
+          // Ensure that unique tasks generate unique keys
+          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+          random.nextBytes(valBytes);
+          key = keyBytes;
+          if (multiTableMapper) {
+            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+          }
+
+          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+            Put p = new Put(keyBytes);
+            p.addColumn(family, QUALIFIER, valBytes);
+            // set TTL to very low so that the scan does not return any value
+            p.setTTL(1l);
+            context.write(new ImmutableBytesWritable(key), p);
+          }
         }
       }
     }
@@ -365,7 +394,7 @@ public class TestHFileOutputFormat2  {
       HFile.Reader rd =
           HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
       Map<byte[],byte[]> finfo = rd.loadFileInfo();
-      byte[] range = finfo.get("TIMERANGE".getBytes());
+      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
       assertNotNull(range);
 
       // unmarshall and check values.
@@ -438,6 +467,9 @@ public class TestHFileOutputFormat2  {
     Path dir =
         util.getDataTestDir("WritingTagData");
     try {
+      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
+      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
+      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
       Job job = new Job(conf);
       FileOutputFormat.setOutputPath(job, dir);
       context = createTestTaskAttemptContext(job);
@@ -537,6 +569,7 @@ public class TestHFileOutputFormat2  {
     doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
   }
 
+  //@Ignore("Wahtevs")
   @Test
   public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
@@ -544,43 +577,80 @@ public class TestHFileOutputFormat2  {
   }
 
   private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
-      boolean putSortReducer, String tableStr) throws Exception {
+                                     boolean putSortReducer, String tableStr) throws Exception {
+      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
+              Arrays.asList(tableStr));
+  }
+
+  @Test
+  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
+    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
+    doIncrementalLoadTest(false, false, true,
+            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
+                    ()));
+  }
+
+  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
+      boolean putSortReducer, List<String> tableStr) throws Exception {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
-    conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
+    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
     int hostCount = 1;
     int regionNum = 5;
-    if(shouldKeepLocality) {
+    if (shouldKeepLocality) {
       // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
       // explicit hostnames parameter just like MiniDFSCluster does.
       hostCount = 3;
       regionNum = 20;
     }
 
-    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
     String[] hostnames = new String[hostCount];
-    for(int i = 0; i < hostCount; ++i) {
+    for (int i = 0; i < hostCount; ++i) {
       hostnames[i] = "datanode_" + i;
     }
     util.startMiniCluster(1, hostCount, hostnames);
-    TableName tableName = TableName.valueOf(tableStr);
-    Table table = util.createTable(tableName, FAMILIES, splitKeys);
-    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
-    FileSystem fs = testDir.getFileSystem(conf);
-    try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin =
-        util.getConnection().getAdmin();) {
+
+    Map<String, Table> allTables = new HashMap<>(tableStr.size());
+    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
+    boolean writeMultipleTables = tableStr.size() > 1;
+    for (String tableStrSingle : tableStr) {
+      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+      TableName tableName = TableName.valueOf(tableStrSingle);
+      Table table = util.createTable(tableName, FAMILIES, splitKeys);
+
+      RegionLocator r = util.getConnection().getRegionLocator(tableName);
       assertEquals("Should start with empty table", 0, util.countRows(table));
       int numRegions = r.getStartKeys().length;
       assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
 
-      // Generate the bulk load files
-      runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer);
+      allTables.put(tableStrSingle, table);
+      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
+    }
+    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+    // Generate the bulk load files
+    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
+
+    for (Table tableSingle : allTables.values()) {
       // This doesn't write into the table, just makes files
-      assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
+      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
+    }
+    int numTableDirs = 0;
+    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
+      Path tablePath = testDir;
+
+      if (writeMultipleTables) {
+        if (allTables.containsKey(tf.getPath().getName())) {
+          ++numTableDirs;
+          tablePath = tf.getPath();
+        }
+        else {
+          continue;
+        }
+      }
 
       // Make sure that a directory was created for every CF
       int dir = 0;
-      for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
         for (byte[] family : FAMILIES) {
           if (Bytes.toString(family).equals(f.getPath().getName())) {
             ++dir;
@@ -588,95 +658,132 @@ public class TestHFileOutputFormat2  {
         }
       }
       assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+    }
+    if (writeMultipleTables) {
+      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
+    }
 
+    Admin admin = util.getConnection().getAdmin();
+    try {
       // handle the split case
       if (shouldChangeRegions) {
-        LOG.info("Changing regions in table");
-        admin.disableTable(table.getName());
+        Table chosenTable = allTables.values().iterator().next();
+        // Choose a semi-random table if multiple tables are available
+        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
+        admin.disableTable(chosenTable.getName());
         util.waitUntilNoRegionsInTransition();
 
-        util.deleteTable(table.getName());
+        util.deleteTable(chosenTable.getName());
         byte[][] newSplitKeys = generateRandomSplitKeys(14);
-        table = util.createTable(tableName, FAMILIES, newSplitKeys);
+        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
 
-        while (util.getConnection().getRegionLocator(tableName)
-            .getAllRegionLocations().size() != 15 ||
-            !admin.isTableAvailable(table.getName())) {
+        while (util.getConnection().getRegionLocator(chosenTable.getName())
+                .getAllRegionLocations().size() != 15 ||
+                !admin.isTableAvailable(table.getName())) {
           Thread.sleep(200);
           LOG.info("Waiting for new region assignment to happen");
         }
       }
 
       // Perform the actual load
-      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r);
-
-      // Ensure data shows up
-      int expectedRows = 0;
-      if (putSortReducer) {
-        // no rows should be extracted
-        assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
-          util.countRows(table));
-      } else {
-        expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
-        assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
-          util.countRows(table));
-        Scan scan = new Scan();
-        ResultScanner results = table.getScanner(scan);
-        for (Result res : results) {
-          assertEquals(FAMILIES.length, res.rawCells().length);
-          Cell first = res.rawCells()[0];
-          for (Cell kv : res.rawCells()) {
-            assertTrue(CellUtil.matchingRow(first, kv));
-            assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
+        Path tableDir = testDir;
+        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
+        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
+        if (writeMultipleTables) {
+          tableDir = new Path(testDir, tableNameStr);
+        }
+        Table currentTable = allTables.get(tableNameStr);
+        TableName currentTableName = currentTable.getName();
+        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
+                .getRegionLocator());
+
+        // Ensure data shows up
+        int expectedRows = 0;
+        if (putSortReducer) {
+          // no rows should be extracted
+          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+                  util.countRows(currentTable));
+        } else {
+          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+                  util.countRows(currentTable));
+          Scan scan = new Scan();
+          ResultScanner results = currentTable.getScanner(scan);
+          for (Result res : results) {
+            assertEquals(FAMILIES.length, res.rawCells().length);
+            Cell first = res.rawCells()[0];
+            for (Cell kv : res.rawCells()) {
+              assertTrue(CellUtil.matchingRow(first, kv));
+              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+            }
           }
+          results.close();
+        }
+        String tableDigestBefore = util.checksumRows(currentTable);
+        // Check region locality
+        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
+        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
+          hbd.add(region.getHDFSBlocksDistribution());
+        }
+        for (String hostname : hostnames) {
+          float locality = hbd.getBlockLocalityIndex(hostname);
+          LOG.info("locality of [" + hostname + "]: " + locality);
+          assertEquals(100, (int) (locality * 100));
         }
-        results.close();
-      }
-      String tableDigestBefore = util.checksumRows(table);
-      // Check region locality
-      HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
-      for (HRegion region : util.getHBaseCluster().getRegions(tableName)) {
-        hbd.add(region.getHDFSBlocksDistribution());
-      }
-      for (String hostname : hostnames) {
-        float locality =  hbd.getBlockLocalityIndex(hostname);
-        LOG.info("locality of [" + hostname + "]: " + locality);
-        assertEquals(100, (int) (locality * 100));
-      }
 
-      // Cause regions to reopen
-      admin.disableTable(tableName);
-      while (!admin.isTableDisabled(tableName)) {
-        Thread.sleep(200);
-        LOG.info("Waiting for table to disable");
+        // Cause regions to reopen
+        admin.disableTable(currentTableName);
+        while (!admin.isTableDisabled(currentTableName)) {
+          Thread.sleep(200);
+          LOG.info("Waiting for table to disable");
+        }
+        admin.enableTable(currentTableName);
+        util.waitTableAvailable(currentTableName);
+        assertEquals("Data should remain after reopening of regions",
+                tableDigestBefore, util.checksumRows(currentTable));
       }
-      admin.enableTable(tableName);
-      util.waitTableAvailable(tableName);
-      assertEquals("Data should remain after reopening of regions",
-          tableDigestBefore, util.checksumRows(table));
     } finally {
+      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+          tableInfoSingle.getRegionLocator().close();
+      }
+      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
+        singleTable.getValue().close();
+        util.deleteTable(singleTable.getValue().getName());
+      }
       testDir.getFileSystem(conf).delete(testDir, true);
-      util.deleteTable(tableName);
       util.shutdownMiniCluster();
     }
   }
 
-  private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
-      RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException,
-      UnsupportedEncodingException, InterruptedException, ClassNotFoundException {
+  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
+                                    boolean putSortReducer) throws IOException,
+          InterruptedException, ClassNotFoundException {
     Job job = new Job(conf, "testLocalMRIncrementalLoad");
     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
         KeyValueSerialization.class.getName());
     setupRandomGeneratorMapper(job, putSortReducer);
-    HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
+    if (tableInfo.size() > 1) {
+      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
+      int sum = 0;
+      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
+      }
+      assertEquals(sum, job.getNumReduceTasks());
+    }
+    else {
+      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
+      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
+              regionLocator);
+      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
+    }
+
     FileOutputFormat.setOutputPath(job, outDir);
 
     assertFalse(util.getTestFileSystem().exists(outDir)) ;
 
-    assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
-
     assertTrue(job.waitForCompletion(true));
   }
 
@@ -696,7 +803,10 @@ public class TestHFileOutputFormat2  {
           getMockColumnFamiliesForCompression(numCfs);
       Table table = Mockito.mock(Table.class);
       setupMockColumnFamiliesForCompression(table, familyToCompression);
-      HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
+      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
+              HFileOutputFormat2.serializeColumnFamilyAttribute
+                      (HFileOutputFormat2.compressionDetails,
+                              Arrays.asList(table.getTableDescriptor())));
 
       // read back family specific compression setting from the configuration
       Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
@@ -707,14 +817,14 @@ public class TestHFileOutputFormat2  {
       for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
         assertEquals("Compression configuration incorrect for column family:"
             + entry.getKey(), entry.getValue(),
-            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
+            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
       }
     }
   }
 
   private void setupMockColumnFamiliesForCompression(Table table,
       Map<String, Compression.Algorithm> familyToCompression) throws IOException {
-    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
     for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
           .setMaxVersions(1)
@@ -766,7 +876,9 @@ public class TestHFileOutputFormat2  {
       Table table = Mockito.mock(Table.class);
       setupMockColumnFamiliesForBloomType(table,
           familyToBloomType);
-      HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
+      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
+              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
+              Arrays.asList(table.getTableDescriptor())));
 
       // read back family specific data block encoding settings from the
       // configuration
@@ -779,14 +891,14 @@ public class TestHFileOutputFormat2  {
       for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
         assertEquals("BloomType configuration incorrect for column family:"
             + entry.getKey(), entry.getValue(),
-            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
+            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
       }
     }
   }
 
   private void setupMockColumnFamiliesForBloomType(Table table,
       Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
-    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
     for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
           .setMaxVersions(1)
@@ -835,7 +947,10 @@ public class TestHFileOutputFormat2  {
       Table table = Mockito.mock(Table.class);
       setupMockColumnFamiliesForBlockSize(table,
           familyToBlockSize);
-      HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
+      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
+              HFileOutputFormat2.serializeColumnFamilyAttribute
+                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
+                              .getTableDescriptor())));
 
       // read back family specific data block encoding settings from the
       // configuration
@@ -849,14 +964,14 @@ public class TestHFileOutputFormat2  {
           ) {
         assertEquals("BlockSize configuration incorrect for column family:"
             + entry.getKey(), entry.getValue(),
-            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
+            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
       }
     }
   }
 
   private void setupMockColumnFamiliesForBlockSize(Table table,
       Map<String, Integer> familyToDataBlockEncoding) throws IOException {
-    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
     for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
           .setMaxVersions(1)
@@ -910,7 +1025,10 @@ public class TestHFileOutputFormat2  {
       setupMockColumnFamiliesForDataBlockEncoding(table,
           familyToDataBlockEncoding);
       HTableDescriptor tableDescriptor = table.getTableDescriptor();
-      HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
+      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+              HFileOutputFormat2.serializeColumnFamilyAttribute
+                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
+                      .asList(tableDescriptor)));
 
       // read back family specific data block encoding settings from the
       // configuration
@@ -923,14 +1041,14 @@ public class TestHFileOutputFormat2  {
       for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
         assertEquals("DataBlockEncoding configuration incorrect for column family:"
             + entry.getKey(), entry.getValue(),
-            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
+            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
       }
     }
   }
 
   private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
       Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
-    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
     for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
           .setMaxVersions(1)
@@ -995,7 +1113,7 @@ public class TestHFileOutputFormat2  {
     // Setup table descriptor
     Table table = Mockito.mock(Table.class);
     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
-    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
     Mockito.doReturn(htd).when(table).getTableDescriptor();
     for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
       htd.addFamily(hcd);
@@ -1099,15 +1217,15 @@ public class TestHFileOutputFormat2  {
     util.startMiniCluster();
     try (Connection conn = ConnectionFactory.createConnection();
         Admin admin = conn.getAdmin();
-        Table table = util.createTable(TABLE_NAME, FAMILIES);
-        RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
+        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
+        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
       final FileSystem fs = util.getDFSCluster().getFileSystem();
       assertEquals("Should start with empty table", 0, util.countRows(table));
 
       // deep inspection: get the StoreFile dir
       final Path storePath = new Path(
-        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
-          new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
+        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
+          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
             Bytes.toString(FAMILIES[0])));
       assertEquals(0, fs.listStatus(storePath).length);
 
@@ -1117,8 +1235,8 @@ public class TestHFileOutputFormat2  {
 
       for (int i = 0; i < 2; i++) {
         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
-        runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
-            testDir, false);
+        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
+                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
         // Perform the actual load
         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
       }
@@ -1132,12 +1250,12 @@ public class TestHFileOutputFormat2  {
       assertEquals(2, fs.listStatus(storePath).length);
 
       // minor compactions shouldn't get rid of the file
-      admin.compact(TABLE_NAME);
+      admin.compact(TABLE_NAMES[0]);
       try {
         quickPoll(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
-            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
             for (HRegion region : regions) {
               for (Store store : region.getStores()) {
                 store.closeAndArchiveCompactedFiles();
@@ -1152,11 +1270,11 @@ public class TestHFileOutputFormat2  {
       }
 
       // a major compaction should work though
-      admin.majorCompact(TABLE_NAME);
+      admin.majorCompact(TABLE_NAMES[0]);
       quickPoll(new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
-          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
           for (HRegion region : regions) {
             for (Store store : region.getStores()) {
               store.closeAndArchiveCompactedFiles();
@@ -1182,13 +1300,13 @@ public class TestHFileOutputFormat2  {
         Admin admin = conn.getAdmin()){
       Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
       final FileSystem fs = util.getDFSCluster().getFileSystem();
-      Table table = util.createTable(TABLE_NAME, FAMILIES);
+      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
       assertEquals("Should start with empty table", 0, util.countRows(table));
 
       // deep inspection: get the StoreFile dir
       final Path storePath = new Path(
-        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
-          new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
+        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
+          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
             Bytes.toString(FAMILIES[0])));
       assertEquals(0, fs.listStatus(storePath).length);
 
@@ -1196,7 +1314,7 @@ public class TestHFileOutputFormat2  {
       Put p = new Put(Bytes.toBytes("test"));
       p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
       table.put(p);
-      admin.flush(TABLE_NAME);
+      admin.flush(TABLE_NAMES[0]);
       assertEquals(1, util.countRows(table));
       quickPoll(new Callable<Boolean>() {
         @Override
@@ -1209,8 +1327,9 @@ public class TestHFileOutputFormat2  {
       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
           true);
 
-      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
-      runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false);
+      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
+      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
+                      .getTableDescriptor(), regionLocator)), testDir, false);
 
       // Perform the actual load
       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
@@ -1224,7 +1343,7 @@ public class TestHFileOutputFormat2  {
       assertEquals(2, fs.listStatus(storePath).length);
 
       // minor compactions shouldn't get rid of the file
-      admin.compact(TABLE_NAME);
+      admin.compact(TABLE_NAMES[0]);
       try {
         quickPoll(new Callable<Boolean>() {
           @Override
@@ -1238,7 +1357,7 @@ public class TestHFileOutputFormat2  {
       }
 
       // a major compaction should work though
-      admin.majorCompact(TABLE_NAME);
+      admin.majorCompact(TABLE_NAMES[0]);
       quickPoll(new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
@@ -1273,15 +1392,15 @@ public class TestHFileOutputFormat2  {
     if ("newtable".equals(args[0])) {
       TableName tname = TableName.valueOf(args[1]);
       byte[][] splitKeys = generateRandomSplitKeys(4);
-      try (Table table = util.createTable(tname, FAMILIES, splitKeys)) {
-      }
+      Table table = util.createTable(tname, FAMILIES, splitKeys);
     } else if ("incremental".equals(args[0])) {
       TableName tname = TableName.valueOf(args[1]);
       try(Connection c = ConnectionFactory.createConnection(conf);
           Admin admin = c.getAdmin();
           RegionLocator regionLocator = c.getRegionLocator(tname)) {
         Path outDir = new Path("incremental-out");
-        runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false);
+        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
+                .getTableDescriptor(tname), regionLocator)), outDir, false);
       }
     } else {
       throw new RuntimeException(
@@ -1294,8 +1413,10 @@ public class TestHFileOutputFormat2  {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
     conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
-    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
-      "ONE_SSD");
+
+    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
+            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
+                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
     Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
     Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
     util.startMiniDFSCluster(3);
@@ -1313,8 +1434,10 @@ public class TestHFileOutputFormat2  {
       assertEquals("HOT", spB);
 
       // alter table cf schema to change storage policies
-      HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
-      HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
+      HFileOutputFormat2.configureStoragePolicy(conf, fs,
+              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
+      HFileOutputFormat2.configureStoragePolicy(conf, fs,
+              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
       spA = getStoragePolicyName(fs, cf1Dir);
       spB = getStoragePolicyName(fs, cf2Dir);
       LOG.debug("Storage policy of cf 0: [" + spA + "].");
@@ -1368,6 +1491,5 @@ public class TestHFileOutputFormat2  {
 
     return null;
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e206df/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
deleted file mode 100644
index 781eaa9..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.PerformanceEvaluation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
- * writes hfiles.
- */
-@Category(MediumTests.class)
-public class TestMultiTableHFileOutputFormat {
-  private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class);
-
-  private HBaseTestingUtility util = new HBaseTestingUtility();
-
-  private static int ROWSPERSPLIT = 10;
-
-  private static final int KEYLEN_DEFAULT = 10;
-  private static final String KEYLEN_CONF = "randomkv.key.length";
-
-  private static final int VALLEN_DEFAULT = 10;
-  private static final String VALLEN_CONF = "randomkv.val.length";
-
-  private static final byte[][] TABLES =
-      { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
-          Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
-
-  private static final byte[][] FAMILIES =
-      { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
-          Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
-
-  private static final byte[] QUALIFIER = Bytes.toBytes("data");
-
-  /**
-   * Run small MR job. this MR job will write HFile into
-   * testWritingDataIntoHFiles/tableNames/columnFamilies/
-   */
-  @Test
-  public void testWritingDataIntoHFiles() throws Exception {
-    Configuration conf = util.getConfiguration();
-    util.startMiniCluster();
-    Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
-    FileSystem fs = testDir.getFileSystem(conf);
-    LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
-
-    // Set down this value or we OOME in eclipse.
-    conf.setInt("mapreduce.task.io.sort.mb", 20);
-    // Write a few files by setting max file size.
-    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
-
-    try {
-      Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
-
-      FileOutputFormat.setOutputPath(job, testDir);
-
-      job.setInputFormatClass(NMapInputFormat.class);
-      job.setMapperClass(Random_TableKV_GeneratingMapper.class);
-      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-      job.setMapOutputValueClass(KeyValue.class);
-      job.setReducerClass(Table_KeyValueSortReducer.class);
-      job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
-      job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
-          MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-          KeyValueSerialization.class.getName());
-
-      TableMapReduceUtil.addDependencyJars(job);
-      TableMapReduceUtil.initCredentials(job);
-      LOG.info("\nStarting test testWritingDataIntoHFiles\n");
-      assertTrue(job.waitForCompletion(true));
-      LOG.info("\nWaiting on checking MapReduce output\n");
-      assertTrue(checkMROutput(fs, testDir, 0));
-    } finally {
-      testDir.getFileSystem(conf).delete(testDir, true);
-      util.shutdownMiniCluster();
-    }
-  }
-
-  /**
-   * check whether create directory and hfiles as format designed in MultiHFilePartitioner
-   * and also check whether the output file has same related configuration as created table
-   */
-  @Test
-  public void testMultiHFilePartitioner() throws Exception {
-    Configuration conf = util.getConfiguration();
-    util.startMiniCluster();
-    Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner");
-    FileSystem fs = testDir.getFileSystem(conf);
-    LOG.info("testMultiHFilePartitioner dir writing to : " + testDir);
-
-    // Set down this value or we OOME in eclipse.
-    conf.setInt("mapreduce.task.io.sort.mb", 20);
-    // Write a few files by setting max file size.
-    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
-
-    // Create several tables for testing
-    List<TableName> tables = new ArrayList<TableName>();
-
-    // to store splitKeys for TABLE[0] for testing;
-    byte[][] testKeys = new byte[0][0];
-    for (int i = 0; i < TABLES.length; i++) {
-      TableName tableName = TableName.valueOf(TABLES[i]);
-      byte[][] splitKeys = generateRandomSplitKeys(3);
-      if (i == 0) {
-        testKeys = splitKeys;
-      }
-      HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
-      for (int j = 0; j < FAMILIES.length; j++) {
-        HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]);
-        //only set Tables[0] configuration, and specify compression type and DataBlockEncode
-        if (i == 0) {
-          familyDescriptor.setCompressionType(Compression.Algorithm.GZ);
-          familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
-        }
-        tableDescriptor.addFamily(familyDescriptor);
-      }
-      util.createTable(tableDescriptor, splitKeys, conf);
-      tables.add(tableName);
-    }
-    // set up for MapReduce job
-    try {
-      Job job = Job.getInstance(conf, "testMultiHFilePartitioner");
-      FileOutputFormat.setOutputPath(job, testDir);
-
-      job.setInputFormatClass(NMapInputFormat.class);
-      job.setMapperClass(Random_TableKV_GeneratingMapper.class);
-      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-      job.setMapOutputValueClass(KeyValue.class);
-
-      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables);
-
-      LOG.info("Starting test testWritingDataIntoHFiles");
-      assertTrue(job.waitForCompletion(true));
-      LOG.info("Waiting on checking MapReduce output");
-      assertTrue(checkMROutput(fs, testDir, 0));
-      assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys));
-    } finally {
-      for (int i = 0; i < TABLES.length; i++) {
-        TableName tName = TableName.valueOf(TABLES[i]);
-        util.deleteTable(tName);
-      }
-      fs.delete(testDir, true);
-      fs.close();
-      util.shutdownMiniCluster();
-    }
-  }
-
-  /**
-   * check the output hfile has same configuration as created test table
-   * and also check whether hfiles get split correctly
-   * only check TABLES[0]
-   */
-  private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException {
-    FileStatus[] fStats = fs.listStatus(testDir);
-    for (FileStatus stats : fStats) {
-      if (stats.getPath().getName().equals(new String(TABLES[0]))) {
-        FileStatus[] cfStats = fs.listStatus(stats.getPath());
-        for (FileStatus cfstat : cfStats) {
-          FileStatus[] hfStats = fs.listStatus(cfstat.getPath());
-
-          List<byte[]> firsttKeys = new ArrayList<byte[]>();
-          List<byte[]> lastKeys = new ArrayList<byte[]>();
-          for (FileStatus hfstat : hfStats) {
-            if (HFile.isHFileFormat(fs, hfstat)) {
-              HFile.Reader hfr =
-                  HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf);
-              if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr
-                  .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false;
-              firsttKeys.add(hfr.getFirstRowKey());
-              lastKeys.add(hfr.getLastRowKey());
-            }
-          }
-          if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) {
-            return false;
-          }
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Check whether the Hfile has been split by region boundaries
-   * @param splitKeys split keys for that table
-   * @param firstKeys first rowKey for hfiles
-   * @param lastKeys last rowKey for hfiles
-   */
-  private boolean checkFileSplit(byte[][] splitKeys, List<byte[]> firstKeys, List<byte[]> lastKeys) {
-    Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR);
-    Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR);
-    Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR);
-
-    int is = 0, il = 0;
-    for (byte[] key : lastKeys) {
-      while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++;
-      if (is == splitKeys.length) {
-        break;
-      }
-      if (is > 0) {
-        if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false;
-      }
-      il++;
-    }
-
-    if (is == splitKeys.length) {
-      return il == lastKeys.size() - 1;
-    }
-    return true;
-  }
-
-
-  /**
-   * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
-   * created directory is correct or not A recursion method, the testDir had better be small size
-   */
-  private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException {
-    if (level >= 3) {
-      return HFile.isHFileFormat(fs, testDir);
-    }
-    FileStatus[] fStats = fs.listStatus(testDir);
-    if (fStats == null || fStats.length <= 0) {
-      LOG.info("Created directory format is not correct");
-      return false;
-    }
-
-    for (FileStatus stats : fStats) {
-      // skip the _SUCCESS file created by MapReduce
-      if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
-        continue;
-      if (level < 2 && !stats.isDirectory()) {
-        LOG.info("Created directory format is not correct");
-        return false;
-      }
-      boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
-      if (flag == false) return false;
-    }
-    return true;
-  }
-
-
-  private byte[][] generateRandomSplitKeys(int numKeys) {
-    Random random = new Random();
-    byte[][] ret = new byte[numKeys][];
-    for (int i = 0; i < numKeys; i++) {
-      ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT);
-    }
-    return ret;
-  }
-
-
-  /**
-   * Simple mapper that makes <TableName, KeyValue> output. With no input data
-   */
-  static class Random_TableKV_GeneratingMapper
-      extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
-
-    private int keyLength;
-    private int valLength;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-      super.setup(context);
-
-      Configuration conf = context.getConfiguration();
-      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
-      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
-    }
-
-    @Override
-    protected void map(NullWritable n1, NullWritable n2,
-        Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
-        throws java.io.IOException, InterruptedException {
-
-      byte keyBytes[] = new byte[keyLength];
-      byte valBytes[] = new byte[valLength];
-
-      ArrayList<ImmutableBytesWritable> tables = new ArrayList<>();
-      for (int i = 0; i < TABLES.length; i++) {
-        tables.add(new ImmutableBytesWritable(TABLES[i]));
-      }
-
-      int taskId = context.getTaskAttemptID().getTaskID().getId();
-      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-      Random random = new Random();
-
-      for (int i = 0; i < ROWSPERSPLIT; i++) {
-        random.nextBytes(keyBytes);
-        // Ensure that unique tasks generate unique keys
-        keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
-        random.nextBytes(valBytes);
-
-        for (ImmutableBytesWritable table : tables) {
-          for (byte[] family : FAMILIES) {
-            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
-            context.write(table, kv);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
-   * <TableName, KeyValue>, with KeyValues are ordered
-   */
-
-  static class Table_KeyValueSortReducer
-      extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-    protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
-        org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
-        throws java.io.IOException, InterruptedException {
-      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
-      for (KeyValue kv : kvs) {
-        try {
-          map.add(kv.clone());
-        } catch (CloneNotSupportedException e) {
-          throw new java.io.IOException(e);
-        }
-      }
-      context.setStatus("Read " + map.getClass());
-      int index = 0;
-      for (KeyValue kv : map) {
-        context.write(table, kv);
-        if (++index % 100 == 0) context.setStatus("Wrote " + index);
-      }
-    }
-  }
-}
\ No newline at end of file


Mime
View raw message