hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17251 Add a timeout parameter when locating region
Date Thu, 08 Dec 2016 02:05:46 GMT
Repository: hbase
Updated Branches:
  refs/heads/master c1293cc91 -> b3ae87bd7


HBASE-17251 Add a timeout parameter when locating region


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3ae87bd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3ae87bd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3ae87bd

Branch: refs/heads/master
Commit: b3ae87bd7dd539fd8d5409076933d4528ff6b14c
Parents: c1293cc
Author: zhangduo <zhangduo@apache.org>
Authored: Wed Dec 7 16:57:04 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Dec 8 09:55:29 2016 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncConnectionImpl.java       |   7 +-
 .../hadoop/hbase/client/AsyncRegionLocator.java |  64 +++++++---
 .../AsyncSingleRequestRpcRetryingCaller.java    |  45 +++++--
 .../client/AsyncTableRegionLocatorImpl.java     |   2 +-
 .../hbase/client/TestAsyncRegionLocator.java    |  28 ++---
 .../client/TestAsyncRegionLocatorTimeout.java   | 120 +++++++++++++++++++
 ...TestAsyncSingleRequestRpcRetryingCaller.java |  54 +++++----
 7 files changed, 253 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 5c32a9f..92785fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.util.HashedWheelTimer;
 
 import java.io.IOException;
@@ -56,7 +58,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class);
 
-  private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
+  @VisibleForTesting
+  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
       Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
 
   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@@ -92,7 +95,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.conf = conf;
     this.user = user;
     this.connConf = new AsyncConnectionConfiguration(conf);
-    this.locator = new AsyncRegionLocator(this);
+    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
     this.registry = AsyncRegistryFactory.getRegistry(conf);
     this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 6b74e4c..ae8f2a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -27,6 +27,9 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -36,9 +39,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -62,6 +68,8 @@ class AsyncRegionLocator {
 
   private final AsyncConnectionImpl conn;
 
+  private final HashedWheelTimer retryTimer;
+
   private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
 
   private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture
=
@@ -70,8 +78,9 @@ class AsyncRegionLocator {
   private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>>
cache =
       new ConcurrentHashMap<>();
 
-  AsyncRegionLocator(AsyncConnectionImpl conn) {
+  AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
     this.conn = conn;
+    this.retryTimer = retryTimer;
   }
 
   private CompletableFuture<HRegionLocation> locateMetaRegion() {
@@ -249,9 +258,6 @@ class AsyncRegionLocator {
       return;
     }
     otherCheck.accept(loc);
-    if (future.isDone()) {
-      return;
-    }
     addToCache(loc);
     future.complete(loc);
   }
@@ -282,12 +288,34 @@ class AsyncRegionLocator {
     return locateInMeta(tableName, row);
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
row) {
-    if (tableName.equals(META_TABLE_NAME)) {
-      return locateMetaRegion();
-    } else {
-      return locateRegion(tableName, row);
-    }
+  private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation>
future,
+      long timeoutNs, Supplier<String> timeoutMsg) {
+    if (future.isDone() || timeoutNs <= 0) {
+      return future;
+    }
+    CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>();
+    Timeout timeoutTask = retryTimer.newTimeout(
+      t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())),
timeoutNs,
+      TimeUnit.NANOSECONDS);
+    future.whenComplete((loc, error) -> {
+      timeoutTask.cancel();
+      if (error != null) {
+        timeoutFuture.completeExceptionally(error);
+      } else {
+        timeoutFuture.complete(loc);
+      }
+    });
+    return timeoutFuture;
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
row,
+      long timeoutNs) {
+    CompletableFuture<HRegionLocation> future =
+        tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName,
row);
+    return withTimeout(future, timeoutNs,
+      () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+          + "ms) waiting for region location for " + tableName + ", row='"
+          + Bytes.toStringBinary(row) + "'");
   }
 
   private HRegionLocation locatePreviousInCache(TableName tableName,
@@ -356,14 +384,18 @@ class AsyncRegionLocator {
 
   /**
    * Locate the previous region using the current regions start key. Used for reverse scan.
+   * <p>
+   * TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the
endRow
+   * of a region.
    */
   CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
-      byte[] startRowOfCurrentRegion) {
-    if (tableName.equals(META_TABLE_NAME)) {
-      return locateMetaRegion();
-    } else {
-      return locatePreviousRegion(tableName, startRowOfCurrentRegion);
-    }
+      byte[] startRowOfCurrentRegion, long timeoutNs) {
+    CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME)
+        ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion);
+    return withTimeout(future, timeoutNs,
+      () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+          + "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
+          + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
   }
 
   private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 36687c6..44a237d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import org.apache.commons.logging.Log;
