Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D1BC2200D0D for ; Fri, 25 Aug 2017 23:53:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CFF3E16D603; Fri, 25 Aug 2017 21:53:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 50AFA16D5FF for ; Fri, 25 Aug 2017 23:53:17 +0200 (CEST) Received: (qmail 62897 invoked by uid 500); 25 Aug 2017 21:53:16 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 62883 invoked by uid 99); 25 Aug 2017 21:53:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Aug 2017 21:53:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5FD8BE08FF; Fri, 25 Aug 2017 21:53:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: iemejia@apache.org To: commits@beam.apache.org Date: Fri, 25 Aug 2017 21:53:16 -0000 Message-Id: <7975271a15534e58884160b4c2c1cb8f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: Fix code style issues for HBaseIO archived-at: Fri, 25 Aug 2017 21:53:19 -0000 Repository: beam Updated Branches: refs/heads/master cdf050c0d -> 77a0a2afc http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 0b7f203..e6f7ac4 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -75,418 +75,408 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Test HBaseIO. - */ +/** Test HBaseIO. */ @RunWith(JUnit4.class) public class HBaseIOTest { - @Rule public final transient TestPipeline p = TestPipeline.create(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static HBaseTestingUtility htu; - private static HBaseAdmin admin; - - private static final Configuration conf = HBaseConfiguration.create(); - private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); - private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); - private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email"); - - @BeforeClass - public static void beforeClass() throws Exception { - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - // Try to bind the hostname to localhost to solve an issue when it is not configured or - // no DNS resolution available. - conf.setStrings("hbase.master.hostname", "localhost"); - conf.setStrings("hbase.regionserver.hostname", "localhost"); - htu = new HBaseTestingUtility(conf); - - // We don't use the full htu.startMiniCluster() to avoid starting unneeded HDFS/MR daemons - htu.startMiniZKCluster(); - MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4); - hbm.waitForActiveAndReadyMaster(); - - admin = htu.getHBaseAdmin(); - } - - @AfterClass - public static void afterClass() throws Exception { - if (admin != null) { - admin.close(); - admin = null; - } - if (htu != null) { - htu.shutdownMiniHBaseCluster(); - htu.shutdownMiniZKCluster(); - htu = null; - } - } - - @Test - public void testReadBuildsCorrectly() { - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("table"); - assertEquals("table", read.getTableId()); - assertNotNull("configuration", read.getConfiguration()); - } - - @Test - public void testReadBuildsCorrectlyInDifferentOrder() { - HBaseIO.Read read = HBaseIO.read().withTableId("table").withConfiguration(conf); - assertEquals("table", read.getTableId()); - assertNotNull("configuration", read.getConfiguration()); - } - - @Test - public void testWriteBuildsCorrectly() { - HBaseIO.Write write = HBaseIO.write().withConfiguration(conf).withTableId("table"); - assertEquals("table", write.getTableId()); - assertNotNull("configuration", write.getConfiguration()); - } - - @Test - public void testWriteBuildsCorrectlyInDifferentOrder() { - HBaseIO.Write write = HBaseIO.write().withTableId("table").withConfiguration(conf); - assertEquals("table", write.getTableId()); - assertNotNull("configuration", write.getConfiguration()); - } - - @Test - public void testWriteValidationFailsMissingTable() { - HBaseIO.Write write = HBaseIO.write().withConfiguration(conf); - thrown.expect(IllegalArgumentException.class); - write.validate(null /* input */); - } - - @Test - public void testWriteValidationFailsMissingConfiguration() { - HBaseIO.Write write = HBaseIO.write().withTableId("table"); - thrown.expect(IllegalArgumentException.class); - write.validate(null /* input */); - } - - /** Tests that when reading from a non-existent table, the read fails. */ - @Test - public void testReadingFailsTableDoesNotExist() throws Exception { - final String table = "TEST-TABLE-INVALID"; - // Exception will be thrown by read.validate() when read is applied. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); - runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), - new ArrayList()); - } - - /** Tests that when reading from an empty table, the read succeeds. */ - @Test - public void testReadingEmptyTable() throws Exception { - final String table = "TEST-EMPTY-TABLE"; - createTable(table); - runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), - new ArrayList()); - } - - @Test - public void testReading() throws Exception { - final String table = "TEST-MANY-ROWS-TABLE"; - final int numRows = 1001; - createTable(table); - writeData(table, numRows); - runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 1001); - } - - /** Tests reading all rows from a split table. */ - @Test - public void testReadingWithSplits() throws Exception { - final String table = "TEST-MANY-ROWS-SPLITS-TABLE"; - final int numRows = 1500; - final int numRegions = 4; - final long bytesPerRow = 100L; - - // Set up test table data and sample row keys for size estimation and splitting. - createTable(table); - writeData(table, numRows); - - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); - HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); - List> splits = - source.split(numRows * bytesPerRow / numRegions, - null /* options */); - - // Test num splits and split equality. - assertThat(splits, hasSize(4)); - assertSourcesEqualReferenceSource(source, splits, null /* options */); - } - - /** Tests that a {@link HBaseSource} can be read twice, verifying its immutability. */ - @Test - public void testReadingSourceTwice() throws Exception { - final String table = "TEST-READING-TWICE"; - final int numRows = 10; - - // Set up test table data and sample row keys for size estimation and splitting. - createTable(table); - writeData(table, numRows); - - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); - HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); - assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); - // second read. - assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); - } - - /** Tests reading all rows using a filter. */ - @Test - public void testReadingWithFilter() throws Exception { - final String table = "TEST-FILTER-TABLE"; - final int numRows = 1001; - - createTable(table); - writeData(table, numRows); - - String regex = ".*17.*"; - Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, - new RegexStringComparator(regex)); - HBaseIO.Read read = - HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter); - runReadTestLength(read, 20); - } - - /** - * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted - * range [] and that some properties hold across them. - */ - @Test - public void testReadingWithKeyRange() throws Exception { - final String table = "TEST-KEY-RANGE-TABLE"; - final int numRows = 1001; - final byte[] startRow = "2".getBytes(); - final byte[] stopRow = "9".getBytes(); - final ByteKey startKey = ByteKey.copyFrom(startRow); - - createTable(table); - writeData(table, numRows); - - // Test prefix: [beginning, startKey). - final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey); - runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) - .withKeyRange(prefixRange), 126); - - // Test suffix: [startKey, end). - final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey); - runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) - .withKeyRange(suffixRange), 875); - - // Test restricted range: [startKey, endKey). - // This one tests the second signature of .withKeyRange - runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) - .withKeyRange(startRow, stopRow), 441); - } - - /** - * Tests dynamic work rebalancing exhaustively. - */ - @Test - public void testReadingSplitAtFractionExhaustive() throws Exception { - final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE"; - final int numRows = 7; - - createTable(table); - writeData(table, numRows); - - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); - HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */) - .withStartKey(ByteKey.of(48)).withEndKey(ByteKey.of(58)); - - assertSplitAtFractionExhaustive(source, null); - } - - /** - * Unit tests of splitAtFraction. - */ - @Test - public void testReadingSplitAtFraction() throws Exception { - final String table = "TEST-SPLIT-AT-FRACTION"; - final int numRows = 10; - - createTable(table); - writeData(table, numRows); - - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); - HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); - - // The value k is based on the partitioning schema for the data, in this test case, - // the partitioning is HEX-based, so we start from 1/16m and the value k will be - // around 1/256, so the tests are done in approximately k ~= 0.003922 steps - double k = 0.003922; - - assertSplitAtFractionFails(source, 0, k, null /* options */); - assertSplitAtFractionFails(source, 0, 1.0, null /* options */); - // With 1 items read, all split requests past k will succeed. - assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options */); - assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */); - // With 3 items read, all split requests past 3k will succeed. - assertSplitAtFractionFails(source, 3, 2 * k, null /* options */); - assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* options */); - assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* options */); - // With 6 items read, all split requests past 6k will succeed. - assertSplitAtFractionFails(source, 6, 5 * k, null /* options */); - assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */); - } - - @Test - public void testReadingDisplayData() { - HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable"); - DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("tableId", "fooTable")); - assertThat(displayData, hasDisplayItem("configuration")); - } - - /** Tests that a record gets written to the service and messages are logged. */ - @Test - public void testWriting() throws Exception { - final String table = "table"; - final String key = "key"; - final String value = "value"; - final int numMutations = 100; - - createTable(table); - - p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))) - .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); - p.run().waitUntilFinish(); - - List results = readTable(table, new Scan()); - assertEquals(numMutations, results.size()); - } - - /** Tests that when writing to a non-existent table, the write fails. */ - @Test - public void testWritingFailsTableDoesNotExist() throws Exception { - final String table = "TEST-TABLE-DOES-NOT-EXIST"; - - p.apply(Create.empty(HBaseMutationCoder.of())) - .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); - - // Exception will be thrown by write.validate() when write is applied. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); - p.run(); - } - - /** Tests that when writing an element fails, the write fails. */ - @Test - public void testWritingFailsBadElement() throws Exception { - final String table = "TEST-TABLE-BAD-ELEMENT"; - final String key = "KEY"; - createTable(table); - - p.apply(Create.of(makeBadMutation(key))) - .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); - - thrown.expect(Pipeline.PipelineExecutionException.class); - thrown.expectCause(Matchers.instanceOf(IllegalArgumentException.class)); - thrown.expectMessage("No columns to insert"); - p.run().waitUntilFinish(); - } - - @Test - public void testWritingDisplayData() { - HBaseIO.Write write = HBaseIO.write().withTableId("fooTable").withConfiguration(conf); - DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("tableId", "fooTable")); - } - - // HBase helper methods - private static void createTable(String tableId) throws Exception { - byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()}; - createTable(tableId, COLUMN_FAMILY, splitKeys); - } - - private static void createTable(String tableId, byte[] columnFamily, byte[][] splitKeys) - throws Exception { - TableName tableName = TableName.valueOf(tableId); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor colDef = new HColumnDescriptor(columnFamily); - desc.addFamily(colDef); - admin.createTable(desc, splitKeys); - } - - /** - * Helper function to create a table and return the rows that it created. - */ - private static void writeData(String tableId, int numRows) throws Exception { - Connection connection = admin.getConnection(); - TableName tableName = TableName.valueOf(tableId); - BufferedMutator mutator = connection.getBufferedMutator(tableName); - List mutations = makeTableData(numRows); - mutator.mutate(mutations); - mutator.flush(); - mutator.close(); - } - - private static List makeTableData(int numRows) { - List mutations = new ArrayList<>(numRows); - for (int i = 0; i < numRows; ++i) { - // We pad values in hex order 0,1, ... ,F,0, ... - String prefix = String.format("%X", i % 16); - // This 21 is to have a key longer than an input - byte[] rowKey = Bytes.toBytes( - StringUtils.leftPad("_" + String.valueOf(i), 21, prefix)); - byte[] value = Bytes.toBytes(String.valueOf(i)); - byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + "@email.com"); - mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, value)); - mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, valueEmail)); - } - return mutations; - } - - private static ResultScanner scanTable(String tableId, Scan scan) throws Exception { - Connection connection = ConnectionFactory.createConnection(conf); - TableName tableName = TableName.valueOf(tableId); - Table table = connection.getTable(tableName); - return table.getScanner(scan); - } - - private static List readTable(String tableId, Scan scan) throws Exception { - ResultScanner scanner = scanTable(tableId, scan); - List results = new ArrayList<>(); - for (Result result : scanner) { - results.add(result); - } - scanner.close(); - return results; - } - - // Beam helper methods - /** Helper function to make a single row mutation to be written. */ - private static Iterable makeMutations(String key, String value, int numMutations) { - List mutations = new ArrayList<>(); - for (int i = 0; i < numMutations; i++) { - mutations.add(makeMutation(key + i, value)); - } - return mutations; - } - - private static Mutation makeMutation(String key, String value) { - return new Put(key.getBytes(StandardCharsets.UTF_8)) - .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value)) - .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com")); - } - - private static Mutation makeBadMutation(String key) { - return new Put(key.getBytes()); - } - - private void runReadTest(HBaseIO.Read read, List expected) { - final String transformId = read.getTableId() + "_" + read.getKeyRange(); - PCollection rows = p.apply("Read" + transformId, read); - PAssert.that(rows).containsInAnyOrder(expected); - p.run().waitUntilFinish(); - } - - private void runReadTestLength(HBaseIO.Read read, long numElements) { - final String transformId = read.getTableId() + "_" + read.getKeyRange(); - PCollection rows = p.apply("Read" + transformId, read); - PAssert.thatSingleton(rows.apply("Count" + transformId, - Count.globally())).isEqualTo(numElements); - p.run().waitUntilFinish(); - } + @Rule public final transient TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static HBaseTestingUtility htu; + private static HBaseAdmin admin; + + private static final Configuration conf = HBaseConfiguration.create(); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); + private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); + private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email"); + + @BeforeClass + public static void beforeClass() throws Exception { + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // Try to bind the hostname to localhost to solve an issue when it is not configured or + // no DNS resolution available. + conf.setStrings("hbase.master.hostname", "localhost"); + conf.setStrings("hbase.regionserver.hostname", "localhost"); + htu = new HBaseTestingUtility(conf); + + // We don't use the full htu.startMiniCluster() to avoid starting unneeded HDFS/MR daemons + htu.startMiniZKCluster(); + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4); + hbm.waitForActiveAndReadyMaster(); + + admin = htu.getHBaseAdmin(); + } + + @AfterClass + public static void afterClass() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + if (htu != null) { + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + htu = null; + } + } + + @Test + public void testReadBuildsCorrectly() { + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("table"); + assertEquals("table", read.getTableId()); + assertNotNull("configuration", read.getConfiguration()); + } + + @Test + public void testReadBuildsCorrectlyInDifferentOrder() { + HBaseIO.Read read = HBaseIO.read().withTableId("table").withConfiguration(conf); + assertEquals("table", read.getTableId()); + assertNotNull("configuration", read.getConfiguration()); + } + + @Test + public void testWriteBuildsCorrectly() { + HBaseIO.Write write = HBaseIO.write().withConfiguration(conf).withTableId("table"); + assertEquals("table", write.getTableId()); + assertNotNull("configuration", write.getConfiguration()); + } + + @Test + public void testWriteBuildsCorrectlyInDifferentOrder() { + HBaseIO.Write write = HBaseIO.write().withTableId("table").withConfiguration(conf); + assertEquals("table", write.getTableId()); + assertNotNull("configuration", write.getConfiguration()); + } + + @Test + public void testWriteValidationFailsMissingTable() { + HBaseIO.Write write = HBaseIO.write().withConfiguration(conf); + thrown.expect(IllegalArgumentException.class); + write.validate(null /* input */); + } + + @Test + public void testWriteValidationFailsMissingConfiguration() { + HBaseIO.Write write = HBaseIO.write().withTableId("table"); + thrown.expect(IllegalArgumentException.class); + write.validate(null /* input */); + } + + /** Tests that when reading from a non-existent table, the read fails. */ + @Test + public void testReadingFailsTableDoesNotExist() throws Exception { + final String table = "TEST-TABLE-INVALID"; + // Exception will be thrown by read.validate() when read is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new ArrayList()); + } + + /** Tests that when reading from an empty table, the read succeeds. */ + @Test + public void testReadingEmptyTable() throws Exception { + final String table = "TEST-EMPTY-TABLE"; + createTable(table); + runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new ArrayList()); + } + + @Test + public void testReading() throws Exception { + final String table = "TEST-MANY-ROWS-TABLE"; + final int numRows = 1001; + createTable(table); + writeData(table, numRows); + runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 1001); + } + + /** Tests reading all rows from a split table. */ + @Test + public void testReadingWithSplits() throws Exception { + final String table = "TEST-MANY-ROWS-SPLITS-TABLE"; + final int numRows = 1500; + final int numRegions = 4; + final long bytesPerRow = 100L; + + // Set up test table data and sample row keys for size estimation and splitting. + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + List> splits = + source.split(numRows * bytesPerRow / numRegions, null /* options */); + + // Test num splits and split equality. + assertThat(splits, hasSize(4)); + assertSourcesEqualReferenceSource(source, splits, null /* options */); + } + + /** Tests that a {@link HBaseSource} can be read twice, verifying its immutability. */ + @Test + public void testReadingSourceTwice() throws Exception { + final String table = "TEST-READING-TWICE"; + final int numRows = 10; + + // Set up test table data and sample row keys for size estimation and splitting. + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); + // second read. + assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows)); + } + + /** Tests reading all rows using a filter. */ + @Test + public void testReadingWithFilter() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + + createTable(table); + writeData(table, numRows); + + String regex = ".*17.*"; + Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); + HBaseIO.Read read = + HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter); + runReadTestLength(read, 20); + } + + /** + * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted range + * [] and that some properties hold across them. + */ + @Test + public void testReadingWithKeyRange() throws Exception { + final String table = "TEST-KEY-RANGE-TABLE"; + final int numRows = 1001; + final byte[] startRow = "2".getBytes(); + final byte[] stopRow = "9".getBytes(); + final ByteKey startKey = ByteKey.copyFrom(startRow); + + createTable(table); + writeData(table, numRows); + + // Test prefix: [beginning, startKey). + final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey); + runReadTestLength( + HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(prefixRange), 126); + + // Test suffix: [startKey, end). + final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey); + runReadTestLength( + HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(suffixRange), 875); + + // Test restricted range: [startKey, endKey). + // This one tests the second signature of .withKeyRange + runReadTestLength( + HBaseIO.read().withConfiguration(conf).withTableId(table).withKeyRange(startRow, stopRow), + 441); + } + + /** Tests dynamic work rebalancing exhaustively. */ + @Test + public void testReadingSplitAtFractionExhaustive() throws Exception { + final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE"; + final int numRows = 7; + + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = + new HBaseSource(read, null /* estimatedSizeBytes */) + .withStartKey(ByteKey.of(48)) + .withEndKey(ByteKey.of(58)); + + assertSplitAtFractionExhaustive(source, null); + } + + /** Unit tests of splitAtFraction. */ + @Test + public void testReadingSplitAtFraction() throws Exception { + final String table = "TEST-SPLIT-AT-FRACTION"; + final int numRows = 10; + + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + + // The value k is based on the partitioning schema for the data, in this test case, + // the partitioning is HEX-based, so we start from 1/16m and the value k will be + // around 1/256, so the tests are done in approximately k ~= 0.003922 steps + double k = 0.003922; + + assertSplitAtFractionFails(source, 0, k, null /* options */); + assertSplitAtFractionFails(source, 0, 1.0, null /* options */); + // With 1 items read, all split requests past k will succeed. + assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */); + // With 3 items read, all split requests past 3k will succeed. + assertSplitAtFractionFails(source, 3, 2 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* options */); + // With 6 items read, all split requests past 6k will succeed. + assertSplitAtFractionFails(source, 6, 5 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */); + } + + @Test + public void testReadingDisplayData() { + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable"); + DisplayData displayData = DisplayData.from(read); + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + assertThat(displayData, hasDisplayItem("configuration")); + } + + /** Tests that a record gets written to the service and messages are logged. */ + @Test + public void testWriting() throws Exception { + final String table = "table"; + final String key = "key"; + final String value = "value"; + final int numMutations = 100; + + createTable(table); + + p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + p.run().waitUntilFinish(); + + List results = readTable(table, new Scan()); + assertEquals(numMutations, results.size()); + } + + /** Tests that when writing to a non-existent table, the write fails. */ + @Test + public void testWritingFailsTableDoesNotExist() throws Exception { + final String table = "TEST-TABLE-DOES-NOT-EXIST"; + + p.apply(Create.empty(HBaseMutationCoder.of())) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + + // Exception will be thrown by write.validate() when write is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + p.run(); + } + + /** Tests that when writing an element fails, the write fails. */ + @Test + public void testWritingFailsBadElement() throws Exception { + final String table = "TEST-TABLE-BAD-ELEMENT"; + final String key = "KEY"; + createTable(table); + + p.apply(Create.of(makeBadMutation(key))) + .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); + + thrown.expect(Pipeline.PipelineExecutionException.class); + thrown.expectCause(Matchers.instanceOf(IllegalArgumentException.class)); + thrown.expectMessage("No columns to insert"); + p.run().waitUntilFinish(); + } + + @Test + public void testWritingDisplayData() { + HBaseIO.Write write = HBaseIO.write().withTableId("fooTable").withConfiguration(conf); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + } + + // HBase helper methods + private static void createTable(String tableId) throws Exception { + byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()}; + createTable(tableId, COLUMN_FAMILY, splitKeys); + } + + private static void createTable(String tableId, byte[] columnFamily, byte[][] splitKeys) + throws Exception { + TableName tableName = TableName.valueOf(tableId); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDef = new HColumnDescriptor(columnFamily); + desc.addFamily(colDef); + admin.createTable(desc, splitKeys); + } + + /** Helper function to create a table and return the rows that it created. */ + private static void writeData(String tableId, int numRows) throws Exception { + Connection connection = admin.getConnection(); + TableName tableName = TableName.valueOf(tableId); + BufferedMutator mutator = connection.getBufferedMutator(tableName); + List mutations = makeTableData(numRows); + mutator.mutate(mutations); + mutator.flush(); + mutator.close(); + } + + private static List makeTableData(int numRows) { + List mutations = new ArrayList<>(numRows); + for (int i = 0; i < numRows; ++i) { + // We pad values in hex order 0,1, ... ,F,0, ... + String prefix = String.format("%X", i % 16); + // This 21 is to have a key longer than an input + byte[] rowKey = Bytes.toBytes(StringUtils.leftPad("_" + String.valueOf(i), 21, prefix)); + byte[] value = Bytes.toBytes(String.valueOf(i)); + byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + "@email.com"); + mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, value)); + mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, valueEmail)); + } + return mutations; + } + + private static ResultScanner scanTable(String tableId, Scan scan) throws Exception { + Connection connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf(tableId); + Table table = connection.getTable(tableName); + return table.getScanner(scan); + } + + private static List readTable(String tableId, Scan scan) throws Exception { + ResultScanner scanner = scanTable(tableId, scan); + List results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + scanner.close(); + return results; + } + + // Beam helper methods + /** Helper function to make a single row mutation to be written. */ + private static Iterable makeMutations(String key, String value, int numMutations) { + List mutations = new ArrayList<>(); + for (int i = 0; i < numMutations; i++) { + mutations.add(makeMutation(key + i, value)); + } + return mutations; + } + + private static Mutation makeMutation(String key, String value) { + return new Put(key.getBytes(StandardCharsets.UTF_8)) + .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value)) + .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com")); + } + + private static Mutation makeBadMutation(String key) { + return new Put(key.getBytes()); + } + + private void runReadTest(HBaseIO.Read read, List expected) { + final String transformId = read.getTableId() + "_" + read.getKeyRange(); + PCollection rows = p.apply("Read" + transformId, read); + PAssert.that(rows).containsInAnyOrder(expected); + p.run().waitUntilFinish(); + } + + private void runReadTestLength(HBaseIO.Read read, long numElements) { + final String transformId = read.getTableId() + "_" + read.getKeyRange(); + PCollection rows = p.apply("Read" + transformId, read); + PAssert.thatSingleton(rows.apply("Count" + transformId, Count.globally())) + .isEqualTo(numElements); + p.run().waitUntilFinish(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java index 5bf2d80..41525dc 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoderTest.java @@ -28,9 +28,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for HBaseMutationCoder. - */ +/** Tests for HBaseMutationCoder. */ @RunWith(JUnit4.class) public class HBaseMutationCoderTest { @Rule public final ExpectedException thrown = ExpectedException.none(); http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java index c6b27d6..5af5e16 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseResultCoderTest.java @@ -25,9 +25,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for HBaseResultCoder. - */ +/** Tests for HBaseResultCoder. */ @RunWith(JUnit4.class) public class HBaseResultCoderTest { @Rule public final ExpectedException thrown = ExpectedException.none(); http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java index 49eb4e3..7d2fd28 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/SerializableScanTest.java @@ -28,14 +28,12 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for SerializableScan. - */ +/** Tests for SerializableScan. */ @RunWith(JUnit4.class) public class SerializableScanTest { @Rule public final ExpectedException thrown = ExpectedException.none(); private static final SerializableScan DEFAULT_SERIALIZABLE_SCAN = - new SerializableScan(new Scan()); + new SerializableScan(new Scan()); @Test public void testSerializationDeserialization() throws Exception {