hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-14978 Don't allow Multi to retain too many blocks
Date Fri, 18 Dec 2015 00:15:28 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 24b40cb90 -> f4bbb17d3


HBASE-14978 Don't allow Multi to retain too many blocks


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f4bbb17d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f4bbb17d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f4bbb17d

Branch: refs/heads/branch-1.2
Commit: f4bbb17d3ca28d21f53e483bc8684e95459ca9c2
Parents: 24b40cb
Author: Elliott Clark <eclark@apache.org>
Authored: Thu Dec 17 00:51:37 2015 -0800
Committer: Elliott Clark <eclark@apache.org>
Committed: Thu Dec 17 16:13:41 2015 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |  3 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 11 +++
 .../hbase/regionserver/RSRpcServices.java       | 58 ++++++++++-----
 .../hbase/client/TestMultiRespectsLimits.java   | 75 ++++++++++++++++++--
 4 files changed, 124 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f4bbb17d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 3e38dbf..eb76748 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -82,4 +82,7 @@ public interface RpcCallContext extends Delayable {
    * onerous.
    */
   void incrementResponseCellSize(long cellSize);
+
+  long getResponseBlockSize();
+  void incrementResponseBlockSize(long blockSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f4bbb17d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 06d8ca9..fe62e95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -318,6 +318,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     private InetAddress remoteAddress;
 
     private long responseCellSize = 0;
+    private long responseBlockSize = 0;
     private boolean retryImmediatelySupported;
 
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader
header,
@@ -541,6 +542,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       responseCellSize += cellSize;
     }
 
+    @Override
+    public long getResponseBlockSize() {
+      return responseBlockSize;
+    }
+
+    @Override
+    public void incrementResponseBlockSize(long blockSize) {
+      responseBlockSize += blockSize;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is

http://git-wip-us.apache.org/repos/asf/hbase/blob/f4bbb17d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index f8f2268..8141156 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -581,6 +581,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
     RpcCallContext context = RpcServer.getCurrentCall();
     IOException sizeIOE = null;
+    Object lastBlock = null;
     for (ClientProtos.Action action : actions.getActionList()) {
       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
       try {
@@ -588,7 +589,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
         if (context != null
             && context.isRetryImmediatelySupported()
-            && context.getResponseCellSize() > maxQuotaResultSize) {
+            && (context.getResponseCellSize() > maxQuotaResultSize
+              || context.getResponseBlockSize() > maxQuotaResultSize)) {
 
           // We're storing the exception since the exception and reason string won't
           // change after the response size limit is reached.
@@ -597,15 +599,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             // Throwing will kill the JVM's JIT.
             //
             // Instead just create the exception and then store it.
-            sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
-                    + context.getResponseCellSize());
+            sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
+                + " CellSize: " + context.getResponseCellSize()
+                + " BlockSize: " + context.getResponseBlockSize());
 
             // Only report the exception once since there's only one request that
             // caused the exception. Otherwise this number will dominate the exceptions count.
             rpcServer.getMetrics().exception(sizeIOE);
           }
 
-          // Now that there's an exception is know to be created
+          // Now that there's an exception is known to be created
           // use it for the response.
           //
           // This will create a copy in the builder.
@@ -674,9 +677,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           } else {
             pbResult = ProtobufUtil.toResult(r);
           }
-          if (context != null) {
-            context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
-          }
+          lastBlock = addSize(context, r, lastBlock);
           resultOrExceptionBuilder =
             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
         }
