hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject git commit: HBASE-11591 Scanner fails to retrieve KV from bulk loaded file with highest sequence id than the cell's mvcc in a non-bulk loaded file (Ram)
Date Tue, 26 Aug 2014 12:08:03 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 582123cd8 -> dea648002


HBASE-11591 Scanner fails to retrieve KV from bulk loaded file with
highest sequence id than the cell's mvcc in a non-bulk loaded file (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dea64800
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dea64800
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dea64800

Branch: refs/heads/master
Commit: dea6480023e78a3facdaf1cfc00ad6cc35ecb3ea
Parents: 582123c
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Tue Aug 26 17:36:37 2014 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Tue Aug 26 17:36:37 2014 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  32 ++-
 .../hadoop/hbase/regionserver/StoreFile.java    |  10 +
 .../hbase/regionserver/StoreFileScanner.java    |  40 +++-
 .../regionserver/TestScanWithBloomError.java    |  11 +-
 .../regionserver/TestScannerWithBulkload.java   | 226 +++++++++++++++++++
 5 files changed, 296 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dea64800/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 274c1b3..73b7957 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -120,8 +120,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -1741,6 +1741,7 @@ public class HRegion implements HeapSize { // , Writable{
     if (this.memstoreSize.get() <= 0) {
       // Take an update lock because am about to change the sequence id and we want the sequence
id
       // to be at the border of the empty memstore.
+      MultiVersionConsistencyControl.WriteEntry w = null;
       this.updatesLock.writeLock().lock();
       try {
         if (this.memstoreSize.get() <= 0) {
@@ -1750,13 +1751,29 @@ public class HRegion implements HeapSize { // , Writable{
           // sure just beyond the last appended region edit (useful as a marker when bulk
loading,
           // etc.)
           // wal can be null replaying edits.
-          return wal != null?
-            new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-              getNextSequenceId(wal), "Nothing to flush"):
-            new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
+          try {
+            if (wal != null) {
+              w = mvcc.beginMemstoreInsert();
+              long flushSeqId = getNextSequenceId(wal);
+              FlushResult flushResult = new FlushResult(
+                  FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to
flush");
+              w.setWriteNumber(flushSeqId);
+              mvcc.waitForPreviousTransactionsComplete(w);
+              w = null;
+              return flushResult;
+            } else {
+              return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+                  "Nothing to flush");
+            }
+
+          } finally {
+            this.updatesLock.writeLock().unlock();
+          }
         }
       } finally {
-        this.updatesLock.writeLock().unlock();
+        if (w != null) {
+          mvcc.advanceMemstore(w);
+        }
       }
     }
 
@@ -1864,6 +1881,7 @@ public class HRegion implements HeapSize { // , Writable{
       // uncommitted transactions from being written into HFiles.
       // We have to block before we start the flush, otherwise keys that
       // were removed via a rollbackMemstore could be written to Hfiles.
+      w.setWriteNumber(flushSeqId);
       mvcc.waitForPreviousTransactionsComplete(w);
       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally
block
       w = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dea64800/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index ec164ca..27c64f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -381,6 +381,7 @@ public class StoreFile {
           this.sequenceid += 1;
         }
       }
+      this.reader.setBulkLoaded(true);
     }
     this.reader.setSequenceID(this.sequenceid);
 
@@ -1009,6 +1010,7 @@ public class StoreFile {
     protected long sequenceID = -1;
     private byte[] lastBloomKey;
     private long deleteFamilyCnt = -1;
+    private boolean bulkLoadResult = false;
 
     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
         throws IOException {
@@ -1475,6 +1477,14 @@ public class StoreFile {
       this.sequenceID = sequenceID;
     }
 
+    public void setBulkLoaded(boolean bulkLoadResult) {
+      this.bulkLoadResult = bulkLoadResult;
+    }
+
+    public boolean isBulkLoaded() {
+      return this.bulkLoadResult;
+    }
+
     BloomFilter getGeneralBloomFilter() {
       return generalBloomFilter;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dea64800/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 1b07594..6474e96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -137,9 +137,10 @@ public class StoreFileScanner implements KeyValueScanner {
       // only seek if we aren't at the end. cur == null implies 'end'.
       if (cur != null) {
         hfs.next();
-        cur = hfs.getKeyValue();
-        if (hasMVCCInfo)
+        setCurrentCell(hfs.getKeyValue());
+        if (hasMVCCInfo || this.reader.isBulkLoaded()) {
           skipKVsNewerThanReadpoint();
+        }
       }
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
@@ -157,9 +158,13 @@ public class StoreFileScanner implements KeyValueScanner {
           return false;
         }
 
-        cur = hfs.getKeyValue();
+        setCurrentCell(hfs.getKeyValue());
 
-        return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
+        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
+          return skipKVsNewerThanReadpoint();
+        } else {
+          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
+        }
       } finally {
         realSeekDone = true;
       }
@@ -177,9 +182,13 @@ public class StoreFileScanner implements KeyValueScanner {
           close();
           return false;
         }
-        cur = hfs.getKeyValue();
+        setCurrentCell(hfs.getKeyValue());
 
-        return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
+        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
+          return skipKVsNewerThanReadpoint();
+        } else {
+          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
+        }
       } finally {
         realSeekDone = true;
       }
