hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [hbase] branch master updated: HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly
Date Sun, 03 Feb 2019 07:46:24 GMT
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1b79f  HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly
2b1b79f is described below

commit 2b1b79f08bf4b8fab84d7e7eb7b46580de60169c
Author: zhangduo <zhangduo@apache.org>
AuthorDate: Sat Feb 2 21:24:18 2019 +0800

    HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly
---
 .../hadoop/hbase/AsyncMetaTableAccessor.java       | 90 ++++++++++------------
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 12 +--
 .../client/AsyncAdminRequestRetryingCaller.java    |  4 +-
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  3 +-
 .../hbase/client/AsyncBufferedMutatorImpl.java     |  5 +-
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  2 +-
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       | 11 +--
 .../AsyncMasterRequestRpcRetryingCaller.java       |  8 +-
 .../hbase/client/AsyncMetaRegionLocator.java       |  3 +-
 .../AsyncServerRequestRpcRetryingCaller.java       |  4 +-
 .../AsyncSingleRequestRpcRetryingCaller.java       |  2 +-
 .../apache/hadoop/hbase/client/AsyncTableImpl.java | 11 +--
 .../hadoop/hbase/client/ConnectionFactory.java     | 12 +--
 .../client/MasterCoprocessorRpcChannelImpl.java    | 32 ++++----
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    | 21 ++---
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  2 +-
 .../client/RegionCoprocessorRpcChannelImpl.java    | 32 ++++----
 .../RegionServerCoprocessorRpcChannelImpl.java     | 32 ++++----
 .../hadoop/hbase/client/ZKAsyncRegistry.java       | 21 ++---
 .../org/apache/hadoop/hbase/util/FutureUtils.java  | 57 +++++++++++++-
 .../client/coprocessor/AsyncAggregationClient.java | 11 ++-
 .../hbase/client/example/AsyncClientExample.java   | 62 +++++++--------
 .../hbase/client/example/HttpProxyExample.java     | 48 ++++++------
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  4 +-
 .../regionserver/wal/AsyncProtobufLogWriter.java   |  6 +-
 .../regionserver/wal/CombinedAsyncWriter.java      |  4 +-
 26 files changed, 281 insertions(+), 218 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index 5d38179..4a886d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -81,7 +83,7 @@ public class AsyncMetaTableAccessor {
     long time = EnvironmentEdgeManager.currentTime();
     try {
       get.setTimeRange(0, time);
-      metaTable.get(get).whenComplete((result, error) -> {
+      addListener(metaTable.get(get), (result, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
           return;
@@ -109,16 +111,14 @@ public class AsyncMetaTableAccessor {
     CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
     try {
       RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
-      metaTable.get(
-        new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
-            .addFamily(HConstants.CATALOG_FAMILY)).whenComplete(
-        (r, err) -> {
+      addListener(metaTable.get(new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
+        .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
-          future.complete(getRegionLocations(r).map(
-            locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
+          future.complete(getRegionLocations(r)
+            .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
         });
     } catch (IOException parseEx) {
       LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
@@ -136,34 +136,29 @@ public class AsyncMetaTableAccessor {
   public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
       AsyncTable<?> metaTable, byte[] encodedRegionName) {
     CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
-    metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
-        .whenComplete(
-          (results, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-              return;
-            }
-            String encodedRegionNameStr = Bytes.toString(encodedRegionName);
-            results
-                .stream()
-                .filter(result -> !result.isEmpty())
-                .filter(result -> MetaTableAccessor.getRegionInfo(result) != null)
-                .forEach(
-                  result -> {
-                    getRegionLocations(result).ifPresent(
-                      locations -> {
-                        for (HRegionLocation location : locations.getRegionLocations()) {
-                          if (location != null
-                              && encodedRegionNameStr.equals(location.getRegion()
-                                  .getEncodedName())) {
-                            future.complete(Optional.of(location));
-                            return;
-                          }
-                        }
-                      });
-                  });
-            future.complete(Optional.empty());
+    addListener(
+      metaTable
+        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
+      (results, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        String encodedRegionNameStr = Bytes.toString(encodedRegionName);
+        results.stream().filter(result -> !result.isEmpty())
+          .filter(result -> MetaTableAccessor.getRegionInfo(result) != null).forEach(result -> {
+            getRegionLocations(result).ifPresent(locations -> {
+              for (HRegionLocation location : locations.getRegionLocations()) {
+                if (location != null &&
+                  encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
+                  future.complete(Optional.of(location));
+                  return;
+                }
+              }
+            });
           });
+        future.complete(Optional.empty());
+      });
     return future;
   }
 
@@ -190,19 +185,18 @@ public class AsyncMetaTableAccessor {
   public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
       AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
     CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-    getTableRegionsAndLocations(metaTable, tableName, true).whenComplete(
-      (locations, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else if (locations == null || locations.isEmpty()) {
-          future.complete(Collections.emptyList());
-        } else {
-          List<HRegionLocation> regionLocations = locations.stream()
-              .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
-              .collect(Collectors.toList());
-          future.complete(regionLocations);
-        }
-      });
+    addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else if (locations == null || locations.isEmpty()) {
+        future.complete(Collections.emptyList());
+      } else {
+        List<HRegionLocation> regionLocations =
+          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+            .collect(Collectors.toList());
+        future.complete(regionLocations);
+      }
+    });
     return future;
   }
 
@@ -254,7 +248,7 @@ public class AsyncMetaTableAccessor {
       }
     };
 
-    scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> {
+    addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 3a5aef1..9abfe23 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import com.google.protobuf.RpcChannel;
 import java.io.IOException;
 import java.util.Collection;
@@ -614,15 +616,15 @@ public interface AsyncAdmin {
    * @param peerId a short name that identifies the peer
    * @return the current cluster state wrapped by a {@link CompletableFuture}.
    */
-  default CompletableFuture<SyncReplicationState>
-      getReplicationPeerSyncReplicationState(String peerId) {
+  default CompletableFuture<SyncReplicationState> getReplicationPeerSyncReplicationState(
+      String peerId) {
     CompletableFuture<SyncReplicationState> future = new CompletableFuture<>();
-    listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> {
+    addListener(listReplicationPeers(Pattern.compile(peerId)), (peers, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
       } else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
-        future.completeExceptionally(
-          new IOException("Replication peer " + peerId + " does not exist"));
+        future
+          .completeExceptionally(new IOException("Replication peer " + peerId + " does not exist"));
       } else {
         future.complete(peers.get(0).getSyncReplicationState());
       }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index cf31d79..02e22c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
@@ -61,7 +63,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
       return;
     }
     resetCallTimeout();
-    callable.call(controller, adminStub).whenComplete((result, error) -> {
+    addListener(callable.call(controller, adminStub), (result, error) -> {
       if (error != null) {
         onError(error, () -> "Call to admin stub failed", err -> {
         });
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 33e6366..4051e1d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -409,7 +410,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
         RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
           if (error != null) {
-            error = translateException(error);
+            error = unwrapCompletionException(translateException(error));
             if (error instanceof DoNotRetryIOException) {
               failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
               return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 318c6c9..61d49af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -25,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -96,7 +97,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
     Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
     for (CompletableFuture<?> future : table.batch(toSend)) {
       CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
-      future.whenComplete((r, e) -> {
+      addListener(future, (r, e) -> {
         if (e != null) {
           toCompleteFuture.completeExceptionally(e);
         } else {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 1828650..4a32546 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -188,7 +188,7 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
-    registry.getMasterAddress().whenComplete((sn, error) -> {
+    addListener(registry.getMasterAddress(), (sn, error) -> {
       if (sn == null) {
         String msg = "ZooKeeper available but no active master location found";
         LOG.info(msg);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 356a425..f39fe36 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -68,15 +69,7 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
-    CompletableFuture<T> asyncFuture = new CompletableFuture<>();
-    future.whenCompleteAsync((r, e) -> {
-      if (e != null) {
-        asyncFuture.completeExceptionally(e);
-      } else {
-        asyncFuture.complete(r);
-      }
-    }, pool);
-    return asyncFuture;
+    return FutureUtils.wrapFuture(future, pool);
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index a52e799..7ed44e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -43,20 +45,20 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
       Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
     super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+      startLogErrorsCnt);
     this.callable = callable;
   }
 
   @Override
   protected void doCall() {
-    conn.getMasterStub().whenComplete((stub, error) -> {
+    addListener(conn.getMasterStub(), (stub, error) -> {
       if (error != null) {
         onError(error, () -> "Get async master stub failed", err -> {
         });
         return;
       }
       resetCallTimeout();
-      callable.call(controller, stub).whenComplete((result, error2) -> {
+      addListener(callable.call(controller, stub), (result, error2) -> {
         if (error2 != null) {
           onError(error2, () -> "Call to master failed", err -> {
           });
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 9fef15d..ce3a2dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi
 import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
 import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
 import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,7 +72,7 @@ class AsyncMetaRegionLocator {
       if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
         LOG.debug("Start fetching meta region location from registry.");
         CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
-        registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+        addListener(registry.getMetaRegionLocation(), (locs, error) -> {
           if (error != null) {
             LOG.debug("Failed to fetch meta region location from registry", error);
             metaRelocateFuture.getAndSet(null).completeExceptionally(error);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 54b055a..f114eff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
@@ -62,7 +64,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
       return;
     }
     resetCallTimeout();
-    callable.call(controller, stub).whenComplete((result, error) -> {
+    addListener(callable.call(controller, stub), (result, error) -> {
       if (error != null) {
         onError(error, () -> "Call to admin stub failed", err -> {
         });
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index a552e40..9490d0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -79,7 +79,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
       return;
     }
     resetCallTimeout();
-    callable.call(controller, loc, stub).whenComplete((result, error) -> {
+    addListener(callable.call(controller, loc, stub), (result, error) -> {
       if (error != null) {
         onError(error,
           () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 9747d06..426b184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -87,15 +88,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
   }
 
   private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
-    CompletableFuture<T> asyncFuture = new CompletableFuture<>();
-    future.whenCompleteAsync((r, e) -> {
-      if (e != null) {
-        asyncFuture.completeExceptionally(e);
-      } else {
-        asyncFuture.complete(r);
-      }
-    }, pool);
-    return asyncFuture;
+    return FutureUtils.wrapFuture(future, pool);
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e24af74..e3e87f6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -18,19 +18,20 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
@@ -282,7 +283,7 @@ public class ConnectionFactory {
       final User user) {
     CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
-    registry.getClusterId().whenComplete((clusterId, error) -> {
+    addListener(registry.getClusterId(), (clusterId, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -295,9 +296,8 @@ public class ConnectionFactory {
         AsyncConnectionImpl.class, AsyncConnection.class);
       try {
         future.complete(
-          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
-            ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
-        );
+          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
+            .newInstance(clazz, conf, registry, clusterId, user)));
       } catch (Exception e) {
         future.completeExceptionally(e);
       }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
index 9176c87..9e68a16 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
@@ -17,23 +17,24 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
 /**
  * The implementation of a master based coprocessor rpc channel.
  */
@@ -75,12 +76,13 @@ class MasterCoprocessorRpcChannelImpl implements RpcChannel {
   @Override
   public void callMethod(MethodDescriptor method, RpcController controller, Message request,
       Message responsePrototype, RpcCallback<Message> done) {
-    callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
-        .whenComplete(((r, e) -> {
-          if (e != null) {
-            ((ClientCoprocessorRpcController) controller).setFailed(e);
-          }
-          done.run(r);
-        }));
+    addListener(
+      callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
+      ((r, e) -> {
+        if (e != null) {
+          ((ClientCoprocessorRpcController) controller).setFailed(e);
+        }
+        done.run(r);
+      }));
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 73efe32..d4b60fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcChannel;
@@ -415,12 +416,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
       MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
       ProcedureBiConsumer consumer) {
-    CompletableFuture<Long> procFuture = this
-        .<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
-            respConverter)).call();
-    return waitProcedureResult(procFuture).whenComplete(consumer);
+    CompletableFuture<Long> procFuture =
+      this.<Long> newMasterCaller().action((controller, stub) -> this
+        .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
+    CompletableFuture<Void> future = waitProcedureResult(procFuture);
+    addListener(future, consumer);
+    return future;
   }
 
   @FunctionalInterface
@@ -2892,7 +2893,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
                 // If any region compaction state is MAJOR_AND_MINOR
                 // the table compaction state is MAJOR_AND_MINOR, too.
                 if (err2 != null) {
-                  future.completeExceptionally(err2);
+                  future.completeExceptionally(unwrapCompletionException(err2));
                 } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
                   future.complete(regionState);
                 } else {
@@ -3039,7 +3040,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       serverNames.stream().forEach(serverName -> {
         futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
           if (err2 != null) {
-            future.completeExceptionally(err2);
+            future.completeExceptionally(unwrapCompletionException(err2));
           } else {
             serverStates.put(serverName, serverState);
           }
@@ -3558,7 +3559,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         futures
           .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
             if (err2 != null) {
-              future.completeExceptionally(err2);
+              future.completeExceptionally(unwrapCompletionException(err2));
             } else {
               aggregator.append(stats);
             }
@@ -3567,7 +3568,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
         (ret, err3) -> {
           if (err3 != null) {
-            future.completeExceptionally(err3);
+            future.completeExceptionally(unwrapCompletionException(err3));
           } else {
             future.complete(aggregator.sum());
           }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 3a94566..be94ca4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -583,7 +583,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
           locateFinished, unfinishedRequest, l, e));
     }
-    coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
+    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
       if (e != null) {
         callback.onRegionError(region, e);
       } else {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
index 4417c7e..94e7d9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -17,10 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -33,12 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
 /**
  * The implementation of a region based coprocessor rpc channel.
  */
@@ -102,16 +102,16 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
   @Override
   public void callMethod(MethodDescriptor method, RpcController controller, Message request,
       Message responsePrototype, RpcCallback<Message> done) {
-    conn.callerFactory.<Message> single().table(tableName).row(row)
+    addListener(
+      conn.callerFactory.<Message> single().table(tableName).row(row)
         .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call()
-        .whenComplete((r, e) -> {
-          if (e != null) {
-            ((ClientCoprocessorRpcController) controller).setFailed(e);
-          }
-          done.run(r);
-        });
+        .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
+      (r, e) -> {
+        if (e != null) {
+          ((ClientCoprocessorRpcController) controller).setFailed(e);
+        }
+        done.run(r);
+      });
   }
-
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
index 372dd4a..38512d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
@@ -17,23 +17,24 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
 /**
  * The implementation of a region server based coprocessor rpc channel.
  */
@@ -75,12 +76,13 @@ public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
   @Override
   public void callMethod(MethodDescriptor method, RpcController controller, Message request,
       Message responsePrototype, RpcCallback<Message> done) {
-    callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
-        .whenComplete(((r, e) -> {
-          if (e != null) {
-            ((ClientCoprocessorRpcController) controller).setFailed(e);
-          }
-          done.run(r);
-        }));
+    addListener(
+      callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
+      ((r, e) -> {
+        if (e != null) {
+          ((ClientCoprocessorRpcController) controller).setFailed(e);
+        }
+        done.run(r);
+      }));
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index c7ae32c..c02643f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION
 import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
 import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
 import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterId;
@@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 
@@ -68,7 +70,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
 
   private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
     CompletableFuture<T> future = new CompletableFuture<>();
-    zk.get(path).whenComplete((data, error) -> {
+    addListener(zk.get(path), (data, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -139,7 +141,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
     MutableInt remaining = new MutableInt(locs.length);
     znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
       if (replicaId == DEFAULT_REPLICA_ID) {
-        getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
           if (error != null) {
             future.completeExceptionally(error);
             return;
@@ -154,13 +156,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
               new IOException("Meta region is in state " + stateAndServerName.getFirst()));
             return;
           }
-          locs[DEFAULT_REPLICA_ID] =
-              new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
-                  stateAndServerName.getSecond());
+          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
+            getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
           tryComplete(remaining, locs, future);
         });
       } else {
-        getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+        addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
           if (future.isDone()) {
             return;
           }
@@ -174,12 +175,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
             Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
             if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
               LOG.warn("Meta region for replica " + replicaId + " is in state " +
-                  stateAndServerName.getFirst());
+                stateAndServerName.getFirst());
               locs[replicaId] = null;
             } else {
               locs[replicaId] =
-                  new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
-                      stateAndServerName.getSecond());
+                new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
+                  stateAndServerName.getSecond());
             }
           }
           tryComplete(remaining, locs, future);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 02ce655..861dacb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -57,7 +59,12 @@ public final class FutureUtils {
       BiConsumer<? super T, ? super Throwable> action) {
     future.whenComplete((resp, error) -> {
       try {
-        action.accept(resp, error);
+        // See this post on stack overflow(shorten since the url is too long),
+        // https://s.apache.org/completionexception
+        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
+        // original exception, others will get a CompletionException, which wraps the original
+        // exception. So here we unwrap it before passing it to the callback action.
+        action.accept(resp, unwrapCompletionException(error));
       } catch (Throwable t) {
         LOG.error("Unexpected error caught when processing CompletableFuture", t);
       }
@@ -65,6 +72,54 @@ public final class FutureUtils {
   }
 
   /**
+   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
+   * exception is that we will call
+   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
+   * @see #addListener(CompletableFuture, BiConsumer)
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public static <T> void addListener(CompletableFuture<T> future,
+      BiConsumer<? super T, ? super Throwable> action, Executor executor) {
+    future.whenCompleteAsync((resp, error) -> {
+      try {
+        action.accept(resp, unwrapCompletionException(error));
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing CompletableFuture", t);
+      }
+    }, executor);
+  }
+
+  /**
+   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
+   * the callbacks in the given {@code executor}.
+   */
+  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
+      Executor executor) {
+    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
+    addListener(future, (r, e) -> {
+      if (e != null) {
+        wrappedFuture.completeExceptionally(e);
+      } else {
+        wrappedFuture.complete(r);
+      }
+    }, executor);
+    return wrappedFuture;
+  }
+
+  /**
+   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
+   */
+  public static Throwable unwrapCompletionException(Throwable error) {
+    if (error instanceof CompletionException) {
+      Throwable cause = error.getCause();
+      if (cause != null) {
+        return cause;
+      }
+    }
+    return error;
+  }
+
+  /**
    * A helper class for getting the result of a Future, and convert the error to an
    * {@link IOException}.
    */
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index 3b3e8d9..b3003c4 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.client.coprocessor;
 
 import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
 import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.Message;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -29,7 +29,6 @@ import java.util.NavigableSet;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
@@ -455,10 +454,10 @@ public final class AsyncAggregationClient {
   }
 
   public static <R, S, P extends Message, Q extends Message, T extends Message>
-          CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
-          ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
+      ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<R> future = new CompletableFuture<>();
-    sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
+    addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
       } else if (sumByRegion.isEmpty()) {
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
index bcc9c0a..b8b3213 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client.example;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -78,7 +80,7 @@ public class AsyncClientExample extends Configured implements Tool {
     for (;;) {
       if (future.compareAndSet(null, new CompletableFuture<>())) {
         CompletableFuture<AsyncConnection> toComplete = future.get();
-        ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> {
+        addListener(ConnectionFactory.createAsyncConnection(getConf()),(conn, error) -> {
           if (error != null) {
             toComplete.completeExceptionally(error);
             // we need to reset the future holder so we will get a chance to recreate an async
@@ -98,15 +100,15 @@ public class AsyncClientExample extends Configured implements Tool {
     }
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NONNULL_PARAM_VIOLATION",
-      justification="it is valid to pass NULL to CompletableFuture#completedFuture")
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
+      justification = "it is valid to pass NULL to CompletableFuture#completedFuture")
   private CompletableFuture<Void> closeConn() {
     CompletableFuture<AsyncConnection> f = future.get();
     if (f == null) {
       return CompletableFuture.completedFuture(null);
     }
     CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-    f.whenComplete((conn, error) -> {
+    addListener(f, (conn, error) -> {
       if (error == null) {
         IOUtils.closeQuietly(conn);
       }
@@ -136,44 +138,44 @@ public class AsyncClientExample extends Configured implements Tool {
     CountDownLatch latch = new CountDownLatch(numOps);
     IntStream.range(0, numOps).forEach(i -> {
       CompletableFuture<AsyncConnection> future = getConn();
-      future.whenComplete((conn, error) -> {
+      addListener(future, (conn, error) -> {
         if (error != null) {
           LOG.warn("failed to get async connection for " + i, error);
           latch.countDown();
           return;
         }
         AsyncTable<?> table = conn.getTable(tableName, threadPool);
-        table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)))
-            .whenComplete((putResp, putErr) -> {
-              if (putErr != null) {
-                LOG.warn("put failed for " + i, putErr);
+        addListener(table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))),
+          (putResp, putErr) -> {
+            if (putErr != null) {
+              LOG.warn("put failed for " + i, putErr);
+              latch.countDown();
+              return;
+            }
+            LOG.info("put for " + i + " succeeded, try getting");
+            addListener(table.get(new Get(getKey(i))), (result, getErr) -> {
+              if (getErr != null) {
+                LOG.warn("get failed for " + i);
                 latch.countDown();
                 return;
               }
-              LOG.info("put for " + i + " succeeded, try getting");
-              table.get(new Get(getKey(i))).whenComplete((result, getErr) -> {
-                if (getErr != null) {
-                  LOG.warn("get failed for " + i);
-                  latch.countDown();
-                  return;
-                }
-                if (result.isEmpty()) {
-                  LOG.warn("get failed for " + i + ", server returns empty result");
-                } else if (!result.containsColumn(FAMILY, QUAL)) {
-                  LOG.warn("get failed for " + i + ", the result does not contain " +
-                      Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
+              if (result.isEmpty()) {
+                LOG.warn("get failed for " + i + ", server returns empty result");
+              } else if (!result.containsColumn(FAMILY, QUAL)) {
+                LOG.warn("get failed for " + i + ", the result does not contain " +
+                  Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
+              } else {
+                int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
+                if (v != i) {
+                  LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
+                    ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
                 } else {
-                  int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
-                  if (v != i) {
-                    LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
-                        ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
-                  } else {
-                    LOG.info("get for " + i + " succeeded");
-                  }
+                  LOG.info("get for " + i + " succeeded");
                 }
-                latch.countDown();
-              });
+              }
+              latch.countDown();
             });
+          });
       });
     });
     latch.await();
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
index f9caf2b..668bf7a 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client.example;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Optional;
@@ -159,36 +161,38 @@ public class HttpProxyExample {
 
     private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
       Params params = parse(req);
-      conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
-          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)))
-          .whenComplete((r, e) -> {
-            if (e != null) {
-              exceptionCaught(ctx, e);
+      addListener(
+        conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
+          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
+        (r, e) -> {
+          if (e != null) {
+            exceptionCaught(ctx, e);
+          } else {
+            byte[] value =
+              r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
+            if (value != null) {
+              write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
             } else {
-              byte[] value =
-                  r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
-              if (value != null) {
-                write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
-              } else {
-                write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
-              }
+              write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
             }
-          });
+          }
+        });
     }
 
     private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
       Params params = parse(req);
       byte[] value = new byte[req.content().readableBytes()];
       req.content().readBytes(value);
-      conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
-          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value))
-          .whenComplete((r, e) -> {
-            if (e != null) {
-              exceptionCaught(ctx, e);
-            } else {
-              write(ctx, HttpResponseStatus.OK, Optional.empty());
-            }
-          });
+      addListener(
+        conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
+          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
+        (r, e) -> {
+          if (e != null) {
+            exceptionCaught(ctx, e);
+          } else {
+            write(ctx, HttpResponseStatus.OK, Optional.empty());
+          }
+        });
     }
 
     @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 81308ad..553ff3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.Sequencer;
@@ -348,7 +350,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
     final long startTimeNs = System.nanoTime();
     final long epoch = (long) epochAndState >>> 2L;
-    writer.sync().whenCompleteAsync((result, error) -> {
+    addListener(writer.sync(), (result, error) -> {
       if (error != null) {
         syncFailed(epoch, error);
       } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 6368fb7..37c6f00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
@@ -194,7 +196,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
         // should not happen
         throw new AssertionError(e);
       }
-      output.flush(false).whenComplete((len, error) -> {
+      addListener(output.flush(false), (len, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
         } else {
@@ -215,7 +217,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
       }
       output.writeInt(trailer.getSerializedSize());
       output.write(magic);
-      output.flush(false).whenComplete((len, error) -> {
+      addListener(output.flush(false), (len, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
         } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
index 4301ae7..4e3fa61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -75,7 +77,7 @@ public final class CombinedAsyncWriter implements AsyncWriter {
   public CompletableFuture<Long> sync() {
     CompletableFuture<Long> future = new CompletableFuture<>();
     AtomicInteger remaining = new AtomicInteger(writers.size());
-    writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+    writers.forEach(w -> addListener(w.sync(), (length, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;


Mime
View raw message