hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests (Phil Yang)
Date Mon, 21 Mar 2016 20:20:37 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 295e5cbcf -> 1c5002660


HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region
is moved between two rpc requests (Phil Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1c500266
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1c500266
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1c500266

Branch: refs/heads/branch-1
Commit: 1c5002660ba8e28e71e4a91c66d6a52ce7d4beb9
Parents: 295e5cb
Author: tedyu <yuzhihong@gmail.com>
Authored: Mon Mar 21 13:20:23 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Mon Mar 21 13:20:23 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      |  92 +++++++-
 .../hbase/TestPartialResultsFromClientSide.java | 217 ++++++++++++++++++-
 2 files changed, 300 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1c500266/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index ce04409..d6b5757 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -27,9 +27,11 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue.MetaComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -38,7 +40,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -78,6 +79,10 @@ public class ClientScanner extends AbstractClientScanner {
      * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
      */
     protected byte[] partialResultsRow = null;
+    /**
+     * The last cell from a not full Row which is added to cache
+     */
+    protected Cell lastCellLoadedToCache = null;
     protected final int caching;
     protected long lastNext;
     // Keep lastResult returned successfully in case we have to reset scanner.
@@ -98,6 +103,7 @@ public class ClientScanner extends AbstractClientScanner {
     protected final int primaryOperationTimeout;
     private int retries;
     protected final ExecutorService pool;
+    private static MetaComparator metaComparator = new MetaComparator();
 
   /**
    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s
start
@@ -393,7 +399,9 @@ public class ClientScanner extends AbstractClientScanner {
     // We don't expect that the server will have more results for us if
     // it doesn't tell us otherwise. We rely on the size or count of results
     boolean serverHasMoreResults = false;
+    boolean allResultsSkipped = false;
     do {
+      allResultsSkipped = false;
       try {
         // Server returns a null values if scanning is to stop. Else,
         // returns an empty array if scanning is to go on and we've just
@@ -452,10 +460,15 @@ public class ClientScanner extends AbstractClientScanner {
           // Reset the startRow to the row we've seen last so that the new scanner starts
at
           // the correct row. Otherwise we may see previously returned rows again.
           // (ScannerCallable by now has "relocated" the correct region)
-          if (scan.isReversed()) {
-            scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+          if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
+            if (scan.isReversed()) {
+              scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+            } else {
+              scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+            }
           } else {
-            scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+            // we need rescan this row because we only loaded partial row before
+            scan.setStartRow(lastResult.getRow());
           }
         }
         if (e instanceof OutOfOrderScannerNextException) {
@@ -487,13 +500,26 @@ public class ClientScanner extends AbstractClientScanner {
           getResultsToAddToCache(values, callable.isHeartbeatMessage());
       if (!resultsToAddToCache.isEmpty()) {
         for (Result rs : resultsToAddToCache) {
+          rs = filterLoadedCell(rs);
+          if (rs == null) {
+            continue;
+          }
           cache.add(rs);
-          // We don't make Iterator here
           for (Cell cell : rs.rawCells()) {
             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
           }
           countdown--;
           this.lastResult = rs;
+          if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
+            updateLastCellLoadedToCache(this.lastResult);
+          } else {
+            this.lastCellLoadedToCache = null;
+          }
+        }
+        if (cache.isEmpty()) {
+          // all result has been seen before, we need scan more.
+          allResultsSkipped = true;
+          continue;
         }
       }
       if (callable.isHeartbeatMessage()) {
@@ -524,7 +550,7 @@ public class ClientScanner extends AbstractClientScanner {
       // !partialResults.isEmpty() means that we are still accumulating partial Results for
a
       // row. We should not change scanners before we receive all the partial Results for
that
       // row.
-    } while ((callable != null && callable.isHeartbeatMessage())
+    } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
         || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values ==
null))));
   }
@@ -767,4 +793,58 @@ public class ClientScanner extends AbstractClientScanner {
     }
     return false;
   }
+
+  protected void updateLastCellLoadedToCache(Result result) {
+    if (result.rawCells().length == 0) {
+      return;
+    }
+    this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
+  }
+
+  /**
+   * Compare two Cells considering reversed scanner.
+   * ReversedScanner only reverses rows, not columns.
+   */
+  private int compare(Cell a, Cell b) {
+    int r = 0;
+    if (currentRegion != null && currentRegion.isMetaRegion()) {
+      r = metaComparator.compareRows(a, b);
+    } else {
+      r = CellComparator.compareRows(a, b);
+    }
+    if (r != 0) {
+      return this.scan.isReversed() ? -r : r;
+    }
+    return CellComparator.compareWithoutRow(a, b);
+  }
+
+  private Result filterLoadedCell(Result result) {
+    // we only filter result when last result is partial
+    // so lastCellLoadedToCache and result should have same row key.
+    // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
+    // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
+    if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
+      return result;
+    }
+    if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
+      // The first cell of this result is larger than the last cell of loadcache.
+      // If user do not allow partial result, it must be true.
+      return result;
+    }
+    if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length -
1]) >= 0) {
+      // The last cell of this result is smaller than the last cell of loadcache, skip all.
+      return null;
+    }
+
+    // The first one must not in filtered result, we start at the second.
+    int index = 1;
+    while (index < result.rawCells().length) {
+      if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
+        break;
+      }
+      index++;
+    }
+    Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
+    return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1c500266/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index a6f8373..47f36e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
 import org.apache.hadoop.hbase.filter.RandomRowFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide {
   private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class);
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
+  private final static int MINICLUSTER_SIZE = 5;
   private static Table TABLE = null;
 
   /**
@@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE);
+    TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
     TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
   }
 
@@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide {
   }
 
   /**
-   * Test the method {@link Result#createCompleteResult(List, Result)}
+   * Test the method {@link Result#createCompleteResult(List)}
    * @throws Exception
    */
   @Test