@@ -51,6 +52,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
 
+  // Add a delta to avoid timeout immediately after a retry sleeping.
+  private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+
   @FunctionalInterface
   public interface Callable<T> {
     CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
@@ -65,7 +69,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private final byte[] row;
 
-  private final Supplier<CompletableFuture<HRegionLocation>> locate;
+  private final Function<Long, CompletableFuture<HRegionLocation>> locate;
 
   private final Callable<T> callable;
 
@@ -118,6 +122,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
   }
 
+  private long remainingTimeNs() {
+    return operationTimeoutNs - (System.nanoTime() - startNs);
+  }
+
   private void completeExceptionally() {
     future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
   }
@@ -138,7 +146,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     }
     long delayNs;
     if (operationTimeoutNs > 0) {
-      long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs);
+      long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         completeExceptionally();
         return;
@@ -153,6 +161,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   }
 
   private void call(HRegionLocation loc) {
+    long callTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      callTimeoutNs = remainingTimeNs();
+      if (callTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+      callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
+    } else {
+      callTimeoutNs = rpcTimeoutNs;
+    }
     ClientService.Interface stub;
     try {
       stub = conn.getRegionServerStub(loc.getServerName());
@@ -166,7 +185,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
         err -> conn.getLocator().updateCachedLocation(loc, err));
       return;
     }