@@ -189,6 +198,15 @@ public class StoreFileScanner implements KeyValueScanner {
     }
   }
 
+  protected void setCurrentCell(Cell newVal) {
+    this.cur = newVal;
+    if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId()
<= 0) {
+      KeyValue curKV = KeyValueUtil.ensureKeyValue(cur);
+      curKV.setSequenceId(this.reader.getSequenceID());
+      cur = curKV;
+    }
+  }
+
   protected boolean skipKVsNewerThanReadpoint() throws IOException {
     // We want to ignore all key-values that are newer than our current
     // readPoint
@@ -197,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner {
         && cur != null
         && (cur.getMvccVersion() > readPt)) {
       hfs.next();
-      cur = hfs.getKeyValue();
+      setCurrentCell(hfs.getKeyValue());
       if (this.stopSkippingKVsIfNextRow
           && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
@@ -325,7 +343,7 @@ public class StoreFileScanner implements KeyValueScanner {
         // a higher timestamp than the max timestamp in this file. We know that
         // the next point when we have to consider this file again is when we
         // pass the max timestamp of this file (with the same row/column).
-        cur = KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile);
+        setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
       } else {
         // This will be the case e.g. when we need to seek to the next
         // row/column, and we don't know exactly what they are, so we set the
@@ -343,13 +361,13 @@ public class StoreFileScanner implements KeyValueScanner {
     // key/value and the store scanner will progress to the next column. This
     // is obviously not a "real real" seek, but unlike the fake KV earlier in
     // this method, we want this to be propagated to ScanQueryMatcher.
-    cur = KeyValueUtil.createLastOnRowCol(kv);
+    setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
 
     realSeekDone = true;
     return true;
   }
 
-  Reader getReaderForTesting() {
+  Reader getReader() {
     return reader;
   }
 
@@ -420,7 +438,7 @@ public class StoreFileScanner implements KeyValueScanner {
           return false;
         }
 
-        cur = hfs.getKeyValue();
+        setCurrentCell(hfs.getKeyValue());
         this.stopSkippingKVsIfNextRow = true;
         boolean resultOfSkipKVs;
         try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dea64800/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
index 011de0f..54e8517 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,8 +56,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import static org.junit.Assert.*;
-
 /**
  * Test a multi-column scanner when there is a Bloom filter false-positive.
  * This is needed for the multi-column Bloom filter optimization.
@@ -132,8 +133,8 @@ public class TestScanWithBloomError {
     Collections.sort(scanners, new Comparator<StoreFileScanner>() {
       @Override
       public int compare(StoreFileScanner s1, StoreFileScanner s2) {
-        Path p1 = s1.getReaderForTesting().getHFileReader().getPath();
-        Path p2 = s2.getReaderForTesting().getHFileReader().getPath();
+        Path p1 = s1.getReader().getHFileReader().getPath();
+        Path p2 = s2.getReader().getHFileReader().getPath();
         long t1, t2;
         try {
           t1 = fs.getFileStatus(p1).getModificationTime();
@@ -147,7 +148,7 @@ public class TestScanWithBloomError {
 
     StoreFile.Reader lastStoreFileReader = null;
     for (StoreFileScanner sfScanner : scanners)
-      lastStoreFileReader = sfScanner.getReaderForTesting();
+      lastStoreFileReader = sfScanner.getReader();
 
     new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f",
         lastStoreFileReader.getHFileReader().getPath().toString()});

http://git-wip-us.apache.org/repos/asf/hbase/blob/dea64800/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
new file mode 100644
index 0000000..3ff6394
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestScannerWithBulkload {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  private static void createTable(HBaseAdmin admin, String tableName) throws IOException
{
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor("col");
+    hcd.setMaxVersions(3);
+    desc.addFamily(hcd);
+    admin.createTable(desc);
+  }
+
+  @Test
+  public void testBulkLoad() throws Exception {
+    String tableName = "testBulkLoad";
+    long l = System.currentTimeMillis();
+    HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    createTable(admin, tableName);
+    Scan scan = createScan();
+    final HTable table = init(admin, l, scan, tableName);
+    // use bulkload
+    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
+    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+    bulkload.doBulkLoad(hfilePath, table);
+    ResultScanner scanner = table.getScanner(scan);
+    Result result = scanner.next();
+    result = scanAfterBulkLoad(scanner, result, "version2");
+    Put put0 = new Put(Bytes.toBytes("row1"));
+    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l, Bytes
+        .toBytes("version3")));
+    table.put(put0);
+    table.flushCommits();
+    admin.flush(tableName);
+    scanner = table.getScanner(scan);
+    result = scanner.next();
+    while (result != null) {
+      List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
+      for (KeyValue _kv : kvs) {
+        if (Bytes.toString(_kv.getRow()).equals("row1")) {
+          System.out.println(Bytes.toString(_kv.getRow()));
+          System.out.println(Bytes.toString(_kv.getQualifier()));
+          System.out.println(Bytes.toString(_kv.getValue()));
+          Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
+        }
+      }
+      result = scanner.next();
+    }
+    table.close();
+  }
+
+  private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
+      throws IOException {
+    while (result != null) {
+      List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
+      for (KeyValue _kv : kvs) {
+        if (Bytes.toString(_kv.getRow()).equals("row1")) {
+          System.out.println(Bytes.toString(_kv.getRow()));
+          System.out.println(Bytes.toString(_kv.getQualifier()));
+          System.out.println(Bytes.toString(_kv.getValue()));
+          Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue()));
+        }
+      }
+      result = scanner.next();
+    }
+    return result;
+  }
+
+  private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException
{
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    final Path hfilePath = new Path(hFilePath);
+    fs.mkdirs(hfilePath);
+    Path path = new Path(pathStr);
+    HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
+    Assert.assertNotNull(wf);
+    HFileContext context = new HFileContext();
+    HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
+    KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l,
+        Bytes.toBytes("version2"));
+    writer.append(kv);
+    // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
+    // loaded file
+    writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+    writer.close();
+    return hfilePath;
+  }
+
+  private HTable init(HBaseAdmin admin, long l, Scan scan, String tableName) throws Exception
{
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    Put put0 = new Put(Bytes.toBytes("row1"));
+    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l, Bytes
+        .toBytes("version0")));
+    table.put(put0);
+    table.flushCommits();
+    admin.flush(tableName);
+    Put put1 = new Put(Bytes.toBytes("row2"));
+    put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l, Bytes
+        .toBytes("version0")));
+    table.put(put1);
+    table.flushCommits();
+    admin.flush(tableName);
+    admin.close();
+    put0 = new Put(Bytes.toBytes("row1"));
+    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l, Bytes
+        .toBytes("version1")));
+    table.put(put0);
+    table.flushCommits();
+    admin.flush(tableName);
+    admin.compact(tableName);
+
+    ResultScanner scanner = table.getScanner(scan);
+    Result result = scanner.next();
+    List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
+    Assert.assertEquals(1, kvs.size());
+    Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue()));
+    scanner.close();
+    return table;
+  }
+
+  @Test
+  public void testBulkLoadWithParallelScan() throws Exception {
+    String tableName = "testBulkLoadWithParallelScan";
+    final long l = System.currentTimeMillis();
+    HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    createTable(admin, tableName);
+    Scan scan = createScan();
+    final HTable table = init(admin, l, scan, tableName);
+    // use bulkload
+    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
+        "/temp/testBulkLoadWithParallelScan/col/file");
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
+    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+    ResultScanner scanner = table.getScanner(scan);
+    // Create a scanner and then do bulk load
+    final CountDownLatch latch = new CountDownLatch(1);
+    new Thread() {
+      public void run() {
+        try {
+          Put put1 = new Put(Bytes.toBytes("row5"));
+          put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"),
l,
+              Bytes.toBytes("version0")));
+          table.put(put1);
+          table.flushCommits();
+          bulkload.doBulkLoad(hfilePath, table);
+          latch.countDown();
+        } catch (TableNotFoundException e) {
+        } catch (IOException e) {
+        }
+      }
+    }.start();
+    latch.await();
+    // By the time we do next() the bulk loaded files are also added to the kv
+    // scanner
+    Result result = scanner.next();
+    scanAfterBulkLoad(scanner, result, "version1");
+    table.close();
+
+  }
+
+  private Scan createScan() {
+    Scan scan = new Scan();
+    scan.setMaxVersions(3);
+    return scan;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+}


Mime
View raw message