hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [10/44] hbase git commit: HBASE-13193 RegionScannerImpl filters should not be reset if a partial Result is returned (Jonathan Lawlor)
Date Mon, 23 Mar 2015 22:22:55 GMT
HBASE-13193 RegionScannerImpl filters should not be reset if a partial Result is returned (Jonathan
Lawlor)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a75a2ace
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a75a2ace
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a75a2ace

Branch: refs/heads/hbase-12439
Commit: a75a2ace4f52daa1eb415f085dfad920b30c3349
Parents: c5aca19
Author: stack <stack@apache.org>
Authored: Mon Mar 16 13:26:34 2015 -0700
Committer: stack <stack@apache.org>
Committed: Mon Mar 16 13:26:34 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 123 ++++++++++++-------
 .../hadoop/hbase/regionserver/HRegion.java      |  14 ++-
 .../hbase/regionserver/InternalScanner.java     |   5 +
 .../hbase/TestPartialResultsFromClientSide.java |  44 ++++++-
 4 files changed, 133 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a75a2ace/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 9993974..bfbe718 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -71,6 +71,12 @@ public class ClientScanner extends AbstractClientScanner {
      * result.
      */
     protected final LinkedList<Result> partialResults = new LinkedList<Result>();
+    /**
+     * The row for which we are accumulating partial Results (i.e. the row of the Results
stored
+     * inside partialResults). Changes to partialResultsRow and partialResults are kept in
sync
+     * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
+     */
+    protected byte[] partialResultsRow = null;
     protected final int caching;
     protected long lastNext;
     // Keep lastResult returned successfully in case we have to reset scanner.
@@ -378,7 +384,7 @@ public class ClientScanner extends AbstractClientScanner {
           } catch (DoNotRetryIOException e) {
             // An exception was thrown which makes any partial results that we were collecting
             // invalid. The scanner will need to be reset to the beginning of a row.
-            partialResults.clear();
+            clearPartialResults();
 
             // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs
want us
             // to reset the scanner and come back in again.
@@ -515,7 +521,7 @@ public class ClientScanner extends AbstractClientScanner {
     if (resultsFromServer == null || resultsFromServer.length == 0) {
       if (!partialResults.isEmpty()) {
         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        partialResults.clear();
+        clearPartialResults();
       }
 
       return resultsToAddToCache;
@@ -534,67 +540,65 @@ public class ClientScanner extends AbstractClientScanner {
       LOG.trace(sb.toString());
     }
 
-    // There are four possibilities cases that can occur while handling partial results
+    // There are three possibilities cases that can occur while handling partial results
     //
     // 1. (partial != null && partialResults.isEmpty())
     // This is the first partial result that we have received. It should be added to
     // the list of partialResults and await the next RPC request at which point another
     // portion of the complete result will be received
     //
-    // 2. (partial != null && !partialResults.isEmpty())
-    // a. values.length == 1
-    // Since partialResults contains some elements, it means that we are expecting to receive
-    // the remainder of the complete result within this RPC response. The fact that a partial
result
-    // was returned and it's the ONLY result returned indicates that we are still receiving
-    // fragments of the complete result. The Result can be completely formed only when we
have
-    // received all of the fragments and thus in this case we simply add the partial result
to
-    // our list.
-    //
-    // b. values.length > 1
-    // More than one result has been returned from the server. The fact that we are accumulating
-    // partials in partialList and we just received more than one result back from the server
-    // indicates that the FIRST result we received from the server must be the final fragment
that
-    // can be used to complete our result. What this means is that the partial that we received
is
-    // a partial result for a different row, and at this point we should combine the existing
-    // partials into a complete result, clear the partialList, and begin accumulating partials
for
-    // a new row
+    // 2. !partialResults.isEmpty()
+    // Since our partialResults list is not empty it means that we have been accumulating
partial
+    // Results for a particular row. We cannot form the complete/whole Result for that row
until
+    // all partials for the row have been received. Thus we loop through all of the Results
+    // returned from the server and determine whether or not all partial Results for the
row have
+    // been received. We know that we have received all of the partial Results for the row
when:
+    // i) We notice a row change in the Results
+    // ii) We see a Result for the partial row that is NOT marked as a partial Result
     //
-    // 3. (partial == null && !partialResults.isEmpty())
-    // No partial was received but we are accumulating partials in our list. That means the
final
-    // fragment of the complete result will be the first Result in values[]. We use it to
create the
-    // complete Result, clear the list, and add it to the list of Results that must be added
to the
-    // cache. All other Results in values[] are added after the complete result to maintain
proper
-    // ordering
-    //
-    // 4. (partial == null && partialResults.isEmpty())
+    // 3. (partial == null && partialResults.isEmpty())
     // Business as usual. We are not accumulating partial results and there wasn't a partial
result
     // in the RPC response. This means that all of the results we received from the server
are
     // complete and can be added directly to the cache
     if (partial != null && partialResults.isEmpty()) {
-      partialResults.add(partial);
+      addToPartialResults(partial);
 
       // Exclude the last result, it's a partial
       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length
- 1);
-    } else if (partial != null && !partialResults.isEmpty()) {
-      if (resultsFromServer.length > 1) {
-        Result finalResult = resultsFromServer[0];
-        partialResults.add(finalResult);
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        partialResults.clear();
+    } else if (!partialResults.isEmpty()) {
+      for (int i = 0; i < resultsFromServer.length; i++) {
+        Result result = resultsFromServer[i];
+
+        // This result is from the same row as the partial Results. Add it to the list of
partials
+        // and check if it was the last partial Result for that row
+        if (Bytes.equals(partialResultsRow, result.getRow())) {
+          addToPartialResults(result);
+
+          // If the result is not a partial, it is a signal to us that it is the last Result
we
+          // need to form the complete Result client-side
+          if (!result.isPartial()) {
+            resultsToAddToCache.add(Result.createCompleteResult(partialResults));
+            clearPartialResults();
+          }
+        } else {
+          // The row of this result differs from the row of the partial results we have received
so
+          // far. If our list of partials isn't empty, this is a signal to form the complete
Result
+          // since the row has now changed
+          if (!partialResults.isEmpty()) {
+            resultsToAddToCache.add(Result.createCompleteResult(partialResults));
+            clearPartialResults();
+          }
 
-        // Exclude first result, it was used to form our complete result
-        // Exclude last result, it's a partial result
-        addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length
- 1);
+          // It's possible that in one response from the server we receive the final partial
for
+          // one row and receive a partial for a different row. Thus, make sure that all
Results
+          // are added to the proper list
+          if (result.isPartial()) {
+            addToPartialResults(result);
+          } else {
+            resultsToAddToCache.add(result);
+          }
+        }
       }
-      partialResults.add(partial);
-    } else if (partial == null && !partialResults.isEmpty()) {
-      Result finalResult = resultsFromServer[0];
-      partialResults.add(finalResult);
-      resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-      partialResults.clear();
-
-      // Exclude the first result, it was used to form our complete result
-      addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length);
     } else { // partial == null && partialResults.isEmpty() -- business as usual
       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
     }
@@ -603,6 +607,31 @@ public class ClientScanner extends AbstractClientScanner {
   }
 
   /**
+   * A convenience method for adding a Result to our list of partials. This method ensure
that only
+   * Results that belong to the same row as the other partials can be added to the list.
+   * @param result The result that we want to add to our list of partial Results
+   * @throws IOException
+   */
+  private void addToPartialResults(final Result result) throws IOException {
+    final byte[] row = result.getRow();
+    if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
+      throw new IOException("Partial result row does not match. All partial results must
come "
+          + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow)
+ "row: "
+          + Bytes.toString(row));
+    }
+    partialResultsRow = row;
+    partialResults.add(result);
+  }
+
+  /**
+   * Convenience method for clearing the list of partials and resetting the partialResultsRow.
+   */
+  private void clearPartialResults() {
+    partialResults.clear();
+    partialResultsRow = null;
+  }
+
+  /**
    * Helper method for adding results between the indices [start, end) to the outputList
    * @param outputList the list that results will be added to
    * @param inputArray the array that results are taken from

http://git-wip-us.apache.org/repos/asf/hbase/blob/a75a2ace/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 71b5646..3b1f267 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5538,13 +5538,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
         state = nextInternal(tmpList, batchLimit, remainingResultSize);
         outResults.addAll(tmpList);
       }
-      // State should never be null, this is a precautionary measure
-      if (state == null) {
-        if (LOG.isTraceEnabled()) LOG.trace("State was null. Defaulting to no more values
state");
-        state = NextState.makeState(NextState.State.NO_MORE_VALUES);
+      // Invalid states should never be returned. Receiving an invalid state means that we
have
+      // no clue how to proceed. Throw an exception.
+      if (!NextState.isValidState(state)) {
+        throw new IOException("Invalid state returned from nextInternal. state:" + state);
       }
 
-      resetFilters();
+      // If the size limit was reached it means a partial Result is being returned. Returning
a
+      // partial Result means that we should not reset the filters; filters should only be
reset in
+      // between rows
+      if (!state.sizeLimitReached()) resetFilters();
+
       if (isFilterDoneInternal()) {
         state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a75a2ace/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
index e68dc75..ea5a75f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
@@ -201,6 +201,11 @@ public interface InternalScanner extends Closeable {
       return resultSize >= 0;
     }
 
+    @Override
+    public String toString() {
+      return "State: " + state + " resultSize: " + resultSize;
+    }
+
     /**
      * Helper method to centralize all checks as to whether or not the state is valid.
      * @param state

http://git-wip-us.apache.org/repos/asf/hbase/blob/a75a2ace/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index 75fa6e3..e7c3813 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +37,11 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
 import org.apache.hadoop.hbase.filter.RandomRowFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -501,7 +508,7 @@ public class TestPartialResultsFromClientSide {
    * because the entire row needs to be read for the include/exclude decision to be made
    */
   @Test
