Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3094B11AC5 for ; Mon, 8 Sep 2014 14:49:28 +0000 (UTC) Received: (qmail 29598 invoked by uid 500); 8 Sep 2014 14:49:28 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 29547 invoked by uid 500); 8 Sep 2014 14:49:28 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 29533 invoked by uid 99); 8 Sep 2014 14:49:28 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2014 14:49:28 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C15E2A0E707; Mon, 8 Sep 2014 14:49:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <8acede24531b4b868d3feded9dc6f175@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He) Date: Mon, 8 Sep 2014 14:49:27 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1 ecb015d9a -> fd5b139a6 HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd5b139a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd5b139a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd5b139a Branch: refs/heads/branch-1 Commit: fd5b139a6f60970f575499d5dfb0aa762c590696 Parents: ecb015d Author: Ted Yu Authored: Mon Sep 8 14:49:16 2014 +0000 Committer: Ted Yu Committed: Mon Sep 8 14:49:16 2014 +0000 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/StoreFile.java | 24 +++++-- .../hbase/regionserver/StoreFileScanner.java | 2 +- .../regionserver/TestScannerWithBulkload.java | 76 ++++++++++++++++++-- 3 files changed, 91 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 27c64f0..6a45c47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -315,18 +315,31 @@ public class StoreFile { } /** - * @return true if this storefile was created by HFileOutputFormat - * for a bulk load. + * Check if this storefile was created by bulk load. + * When a hfile is bulk loaded into HBase, we append + * '_SeqId_' to the hfile name, unless + * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is + * explicitly turned off. + * If "hbase.mapreduce.bulkload.assign.sequenceNumbers" + * is turned off, fall back to BULKLOAD_TIME_KEY. + * @return true if this storefile was created by bulk load. */ boolean isBulkLoadResult() { - return metadataMap.containsKey(BULKLOAD_TIME_KEY); + boolean bulkLoadedHFile = false; + String fileName = this.getPath().getName(); + int startPos = fileName.indexOf("SeqId_"); + if (startPos != -1) { + bulkLoadedHFile = true; + } + return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); } /** * Return the timestamp at which this bulk load file was generated. */ public long getBulkLoadTimestamp() { - return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY)); + byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); + return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp); } /** @@ -372,7 +385,8 @@ public class StoreFile { // generate the sequenceId from the fileName // fileName is of the form _SeqId__ String fileName = this.getPath().getName(); - int startPos = fileName.indexOf("SeqId_"); + // Use lastIndexOf() to get the last, most recent bulk load seqId. + int startPos = fileName.lastIndexOf("SeqId_"); if (startPos != -1) { this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6))); http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index aa351d3..2784845 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner { protected void setCurrentCell(Cell newVal) throws IOException { this.cur = newVal; - if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { + if (this.cur != null && this.reader.isBulkLoaded()) { CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 3ff6394..af4b9c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -73,7 +73,8 @@ public class TestScannerWithBulkload { Scan scan = createScan(); final HTable table = init(admin, l, scan, tableName); // use bulkload - final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file"); + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", + false); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); @@ -101,6 +102,7 @@ public class TestScannerWithBulkload { } result = scanner.next(); } + scanner.close(); table.close(); } @@ -121,7 +123,10 @@ public class TestScannerWithBulkload { return result; } - private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException { + // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. + // Else, we will set BULKLOAD_TIME_KEY. + private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) + throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); final Path hfilePath = new Path(hFilePath); fs.mkdirs(hfilePath); @@ -132,10 +137,26 @@ public class TestScannerWithBulkload { HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version2")); + + // Set cell seq id to test bulk load native hfiles. + if (nativeHFile) { + // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip + // this kv. + kv.setSequenceId(9999999); + } + writer.append(kv); - // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk - // loaded file + + if (nativeHFile) { + // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip its + // kv. + writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); + } + else { writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + } writer.close(); return hfilePath; } @@ -182,7 +203,7 @@ public class TestScannerWithBulkload { final HTable table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", - "/temp/testBulkLoadWithParallelScan/col/file"); + "/temp/testBulkLoadWithParallelScan/col/file", false); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); @@ -209,10 +230,55 @@ public class TestScannerWithBulkload { // scanner Result result = scanner.next(); scanAfterBulkLoad(scanner, result, "version1"); + scanner.close(); table.close(); } + @Test + public void testBulkLoadNativeHFile() throws Exception { + String tableName = "testBulkLoadNativeHFile"; + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", + "/temp/testBulkLoadNativeHFile/col/file", true); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + // We had 'version0', 'version1' for 'row1,col:q' in the table. + // Bulk load added 'version2' scanner should be able to see 'version2' + result = scanAfterBulkLoad(scanner, result, "version2"); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version3"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + scanner = table.getScanner(scan); + result = scanner.next(); + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + scanner.close(); + table.close(); + } + private Scan createScan() { Scan scan = new Scan(); scan.setMaxVersions(3);