hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] Implement Preemptive Fast Fail
Date Wed, 29 Oct 2014 05:48:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 95282f2ea -> ece933fa3


http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
new file mode 100644
index 0000000..a9f5b27
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestFastFailWithoutTestUtil {
+  private static final Log LOG = LogFactory.getLog(TestFastFailWithoutTestUtil.class);
+
+  @Test
+  public void testInterceptorFactoryMethods() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
+        conf);
+
+    RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
+        .build();
+    assertTrue("We should be getting a PreemptiveFastFailInterceptor",
+        interceptorBeforeCast instanceof PreemptiveFastFailInterceptor);
+    PreemptiveFastFailInterceptor interceptor = (PreemptiveFastFailInterceptor) interceptorBeforeCast;
+
+    RetryingCallerInterceptorContext contextBeforeCast = interceptor
+        .createEmptyContext();
+    assertTrue(
+        "We should be getting a FastFailInterceptorContext since we are interacting with
the"
+            + " PreemptiveFastFailInterceptor",
+        contextBeforeCast instanceof FastFailInterceptorContext);
+
+    FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast;
+    assertTrue(context != null);
+
+    conf = HBaseConfiguration.create();
+    interceptorFactory = new RetryingCallerInterceptorFactory(conf);
+
+    interceptorBeforeCast = interceptorFactory.build();
+    assertTrue(
+        "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE",
+        interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor);
+
+    contextBeforeCast = interceptorBeforeCast.createEmptyContext();
+    assertTrue(
+        "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor",
+        contextBeforeCast instanceof NoOpRetryingInterceptorContext);
+
+    assertTrue(context != null);
+  }
+
+  @Test
+  public void testInterceptorContextClear() {
+    PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor();
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+    context.clear();
+    assertFalse(context.getCouldNotCommunicateWithServer().booleanValue());
+    assertEquals(context.didTry(), false);
+    assertEquals(context.getFailureInfo(), null);
+    assertEquals(context.getServer(), null);
+    assertEquals(context.getTries(), 0);
+  }
+
+  @Test
+  public void testInterceptorContextPrepare() throws IOException {
+    PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+        .createPreemptiveInterceptor();
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+    RetryingCallable<?> callable = new RegionServerCallable<Boolean>(null,
+        null, null) {
+      @Override
+      public Boolean call(int callTimeout) throws Exception {
+        return true;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, ServerName.valueOf("localhost", 1234,
+            987654321));
+      }
+    };
+    context.prepare(callable);
+    ServerName server = getSomeServerName();
+    assertEquals(context.getServer(), server);
+    context.clear();
+    context.prepare(callable, 2);
+    assertEquals(context.getServer(), server);
+  }
+
+  @Test
+  public void testInterceptorIntercept50Times() throws IOException,
+      InterruptedException {
+    for (int i = 0; i < 50; i++) {
+      testInterceptorIntercept();
+    }
+  }
+
+  public void testInterceptorIntercept() throws IOException,
+      InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+    long CLEANUP_TIMEOUT = 50;
+    long FAST_FAIL_THRESHOLD = 10;
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+
+    PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+        .createPreemptiveInterceptor(conf);
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+
+    RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+
+    // Lets simulate some work flow here.
+    int tries = 0;
+    context.prepare(callable, tries);
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("Interceptor should have updated didTry to true",
+        context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNull(
+        "Once a failure is identified, the first time the FailureInfo is generated for the
server,"
+            + " but it is not assigned to the context yet. It would be assigned on the next"
+            + " intercept.", context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertFalse(
+        "We are still in the first attempt and so we dont set this variable to true yet.",
+        context.isRetryDespiteFastFailMode());
+
+    Thread.sleep(FAST_FAIL_THRESHOLD + 1); // We sleep so as to make sure that
+                                           // we
+    // actually consider this server as a
+    // dead server in the next attempt.
+    tries++;
+
+    context.prepare(callable, tries);
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("didTru should remain true", context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNotNull(
+        "The context this time is updated with a failureInfo, since we already gave it a
try.",
+        context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertTrue(
+        "Since we are alone here we would be given the permission to retryDespiteFailures.",
+        context.isRetryDespiteFastFailMode());
+    context.clear();
+
+    Thread.sleep(CLEANUP_TIMEOUT); // Lets try and cleanup the data in the fast
+                                   // fail failure maps.
+
+    tries++;
+
+    context.clear();
+    context.prepare(callable, tries);
+    interceptor.occasionallyCleanupFailureInformation();
+    assertNull("The cleanup should have cleared the server",
+        interceptor.repeatedFailuresMap.get(context.getServer()));
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("didTru should remain true", context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNull("The failureInfo is cleared off from the maps.",
+        context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertFalse(
+        "Since we are alone here we would be given the permission to retryDespiteFailures.",
+        context.isRetryDespiteFastFailMode());
+    context.clear();
+
+  }
+
+  private <T> RetryingCallable<T> getDummyRetryingCallable(
+      ServerName someServerName) {
+    return new RegionServerCallable<T>(null, null, null) {
+      @Override
+      public T call(int callTimeout) throws Exception {
+        return null;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, serverName);
+      }
+    };
+  }
+
+  @Test
+  public void testExceptionsIdentifiedByInterceptor() throws IOException {
+    Throwable[] networkexceptions = new Throwable[] {
+        new ConnectException("Mary is unwell"),
+        new SocketTimeoutException("Mike is too late"),
+        new ClosedChannelException(),
+        new SyncFailedException("Dave is not on the same page"),
+        new TimeoutException("Mike is late again"),
+        new EOFException("This is the end... "),
+        new ConnectionClosingException("Its closing") };
+    final String INDUCED = "Induced";
+    Throwable[] nonNetworkExceptions = new Throwable[] {
+        new IOException("Bob died"),
+        new RemoteException("Bob's cousin died", null),
+        new NoSuchMethodError(INDUCED), new NullPointerException(INDUCED),
+        new DoNotRetryIOException(INDUCED), new Error(INDUCED) };
+
+    Configuration conf = HBaseConfiguration.create();
+    long CLEANUP_TIMEOUT = 0;
+    long FAST_FAIL_THRESHOLD = 1000000;
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+    for (Throwable e : networkexceptions) {
+      PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+          .createPreemptiveInterceptor(conf);
+      FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+          .createEmptyContext();
+
+      RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+      context.prepare(callable, 0);
+      interceptor.intercept(context);
+      interceptor.handleFailure(context, e);
+      interceptor.updateFailureInfo(context);
+      assertTrue(
+          "The call shouldn't have been successful if there was a ConnectException",
+          context.getCouldNotCommunicateWithServer().booleanValue());
+    }
+    for (Throwable e : nonNetworkExceptions) {
+      try {
+        PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+            .createPreemptiveInterceptor(conf);
+        FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+            .createEmptyContext();
+
+        RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+        context.prepare(callable, 0);
+        interceptor.intercept(context);
+        interceptor.handleFailure(context, e);
+        interceptor.updateFailureInfo(context);
+        assertFalse(
+            "The call shouldn't have been successful if there was a ConnectException",
+            context.getCouldNotCommunicateWithServer().booleanValue());
+      } catch (NoSuchMethodError t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (NullPointerException t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (DoNotRetryIOException t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (Error t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      }
+    }
+  }
+
+  protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor(
+      Configuration conf) {
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
+        conf);
+    RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
+        .build();
+    return (PreemptiveFastFailInterceptor) interceptorBeforeCast;
+  }
+
+  static PreemptiveFastFailInterceptor createPreemptiveInterceptor() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    return createPreemptiveInterceptor(conf);
+  }
+
+  @Test(timeout = 120000)
+  public void testPreemptiveFastFailException50Times()
+      throws InterruptedException, ExecutionException {
+    for (int i = 0; i < 50; i++) {
+      testPreemptiveFastFailException();
+    }
+  }
+
+  /***
+   * This test tries to create a thread interleaving of the 2 threads trying to do a 
+   * Retrying operation using a {@link PreemptiveFastFailInterceptor}. The goal here is to
make sure
+   * that the second thread will be attempting the operation while the first thread is in
the
+   * process of making an attempt after it has marked the server in fast fail. 
+   * 
+   * The thread execution is as follows :
+   * The PreemptiveFastFailInterceptor is extended in this test to achieve a good interleaving
+   * behavior without using any thread sleeps.
+   * 
+   *              Privileged Thread 1                         NonPrivileged Thread 2
+   *                                              
+   *  Retry 0 :   intercept               
+   *                                              
+   *  Retry 0 :   handleFailure
+   *                      latches[0].countdown
+   *                      latches2[0].await
+   *                                                                          latches[0].await
+   *                                                    intercept                 : Retry
0
+   * 
+   *                                                    handleFailure             : Retry
0
+   * 
+   *                                                    updateFailureinfo         : Retry
0
+   *                                                                          latches2[0].countdown
+   *                                                                          
+   *  Retry 0 :   updateFailureInfo
+   *  
+   *  Retry 1 : intercept
+   *  
+   *  Retry 1 :   handleFailure
+   *                      latches[1].countdown
+   *                      latches2[1].await
+   *     
+   *                                                                          latches[1].await
+   *                                                    intercept                 : Retry
1
+   *                                                        (throws PFFE)
+   *                                                    handleFailure             : Retry
1
+   *                                       
+   *                                                    updateFailureinfo         : Retry
1
+   *                                                                          latches2[1].countdown
+   *  Retry 1 :   updateFailureInfo
+   *  
+   * 
+   *  See getInterceptor() for more details on the interceptor implementation to make sure
this
+   *  thread interleaving is achieved.
+   *  
+   *  We need 2 sets of latches of size MAX_RETRIES. We use an AtomicInteger done to make
sure that
+   *  we short circuit the Thread 1 after we hit the PFFE on Thread 2
+   *  
+   *  
+   * @throws InterruptedException
+   * @throws ExecutionException
+   */
+  private void testPreemptiveFastFailException() throws InterruptedException,
+      ExecutionException {
+    LOG.debug("Setting up the counters to start the test");
+    priviRetryCounter.set(0);
+    nonPriviRetryCounter.set(0);
+    done.set(0);
+
+    for (int i = 0; i <= RETRIES; i++) {
+      latches[i] = new CountDownLatch(1);
+      latches2[i] = new CountDownLatch(1);
+    }
+
+    PreemptiveFastFailInterceptor interceptor = getInterceptor();
+
+    final RpcRetryingCaller<Void> priviCaller = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+    final RpcRetryingCaller<Void> nonPriviCaller = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+
+    LOG.debug("Submitting the thread 1");
+    Future<Boolean> priviFuture = executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          isPriviThreadLocal.get().set(true);
+          priviCaller
+              .callWithRetries(
+                  getRetryingCallable(serverName, exception),
+                  CLEANUP_TIMEOUT);
+        } catch (RetriesExhaustedException e) {
+          return true;
+        } catch (PreemptiveFastFailException e) {
+          return false;
+        }
+        return false;
+      }
+    });
+    LOG.debug("Submitting the thread 2");
+    Future<Boolean> nonPriviFuture = executor.submit(new Callable<Boolean>()
{
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          isPriviThreadLocal.get().set(false);
+          nonPriviCaller.callWithRetries(
+              getRetryingCallable(serverName, exception),
+              CLEANUP_TIMEOUT);
+        } catch (PreemptiveFastFailException e) {
+          return true;
+        }
+        return false;
+      }
+    });
+    LOG.debug("Waiting for Thread 2 to finish");
+    assertTrue(nonPriviFuture.get());
+    LOG.debug("Waiting for Thread 1 to finish");
+    assertTrue(priviFuture.get());
+
+    // Now that the server in fast fail mode. Lets try to make contact with the
+    // server with a third thread. And make sure that when there is no
+    // exception,
+    // the fast fail gets cleared up.
+    assertTrue(interceptor.isServerInFailureMap(serverName));
+    final RpcRetryingCaller<Void> priviCallerNew = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+    executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        priviCallerNew.callWithRetries(
+            getRetryingCallable(serverName, null), CLEANUP_TIMEOUT);
+        return false;
+      }
+    }).get();
+    assertFalse("The server was supposed to be removed from the map",
+        interceptor.isServerInFailureMap(serverName));
+  }
+
+  ExecutorService executor = Executors.newCachedThreadPool();
+  
+  /**
+   * Some timeouts to make the test execution resonable.
+   */
+  final int PAUSE_TIME = 10;
+  final int RETRIES = 3;
+  final int CLEANUP_TIMEOUT = 10000;
+  final long FAST_FAIL_THRESHOLD = PAUSE_TIME / 1;
+  
+  /**
+   * The latches necessary to make the thread interleaving possible.
+   */
+  final CountDownLatch[] latches = new CountDownLatch[RETRIES + 1];
+  final CountDownLatch[] latches2 = new CountDownLatch[RETRIES + 1];
+  final AtomicInteger done = new AtomicInteger(0);
+
+  /**
+   * Global retry counters that give us an idea about which iteration of the retry we are
in
+   */
+  final AtomicInteger priviRetryCounter = new AtomicInteger();
+  final AtomicInteger nonPriviRetryCounter = new AtomicInteger();
+  final ServerName serverName = getSomeServerName();
+
+  /**
+   * The variable which is used as an identifier within the 2 threads.
+   */
+  public final ThreadLocal<AtomicBoolean> isPriviThreadLocal = new ThreadLocal<AtomicBoolean>()
{
+    @Override
+    public AtomicBoolean initialValue() {
+      return new AtomicBoolean(true);
+    }
+  };
+  final Exception exception = new ConnectionClosingException("The current connection is closed");
+
+  public PreemptiveFastFailInterceptor getInterceptor() {
+    final Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+
+    return new PreemptiveFastFailInterceptor(
+        conf) {
+      @Override
+      public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+        boolean pffe = false;
+        if (!isPriviThreadLocal.get().get()) {
+          pffe = !((FastFailInterceptorContext)context).isRetryDespiteFastFailMode();
+        }
+        if (isPriviThreadLocal.get().get()) {
+          try {
+            // Thread 2 should be done by 2 iterations. We should short circuit Thread 1
because
+            // Thread 2 would be dead and can't do a countdown.
+            if (done.get() <= 1) {
+              latches2[priviRetryCounter.get()].await();
+            }
+          } catch (InterruptedException e) {
+            fail();
+          }
+        }
+        super.updateFailureInfo(context);
+        if (!isPriviThreadLocal.get().get()) {
+          if (pffe) done.incrementAndGet();
+          latches2[nonPriviRetryCounter.get()].countDown();
+        }
+      }
+
+      @Override
+      public void intercept(RetryingCallerInterceptorContext context)
+          throws PreemptiveFastFailException {
+        if (!isPriviThreadLocal.get().get()) {
+          try {
+            latches[nonPriviRetryCounter.getAndIncrement()].await();
+          } catch (InterruptedException e) {
+            fail();
+          }
+        }
+        super.intercept(context);
+      }
+
+      @Override
+      public void handleFailure(RetryingCallerInterceptorContext context,
+          Throwable t) throws IOException {
+        super.handleFailure(context, t);
+        if (isPriviThreadLocal.get().get()) {
+          latches[priviRetryCounter.getAndIncrement()].countDown();
+        }
+      }
+    };
+  }
+
+  public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
+      int retries, RetryingCallerInterceptor interceptor) {
+    return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor) {
+      @Override
+      public Void callWithRetries(RetryingCallable<Void> callable,
+          int callTimeout) throws IOException, RuntimeException {
+        Void ret = super.callWithRetries(callable, callTimeout);
+        return ret;
+      }
+    };
+  }
+
+  protected static ServerName getSomeServerName() {
+    return ServerName.valueOf("localhost", 1234, 987654321);
+  }
+
+  private RegionServerCallable<Void> getRetryingCallable(
+      final ServerName serverName, final Exception e) {
+    return new RegionServerCallable<Void>(null, null, null) {
+      @Override
+      public void prepare(boolean reload) throws IOException {
+        this.location = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+            serverName);
+      }
+
+      @Override
+      public Void call(int callTimeout) throws Exception {
+        if (e != null)
+          throw e;
+        return null;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, serverName);
+      }
+      
+      @Override
+      public void throwable(Throwable t, boolean retrying) {
+        // Do nothing
+      }
+      
+      @Override
+      public long sleep(long pause, int tries) {
+        return ConnectionUtils.getPauseTime(pause, tries + 1);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 7befad9..d4d17c9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1069,6 +1069,33 @@ public final class HConstants {
    */
   public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
 
+  /**
+   * HConstants for fast fail on the client side follow
+   */
+  /**
+   * Config for enabling/disabling the fast fail mode.
+   */
+  public static final String HBASE_CLIENT_FAST_FAIL_MODE_ENABLED =
+      "hbase.client.fast.fail.mode.enabled";
+
+  public static final boolean HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT =
+      false;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS =
+      "hbase.client.fastfail.threshold";
+  
+  public static final long HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT =
+      60000;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS =
+      "hbase.client.fast.fail.cleanup.duration";
+
+  public static final long HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT =
+      600000;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
+      "hbase.client.fast.fail.interceptor.impl"; 
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index aeb8646..f3bf9c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -131,6 +131,9 @@ public class HConnectionTestingUtility {
           RpcControllerFactory.instantiate(conf)));
     Mockito.doNothing().when(c).incCount();
     Mockito.doNothing().when(c).decCount();
+    Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
+        RpcRetryingCallerFactory.instantiate(conf,
+            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR));
     return c;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
