hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huaxiang...@apache.org
Subject hbase git commit: HBASE-19163 Maximum lock count exceeded from region server's batch processing
Date Thu, 07 Dec 2017 22:10:36 GMT
Repository: hbase
Updated Branches:
  refs/heads/master f55e81e6c -> 428e5672e


HBASE-19163 Maximum lock count exceeded from region server's batch processing

Signed-off-by: Michael Stack <stack@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/428e5672
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/428e5672
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/428e5672

Branch: refs/heads/master
Commit: 428e5672e77d6dea9cfdafb5a3052415b9926d12
Parents: f55e81e
Author: huaxiangsun <huaxiangsun@apache.org>
Authored: Wed Dec 6 16:58:31 2017 -0800
Committer: huaxiangsun <huaxiangsun@apache.org>
Committed: Thu Dec 7 13:57:22 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 72 ++++++++++++++++----
 .../hbase/client/TestFromClientSide3.java       | 27 ++++++++
 .../hbase/regionserver/TestAtomicOperation.java |  5 +-
 3 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/428e5672/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 131fff5..a8ea724 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -226,6 +226,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
   public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
 
+  public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
+      "hbase.regionserver.minibatch.size";
+  public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
+
   /**
    * This is the global default value for durability. All tables/mutations not
    * defining a durability or using USE_DEFAULT will default to this value.
@@ -338,6 +342,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   // in bytes
   final long maxCellSize;
 
+  // Number of mutations for minibatch processing.
+  private final int miniBatchSize;
+
   // negative number indicates infinite timeout
   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
@@ -809,6 +816,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
 
     this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE);
+    this.miniBatchSize = conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE,
+        DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE);
   }
 
   void setHTableSpecificConf() {
@@ -3137,7 +3146,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         List<RowLock> acquiredRowLocks) throws IOException {
       int readyToWriteCount = 0;
       int lastIndexExclusive = 0;
+      RowLock prevRowLock = null;
       for (; lastIndexExclusive < size(); lastIndexExclusive++) {
+        // It reaches the miniBatchSize, stop here and process the miniBatch
+        // This only applies to non-atomic batch operations.
+        if (!isAtomic() && (readyToWriteCount == region.miniBatchSize)) {
+          break;
+        }
+
         if (!isOperationPending(lastIndexExclusive)) {
           continue;
         }
@@ -3146,7 +3162,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         RowLock rowLock = null;
         try {
           // if atomic then get exclusive lock, else shared lock
-          rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());
+          rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
         } catch (TimeoutIOException e) {
           // We will retry when other exceptions, but we should stop if we timeout .
           throw e;
@@ -3163,8 +3179,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           }
           break; // Stop acquiring more rows for this batch
         } else {
-          acquiredRowLocks.add(rowLock);
+          if (rowLock != prevRowLock) {
+            // It is a different row now, add this to the acquiredRowLocks and
+            // set prevRowLock to the new returned rowLock
+            acquiredRowLocks.add(rowLock);
+            prevRowLock = rowLock;
+          }
         }
+
         readyToWriteCount++;
       }
       return createMiniBatch(lastIndexExclusive, readyToWriteCount);
@@ -3553,7 +3575,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           this.checkAndPrepareMutation(cpMutation, timestamp);
 
           // Acquire row locks. If not, the whole batch will fail.
-          acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true));
+          acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true, null));
 
           // Returned mutations from coprocessor correspond to the Mutation at index i. We
can
           // directly add the cells from those mutations to the familyMaps of this mutation.
@@ -3930,7 +3952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       get.addColumn(family, qualifier);
       // Lock row - note that doBatchMutate will relock this row if called
       checkRow(row, "doCheckAndRowMutate");
-      RowLock rowLock = getRowLockInternal(get.getRow(), false);
+      RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
       try {
         if (mutation != null && this.getCoprocessorHost() != null) {
           // Call coprocessor.
@@ -5545,10 +5567,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   @Override
   public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
     checkRow(row, "row lock");
-    return getRowLockInternal(row, readLock);
+    return getRowLockInternal(row, readLock, null);
   }
 
-  protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
+  protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock prevRowLock)
+      throws IOException {
     // create an object to use a a key in the row lock map
     HashedBytes rowKey = new HashedBytes(row);
 
@@ -5565,6 +5588,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         // Now try an get the lock.
         // This can fail as
         if (readLock) {
+          // For read lock, if the caller has locked the same row previously, it will not
try
+          // to acquire the same read lock. It simply returns the previous row lock.
+          RowLockImpl prevRowLockImpl = (RowLockImpl)prevRowLock;
+          if ((prevRowLockImpl != null) && (prevRowLockImpl.getLock() ==
+              rowLockContext.readWriteLock.readLock())) {
+            success = true;
+            return prevRowLock;
+          }
           result = rowLockContext.newReadLock();
         } else {
           result = rowLockContext.newWriteLock();
@@ -5587,7 +5618,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
       if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
         TraceUtil.addTimelineAnnotation("Failed to get row lock");
-        result = null;
         String message = "Timed out waiting for lock for row: " + rowKey + " in region "
             + getRegionInfo().getEncodedName();
         if (reachDeadlineFirst) {
@@ -5607,6 +5637,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
       Thread.currentThread().interrupt();
       throw iie;
+    } catch (Error error) {
+      // The maximum lock count for read lock is 64K (hardcoded), when this maximum count
+      // is reached, it will throw out an Error. This Error needs to be caught so it can
+      // go ahead to process the minibatch with lock acquired.
+      LOG.warn("Error to get row lock for " + Bytes.toStringBinary(row) + ", cause: " + error);
+      IOException ioe = new IOException();
+      ioe.initCause(error);
+      TraceUtil.addTimelineAnnotation("Error getting row lock");
+      throw ioe;
     } finally {
       // Clean up the counts just in case this was the thing keeping the context alive.
       if (!success && rowLockContext != null) {
@@ -7154,10 +7193,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       @Override
       public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(
           List<RowLock> acquiredRowLocks) throws IOException {
+        RowLock prevRowLock = null;
         for (byte[] row : rowsToLock) {
           try {
-            RowLock rowLock = region.getRowLockInternal(row, false); // write lock
-            acquiredRowLocks.add(rowLock);
+            RowLock rowLock = region.getRowLockInternal(row, false, prevRowLock); // write
lock
+            if (rowLock != prevRowLock) {
+              acquiredRowLocks.add(rowLock);
+              prevRowLock = rowLock;
+            }
           } catch (IOException ioe) {
             LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe);
             throw ioe;
@@ -7244,10 +7287,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       try {
         // STEP 2. Acquire the row lock(s)
         acquiredRowLocks = new ArrayList<>(rowsToLock.size());
+        RowLock prevRowLock = null;
         for (byte[] row : rowsToLock) {
           // Attempt to lock all involved rows, throw if any lock times out
           // use a writer lock for mixed reads and writes
-          acquiredRowLocks.add(getRowLockInternal(row, false));
+          RowLock rowLock = getRowLockInternal(row, false, prevRowLock);
+          if (rowLock != prevRowLock) {
+            acquiredRowLocks.add(rowLock);
+            prevRowLock = rowLock;
+          }
         }
         // STEP 3. Region lock
         lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
@@ -7425,7 +7473,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     RowLock rowLock = null;
     MemStoreSizing memstoreAccounting = new MemStoreSizing();
     try {
-      rowLock = getRowLockInternal(mutation.getRow(), false);
+      rowLock = getRowLockInternal(mutation.getRow(), false, null);
       lock(this.updatesLock.readLock());
       try {
         Result cpResult = doCoprocessorPreCall(op, mutation);
@@ -7774,7 +7822,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       3 * Bytes.SIZEOF_BOOLEAN);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/428e5672/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 5f7622a..e5d5324 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -411,6 +411,33 @@ public class TestFromClientSide3 {
     }
   }
 
+  // Test Table.batch with large amount of mutations against the same key.
+  // It used to trigger read lock's "Maximum lock count exceeded" Error.
+  @Test
+  public void testHTableWithLargeBatch() throws Exception {
+    Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+        new byte[][] { FAMILY });
+    int sixtyFourK = 64 * 1024;
+    try {
+      List actions = new ArrayList();
+      Object[] results = new Object[(sixtyFourK + 1) * 2];
+
+      for (int i = 0; i < sixtyFourK + 1; i ++) {
+        Put put1 = new Put(ROW);
+        put1.addColumn(FAMILY, QUALIFIER, VALUE);
+        actions.add(put1);
+
+        Put put2 = new Put(ANOTHERROW);
+        put2.addColumn(FAMILY, QUALIFIER, VALUE);
+        actions.add(put2);
+      }
+
+      table.batch(actions, results);
+    } finally {
+      table.close();
+    }
+  }
+
   @Test
   public void testBatchWithRowMutation() throws Exception {
     LOG.info("Starting testBatchWithRowMutation");

http://git-wip-us.apache.org/repos/asf/hbase/blob/428e5672/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 3aed91c..af82692 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -667,11 +667,12 @@ public class TestAtomicOperation {
     }
 
     @Override
-    public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException
{
+    public RowLock getRowLockInternal(final byte[] row, boolean readLock,
+        final RowLock prevRowlock) throws IOException {
       if (testStep == TestStep.CHECKANDPUT_STARTED) {
         latch.countDown();
       }
-      return new WrappedRowLock(super.getRowLockInternal(row, readLock));
+      return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
     }
 
     public class WrappedRowLock implements RowLock {


Mime
View raw message