hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject hadoop git commit: HDFS-11551. Support Timeout when checking single disk. Contributed by Hanisha Koneru.
Date Tue, 04 Apr 2017 17:50:11 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 8c21b2a25 -> fbe86471c


HDFS-11551. Support Timeout when checking single disk. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fbe86471
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fbe86471
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fbe86471

Branch: refs/heads/branch-2
Commit: fbe86471cf6d6397a89e473eb328d86a66c513e2
Parents: 8c21b2a
Author: Hanisha Koneru <hanishakoneru@apache.org>
Authored: Tue Apr 4 10:49:44 2017 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Tue Apr 4 10:49:44 2017 -0700

----------------------------------------------------------------------
 LICENSE.txt                                     |   21 +
 .../dev-support/findbugsExcludeFile.xml         |   12 +
 .../server/datanode/checker/AbstractFuture.java | 1295 ++++++++++++++++++
 .../datanode/checker/DatasetVolumeChecker.java  |   15 +-
 .../checker/StorageLocationChecker.java         |    1 +
 .../datanode/checker/ThrottledAsyncChecker.java |   37 +-
 .../server/datanode/checker/TimeoutFuture.java  |  162 +++
 .../TestDatasetVolumeCheckerTimeout.java        |  132 ++
 .../checker/TestThrottledAsyncChecker.java      |   14 +-
 .../TestThrottledAsyncCheckerTimeout.java       |  221 +++
 10 files changed, 1901 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index 61ebbd6..d7659e4 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1799,3 +1799,24 @@ representations with respect to the Work not specified here. Licensor shall not
 be bound by any additional provisions that may appear in any communication from
 You. This License may not be modified without the mutual written agreement of
 the Licensor and You.
+
+--------------------------------------------------------------------------------
+
+For: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
+/server/datanode/checker/AbstractFuture.java and
+hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
+/server/datanode/checker/TimeoutFuture.java
+
+Copyright (C) 2007 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 3fa4e8d..53897ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -234,4 +234,16 @@
       <Method name="assertAllResultsEqual" />
       <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
     </Match>
