Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72382200CB4 for ; Tue, 27 Jun 2017 21:36:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 658D0160BF6; Tue, 27 Jun 2017 19:36:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A740A160BDC for ; Tue, 27 Jun 2017 21:34:01 +0200 (CEST) Received: (qmail 44169 invoked by uid 500); 27 Jun 2017 19:33:50 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 44160 invoked by uid 99); 27 Jun 2017 19:33:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jun 2017 19:33:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33CBCE04F2; Tue, 27 Jun 2017 19:33:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Date: Tue, 27 Jun 2017 19:33:47 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hbase git commit: HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor) archived-at: Tue, 27 Jun 2017 19:36:41 -0000 Repository: hbase Updated Branches: refs/heads/branch-2 0a4794aab -> d5e206dfa http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e206df/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 3533f8a..87522b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -109,6 +108,9 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * Simple test for {@link HFileOutputFormat2}. * Sets up and runs a mapreduce job that writes hfile output. @@ -123,9 +125,9 @@ public class TestHFileOutputFormat2 { private static final byte[][] FAMILIES = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) - , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; - private static final TableName TABLE_NAME = - TableName.valueOf("TestTable"); + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; + private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", + "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); private HBaseTestingUtility util = new HBaseTestingUtility(); @@ -146,6 +148,9 @@ public class TestHFileOutputFormat2 { private static final int VALLEN_DEFAULT=10; private static final String VALLEN_CONF="randomkv.val.length"; private static final byte [] QUALIFIER = Bytes.toBytes("data"); + private boolean multiTableMapper = false; + private TableName[] tables = null; + @Override protected void setup(Context context) throws IOException, @@ -155,6 +160,13 @@ public class TestHFileOutputFormat2 { Configuration conf = context.getConfiguration(); keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + false); + if (multiTableMapper) { + tables = TABLE_NAMES; + } else { + tables = new TableName[]{TABLE_NAMES[0]}; + } } @Override @@ -170,19 +182,23 @@ public class TestHFileOutputFormat2 { int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - for (int i = 0; i < ROWSPERSPLIT; i++) { - - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + byte[] key; + for (int j = 0; j < tables.length; ++j) { + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + key = keyBytes; + if (multiTableMapper) { + key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); + } - for (byte[] family : TestHFileOutputFormat2.FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(key, kv); + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(new ImmutableBytesWritable(key), kv); + } } } } @@ -196,31 +212,39 @@ public class TestHFileOutputFormat2 { ImmutableBytesWritable, Put> { private int keyLength; - private static final int KEYLEN_DEFAULT=10; - private static final String KEYLEN_CONF="randomkv.key.length"; + private static final int KEYLEN_DEFAULT = 10; + private static final String KEYLEN_CONF = "randomkv.key.length"; private int valLength; - private static final int VALLEN_DEFAULT=10; - private static final String VALLEN_CONF="randomkv.val.length"; - private static final byte [] QUALIFIER = Bytes.toBytes("data"); + private static final int VALLEN_DEFAULT = 10; + private static final String VALLEN_CONF = "randomkv.val.length"; + private static final byte[] QUALIFIER = Bytes.toBytes("data"); + private boolean multiTableMapper = false; + private TableName[] tables = null; @Override protected void setup(Context context) throws IOException, - InterruptedException { + InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + false); + if (multiTableMapper) { + tables = TABLE_NAMES; + } else { + tables = new TableName[]{TABLE_NAMES[0]}; + } } @Override protected void map( - NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException ,InterruptedException - { + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException, InterruptedException { byte keyBytes[] = new byte[keyLength]; byte valBytes[] = new byte[valLength]; @@ -229,20 +253,25 @@ public class TestHFileOutputFormat2 { assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; Random random = new Random(); - for (int i = 0; i < ROWSPERSPLIT; i++) { - - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - - for (byte[] family : TestHFileOutputFormat2.FAMILIES) { - Put p = new Put(keyBytes); - p.addColumn(family, QUALIFIER, valBytes); - // set TTL to very low so that the scan does not return any value - p.setTTL(1l); - context.write(key, p); + byte[] key; + for (int j = 0; j < tables.length; ++j) { + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + key = keyBytes; + if (multiTableMapper) { + key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); + } + + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Put p = new Put(keyBytes); + p.addColumn(family, QUALIFIER, valBytes); + // set TTL to very low so that the scan does not return any value + p.setTTL(1l); + context.write(new ImmutableBytesWritable(key), p); + } } } } @@ -365,7 +394,7 @@ public class TestHFileOutputFormat2 { HFile.Reader rd = HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); Map finfo = rd.loadFileInfo(); - byte[] range = finfo.get("TIMERANGE".getBytes()); + byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); assertNotNull(range); // unmarshall and check values. @@ -438,6 +467,9 @@ public class TestHFileOutputFormat2 { Path dir = util.getDataTestDir("WritingTagData"); try { + conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); @@ -537,6 +569,7 @@ public class TestHFileOutputFormat2 { doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); } + //@Ignore("Wahtevs") @Test public void testMRIncrementalLoadWithPutSortReducer() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); @@ -544,43 +577,80 @@ public class TestHFileOutputFormat2 { } private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, - boolean putSortReducer, String tableStr) throws Exception { + boolean putSortReducer, String tableStr) throws Exception { + doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, + Arrays.asList(tableStr)); + } + + @Test + public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { + LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); + doIncrementalLoadTest(false, false, true, + Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList + ())); + } + + private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, + boolean putSortReducer, List tableStr) throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); - conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); + conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); int hostCount = 1; int regionNum = 5; - if(shouldKeepLocality) { + if (shouldKeepLocality) { // We should change host count higher than hdfs replica count when MiniHBaseCluster supports // explicit hostnames parameter just like MiniDFSCluster does. hostCount = 3; regionNum = 20; } - byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); String[] hostnames = new String[hostCount]; - for(int i = 0; i < hostCount; ++i) { + for (int i = 0; i < hostCount; ++i) { hostnames[i] = "datanode_" + i; } util.startMiniCluster(1, hostCount, hostnames); - TableName tableName = TableName.valueOf(tableStr); - Table table = util.createTable(tableName, FAMILIES, splitKeys); - Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); - FileSystem fs = testDir.getFileSystem(conf); - try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin = - util.getConnection().getAdmin();) { + + Map allTables = new HashMap<>(tableStr.size()); + List tableInfo = new ArrayList<>(tableStr.size()); + boolean writeMultipleTables = tableStr.size() > 1; + for (String tableStrSingle : tableStr) { + byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); + TableName tableName = TableName.valueOf(tableStrSingle); + Table table = util.createTable(tableName, FAMILIES, splitKeys); + + RegionLocator r = util.getConnection().getRegionLocator(tableName); assertEquals("Should start with empty table", 0, util.countRows(table)); int numRegions = r.getStartKeys().length; assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); - // Generate the bulk load files - runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer); + allTables.put(tableStrSingle, table); + tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); + } + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + // Generate the bulk load files + runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); + + for (Table tableSingle : allTables.values()) { // This doesn't write into the table, just makes files - assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); + assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); + } + int numTableDirs = 0; + for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { + Path tablePath = testDir; + + if (writeMultipleTables) { + if (allTables.containsKey(tf.getPath().getName())) { + ++numTableDirs; + tablePath = tf.getPath(); + } + else { + continue; + } + } // Make sure that a directory was created for every CF int dir = 0; - for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) { + for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { for (byte[] family : FAMILIES) { if (Bytes.toString(family).equals(f.getPath().getName())) { ++dir; @@ -588,95 +658,132 @@ public class TestHFileOutputFormat2 { } } assertEquals("Column family not found in FS.", FAMILIES.length, dir); + } + if (writeMultipleTables) { + assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); + } + Admin admin = util.getConnection().getAdmin(); + try { // handle the split case if (shouldChangeRegions) { - LOG.info("Changing regions in table"); - admin.disableTable(table.getName()); + Table chosenTable = allTables.values().iterator().next(); + // Choose a semi-random table if multiple tables are available + LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); + admin.disableTable(chosenTable.getName()); util.waitUntilNoRegionsInTransition(); - util.deleteTable(table.getName()); + util.deleteTable(chosenTable.getName()); byte[][] newSplitKeys = generateRandomSplitKeys(14); - table = util.createTable(tableName, FAMILIES, newSplitKeys); + Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); - while (util.getConnection().getRegionLocator(tableName) - .getAllRegionLocations().size() != 15 || - !admin.isTableAvailable(table.getName())) { + while (util.getConnection().getRegionLocator(chosenTable.getName()) + .getAllRegionLocations().size() != 15 || + !admin.isTableAvailable(table.getName())) { Thread.sleep(200); LOG.info("Waiting for new region assignment to happen"); } } // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r); - - // Ensure data shows up - int expectedRows = 0; - if (putSortReducer) { - // no rows should be extracted - assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, - util.countRows(table)); - } else { - expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, - util.countRows(table)); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - for (Result res : results) { - assertEquals(FAMILIES.length, res.rawCells().length); - Cell first = res.rawCells()[0]; - for (Cell kv : res.rawCells()) { - assertTrue(CellUtil.matchingRow(first, kv)); - assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { + Path tableDir = testDir; + String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); + LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); + if (writeMultipleTables) { + tableDir = new Path(testDir, tableNameStr); + } + Table currentTable = allTables.get(tableNameStr); + TableName currentTableName = currentTable.getName(); + new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo + .getRegionLocator()); + + // Ensure data shows up + int expectedRows = 0; + if (putSortReducer) { + // no rows should be extracted + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(currentTable)); + } else { + expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(currentTable)); + Scan scan = new Scan(); + ResultScanner results = currentTable.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.rawCells().length); + Cell first = res.rawCells()[0]; + for (Cell kv : res.rawCells()) { + assertTrue(CellUtil.matchingRow(first, kv)); + assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + } } + results.close(); + } + String tableDigestBefore = util.checksumRows(currentTable); + // Check region locality + HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); + for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { + hbd.add(region.getHDFSBlocksDistribution()); + } + for (String hostname : hostnames) { + float locality = hbd.getBlockLocalityIndex(hostname); + LOG.info("locality of [" + hostname + "]: " + locality); + assertEquals(100, (int) (locality * 100)); } - results.close(); - } - String tableDigestBefore = util.checksumRows(table); - // Check region locality - HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); - for (HRegion region : util.getHBaseCluster().getRegions(tableName)) { - hbd.add(region.getHDFSBlocksDistribution()); - } - for (String hostname : hostnames) { - float locality = hbd.getBlockLocalityIndex(hostname); - LOG.info("locality of [" + hostname + "]: " + locality); - assertEquals(100, (int) (locality * 100)); - } - // Cause regions to reopen - admin.disableTable(tableName); - while (!admin.isTableDisabled(tableName)) { - Thread.sleep(200); - LOG.info("Waiting for table to disable"); + // Cause regions to reopen + admin.disableTable(currentTableName); + while (!admin.isTableDisabled(currentTableName)) { + Thread.sleep(200); + LOG.info("Waiting for table to disable"); + } + admin.enableTable(currentTableName); + util.waitTableAvailable(currentTableName); + assertEquals("Data should remain after reopening of regions", + tableDigestBefore, util.checksumRows(currentTable)); } - admin.enableTable(tableName); - util.waitTableAvailable(tableName); - assertEquals("Data should remain after reopening of regions", - tableDigestBefore, util.checksumRows(table)); } finally { + for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { + tableInfoSingle.getRegionLocator().close(); + } + for (Entry singleTable : allTables.entrySet() ) { + singleTable.getValue().close(); + util.deleteTable(singleTable.getValue().getName()); + } testDir.getFileSystem(conf).delete(testDir, true); - util.deleteTable(tableName); util.shutdownMiniCluster(); } } - private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException, - UnsupportedEncodingException, InterruptedException, ClassNotFoundException { + private void runIncrementalPELoad(Configuration conf, List tableInfo, Path outDir, + boolean putSortReducer) throws IOException, + InterruptedException, ClassNotFoundException { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job, putSortReducer); - HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); + if (tableInfo.size() > 1) { + MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); + int sum = 0; + for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { + sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); + } + assertEquals(sum, job.getNumReduceTasks()); + } + else { + RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); + HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), + regionLocator); + assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); + } + FileOutputFormat.setOutputPath(job, outDir); assertFalse(util.getTestFileSystem().exists(outDir)) ; - assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); - assertTrue(job.waitForCompletion(true)); } @@ -696,7 +803,10 @@ public class TestHFileOutputFormat2 { getMockColumnFamiliesForCompression(numCfs); Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForCompression(table, familyToCompression); - HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor()); + conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.compressionDetails, + Arrays.asList(table.getTableDescriptor()))); // read back family specific compression setting from the configuration Map retrievedFamilyToCompressionMap = HFileOutputFormat2 @@ -707,14 +817,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToCompression.entrySet()) { assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForCompression(Table table, Map familyToCompression) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToCompression.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -766,7 +876,9 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForBloomType(table, familyToBloomType); - HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf); + conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, + Arrays.asList(table.getTableDescriptor()))); // read back family specific data block encoding settings from the // configuration @@ -779,14 +891,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToBloomType.entrySet()) { assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes())); + retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForBloomType(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -835,7 +947,10 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); - HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf); + conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table + .getTableDescriptor()))); // read back family specific data block encoding settings from the // configuration @@ -849,14 +964,14 @@ public class TestHFileOutputFormat2 { ) { assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes())); + retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForBlockSize(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -910,7 +1025,10 @@ public class TestHFileOutputFormat2 { setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); HTableDescriptor tableDescriptor = table.getTableDescriptor(); - HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); + conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.dataBlockEncodingDetails, Arrays + .asList(tableDescriptor))); // read back family specific data block encoding settings from the // configuration @@ -923,14 +1041,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToDataBlockEncoding.entrySet()) { assertEquals("DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes())); + retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForDataBlockEncoding(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -995,7 +1113,7 @@ public class TestHFileOutputFormat2 { // Setup table descriptor Table table = Mockito.mock(Table.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class); - HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); + HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { htd.addFamily(hcd); @@ -1099,15 +1217,15 @@ public class TestHFileOutputFormat2 { util.startMiniCluster(); try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); - Table table = util.createTable(TABLE_NAME, FAMILIES); - RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { + Table table = util.createTable(TABLE_NAMES[0], FAMILIES); + RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { final FileSystem fs = util.getDFSCluster().getFileSystem(); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir final Path storePath = new Path( - FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), - new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(), + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), + new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), Bytes.toString(FAMILIES[0]))); assertEquals(0, fs.listStatus(storePath).length); @@ -1117,8 +1235,8 @@ public class TestHFileOutputFormat2 { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); - runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME), - testDir, false); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table + .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); } @@ -1132,12 +1250,12 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME); + admin.compact(TABLE_NAMES[0]); try { quickPoll(new Callable() { @Override public Boolean call() throws Exception { - List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); for (HRegion region : regions) { for (Store store : region.getStores()) { store.closeAndArchiveCompactedFiles(); @@ -1152,11 +1270,11 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME); + admin.majorCompact(TABLE_NAMES[0]); quickPoll(new Callable() { @Override public Boolean call() throws Exception { - List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); for (HRegion region : regions) { for (Store store : region.getStores()) { store.closeAndArchiveCompactedFiles(); @@ -1182,13 +1300,13 @@ public class TestHFileOutputFormat2 { Admin admin = conn.getAdmin()){ Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); final FileSystem fs = util.getDFSCluster().getFileSystem(); - Table table = util.createTable(TABLE_NAME, FAMILIES); + Table table = util.createTable(TABLE_NAMES[0], FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir final Path storePath = new Path( - FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), - new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(), + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), + new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), Bytes.toString(FAMILIES[0]))); assertEquals(0, fs.listStatus(storePath).length); @@ -1196,7 +1314,7 @@ public class TestHFileOutputFormat2 { Put p = new Put(Bytes.toBytes("test")); p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); table.put(p); - admin.flush(TABLE_NAME); + admin.flush(TABLE_NAMES[0]); assertEquals(1, util.countRows(table)); quickPoll(new Callable() { @Override @@ -1209,8 +1327,9 @@ public class TestHFileOutputFormat2 { conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); - RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); - runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false); + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table + .getTableDescriptor(), regionLocator)), testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); @@ -1224,7 +1343,7 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME); + admin.compact(TABLE_NAMES[0]); try { quickPoll(new Callable() { @Override @@ -1238,7 +1357,7 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME); + admin.majorCompact(TABLE_NAMES[0]); quickPoll(new Callable() { @Override public Boolean call() throws Exception { @@ -1273,15 +1392,15 @@ public class TestHFileOutputFormat2 { if ("newtable".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); byte[][] splitKeys = generateRandomSplitKeys(4); - try (Table table = util.createTable(tname, FAMILIES, splitKeys)) { - } + Table table = util.createTable(tname, FAMILIES, splitKeys); } else if ("incremental".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); try(Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); RegionLocator regionLocator = c.getRegionLocator(tname)) { Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin + .getTableDescriptor(tname), regionLocator)), outDir, false); } } else { throw new RuntimeException( @@ -1294,8 +1413,10 @@ public class TestHFileOutputFormat2 { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); - conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]), - "ONE_SSD"); + + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + + Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( + TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); util.startMiniDFSCluster(3); @@ -1313,8 +1434,10 @@ public class TestHFileOutputFormat2 { assertEquals("HOT", spB); // alter table cf schema to change storage policies - HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir); - HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, + HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, + HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); spA = getStoragePolicyName(fs, cf1Dir); spB = getStoragePolicyName(fs, cf2Dir); LOG.debug("Storage policy of cf 0: [" + spA + "]."); @@ -1368,6 +1491,5 @@ public class TestHFileOutputFormat2 { return null; } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e206df/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java deleted file mode 100644 index 781eaa9..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.PerformanceEvaluation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and - * writes hfiles. - */ -@Category(MediumTests.class) -public class TestMultiTableHFileOutputFormat { - private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class); - - private HBaseTestingUtility util = new HBaseTestingUtility(); - - private static int ROWSPERSPLIT = 10; - - private static final int KEYLEN_DEFAULT = 10; - private static final String KEYLEN_CONF = "randomkv.key.length"; - - private static final int VALLEN_DEFAULT = 10; - private static final String VALLEN_CONF = "randomkv.val.length"; - - private static final byte[][] TABLES = - { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), - Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; - - private static final byte[][] FAMILIES = - { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), - Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; - - private static final byte[] QUALIFIER = Bytes.toBytes("data"); - - /** - * Run small MR job. this MR job will write HFile into - * testWritingDataIntoHFiles/tableNames/columnFamilies/ - */ - @Test - public void testWritingDataIntoHFiles() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - try { - Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); - - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - job.setReducerClass(Table_KeyValueSortReducer.class); - job.setOutputFormatClass(MultiTableHFileOutputFormat.class); - job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("\nStarting test testWritingDataIntoHFiles\n"); - assertTrue(job.waitForCompletion(true)); - LOG.info("\nWaiting on checking MapReduce output\n"); - assertTrue(checkMROutput(fs, testDir, 0)); - } finally { - testDir.getFileSystem(conf).delete(testDir, true); - util.shutdownMiniCluster(); - } - } - - /** - * check whether create directory and hfiles as format designed in MultiHFilePartitioner - * and also check whether the output file has same related configuration as created table - */ - @Test - public void testMultiHFilePartitioner() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testMultiHFilePartitioner dir writing to : " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - // Create several tables for testing - List tables = new ArrayList(); - - // to store splitKeys for TABLE[0] for testing; - byte[][] testKeys = new byte[0][0]; - for (int i = 0; i < TABLES.length; i++) { - TableName tableName = TableName.valueOf(TABLES[i]); - byte[][] splitKeys = generateRandomSplitKeys(3); - if (i == 0) { - testKeys = splitKeys; - } - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - for (int j = 0; j < FAMILIES.length; j++) { - HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]); - //only set Tables[0] configuration, and specify compression type and DataBlockEncode - if (i == 0) { - familyDescriptor.setCompressionType(Compression.Algorithm.GZ); - familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - } - tableDescriptor.addFamily(familyDescriptor); - } - util.createTable(tableDescriptor, splitKeys, conf); - tables.add(tableName); - } - // set up for MapReduce job - try { - Job job = Job.getInstance(conf, "testMultiHFilePartitioner"); - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - - MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables); - - LOG.info("Starting test testWritingDataIntoHFiles"); - assertTrue(job.waitForCompletion(true)); - LOG.info("Waiting on checking MapReduce output"); - assertTrue(checkMROutput(fs, testDir, 0)); - assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys)); - } finally { - for (int i = 0; i < TABLES.length; i++) { - TableName tName = TableName.valueOf(TABLES[i]); - util.deleteTable(tName); - } - fs.delete(testDir, true); - fs.close(); - util.shutdownMiniCluster(); - } - } - - /** - * check the output hfile has same configuration as created test table - * and also check whether hfiles get split correctly - * only check TABLES[0] - */ - private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException { - FileStatus[] fStats = fs.listStatus(testDir); - for (FileStatus stats : fStats) { - if (stats.getPath().getName().equals(new String(TABLES[0]))) { - FileStatus[] cfStats = fs.listStatus(stats.getPath()); - for (FileStatus cfstat : cfStats) { - FileStatus[] hfStats = fs.listStatus(cfstat.getPath()); - - List firsttKeys = new ArrayList(); - List lastKeys = new ArrayList(); - for (FileStatus hfstat : hfStats) { - if (HFile.isHFileFormat(fs, hfstat)) { - HFile.Reader hfr = - HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf); - if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr - .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false; - firsttKeys.add(hfr.getFirstRowKey()); - lastKeys.add(hfr.getLastRowKey()); - } - } - if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) { - return false; - } - } - } - } - return true; - } - - /** - * Check whether the Hfile has been split by region boundaries - * @param splitKeys split keys for that table - * @param firstKeys first rowKey for hfiles - * @param lastKeys last rowKey for hfiles - */ - private boolean checkFileSplit(byte[][] splitKeys, List firstKeys, List lastKeys) { - Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR); - Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR); - Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR); - - int is = 0, il = 0; - for (byte[] key : lastKeys) { - while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++; - if (is == splitKeys.length) { - break; - } - if (is > 0) { - if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false; - } - il++; - } - - if (is == splitKeys.length) { - return il == lastKeys.size() - 1; - } - return true; - } - - - /** - * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the - * created directory is correct or not A recursion method, the testDir had better be small size - */ - private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException { - if (level >= 3) { - return HFile.isHFileFormat(fs, testDir); - } - FileStatus[] fStats = fs.listStatus(testDir); - if (fStats == null || fStats.length <= 0) { - LOG.info("Created directory format is not correct"); - return false; - } - - for (FileStatus stats : fStats) { - // skip the _SUCCESS file created by MapReduce - if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) - continue; - if (level < 2 && !stats.isDirectory()) { - LOG.info("Created directory format is not correct"); - return false; - } - boolean flag = checkMROutput(fs, stats.getPath(), level + 1); - if (flag == false) return false; - } - return true; - } - - - private byte[][] generateRandomSplitKeys(int numKeys) { - Random random = new Random(); - byte[][] ret = new byte[numKeys][]; - for (int i = 0; i < numKeys; i++) { - ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT); - } - return ret; - } - - - /** - * Simple mapper that makes output. With no input data - */ - static class Random_TableKV_GeneratingMapper - extends Mapper { - - private int keyLength; - private int valLength; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - - Configuration conf = context.getConfiguration(); - keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); - valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); - } - - @Override - protected void map(NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException, InterruptedException { - - byte keyBytes[] = new byte[keyLength]; - byte valBytes[] = new byte[valLength]; - - ArrayList tables = new ArrayList<>(); - for (int i = 0; i < TABLES.length; i++) { - tables.add(new ImmutableBytesWritable(TABLES[i])); - } - - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - - for (int i = 0; i < ROWSPERSPLIT; i++) { - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); - random.nextBytes(valBytes); - - for (ImmutableBytesWritable table : tables) { - for (byte[] family : FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(table, kv); - } - } - } - } - } - - /** - * Simple Reducer that have input , with KeyValues have no order. and output - * , with KeyValues are ordered - */ - - static class Table_KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, - org.apache.hadoop.mapreduce.Reducer.Context context) - throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); - for (KeyValue kv : kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv : map) { - context.write(table, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } - } -} \ No newline at end of file