-  public void testNoPartialResultsWhenFilterPresent() throws Exception {
+  public void testNoPartialResultsWhenRowFilterPresent() throws Exception {
     Scan scan = new Scan();
     scan.setMaxResultSize(1);
     scan.setAllowPartialResults(true);
@@ -784,4 +791,39 @@ public class TestPartialResultsFromClientSide {
     scanner.close();
     return numCells;
   }
+
+  /**
+   * Test partial Result re-assembly in the presence of different filters. The Results from
the
+   * partial scanner should match the Results returned from a scanner that receives all of
the
+   * results in one RPC to the server. The partial scanner is tested with a variety of different
+   * result sizes (all of which are less than the size necessary to fetch an entire row)
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsWithColumnFilter() throws Exception {
+    testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter());
+    testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5")));
+    testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"),
true,
+        Bytes.toBytes("testQualifier7"), true));
+
+    Set<byte[]> qualifiers = new LinkedHashSet<>();
+    qualifiers.add(Bytes.toBytes("testQualifier5"));
+    testPartialResultsWithColumnFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
+  }
+
+  public void testPartialResultsWithColumnFilter(Filter filter) throws Exception {
+    assertTrue(!filter.hasFilterRow());
+
+    Scan partialScan = new Scan();
+    partialScan.setFilter(filter);
+
+    Scan oneshotScan = new Scan();
+    oneshotScan.setFilter(filter);
+    oneshotScan.setMaxResultSize(Long.MAX_VALUE);
+
+    for (int i = 1; i <= NUM_COLS; i++) {
+      partialScan.setMaxResultSize(getResultSizeForNumberOfCells(i));
+      testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
+    }
+  }
 }


Mime
View raw message