@@ -1003,6 +1004,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   /**
+   * Method to account for the size of retained cells and retained data blocks.
+   * @return an object that represents the last referenced block from this response.
+   */
+  Object addSize(RpcCallContext context, Result r, Object lastBlock) {
+    if (context != null && !r.isEmpty()) {
+      for (Cell c : r.rawCells()) {
+        context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
+
+        // We're using the last block being the same as the current block as
+        // a proxy for pointing to a new block. This won't be exact.
+        // If there are multiple gets that bounce back and forth
+        // Then it's possible that this will over count the size of
+        // referenced blocks. However it's better to over count and
+        // use two rpcs than to OOME the regionserver.
+        byte[] rowArray = c.getRowArray();
+        if (rowArray != lastBlock) {
+          context.incrementResponseBlockSize(rowArray.length);
+          lastBlock = rowArray;
+        }
+      }
+    }
+    return lastBlock;
+  }
+
+
+  /**
    * Find the HRegion based on a region specifier
    *
    * @param regionSpecifier the region specifier
@@ -2289,6 +2316,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       boolean closeScanner = false;
       boolean isSmallScan = false;
       RpcCallContext context = RpcServer.getCurrentCall();
+      Object lastBlock = null;
+
       ScanResponse.Builder builder = ScanResponse.newBuilder();
       if (request.hasCloseScanner()) {
         closeScanner = request.getCloseScanner();
@@ -2377,11 +2406,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               scanner, results, rows);
             if (!results.isEmpty()) {
               for (Result r : results) {
-                for (Cell cell : r.rawCells()) {
-                  if (context != null) {
-                    context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
-                  }
-                }
+                lastBlock = addSize(context, r, lastBlock);
               }
             }
             if (bypass != null && bypass.booleanValue()) {
@@ -2479,13 +2504,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                   moreRows = scanner.nextRaw(values, scannerContext);
 
                   if (!values.isEmpty()) {
-                    for (Cell cell : values) {
-                      if (context != null) {
-                        context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
-                      }
-                    }
                     final boolean partial = scannerContext.partialResultFormed();
-                    results.add(Result.create(values, null, stale, partial));
+                    Result r = Result.create(values, null, stale, partial);
+                    lastBlock = addSize(context, r, lastBlock);
+                    results.add(r);
                     i++;
                   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f4bbb17d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
index 47dd7be..28e1855 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -36,6 +37,7 @@ import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static junit.framework.TestCase.assertEquals;
 
@@ -73,7 +75,7 @@ public class TestMultiRespectsLimits {
     TEST_UTIL.loadTable(t, FAMILY, false);
 
     // Split the table to make sure that the chunking happens accross regions.
-    try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+    try (final Admin admin = TEST_UTIL.getAdmin()) {
       admin.split(name);
       TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
         @Override
@@ -87,16 +89,79 @@ public class TestMultiRespectsLimits {
     for (int i = 0; i < MAX_SIZE; i++) {
       gets.add(new Get(HBaseTestingUtility.ROWS[i]));
     }
-    Result[] results = t.get(gets);
-    assertEquals(MAX_SIZE, results.length);
+
     RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
     BaseSource s = rpcServer.getMetrics().getMetricsSource();
+    long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+    long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge",
s);
+
+    Result[] results = t.get(gets);
+    assertEquals(MAX_SIZE, results.length);
 
     // Cells from TEST_UTIL.loadTable have a length of 27.
     // Multiplying by less than that gives an easy lower bound on size.
     // However in reality each kv is being reported as much higher than that.
-    METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
+    METRICS_ASSERT.assertCounterGt("exceptions",
+        startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+    METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+        startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
+  }
+
+  @Test
+  public void testBlockMultiLimits() throws Exception {
+    final TableName name = TableName.valueOf("testBlockMultiLimits");
+    Table t = TEST_UTIL.createTable(name, FAMILY);
+
+    final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    RpcServerInterface rpcServer = regionServer.getRpcServer();
+    BaseSource s = rpcServer.getMetrics().getMetricsSource();
+    long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
+    long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge",
s);
+
+    byte[] row = Bytes.toBytes("TEST");
+    byte[][] cols = new byte[][]{
+        Bytes.toBytes("0"), // Get this
+        Bytes.toBytes("1"), // Buffer
+        Bytes.toBytes("2"), // Get This
+        Bytes.toBytes("3"), // Buffer
+    };
+
+    // Set the value size so that one result will be less than the MAX_SIE
+    // however the block being reference will be larger than MAX_SIZE.
+    // This should cause the regionserver to try and send a result immediately.
+    byte[] value = new byte[MAX_SIZE - 200];
+    ThreadLocalRandom.current().nextBytes(value);
+
+    for (byte[] col:cols) {
+      Put p = new Put(row);
+      p.addImmutable(FAMILY, col, value);
+      t.put(p);
+    }
+
+    // Make sure that a flush happens
+    try (final Admin admin = TEST_UTIL.getAdmin()) {
+      admin.flush(name);
+      TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return regionServer.getOnlineRegions(name).get(0).getMaxFlushedSeqId() > 3;
+        }
+      });
+    }
+
+    List<Get> gets = new ArrayList<>(2);
+    Get g0 = new Get(row);
+    g0.addColumn(FAMILY, cols[0]);
+    gets.add(g0);
+
+    Get g2 = new Get(row);
+    g2.addColumn(FAMILY, cols[2]);
+    gets.add(g2);
+
+    Result[] results = t.get(gets);
+    assertEquals(2, results.length);
+    METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
     METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
-        (MAX_SIZE * 25) / MAX_SIZE, s);
+        startingMultiExceptions, s);
   }
 }


Mime
View raw message