new file mode 100644
index 0000000..709e94b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestFastFail {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final Random random = new Random();
+  private static int SLAVES = 3;
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static final int SLEEPTIME = 1000;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  @Test
+  public void testFastFail() throws IOException, InterruptedException {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+
+    final String tableName = "testClientRelearningExperiment";
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
+        .toBytes(tableName)));
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
+    final long numRows = 1000;
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
+    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
+    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
+        MyPreemptiveFastFailInterceptor.class,
+        PreemptiveFastFailInterceptor.class);
+
+    final Connection connection = ConnectionFactory.createConnection(conf);
+
+    /**
+     * Write numRows worth of data, so that the workers can arbitrarily read.
+     */
+    try (Table table = connection.getTable(TableName.valueOf(tableName));) {
+      writeData(table, numRows);
+    }
+
+    /**
+     * The number of threads that are going to perform actions against the test
+     * table.
+     */
+    int nThreads = 200;
+    ExecutorService service = Executors.newFixedThreadPool(nThreads);
+    final CountDownLatch continueOtherHalf = new CountDownLatch(1);
+    final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
+
+    final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
+    final AtomicInteger numFailedThreads = new AtomicInteger(0);
+
+    // The total time taken for the threads to perform the second put;
+    final AtomicLong totalTimeTaken = new AtomicLong(0);
+    final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
+    final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
+
+    List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+    for (int i = 0; i < nThreads; i++) {
+      futures.add(service.submit(new Callable<Boolean>() {
+        /**
+         * The workers are going to perform a couple of reads. The second read
+         * will follow the killing of a regionserver so that we make sure that
+         * some of threads go into PreemptiveFastFailExcception
+         */
+        public Boolean call() throws Exception {
+          try (Table table = connection.getTable(TableName.valueOf(tableName))) {
+            Thread.sleep(Math.abs(random.nextInt()) % 100); // Add some jitter
+                                                            // here
+            byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
+                % numRows);
+            Get g = new Get(row);
+            g.addColumn(FAMILY, QUALIFIER);
+            try {
+              table.get(g);
+            } catch (Exception e) {
+              LOG.debug("Get failed : ", e);
+              doneHalfway.countDown();
+              return false;
+            }
+
+            // Done with one get, proceeding to do the next one.
+            doneHalfway.countDown();
+            continueOtherHalf.await();
+
+            long startTime = System.currentTimeMillis();
+            g = new Get(row);
+            g.addColumn(FAMILY, QUALIFIER);
+            try {
+              table.get(g);
+              // The get was successful
+              numSuccessfullThreads.addAndGet(1);
+            } catch (Exception e) {
+              if (e instanceof PreemptiveFastFailException) {
+                // We were issued a PreemptiveFastFailException
+                numPreemptiveFastFailExceptions.addAndGet(1);
+              }
+              // Irrespective of PFFE, the request failed.
+              numFailedThreads.addAndGet(1);
+              return false;
+            } finally {
+              long enTime = System.currentTimeMillis();
+              totalTimeTaken.addAndGet(enTime - startTime);
+              if ((enTime - startTime) >= SLEEPTIME) {
+                // Considering the slow workers as the blockedWorkers.
+                // This assumes that the threads go full throttle at performing
+                // actions. In case the thread scheduling itself is as slow as
+                // SLEEPTIME, then this test might fail and so, we might have
+                // set it to a higher number on slower machines.
+                numBlockedWorkers.addAndGet(1);
+              }
+            }
+            return true;
+          } catch (Exception e) {
+            LOG.error("Caught unknown exception", e);
+            doneHalfway.countDown();
+            return false;
+          }
+        }
+      }));
+    }
+
+    doneHalfway.await();
+
+    ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus();
+
+    // Kill a regionserver
+    TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop();
+    TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing");
+
+    // Let the threads continue going
+    continueOtherHalf.countDown();
+
+    Thread.sleep(2 * SLEEPTIME);
+    // Restore the cluster
+    TEST_UTIL.getHBaseCluster().restoreClusterStatus(status);
+
+    int numThreadsReturnedFalse = 0;
+    int numThreadsReturnedTrue = 0;
+    int numThreadsThrewExceptions = 0;
+    for (Future<Boolean> f : futures) {
+      try {
+        numThreadsReturnedTrue += f.get() ? 1 : 0;
+        numThreadsReturnedFalse += f.get() ? 0 : 1;
+      } catch (Exception e) {
+        numThreadsThrewExceptions++;
+      }
+    }
+    LOG.debug("numThreadsReturnedFalse:"
+        + numThreadsReturnedFalse
+        + " numThreadsReturnedTrue:"
+        + numThreadsReturnedTrue
+        + " numThreadsThrewExceptions:"
+        + numThreadsThrewExceptions
+        + " numFailedThreads:"
+        + numFailedThreads.get()
+        + " numSuccessfullThreads:"
+        + numSuccessfullThreads.get()
+        + " numBlockedWorkers:"
+        + numBlockedWorkers.get()
+        + " totalTimeWaited: "
+        + totalTimeTaken.get()
+        / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
+            .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
+
+    assertEquals("The expected number of all the successfull and the failed "
+        + "threads should equal the total number of threads that we spawned",
+        nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
+    assertEquals(
+        "All the failures should be coming from the secondput failure",
+        numFailedThreads.get(), numThreadsReturnedFalse);
+    assertEquals("Number of threads that threw execution exceptions "
+        + "otherwise should be 0", numThreadsThrewExceptions, 0);
+    assertEquals("The regionservers that returned true should equal to the"
+        + " number of successful threads", numThreadsReturnedTrue,
+        numSuccessfullThreads.get());
+    assertTrue(
+        "There should be atleast one thread that retried instead of failing",
+        MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
+    assertTrue(
+        "There should be atleast one PreemptiveFastFail exception,"
+            + " otherwise, the test makes little sense."
+            + "numPreemptiveFastFailExceptions: "
+            + numPreemptiveFastFailExceptions.get(),
+        numPreemptiveFastFailExceptions.get() > 0);
+    assertTrue(
+        "Only few thread should ideally be waiting for the dead "
+            + "regionserver to be coming back. numBlockedWorkers:"
+            + numBlockedWorkers.get() + " threads that retried : "
+            + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
+        numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
+            .get());
+  }
+
+  public static class MyPreemptiveFastFailInterceptor extends
+      PreemptiveFastFailInterceptor {
+    public static AtomicInteger numBraveSouls = new AtomicInteger();
+
+    @Override
+    protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+      boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
+      if (ret)
+        numBraveSouls.addAndGet(1);
+      return ret;
+    }
+
+    public MyPreemptiveFastFailInterceptor(Configuration conf) {
+      super(conf);
+    }
+  }
+
+  private byte[] longToByteArrayKey(long rowKey) {
+    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+  }
+
+  public void writeData(Table table, long numRows) throws IOException,
+      InterruptedException {
+    table.flushCommits();
+    for (long i = 0; i < numRows; i++) {
+      byte[] rowKey = longToByteArrayKey(i);
+      Put put = new Put(rowKey);
+      byte[] value = rowKey; // value is the same as the row key
+      put.add(FAMILY, QUALIFIER, value);
+      table.put(put);
+    }
+    LOG.info("Written all puts.");
+  }
+}


Mime
View raw message