hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yangzhe1...@apache.org
Subject hbase git commit: HBASE-17210 Set timeout on trying rowlock according to client's RPC timeout
Date Wed, 22 Feb 2017 03:54:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 27a8aa99d -> f5ae21ea3


HBASE-17210 Set timeout on trying rowlock according to client's RPC timeout


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f5ae21ea
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f5ae21ea
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f5ae21ea

Branch: refs/heads/branch-1
Commit: f5ae21ea32c0bbe2f42e00da86c8dd73bc7aef67
Parents: 27a8aa9
Author: Phil Yang <yangzhe1991@apache.org>
Authored: Tue Feb 21 11:31:00 2017 +0800
Committer: Phil Yang <yangzhe1991@apache.org>
Committed: Wed Feb 22 11:43:47 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  28 +++-
 .../TestSettingTimeoutOnBlockingPoint.java      | 132 +++++++++++++++++++
 2 files changed, 157 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ae21ea/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3a102c9..c44d363 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -132,6 +132,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -3213,6 +3214,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         RowLock rowLock = null;
         try {
           rowLock = getRowLockInternal(mutation.getRow(), true);
+        } catch (TimeoutIOException e) {
+          // We will retry when other exceptions, but we should stop if we timeout .
+          throw e;
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock in batch put, row="
             + Bytes.toStringBinary(mutation.getRow()), ioe);
@@ -5466,15 +5470,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           result = rowLockContext.newWriteLock();
         }
       }
-      if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+
+      int timeout = rowLockWaitDuration;
+      boolean reachDeadlineFirst = false;
+      RpcCallContext call = RpcServer.getCurrentCall();
+      if (call != null && call.getDeadline() < Long.MAX_VALUE) {
+        int timeToDeadline = (int)(call.getDeadline() - System.currentTimeMillis());
+        if (timeToDeadline <= this.rowLockWaitDuration) {
+          reachDeadlineFirst = true;
+          timeout = timeToDeadline;
+        }
+      }
+
+      if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
         if (traceScope != null) {
           traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
         }
         result = null;
         // Clean up the counts just in case this was the thing keeping the context alive.
         rowLockContext.cleanUp();
-        throw new IOException("Timed out waiting for lock for row: " + rowKey + " in region
"
-            + getRegionInfo().getEncodedName());
+        String message = "Timed out waiting for lock for row: " + rowKey + " in region "
+            + getRegionInfo().getEncodedName();
+        if (reachDeadlineFirst) {
+          throw new TimeoutIOException(message);
+        } else {
+          // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
+          throw new IOException(message);
+        }
       }
       rowLockContext.setThreadName(Thread.currentThread().getName());
       return result;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ae21ea/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java
new file mode 100644
index 0000000..d60ff22
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({LargeTests.class})
+public class TestSettingTimeoutOnBlockingPoint {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] FAM = Bytes.toBytes("f");
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    // simulate queue blocking
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void setUpAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public static class SleepCoprocessor extends BaseRegionObserver {
+    public static final int SLEEP_TIME = 10000;
+
+    @Override
+    public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment>
e,
+        final Increment increment) throws IOException {
+      Threads.sleep(SLEEP_TIME);
+      return super.preIncrementAfterRowLock(e, increment);
+    }
+  }
+
+  @Test
+  public void testRowLock() throws IOException {
+    final TableName tableName = TableName.valueOf(testName.getMethodName());
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(tableName.getNameAsString());
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    TEST_UTIL.createTable(hdt, new byte[][]{FAM}, TEST_UTIL.getConfiguration());
+
+    Thread incrementThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          try(Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+            table.incrementColumnValue(ROW1, FAM, FAM, 1);
+          }
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+      }
+    });
+    Thread getThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          try(Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+            table.setRpcTimeout(1000);
+            Delete delete = new Delete(ROW1);
+            table.delete(delete);
+          }
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+      }
+    });
+
+    incrementThread.start();
+    Threads.sleep(1000);
+    getThread.start();
+    Threads.sleep(2000);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      // We have only two handlers. The first thread will get a write lock for row1 and occupy
+      // the first handler. The second thread need a read lock for row1, it should quit after
1000
+      // ms and give back the handler because it can not get the lock in time.
+      // So we can get the value using the second handler.
+      table.setRpcTimeout(1000);
+      table.get(new Get(ROW2)); // Will throw exception if the timeout checking is failed
+    } finally {
+      incrementThread.interrupt();
+      getThread.interrupt();
+    }
+  }
+}


Mime
View raw message