+    <Match>
+        <Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
+        <Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
+    </Match>
+    <Match>
+        <Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
+        <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+    </Match>
+    <Match>
+        <Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
+        <Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT" />
+    </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java
new file mode 100644
index 0000000..2e0ba18
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java
@@ -0,0 +1,1295 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.GwtCompatible;
+import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+    .newUpdater;
+
+import javax.annotation.Nullable;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An abstract implementation of {@link ListenableFuture}, intended for
+ * advanced users only. More common ways to create a {@code ListenableFuture}
+ * include instantiating a {@link SettableFuture}, submitting a task to a
+ * {@link ListeningExecutorService}, and deriving a {@code Future} from an
+ * existing one, typically using methods like {@link Futures#transform
+ * (ListenableFuture, com.google.common.base.Function) Futures.transform}
+ * and {@link Futures#catching(ListenableFuture, Class,
+ * com.google.common.base.Function, java.util.concurrent.Executor)
+ * Futures.catching}.
+ * <p>
+ * <p>This class implements all methods in {@code ListenableFuture}.
+ * Subclasses should provide a way to set the result of the computation
+ * through the protected methods {@link #set(Object)},
+ * {@link #setFuture(ListenableFuture)} and {@link #setException(Throwable)}.
+ * Subclasses may also override {@link #interruptTask()}, which will be
+ * invoked automatically if a call to {@link #cancel(boolean) cancel(true)}
+ * succeeds in canceling the future. Subclasses should rarely override other
+ * methods.
+ */
+
+@GwtCompatible(emulated = true)
+public abstract class AbstractFuture<V> implements ListenableFuture<V> {
+  // NOTE: Whenever both tests are cheap and functional, it's faster to use &,
+  // | instead of &&, ||
+
+  private static final boolean GENERATE_CANCELLATION_CAUSES =
+      Boolean.parseBoolean(
+          System.getProperty("guava.concurrent.generate_cancellation_cause",
+              "false"));
+
+  /**
+   * A less abstract subclass of AbstractFuture. This can be used to optimize
+   * setFuture by ensuring that {@link #get} calls exactly the implementation
+   * of {@link AbstractFuture#get}.
+   */
+  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
+    @Override
+    public final V get() throws InterruptedException, ExecutionException {
+      return super.get();
+    }
+
+    @Override
+    public final V get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return super.get(timeout, unit);
+    }
+
+    @Override
+    public final boolean isDone() {
+      return super.isDone();
+    }
+
+    @Override
+    public final boolean isCancelled() {
+      return super.isCancelled();
+    }
+
+    @Override
+    public final void addListener(Runnable listener, Executor executor) {
+      super.addListener(listener, executor);
+    }
+
+    @Override
+    public final boolean cancel(boolean mayInterruptIfRunning) {
+      return super.cancel(mayInterruptIfRunning);
+    }
+  }
+
+  // Logger to log exceptions caught when running listeners.
+  private static final Logger log = Logger
+      .getLogger(AbstractFuture.class.getName());
+
+  // A heuristic for timed gets. If the remaining timeout is less than this,
+  // spin instead of
+  // blocking. This value is what AbstractQueuedSynchronizer uses.
+  private static final long SPIN_THRESHOLD_NANOS = 1000L;
+
+  private static final AtomicHelper ATOMIC_HELPER;
+
+  static {
+    AtomicHelper helper;
+
+    try {
+      helper = new UnsafeAtomicHelper();
+    } catch (Throwable unsafeFailure) {
+      // catch absolutely everything and fall through to our 'SafeAtomicHelper'
+      // The access control checks that ARFU does means the caller class has
+      // to be AbstractFuture
+      // instead of SafeAtomicHelper, so we annoyingly define these here
+      try {
+        helper =
+            new SafeAtomicHelper(
+                newUpdater(Waiter.class, Thread.class, "thread"),
+                newUpdater(Waiter.class, Waiter.class, "next"),
+                newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
+                newUpdater(AbstractFuture.class, Listener.class, "listeners"),
+                newUpdater(AbstractFuture.class, Object.class, "value"));
+      } catch (Throwable atomicReferenceFieldUpdaterFailure) {
+        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs
+        // that cause getDeclaredField to throw a NoSuchFieldException when
+        // the field is definitely there.
+        // For these users fallback to a suboptimal implementation, based on
+        // synchronized. This will be a definite performance hit to those users.
+        log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
+        log.log(
+            Level.SEVERE, "SafeAtomicHelper is broken!",
+            atomicReferenceFieldUpdaterFailure);
+        helper = new SynchronizedHelper();
+      }
+    }
+    ATOMIC_HELPER = helper;
+
+    // Prevent rare disastrous classloading in first call to LockSupport.park.
+    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
+    @SuppressWarnings("unused")
+    Class<?> ensureLoaded = LockSupport.class;
+  }
+
+  /**
+   * Waiter links form a Treiber stack, in the {@link #waiters} field.
+   */
+  private static final class Waiter {
+    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
+
+    @Nullable volatile Thread thread;
+    @Nullable volatile Waiter next;
+
+    /**
+     * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this
+     * class is loaded before the ATOMIC_HELPER. Apparently this is possible
+     * on some android platforms.
+     */
+    Waiter(boolean unused) {
+    }
+
+    Waiter() {
+      // avoid volatile write, write is made visible by subsequent CAS on
+      // waiters field
+      ATOMIC_HELPER.putThread(this, Thread.currentThread());
+    }
+
+    // non-volatile write to the next field. Should be made visible by
+    // subsequent CAS on waiters field.
+    void setNext(Waiter next) {
+      ATOMIC_HELPER.putNext(this, next);
+    }
+
+    void unpark() {
+      // This is racy with removeWaiter. The consequence of the race is that
+      // we may spuriously call unpark even though the thread has already
+      // removed itself from the list. But even if we did use a CAS, that
+      // race would still exist (it would just be ever so slightly smaller).
+      Thread w = thread;
+      if (w != null) {
+        thread = null;
+        LockSupport.unpark(w);
+      }
+    }
+  }
+
+  /**
+   * Marks the given node as 'deleted' (null waiter) and then scans the list
+   * to unlink all deleted nodes. This is an O(n) operation in the common
+   * case (and O(n^2) in the worst), but we are saved by two things.
+   * <ul>
+   * <li>This is only called when a waiting thread times out or is
+   * interrupted. Both of which should be rare.
+   * <li>The waiters list should be very short.
+   * </ul>
+   */
+  private void removeWaiter(Waiter node) {
+    node.thread = null; // mark as 'deleted'
+    restart:
+    while (true) {
+      Waiter pred = null;
+      Waiter curr = waiters;
+      if (curr == Waiter.TOMBSTONE) {
+        return; // give up if someone is calling complete
+      }
+      Waiter succ;
+      while (curr != null) {
+        succ = curr.next;
+        if (curr.thread != null) { // we aren't unlinking this node, update
+          // pred.
+          pred = curr;
+        } else if (pred != null) { // We are unlinking this node and it has a
+          // predecessor.
+          pred.next = succ;
+          if (pred.thread == null) { // We raced with another node that
+            // unlinked pred. Restart.
+            continue restart;
+          }
+        } else if (!ATOMIC_HELPER
+            .casWaiters(this, curr, succ)) { // We are unlinking head
+          continue restart; // We raced with an add or complete
+        }
+        curr = succ;
+      }
+      break;
+    }
+  }
+
+  /**
+   * Listeners also form a stack through the {@link #listeners} field.
+   */
+  private static final class Listener {
+    static final Listener TOMBSTONE = new Listener(null, null);
+    final Runnable task;
+    final Executor executor;
+
+    // writes to next are made visible by subsequent CAS's on the listeners
+    // field
+    @Nullable Listener next;
+
+    Listener(Runnable task, Executor executor) {
+      this.task = task;
+      this.executor = executor;
+    }
+  }
+
+  /**
+   * A special value to represent {@code null}.
+   */
+  private static final Object NULL = new Object();
+
+  /**
+   * A special value to represent failure, when {@link #setException} is
+   * called successfully.
+   */
+  private static final class Failure {
+    static final Failure FALLBACK_INSTANCE =
+        new Failure(
+            new Throwable("Failure occurred while trying to finish a future" +
+                ".") {
+              @Override
+              public synchronized Throwable fillInStackTrace() {
+                return this; // no stack trace
+              }
+            });
+    final Throwable exception;
+
+    Failure(Throwable exception) {
+      this.exception = checkNotNull(exception);
+    }
+  }
+
+  /**
+   * A special value to represent cancellation and the 'wasInterrupted' bit.
+   */
+  private static final class Cancellation {
+    final boolean wasInterrupted;
+    @Nullable final Throwable cause;
+
+    Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
+      this.wasInterrupted = wasInterrupted;
+      this.cause = cause;
+    }
+  }
+
+  /**
+   * A special value that encodes the 'setFuture' state.
+   */
+  private static final class SetFuture<V> implements Runnable {
+    final AbstractFuture<V> owner;
+    final ListenableFuture<? extends V> future;
+
+    SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
+      this.owner = owner;
+      this.future = future;
+    }
+
+    @Override
+    public void run() {
+      if (owner.value != this) {
+        // nothing to do, we must have been cancelled, don't bother inspecting
+        // the future.
+        return;
+      }
+      Object valueToSet = getFutureValue(future);
+      if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
+        complete(owner);
+      }
+    }
+  }
+
+  /**
+   * This field encodes the current state of the future.
+   * <p>
+   * <p>The valid values are:
+   * <ul>
+   * <li>{@code null} initial state, nothing has happened.
+   * <li>{@link Cancellation} terminal state, {@code cancel} was called.
+   * <li>{@link Failure} terminal state, {@code setException} was called.
+   * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
+   * <li>{@link #NULL} terminal state, {@code set(null)} was called.
+   * <li>Any other non-null value, terminal state, {@code set} was called with
+   * a non-null argument.
+   * </ul>
+   */
+  private volatile Object value;
+
+  /**
+   * All listeners.
+   */
+  private volatile Listener listeners;
+
+  /**
+   * All waiting threads.
+   */
+  private volatile Waiter waiters;
+
+  /**
+   * Constructor for use by subclasses.
+   */
+  protected AbstractFuture() {
+  }
+
+  // Gets and Timed Gets
+  //
+  // * Be responsive to interruption
+  // * Don't create Waiter nodes if you aren't going to park, this helps
+  // reduce contention on the waiters field.
+  // * Future completion is defined by when #value becomes non-null/non
+  // SetFuture
+  // * Future completion can be observed if the waiters field contains a
+  // TOMBSTONE
+
+  // Timed Get
+  // There are a few design constraints to consider
+  // * We want to be responsive to small timeouts, unpark() has non trivial
+  // latency overheads (I have observed 12 micros on 64 bit linux systems to
+  // wake up a parked thread). So if the timeout is small we shouldn't park().
+  // This needs to be traded off with the cpu overhead of spinning, so we use
+  // SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
+  // similar purposes.
+  // * We want to behave reasonably for timeouts of 0
+  // * We are more responsive to completion than timeouts. This is because
+  // parkNanos depends on system scheduling and as such we could either miss
+  // our deadline, or unpark() could be delayed so that it looks like we
+  // timed out even though we didn't. For comparison FutureTask respects
+  // completion preferably and AQS is non-deterministic (depends on where in
+  // the queue the waiter is). If we wanted to be strict about it, we could
+  // store the unpark() time in the Waiter node and we could use that to make
+  // a decision about whether or not we timed out prior to being unparked.
+
+  /*
+   * Improve the documentation of when InterruptedException is thrown. Our
+   * behavior matches the JDK's, but the JDK's documentation is misleading.
+   */
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>The default {@link AbstractFuture} implementation throws {@code
+   * InterruptedException} if the current thread is interrupted before or
+   * during the call, even if the value is already available.
+   *
+   * @throws InterruptedException  if the current thread was interrupted
+   * before or during the call
+   *                               (optional but recommended).
+   * @throws CancellationException {@inheritDoc}
+   */
+  @Override
+  public V get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException, ExecutionException {
+    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into
+    // the while(true) loop at the bottom and throw a timeoutexception.
+    long remainingNanos = unit
+        .toNanos(timeout); // we rely on the implicit null check on unit.
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+    Object localValue = value;
+    if (localValue != null & !(localValue instanceof SetFuture)) {
+      return getDoneValue(localValue);
+    }
+    // we delay calling nanoTime until we know we will need to either park or
+    // spin
+    final long endNanos = remainingNanos > 0 ? System
+        .nanoTime() + remainingNanos : 0;
+    long_wait_loop:
+    if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
+      Waiter oldHead = waiters;
+      if (oldHead != Waiter.TOMBSTONE) {
+        Waiter node = new Waiter();
+        do {
+          node.setNext(oldHead);
+          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+            while (true) {
+              LockSupport.parkNanos(this, remainingNanos);
+              // Check interruption first, if we woke up due to interruption
+              // we need to honor that.
+              if (Thread.interrupted()) {
+                removeWaiter(node);
+                throw new InterruptedException();
+              }
+
+              // Otherwise re-read and check doneness. If we loop then it must
+              // have been a spurious wakeup
+              localValue = value;
+              if (localValue != null & !(localValue instanceof SetFuture)) {
+                return getDoneValue(localValue);
+              }
+
+              // timed out?
+              remainingNanos = endNanos - System.nanoTime();
+              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
+                // Remove the waiter, one way or another we are done parking
+                // this thread.
+                removeWaiter(node);
+                break long_wait_loop; // jump down to the busy wait loop
+              }
+            }
+          }
+          oldHead = waiters; // re-read and loop.
+        } while (oldHead != Waiter.TOMBSTONE);
+      }
+      // re-read value, if we get here then we must have observed a TOMBSTONE
+      // while trying to add a waiter.
+      return getDoneValue(value);
+    }
+    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and
+    // there is no node on the waiters list
+    while (remainingNanos > 0) {
+      localValue = value;
+      if (localValue != null & !(localValue instanceof SetFuture)) {
+        return getDoneValue(localValue);
+      }
+      if (Thread.interrupted()) {
+        throw new InterruptedException();
+      }
+      remainingNanos = endNanos - System.nanoTime();
+    }
+    throw new TimeoutException();
+  }
+
+  /*
+   * Improve the documentation of when InterruptedException is thrown. Our
+   * behavior matches the JDK's, but the JDK's documentation is misleading.
+   */
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>The default {@link AbstractFuture} implementation throws {@code
+   * InterruptedException} if the current thread is interrupted before or
+   * during the call, even if the value is already available.
+   *
+   * @throws InterruptedException  if the current thread was interrupted
+   * before or during the call
+   *                               (optional but recommended).
+   * @throws CancellationException {@inheritDoc}
+   */
+  @Override
+  public V get() throws InterruptedException, ExecutionException {
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+    Object localValue = value;
+    if (localValue != null & !(localValue instanceof SetFuture)) {
+      return getDoneValue(localValue);
+    }
+    Waiter oldHead = waiters;
+    if (oldHead != Waiter.TOMBSTONE) {
+      Waiter node = new Waiter();
+      do {
+        node.setNext(oldHead);
+        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+          // we are on the stack, now wait for completion.
+          while (true) {
+            LockSupport.park(this);
+            // Check interruption first, if we woke up due to interruption we
+            // need to honor that.
+            if (Thread.interrupted()) {
+              removeWaiter(node);
+              throw new InterruptedException();
+            }
+            // Otherwise re-read and check doneness. If we loop then it must
+            // have been a spurious wakeup
+            localValue = value;
+            if (localValue != null & !(localValue instanceof SetFuture)) {
+              return getDoneValue(localValue);
+            }
+          }
+        }
+        oldHead = waiters; // re-read and loop.
+      } while (oldHead != Waiter.TOMBSTONE);
+    }
+    // re-read value, if we get here then we must have observed a TOMBSTONE
+    // while trying to add a waiter.
+    return getDoneValue(value);
+  }
+
+  /**
+   * Unboxes {@code obj}. Assumes that obj is not {@code null} or a
+   * {@link SetFuture}.
+   */
+  private V getDoneValue(Object obj) throws ExecutionException {
+    // While this seems like it might be too branch-y, simple benchmarking
+    // proves it to be unmeasurable (comparing done AbstractFutures with
+    // immediateFuture)
+    if (obj instanceof Cancellation) {
+      throw cancellationExceptionWithCause(
+          "Task was cancelled.", ((Cancellation) obj).cause);
+    } else if (obj instanceof Failure) {
+      throw new ExecutionException(((Failure) obj).exception);
+    } else if (obj == NULL) {
+      return null;
+    } else {
+      @SuppressWarnings("unchecked") // this is the only other option
+          V asV = (V) obj;
+      return asV;
+    }
+  }
+
+  @Override
+  public boolean isDone() {
+    final Object localValue = value;
+    return localValue != null & !(localValue instanceof SetFuture);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    final Object localValue = value;
+    return localValue instanceof Cancellation;
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>If a cancellation attempt succeeds on a {@code Future} that had
+   * previously been {@linkplain#setFuture set asynchronously}, then the
+   * cancellation will also be propagated to the delegate {@code Future} that
+   * was supplied in the {@code setFuture} call.
+   */
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    Object localValue = value;
+    boolean rValue = false;
+    if (localValue == null | localValue instanceof SetFuture) {
+      // Try to delay allocating the exception. At this point we may still
+      // lose the CAS, but it is certainly less likely.
+      Throwable cause =
+          GENERATE_CANCELLATION_CAUSES
+              ? new CancellationException("Future.cancel() was called.")
+              : null;
+      Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
+      AbstractFuture<?> abstractFuture = this;
+      while (true) {
+        if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
+          rValue = true;
+          // We call interuptTask before calling complete(), which is
+          // consistent with FutureTask
+          if (mayInterruptIfRunning) {
+            abstractFuture.interruptTask();
+          }
+          complete(abstractFuture);
+          if (localValue instanceof SetFuture) {
+            // propagate cancellation to the future set in setfuture, this is
+            // racy, and we don't care if we are successful or not.
+            ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue)
+                .future;
+            if (futureToPropagateTo instanceof TrustedFuture) {
+              // If the future is a TrustedFuture then we specifically avoid
+              // calling cancel() this has 2 benefits
+              // 1. for long chains of futures strung together with setFuture
+              // we consume less stack
+              // 2. we avoid allocating Cancellation objects at every level of
+              // the cancellation chain
+              // We can only do this for TrustedFuture, because
+              // TrustedFuture.cancel is final and does nothing but delegate
+              // to this method.
+              AbstractFuture<?> trusted = (AbstractFuture<?>)
+                  futureToPropagateTo;
+              localValue = trusted.value;
+              if (localValue == null | localValue instanceof SetFuture) {
+                abstractFuture = trusted;
+                continue;  // loop back up and try to complete the new future
+              }
+            } else {
+              // not a TrustedFuture, call cancel directly.
+              futureToPropagateTo.cancel(mayInterruptIfRunning);
+            }
+          }
+          break;
+        }
+        // obj changed, reread
+        localValue = abstractFuture.value;
+        if (!(localValue instanceof SetFuture)) {
+          // obj cannot be null at this point, because value can only change
+          // from null to non-null. So if value changed (and it did since we
+          // lost the CAS), then it cannot be null and since it isn't a
+          // SetFuture, then the future must be done and we should exit the loop
+          break;
+        }
+      }
+    }
+    return rValue;
+  }
+
+  /**
+   * Subclasses can override this method to implement interruption of the
+   * future's computation. The method is invoked automatically by a
+   * successful call to {@link #cancel(boolean) cancel(true)}.
+   * <p>
+   * <p>The default implementation does nothing.
+   *
+   * @since 10.0
+   */
+  protected void interruptTask() {
+  }
+
+  /**
+   * Returns true if this future was cancelled with {@code
+   * mayInterruptIfRunning} set to {@code true}.
+   *
+   * @since 14.0
+   */
+  protected final boolean wasInterrupted() {
+    final Object localValue = value;
+    return (localValue instanceof Cancellation) && ((Cancellation) localValue)
+        .wasInterrupted;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @since 10.0
+   */
+  @Override
+  public void addListener(Runnable listener, Executor executor) {
+    checkNotNull(listener, "Runnable was null.");
+    checkNotNull(executor, "Executor was null.");
+    Listener oldHead = listeners;
+    if (oldHead != Listener.TOMBSTONE) {
+      Listener newNode = new Listener(listener, executor);
+      do {
+        newNode.next = oldHead;
+        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
+          return;
+        }
+        oldHead = listeners; // re-read
+      } while (oldHead != Listener.TOMBSTONE);
+    }
+    // If we get here then the Listener TOMBSTONE was set, which means the
+    // future is done, call the listener.
+    executeListener(listener, executor);
+  }
+
+  /**
+   * Sets the result of this {@code Future} unless this {@code Future} has
+   * already been cancelled or set (including
+   * {@linkplain #setFuture set asynchronously}). When a call to this method
+   * returns, the {@code Future} is guaranteed to be
+   * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+   * case it returns {@code true}). If it returns {@code false}, the {@code
+   * Future} may have previously been set asynchronously, in which case its
+   * result may not be known yet. That result, though not yet known, cannot
+   * be overridden by a call to a {@code set*} method, only by a call to
+   * {@link #cancel}.
+   *
+   * @param value the value to be used as the result
+   * @return true if the attempt was accepted, completing the {@code Future}
+   */
+  protected boolean set(@Nullable V value) {
+    Object valueToSet = value == null ? NULL : value;
+    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+      complete(this);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Sets the failed result of this {@code Future} unless this {@code Future}
+   * has already been cancelled or set (including
+   * {@linkplain #setFuture set asynchronously}). When a call to this method
+   * returns, the {@code Future} is guaranteed to be
+   * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+   * case it returns {@code true}). If it returns {@code false}, the
+   * {@code Future} may have previously been set asynchronously, in which case
+   * its result may not be known yet. That result, though not yet known,
+   * cannot be overridden by a call to a {@code set*} method, only by a call
+   * to {@link #cancel}.
+   *
+   * @param throwable the exception to be used as the failed result
+   * @return true if the attempt was accepted, completing the {@code Future}
+   */
+  protected boolean setException(Throwable throwable) {
+    Object valueToSet = new Failure(checkNotNull(throwable));
+    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+      complete(this);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Sets the result of this {@code Future} to match the supplied input
+   * {@code Future} once the supplied {@code Future} is done, unless this
+   * {@code Future} has already been cancelled or set (including "set
+   * asynchronously," defined below).
+   * <p>
+   * <p>If the supplied future is {@linkplain #isDone done} when this method
+   * is called and the call is accepted, then this future is guaranteed to
+   * have been completed with the supplied future by the time this method
+   * returns. If the supplied future is not done and the call is accepted, then
+   * the future will be <i>set asynchronously</i>. Note that such a result,
+   * though not yet known, cannot be overridden by a call to a {@code set*}
+   * method, only by a call to {@link #cancel}.
+   * <p>
+   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code
+   * Future} is later cancelled, cancellation will be propagated to {@code
+   * delegate}. Additionally, any call to {@code setFuture} after any
+   * cancellation will propagate cancellation to the supplied {@code Future}.
+   *
+   * @param future the future to delegate to
+   * @return true if the attempt was accepted, indicating that the {@code
+   * Future} was not previously cancelled or set.
+   * @since 19.0
+   */
+  @Beta
+  protected boolean setFuture(ListenableFuture<? extends V> future) {
+    checkNotNull(future);
+    Object localValue = value;
+    if (localValue == null) {
+      if (future.isDone()) {
+        Object value = getFutureValue(future);
+        if (ATOMIC_HELPER.casValue(this, null, value)) {
+          complete(this);
+          return true;
+        }
+        return false;
+      }
+      SetFuture valueToSet = new SetFuture<V>(this, future);
+      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+        // the listener is responsible for calling completeWithFuture,
+        // directExecutor is appropriate since all we are doing is unpacking
+        // a completed future which should be fast.
+        try {
+          future.addListener(valueToSet, directExecutor());
+        } catch (Throwable t) {
+          // addListener has thrown an exception! SetFuture.run can't throw
+          // any exceptions so this must have been caused by addListener
+          // itself. The most likely explanation is a misconfigured mock. Try
+          // to switch to Failure.
+          Failure failure;
+          try {
+            failure = new Failure(t);
+          } catch (Throwable oomMostLikely) {
+            failure = Failure.FALLBACK_INSTANCE;
+          }
+          // Note: The only way this CAS could fail is if cancel() has raced
+          // with us. That is ok.
+          boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
+        }
+        return true;
+      }
+      localValue = value; // we lost the cas, fall through and maybe cancel
+    }
+    // The future has already been set to something. If it is cancellation we
+    // should cancel the incoming future.
+    if (localValue instanceof Cancellation) {
+      // we don't care if it fails, this is best-effort.
+      future.cancel(((Cancellation) localValue).wasInterrupted);
+    }
+    return false;
+  }
+
+  /**
+   * Returns a value, suitable for storing in the {@link #value} field. From
+   * the given future, which is assumed to be done.
+   * <p>
+   * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
+   */
+  private static Object getFutureValue(ListenableFuture<?> future) {
+    Object valueToSet;
+    if (future instanceof TrustedFuture) {
+      // Break encapsulation for TrustedFuture instances since we know that
+      // subclasses cannot override .get() (since it is final) and therefore
+      // this is equivalent to calling .get() and unpacking the exceptions
+      // like we do below (just much faster because it is a single field read
+      // instead of a read, several branches and possibly creating exceptions).
+      return ((AbstractFuture<?>) future).value;
+    } else {
+      // Otherwise calculate valueToSet by calling .get()
+      try {
+        Object v = getDone(future);
+        valueToSet = v == null ? NULL : v;
+      } catch (ExecutionException exception) {
+        valueToSet = new Failure(exception.getCause());
+      } catch (CancellationException cancellation) {
+        valueToSet = new Cancellation(false, cancellation);
+      } catch (Throwable t) {
+        valueToSet = new Failure(t);
+      }
+    }
+    return valueToSet;
+  }
+
+  /**
+   * Unblocks all threads and runs all listeners.
+   */
+  private static void complete(AbstractFuture<?> future) {
+    Listener next = null;
+    outer:
+    while (true) {
+      future.releaseWaiters();
+      // We call this before the listeners in order to avoid needing to manage
+      // a separate stack data structure for them. afterDone() should be
+      // generally fast and only used for cleanup work... but in theory can
+      // also be recursive and create StackOverflowErrors
+      future.afterDone();
+      // push the current set of listeners onto next
+      next = future.clearListeners(next);
+      future = null;
+      while (next != null) {
+        Listener curr = next;
+        next = next.next;
+        Runnable task = curr.task;
+        if (task instanceof SetFuture) {
+          SetFuture<?> setFuture = (SetFuture<?>) task;
+          // We unwind setFuture specifically to avoid StackOverflowErrors in
+          // the case of long chains of SetFutures
+          // Handling this special case is important because there is no way
+          // to pass an executor to setFuture, so a user couldn't break the
+          // chain by doing this themselves.  It is also potentially common
+          // if someone writes a recursive Futures.transformAsync transformer.
+          future = setFuture.owner;
+          if (future.value == setFuture) {
+            Object valueToSet = getFutureValue(setFuture.future);
+            if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
+              continue outer;
+            }
+          }
+          // other wise the future we were trying to set is already done.
+        } else {
+          executeListener(task, curr.executor);
+        }
+      }
+      break;
+    }
+  }
+
+  public static <V> V getDone(Future<V> future) throws ExecutionException {
+    /*
+     * We throw IllegalStateException, since the call could succeed later.
+     * Perhaps we "should" throw IllegalArgumentException, since the call
+     * could succeed with a different argument. Those exceptions' docs
+     * suggest that either is acceptable. Google's Java Practices page
+     * recommends IllegalArgumentException here, in part to keep its
+     * recommendation simple: Static methods should throw
+     * IllegalStateException only when they use static state.
+     *
+     *
+     * Why do we deviate here? The answer: We want for fluentFuture.getDone()
+      * to throw the same exception as Futures.getDone(fluentFuture).
+     */
+    Preconditions.checkState(future.isDone(), "Future was expected to be " +
+        "done:" +
+        " %s", future);
+    return Uninterruptibles.getUninterruptibly(future);
+  }
+
+  /**
+   * Callback method that is called exactly once after the future is completed.
+   * <p>
+   * <p>If {@link #interruptTask} is also run during completion,
+   * {@link #afterDone} runs after it.
+   * <p>
+   * <p>The default implementation of this method in {@code AbstractFuture}
+   * does nothing.  This is intended for very lightweight cleanup work, for
+   * example, timing statistics or clearing fields.
+   * If your task does anything heavier consider, just using a listener with
+   * an executor.
+   *
+   * @since 20.0
+   */
+  @Beta
+  protected void afterDone() {
+  }
+
+  /**
+   * If this future has been cancelled (and possibly interrupted), cancels
+   * (and possibly interrupts) the given future (if available).
+   * <p>
+   * <p>This method should be used only when this future is completed. It is
+   * designed to be called from {@code done}.
+   */
+  final void maybePropagateCancellation(@Nullable Future<?> related) {
+    if (related != null & isCancelled()) {
+      related.cancel(wasInterrupted());
+    }
+  }
+
+  /**
+   * Releases all threads in the {@link #waiters} list, and clears the list.
+   */
+  private void releaseWaiters() {
+    Waiter head;
+    do {
+      head = waiters;
+    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
+    for (
+        Waiter currentWaiter = head;
+        currentWaiter != null;
+        currentWaiter = currentWaiter.next) {
+      currentWaiter.unpark();
+    }
+  }
+
+  /**
+   * Clears the {@link #listeners} list and prepends its contents to {@code
+   * onto}, least recently added first.
+   */
+  private Listener clearListeners(Listener onto) {
+    // We need to
+    // 1. atomically swap the listeners with TOMBSTONE, this is because
+    // addListener uses that to to synchronize with us
+    // 2. reverse the linked list, because despite our rather clear contract,
+    // people depend on us executing listeners in the order they were added
+    // 3. push all the items onto 'onto' and return the new head of the stack
+    Listener head;
+    do {
+      head = listeners;
+    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
+    Listener reversedList = onto;
+    while (head != null) {
+      Listener tmp = head;
+      head = head.next;
+      tmp.next = reversedList;
+      reversedList = tmp;
+    }
+    return reversedList;
+  }
+
+  /**
+   * Submits the given runnable to the given {@link Executor} catching and
+   * logging all {@linkplain RuntimeException runtime exceptions} thrown by
+   * the executor.
+   */
+  private static void executeListener(Runnable runnable, Executor executor) {
+    try {
+      executor.execute(runnable);
+    } catch (RuntimeException e) {
+      // Log it and keep going -- bad runnable and/or executor. Don't punish
+      // the other runnables if we're given a bad one. We only catch
+      // RuntimeException because we want Errors to propagate up.
+      log.log(
+          Level.SEVERE,
+          "RuntimeException while executing runnable " + runnable + " with " +
+              "executor " + executor,
+          e);
+    }
+  }
+
+  private abstract static class AtomicHelper {
+    /**
+     * Non volatile write of the thread to the {@link Waiter#thread} field.
+     */
+    abstract void putThread(Waiter waiter, Thread newValue);
+
+    /**
+     * Non volatile write of the waiter to the {@link Waiter#next} field.
+     */
+    abstract void putNext(Waiter waiter, Waiter newValue);
+
+    /**
+     * Performs a CAS operation on the {@link #waiters} field.
+     */
+    abstract boolean casWaiters(
+        AbstractFuture<?> future, Waiter expect,
+        Waiter update);
+
+    /**
+     * Performs a CAS operation on the {@link #listeners} field.
+     */
+    abstract boolean casListeners(
+        AbstractFuture<?> future, Listener expect,
+        Listener update);
+
+    /**
+     * Performs a CAS operation on the {@link #value} field.
+     */
+    abstract boolean casValue(
+        AbstractFuture<?> future, Object expect, Object update);
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
+   * <p>
+   * <p>Static initialization of this class will fail if the
+   * {@link sun.misc.Unsafe} object cannot be accessed.
+   */
+  private static final class UnsafeAtomicHelper extends AtomicHelper {
+    static final sun.misc.Unsafe UNSAFE;
+    static final long LISTENERS_OFFSET;
+    static final long WAITERS_OFFSET;
+    static final long VALUE_OFFSET;
+    static final long WAITER_THREAD_OFFSET;
+    static final long WAITER_NEXT_OFFSET;
+
+    static {
+      sun.misc.Unsafe unsafe = null;
+      try {
+        unsafe = sun.misc.Unsafe.getUnsafe();
+      } catch (SecurityException tryReflectionInstead) {
+        try {
+          unsafe =
+              AccessController.doPrivileged(
+                  new PrivilegedExceptionAction<sun.misc.Unsafe>() {
+                    @Override
+                    public sun.misc.Unsafe run() throws Exception {
+                      Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
+                      for (java.lang.reflect.Field f : k.getDeclaredFields()) {
+                        f.setAccessible(true);
+                        Object x = f.get(null);
+                        if (k.isInstance(x)) {
+                          return k.cast(x);
+                        }
+                      }
+                      throw new NoSuchFieldError("the Unsafe");
+                    }
+                  });
+        } catch (PrivilegedActionException e) {
+          throw new RuntimeException(
+              "Could not initialize intrinsics", e.getCause());
+        }
+      }
+      try {
+        Class<?> abstractFuture = AbstractFuture.class;
+        WAITERS_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
+        LISTENERS_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
+        VALUE_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("value"));
+        WAITER_THREAD_OFFSET = unsafe
+            .objectFieldOffset(Waiter.class.getDeclaredField("thread"));
+        WAITER_NEXT_OFFSET = unsafe
+            .objectFieldOffset(Waiter.class.getDeclaredField("next"));
+        UNSAFE = unsafe;
+      } catch (Exception e) {
+        throwIfUnchecked(e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    public static void throwIfUnchecked(Throwable throwable) {
+      checkNotNull(throwable);
+      if (throwable instanceof RuntimeException) {
+        throw (RuntimeException) throwable;
+      }
+      if (throwable instanceof Error) {
+        throw (Error) throwable;
+      }
+    }
+
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #waiters} field.
+     */
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      return UNSAFE
+          .compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #listeners} field.
+     */
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      return UNSAFE
+          .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #value} field.
+     */
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
+    }
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}.
+   */
+  private static final class SafeAtomicHelper extends AtomicHelper {
+    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
+    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Listener>
+        listenersUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
+
+    SafeAtomicHelper(
+        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
+        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
+      this.waiterThreadUpdater = waiterThreadUpdater;
+      this.waiterNextUpdater = waiterNextUpdater;
+      this.waitersUpdater = waitersUpdater;
+      this.listenersUpdater = listenersUpdater;
+      this.valueUpdater = valueUpdater;
+    }
+
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      waiterThreadUpdater.lazySet(waiter, newValue);
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      waiterNextUpdater.lazySet(waiter, newValue);
+    }
+
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      return waitersUpdater.compareAndSet(future, expect, update);
+    }
+
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      return listenersUpdater.compareAndSet(future, expect, update);
+    }
+
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      return valueUpdater.compareAndSet(future, expect, update);
+    }
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
+   * <p>
+   * <p>This is an implementation of last resort for when certain basic VM
+   * features are broken (like AtomicReferenceFieldUpdater).
+   */
+  private static final class SynchronizedHelper extends AtomicHelper {
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      waiter.thread = newValue;
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      waiter.next = newValue;
+    }
+
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      synchronized (future) {
+        if (future.waiters == expect) {
+          future.waiters = update;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      synchronized (future) {
+        if (future.listeners == expect) {
+          future.listeners = update;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      synchronized (future) {
+        if (future.value == expect) {
+          future.value = update;
+          return true;
+        }
+        return false;
+      }
+    }
+  }
+
+  private static CancellationException cancellationExceptionWithCause(
+      @Nullable String message, @Nullable Throwable cause) {
+    CancellationException exception = new CancellationException(message);
+    exception.initCause(cause);
+    return exception;
+  }
+
+  /**
+   * Returns an {@link Executor} that runs each task in the thread that invokes
+   * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
+   * <p>
+   * <p>This instance is equivalent to: <pre>   {@code
+   *   final class DirectExecutor implements Executor {
+   *     public void execute(Runnable r) {
+   *       r.run();
+   *     }
+   *   }}</pre>
+   * <p>
+   * <p>This should be preferred to {@link #newDirectExecutorService()}
+   * because implementing the {@link ExecutorService} subinterface
+   * necessitates significant performance overhead.
+   *
+   * @since 18.0
+   */
+  public static Executor directExecutor() {
+    return DirectExecutor.INSTANCE;
+  }
+
+  /**
+   * See {@link #directExecutor} for behavioral notes.
+   */
+  private enum DirectExecutor implements Executor {
+    INSTANCE;
+
+    @Override
+    public void execute(Runnable command) {
+      command.run();
+    }
+
+    @Override
+    public String toString() {
+      return "MoreExecutors.directExecutor()";
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index 0d96e72..d5f2035 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -91,6 +91,7 @@ public class DatasetVolumeChecker {
    * Minimum time between two successive disk checks of a volume.
    */
   private final long minDiskCheckGapMs;
+  private final long diskCheckTimeout;
 
   /**
    * Timestamp of the last check of all volumes.
@@ -136,6 +137,17 @@ public class DatasetVolumeChecker {
           + minDiskCheckGapMs + " (should be >= 0)");
     }
 
+    diskCheckTimeout = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (diskCheckTimeout < 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+          + diskCheckTimeout + " (should be >= 0)");
+    }
+
     lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
 
     if (maxVolumeFailuresTolerated < 0) {
@@ -145,7 +157,8 @@ public class DatasetVolumeChecker {
     }
 
     delegateChecker = new ThrottledAsyncChecker<>(
-        timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
+        timer, minDiskCheckGapMs, diskCheckTimeout,
+        Executors.newCachedThreadPool(
             new ThreadFactoryBuilder()
                 .setNameFormat("DataNode DiskChecker thread %d")
                 .setDaemon(true)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
index a0bffcd..2d1eebe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -119,6 +119,7 @@ public class StorageLocationChecker {
             DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
             DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
             TimeUnit.MILLISECONDS),
+        0,
         Executors.newCachedThreadPool(
             new ThreadFactoryBuilder()
                 .setNameFormat("StorageLocationChecker thread %d")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
index ad56c61..d3e439b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -38,6 +38,8 @@ import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,12 +66,14 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
    * The ExecutorService used to schedule asynchronous checks.
    */
   private final ListeningExecutorService executorService;
+  private final ScheduledExecutorService scheduledExecutorService;
 
   /**
    * The minimum gap in milliseconds between two successive checks
    * of the same object. This is the throttle.
    */
   private final long minMsBetweenChecks;
+  private final long diskCheckTimeout;
 
   /**
    * Map of checks that are currently in progress. Protected by the object
@@ -86,12 +90,23 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
 
   ThrottledAsyncChecker(final Timer timer,
                         final long minMsBetweenChecks,
+                        final long diskCheckTimeout,
                         final ExecutorService executorService) {
     this.timer = timer;
     this.minMsBetweenChecks = minMsBetweenChecks;
+    this.diskCheckTimeout = diskCheckTimeout;
     this.executorService = MoreExecutors.listeningDecorator(executorService);
     this.checksInProgress = new HashMap<>();
     this.completedChecks = new WeakHashMap<>();
+
+    if (this.diskCheckTimeout > 0) {
+      ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
+          ScheduledThreadPoolExecutor(1);
+      this.scheduledExecutorService = MoreExecutors
+          .getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
+    } else {
+      this.scheduledExecutorService = null;
+    }
   }
 
   /**
@@ -120,13 +135,23 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
       }
     }
 
-    final ListenableFuture<V> lf = executorService.submit(
+    final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
         new Callable<V>() {
           @Override
           public V call() throws Exception {
             return target.check(context);
           }
         });
+    final ListenableFuture<V> lf;
+
+    if (diskCheckTimeout > 0) {
+      lf = TimeoutFuture
+          .create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
+              scheduledExecutorService);
+    } else {
+      lf = lfWithoutTimeout;
+    }
+
     checksInProgress.put(target, lf);
     addResultCachingCallback(target, lf);
     return Optional.of(lf);
@@ -174,6 +199,16 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
       executorService.shutdownNow();
       executorService.awaitTermination(timeout, timeUnit);
     }
+    if (scheduledExecutorService != null) {
+      // Try orderly shutdown
+      scheduledExecutorService.shutdown();
+
+      if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) {
+        // Interrupt executing tasks and wait again.
+        scheduledExecutorService.shutdownNow();
+        scheduledExecutorService.awaitTermination(timeout, timeUnit);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/TimeoutFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/TimeoutFuture.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/TimeoutFuture.java
new file mode 100644
index 0000000..ae7b34f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/TimeoutFuture.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of {@code Futures#withTimeout}.
+ * <p>
+ * <p>Future that delegates to another but will finish early (via a
+ * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
+ * specified duration expires. The delegate future is interrupted and
+ * cancelled if it times out.
+ */
+final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TimeoutFuture.class);
+
+  static <V> ListenableFuture<V> create(
+      ListenableFuture<V> delegate,
+      long time,
+      TimeUnit unit,
+      ScheduledExecutorService scheduledExecutor) {
+    TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
+    TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
+    result.timer = scheduledExecutor.schedule(fire, time, unit);
+    delegate.addListener(fire, directExecutor());
+    return result;
+  }
+
+  /*
+   * Memory visibility of these fields. There are two cases to consider.
+   *
+   * 1. visibility of the writes to these fields to Fire.run:
+   *
+   * The initial write to delegateRef is made definitely visible via the
+   * semantics of addListener/SES.schedule. The later racy write in cancel()
+   * is not guaranteed to be observed, however that is fine since the
+   * correctness is based on the atomic state in our base class. The initial
+   * write to timer is never definitely visible to Fire.run since it is
+   * assigned after SES.schedule is called. Therefore Fire.run has to check
+   * for null. However, it should be visible if Fire.run is called by
+   * delegate.addListener since addListener is called after the assignment
+   * to timer, and importantly this is the main situation in which we need to
+   * be able to see the write.
+   *
+   * 2. visibility of the writes to an afterDone() call triggered by cancel():
+   *
+   * Since these fields are non-final that means that TimeoutFuture is not
+   * being 'safely published', thus a motivated caller may be able to expose
+   * the reference to another thread that would then call cancel() and be
+   * unable to cancel the delegate. There are a number of ways to solve this,
+   * none of which are very pretty, and it is currently believed to be a
+   * purely theoretical problem (since the other actions should supply
+   * sufficient write-barriers).
+   */
+
+  @Nullable private ListenableFuture<V> delegateRef;
+  @Nullable private Future<?> timer;
+
+  private TimeoutFuture(ListenableFuture<V> delegate) {
+    this.delegateRef = Preconditions.checkNotNull(delegate);
+  }
+
+  /**
+   * A runnable that is called when the delegate or the timer completes.
+   */
+  private static final class Fire<V> implements Runnable {
+    @Nullable
+    TimeoutFuture<V> timeoutFutureRef;
+
+    Fire(
+        TimeoutFuture<V> timeoutFuture) {
+      this.timeoutFutureRef = timeoutFuture;
+    }
+
+    @Override
+    public void run() {
+      // If either of these reads return null then we must be after a
+      // successful cancel or another call to this method.
+      TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
+      if (timeoutFuture == null) {
+        return;
+      }
+      ListenableFuture<V> delegate = timeoutFuture.delegateRef;
+      if (delegate == null) {
+        return;
+      }
+
+      /*
+       * If we're about to complete the TimeoutFuture, we want to release our
+       * reference to it. Otherwise, we'll pin it (and its result) in memory
+       * until the timeout task is GCed. (The need to clear our reference to
+       * the TimeoutFuture is the reason we use a *static* nested class with
+       * a manual reference back to the "containing" class.)
+       *
+       * This has the nice-ish side effect of limiting reentrancy: run() calls
+       * timeoutFuture.setException() calls run(). That reentrancy would
+       * already be harmless, since timeoutFuture can be set (and delegate
+       * cancelled) only once. (And "set only once" is important for other
+       * reasons: run() can still be invoked concurrently in different threads,
+       * even with the above null checks.)
+       */
+      timeoutFutureRef = null;
+      if (delegate.isDone()) {
+        timeoutFuture.setFuture(delegate);
+      } else {
+        try {
+          timeoutFuture.setException(
+              new TimeoutException("Future timed out: " + delegate));
+        } finally {
+          delegate.cancel(true);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void afterDone() {
+    maybePropagateCancellation(delegateRef);
+
+    Future<?> localTimer = timer;
+    // Try to cancel the timer as an optimization.
+    // timer may be null if this call to run was by the timer task since there
+    // is no happens-before edge between the assignment to timer and an
+    // execution of the timer task.
+    if (localTimer != null) {
+      localTimer.cancel(false);
+    }
+
+    delegateRef = null;
+    timer = null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerTimeout.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerTimeout.java
new file mode 100644
index 0000000..3d85038
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerTimeout.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.FakeTimer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Test that timeout is triggered during Disk Volume Checker.
+ */
+public class TestDatasetVolumeCheckerTimeout {
+  public static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestDatasetVolumeCheckerTimeout.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  static Configuration conf;
+  private static final long DISK_CHECK_TIMEOUT = 10;
+  private static final long DISK_CHECK_TIME = 100;
+  static ReentrantLock lock = new ReentrantLock();
+
+  static {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DISK_CHECK_TIMEOUT, TimeUnit.MILLISECONDS);
+  }
+
+  static FsVolumeSpi makeSlowVolume() throws Exception {
+    final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+    final FsVolumeReference reference = mock(FsVolumeReference.class);
+
+    when(reference.getVolume()).thenReturn(volume);
+    when(volume.obtainReference()).thenReturn(reference);
+
+    when(volume.check(any(FsVolumeSpi.VolumeCheckContext.class))).thenAnswer(
+        new Answer<VolumeCheckResult>() {
+      @Override
+      public VolumeCheckResult answer(
+          InvocationOnMock invocationOnMock) throws Throwable {
+        // Wait for the disk check to timeout and then release lock.
+        lock.lock();
+        lock.unlock();
+        return VolumeCheckResult.HEALTHY;
+      }
+    });
+
+    return volume;
+  }
+
+  @Test (timeout = 1000)
+  public void testDiskCheckTimeout() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+    final FsVolumeSpi volume = makeSlowVolume();
+
+    final DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(conf, new FakeTimer());
+    final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+    lock.lock();
+    /**
+     * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
+     */
+    boolean result =
+        checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(Set<FsVolumeSpi> healthyVolumes,
+              Set<FsVolumeSpi> failedVolumes) {
+            numCallbackInvocations.incrementAndGet();
+
+            // Assert that the disk check registers a failed volume due to
+            // timeout
+            assertThat(healthyVolumes.size(), is(0));
+            assertThat(failedVolumes.size(), is(1));
+          }
+        });
+
+    // Wait for the callback
+    Thread.sleep(DISK_CHECK_TIME);
+
+    // Release lock
+    lock.unlock();
+
+    // Ensure that the check was invoked only once.
+    verify(volume, times(1)).check(any(FsVolumeSpi.VolumeCheckContext.class));
+    assertThat(numCallbackInvocations.get(), is(1L));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
index 1109772..0c7ed50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
@@ -59,7 +59,7 @@ public class TestThrottledAsyncChecker {
     final NoOpCheckable target2 = new NoOpCheckable();
     final FakeTimer timer = new FakeTimer();
     ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
                                     getExecutorService());
 
     // check target1 and ensure we get back the expected result.
@@ -99,8 +99,8 @@ public class TestThrottledAsyncChecker {
     final FakeTimer timer = new FakeTimer();
     final LatchedCallback callback = new LatchedCallback(target);
     ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-                                    getExecutorService());
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
+            getExecutorService());
 
     Optional<ListenableFuture<Boolean>> olf =
         checker.schedule(target, true);
@@ -124,8 +124,8 @@ public class TestThrottledAsyncChecker {
     final LatchedCheckable target = new LatchedCheckable();
     final FakeTimer timer = new FakeTimer();
     final ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-                                    getExecutorService());
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
+            getExecutorService());
     final Optional<ListenableFuture<Boolean>> olf1 =
         checker.schedule(target, true);
 
@@ -167,7 +167,7 @@ public class TestThrottledAsyncChecker {
     final NoOpCheckable target1 = new NoOpCheckable();
     final FakeTimer timer = new FakeTimer();
     ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
             getExecutorService());
 
     assertTrue(checker.schedule(target1, true).isPresent());
@@ -204,7 +204,7 @@ public class TestThrottledAsyncChecker {
     final ThrowingCheckable target1 = new ThrowingCheckable();
     final FakeTimer timer = new FakeTimer();
     ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
             getExecutorService());
 
     assertTrue(checker.schedule(target1, true).isPresent());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbe86471/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncCheckerTimeout.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncCheckerTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncCheckerTimeout.java
new file mode 100644
index 0000000..23f3f47
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncCheckerTimeout.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.FakeTimer;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TestThrottledAsyncCheckerTimeout {
+  public static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestThrottledAsyncCheckerTimeout.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  Configuration conf;
+  private static final long DISK_CHECK_TIMEOUT = 10;
+  private static final long DISK_CHECK_TIME = 100;
+  private ReentrantLock lock;
+
+  private ExecutorService getExecutorService() {
+    return new ScheduledThreadPoolExecutor(1);
+  }
+
+  @Before
+  public void initializeLock() {
+    lock = new ReentrantLock();
+  }
+
+  @Test	(timeout = 1000)
+  public void testDiskCheckTimeout() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final DummyCheckable target = new DummyCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
+            getExecutorService());
+
+    // Acquire lock to halt checker. Release after timeout occurs.
+    lock.lock();
+
+    final Optional<ListenableFuture<Boolean>> olf = checker
+        .schedule(target, true);
+
+    final AtomicLong numCallbackInvocationsSuccess = new AtomicLong(0);
+    final AtomicLong numCallbackInvocationsFailure = new AtomicLong(0);
+
+    final AtomicBoolean callbackResult = new AtomicBoolean(false);
+    final Throwable[] throwable = new Throwable[1];
+
+    assertTrue(olf.isPresent());
+    Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
+      @Override
+      public void onSuccess(Boolean result) {
+        numCallbackInvocationsSuccess.incrementAndGet();
+        callbackResult.set(true);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        throwable[0] = t;
+        numCallbackInvocationsFailure.incrementAndGet();
+        callbackResult.set(true);
+      }
+    });
+
+    while (!callbackResult.get()) {
+      // Wait for the callback
+      Thread.sleep(DISK_CHECK_TIMEOUT);
+    }
+
+    lock.unlock();
+
+    assertThat(numCallbackInvocationsFailure.get(), is(1L));
+    assertThat(numCallbackInvocationsSuccess.get(), is(0L));
+    assertTrue(throwable[0] instanceof TimeoutException);
+  }
+
+  @Test (timeout = 2000)
+  public void testDiskCheckTimeoutInvokesOneCallbackOnly() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final DummyCheckable target = new DummyCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
+            getExecutorService());
+    FutureCallback<Boolean> futureCallback = mock(FutureCallback.class);
+
+    // Acquire lock to halt disk checker. Release after timeout occurs.
+    lock.lock();
+
+    final Optional<ListenableFuture<Boolean>> olf1 = checker
+        .schedule(target, true);
+
+    assertTrue(olf1.isPresent());
+    Futures.addCallback(olf1.get(), futureCallback);
+
+    // Wait for the callback
+    Thread.sleep(DISK_CHECK_TIMEOUT);
+
+    // Verify that timeout results in only 1 onFailure call and 0 onSuccess
+    // calls.
+    verify(futureCallback, times(1)).onFailure(any(Throwable.class));
+    verify(futureCallback, times(0)).onSuccess(anyBoolean());
+
+    // Release lock so that target can acquire it.
+    lock.unlock();
+
+    final Optional<ListenableFuture<Boolean>> olf2 = checker
+        .schedule(target, true);
+
+    assertTrue(olf2.isPresent());
+    Futures.addCallback(olf2.get(), futureCallback);
+
+    // Wait for the callback
+    Thread.sleep(DISK_CHECK_TIME);
+
+    // Verify that normal check (dummy) results in only 1 onSuccess call.
+    // Number of times onFailure is invoked should remain the same - 1.
+    verify(futureCallback, times(1)).onFailure(any(Throwable.class));
+    verify(futureCallback, times(1)).onSuccess(anyBoolean());
+  }
+
+  @Test (timeout = 1000)
+  public void testTimeoutExceptionIsNotThrownForGoodDisk() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final DummyCheckable target = new DummyCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
+            getExecutorService());
+
+    final Optional<ListenableFuture<Boolean>> olf = checker
+        .schedule(target, true);
+
+    final AtomicBoolean callbackResult = new AtomicBoolean(false);
+    final Throwable[] throwable = new Throwable[1];
+
+    assertTrue(olf.isPresent());
+    Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
+      @Override
+      public void onSuccess(Boolean result) {
+        callbackResult.set(true);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        throwable[0] = t;
+        callbackResult.set(true);
+      }
+    });
+
+    while (!callbackResult.get()) {
+      // Wait for the callback
+      Thread.sleep(DISK_CHECK_TIMEOUT);
+    }
+
+    assertTrue(throwable[0] == null);
+  }
+
+  /**
+   * A dummy Checkable that just returns true after acquiring lock.
+   */
+  protected class DummyCheckable implements Checkable<Boolean,Boolean> {
+
+    @Override
+    public Boolean check(Boolean context) throws Exception {
+      // Wait to acquire lock
+      lock.lock();
+      lock.unlock();
+      return true;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message