@@ -829,4 +832,212 @@ public class TestPartialResultsFromClientSide {
       testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
     }
   }
+
+  private void moveRegion(Table table, int index) throws IOException{
+    List<Pair<HRegionInfo, ServerName>> regions = MetaTableAccessor
+        .getTableRegionsAndLocations(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConnection(),
+            table.getName());
+    assertEquals(1, regions.size());
+    HRegionInfo regionInfo = regions.get(0).getFirst();
+    ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName();
+    TEST_UTIL.getHBaseAdmin().move(regionInfo.getEncodedNameAsBytes(),
+        Bytes.toBytes(name.getServerName()));
+  }
+
+  private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) {
+    assertArrayEquals(row,
+        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+    assertArrayEquals(cf,
+        Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
+    assertArrayEquals(cq,
+        Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
+  }
+
+  @Test
+  public void testPartialResultWhenRegionMove() throws IOException {
+    Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"),
+        ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+    moveRegion(table, 1);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+    ResultScanner scanner = table.getScanner(scan);
+    for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
+      scanner.next();
+    }
+    Result result1 = scanner.next();
+    assertEquals(1, result1.rawCells().length);
+    Cell c1 = result1.rawCells()[0];
+    assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
+    assertFalse(result1.isPartial());
+
+    moveRegion(table, 2);
+
+    Result result2 = scanner.next();
+    assertEquals(1, result2.rawCells().length);
+    Cell c2 = result2.rawCells()[0];
+    assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+    assertTrue(result2.isPartial());
+
+    moveRegion(table, 3);
+
+    Result result3 = scanner.next();
+    assertEquals(1, result3.rawCells().length);
+    Cell c3 = result3.rawCells()[0];
+    assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
+    assertTrue(result3.isPartial());
+
+  }
+
+  @Test
+  public void testReversedPartialResultWhenRegionMove() throws IOException {
+    Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"),
+        ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+    moveRegion(table, 1);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setAllowPartialResults(true);
+    scan.setReversed(true);
+    ResultScanner scanner = table.getScanner(scan);
+    for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) {
+      scanner.next();
+    }
+    Result result1 = scanner.next();
+    assertEquals(1, result1.rawCells().length);
+    Cell c1 = result1.rawCells()[0];
+    assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS
- 1]);
+    assertFalse(result1.isPartial());
+
+    moveRegion(table, 2);
+
+    Result result2 = scanner.next();
+    assertEquals(1, result2.rawCells().length);
+    Cell c2 = result2.rawCells()[0];
+    assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
+    assertTrue(result2.isPartial());
+
+    moveRegion(table, 3);
+
+    Result result3 = scanner.next();
+    assertEquals(1, result3.rawCells().length);
+    Cell c3 = result3.rawCells()[0];
+    assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]);
+    assertTrue(result3.isPartial());
+
+  }
+
+  @Test
+  public void testCompleteResultWhenRegionMove() throws IOException {
+    Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"),
+        ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+    moveRegion(table, 1);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setCaching(1);
+    ResultScanner scanner = table.getScanner(scan);
+
+    Result result1 = scanner.next();
+    assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length);
+    Cell c1 = result1.rawCells()[0];
+    assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result1.isPartial());
+
+    moveRegion(table, 2);
+
+    Result result2 = scanner.next();
+    assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length);
+    Cell c2 = result2.rawCells()[0];
+    assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result2.isPartial());
+
+    moveRegion(table, 3);
+
+    Result result3 = scanner.next();
+    assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length);
+    Cell c3 = result3.rawCells()[0];
+    assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result3.isPartial());
+
+  }
+
+  @Test
+  public void testReversedCompleteResultWhenRegionMove() throws IOException {
+    Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"),
+        ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+    moveRegion(table, 1);
+
+    Scan scan = new Scan();
+    scan.setMaxResultSize(1);
+    scan.setCaching(1);
+    scan.setReversed(true);
+    ResultScanner scanner = table.getScanner(scan);
+
+    Result result1 = scanner.next();
+    assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length);
+    Cell c1 = result1.rawCells()[0];
+    assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result1.isPartial());
+
+    moveRegion(table, 2);
+
+    Result result2 = scanner.next();
+    assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length);
+    Cell c2 = result2.rawCells()[0];
+    assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result2.isPartial());
+
+    moveRegion(table, 3);
+
+    Result result3 = scanner.next();
+    assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length);
+    Cell c3 = result3.rawCells()[0];
+    assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]);
+    assertFalse(result3.isPartial());
+
+  }
+
+  @Test
+  public void testBatchingResultWhenRegionMove() throws IOException {
+    Table table =
+        createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES,
+            QUALIFIERS, VALUE);
+
+    moveRegion(table, 1);
+
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    scan.setBatch(1);
+
+    ResultScanner scanner = table.getScanner(scan);
+    for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
+      scanner.next();
+    }
+    Result result1 = scanner.next();
+    assertEquals(1, result1.rawCells().length);
+    Cell c1 = result1.rawCells()[0];
+    assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
+
+    moveRegion(table, 2);
+
+    Result result2 = scanner.next();
+    assertEquals(1, result2.rawCells().length);
+    Cell c2 = result2.rawCells()[0];
+    assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+
+    moveRegion(table, 3);
+
+    Result result3 = scanner.next();
+    assertEquals(1, result3.rawCells().length);
+    Cell c3 = result3.rawCells()[0];
+    assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
+  }
+
+
 }
\ No newline at end of file


Mime
View raw message