-    resetController(controller, rpcTimeoutNs);
+    resetController(controller, callTimeoutNs);
     callable.call(controller, loc, stub).whenComplete((result, error) -> {
       if (error != null) {
         onError(error,
@@ -183,7 +202,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   }
 
   private void locateThenCall() {
-    locate.get().whenComplete((loc, error) -> {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    locate.apply(locateTimeoutNs).whenComplete((loc, error) -> {
       if (error != null) {
         onError(error,
           () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed,
tries = "
@@ -198,12 +227,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     });
   }
 
-  private CompletableFuture<HRegionLocation> locate() {
-    return conn.getLocator().getRegionLocation(tableName, row);
+  private CompletableFuture<HRegionLocation> locate(long timeoutNs) {
+    return conn.getLocator().getRegionLocation(tableName, row, timeoutNs);
   }
 
-  private CompletableFuture<HRegionLocation> locatePrevious() {
-    return conn.getLocator().getPreviousRegionLocation(tableName, row);
+  private CompletableFuture<HRegionLocation> locatePrevious(long timeoutNs) {
+    return conn.getLocator().getPreviousRegionLocation(tableName, row, timeoutNs);
   }
 
   public CompletableFuture<T> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index b29f878..e1f40a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
 
   @Override
   public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload)
{
-    return locator.getRegionLocation(tableName, row);
+    return locator.getRegionLocation(tableName, row, 0L);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
index 2e46d8a..a679192 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -102,12 +102,12 @@ public class TestAsyncRegionLocator {
   @Test
   public void testNoTable() throws InterruptedException {
     try {
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
     }
     try {
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
     }
@@ -118,12 +118,12 @@ public class TestAsyncRegionLocator {
     createSingleRegionTable();
     TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
     try {
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
     }
     try {
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
     }
@@ -143,17 +143,17 @@ public class TestAsyncRegionLocator {
     createSingleRegionTable();
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
     byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
     ThreadLocalRandom.current().nextBytes(randKey);
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
+      LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get());
     // Use a key which is not the endKey of a region will cause error
     try {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
+        LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get());
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(IOException.class));
       assertTrue(e.getCause().getMessage().contains("end key of"));
@@ -193,7 +193,7 @@ public class TestAsyncRegionLocator {
     IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i
-> {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i
+ 1],
-          serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
+          serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get());
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -204,7 +204,7 @@ public class TestAsyncRegionLocator {
       n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i
-> {
         try {
           assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
-            LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
+            LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get());
         } catch (InterruptedException | ExecutionException e) {
           throw new RuntimeException(e);
         }
@@ -215,7 +215,7 @@ public class TestAsyncRegionLocator {
   public void testRegionMove() throws IOException, InterruptedException, ExecutionException
{
     createSingleRegionTable();
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
-    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
     ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
         .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
@@ -228,12 +228,12 @@ public class TestAsyncRegionLocator {
       Thread.sleep(100);
     }
     // Should be same as it is in cache
-    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
     LOCATOR.updateCachedLocation(loc, null);
     // null error will not trigger a cache cleanup
-    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
     LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
new file mode 100644
index 0000000..2a902a6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncRegionLocatorTimeout {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static AsyncConnectionImpl CONN;
+
+  private static AsyncRegionLocator LOCATOR;
+
+  private static volatile long SLEEP_MS = 0L;
+
+  public static class SleepRegionObserver extends BaseRegionObserver {
+
+    @Override
+    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
e, Scan scan,
+        RegionScanner s) throws IOException {
+      if (SLEEP_MS > 0) {
+        Threads.sleepWithoutInterrupt(SLEEP_MS);
+      }
+      return super.preScannerOpen(e, scan, s);
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(REGION_COPROCESSOR_CONF_KEY, SleepRegionObserver.class.getName());
+    conf.setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = new AsyncConnectionImpl(conf, User.getCurrent());
+    LOCATOR = CONN.getLocator();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException {
+    SLEEP_MS = 1000;
+    long startNs = System.nanoTime();
+    try {
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500))
+          .get();
+      fail();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+      assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
+    }
+    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+    assertTrue(costMs >= 500);
+    assertTrue(costMs < 1000);
+    // wait for the background task finish
+    Thread.sleep(2000);
+    // Now the location should be in cache, so we will not visit meta again.
+    HRegionLocation loc = LOCATOR
+        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500)).get();
+    assertEquals(loc.getServerName(),
+      TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 0b3e186..f76e240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -150,33 +150,35 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     AtomicBoolean errorTriggered = new AtomicBoolean(false);
     AtomicInteger count = new AtomicInteger(0);
     HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
-    AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) {
-      @Override
-      CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
row) {
-        if (tableName.equals(TABLE_NAME)) {
-          CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-          if (count.getAndIncrement() == 0) {
-            errorTriggered.set(true);
-            future.completeExceptionally(new RuntimeException("Inject error!"));
-          } else {
-            future.complete(loc);
+    AsyncRegionLocator mockedLocator =
+        new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) {
+          @Override
+          CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName,
byte[] row,
+              long timeoutNs) {
+            if (tableName.equals(TABLE_NAME)) {
+              CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+              if (count.getAndIncrement() == 0) {
+                errorTriggered.set(true);
+                future.completeExceptionally(new RuntimeException("Inject error!"));
+              } else {
+                future.complete(loc);
+              }
+              return future;
+            } else {
+              return super.getRegionLocation(tableName, row, timeoutNs);
+            }
+          }
+
+          @Override
+          CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+              byte[] startRowOfCurrentRegion, long timeoutNs) {
+            return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion, timeoutNs);
+          }
+
+          @Override
+          void updateCachedLocation(HRegionLocation loc, Throwable exception) {
           }
-          return future;
-        } else {
-          return super.getRegionLocation(tableName, row);
-        }
-      }
-
-      @Override
-      CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
-          byte[] startRowOfCurrentRegion) {
-        return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
-      }
-
-      @Override
-      void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-      }
-    };
+        };
     try (AsyncConnectionImpl mockedConn =
         new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
 


Mime
View raw message