hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [4/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin
Date Tue, 04 Jul 2017 02:14:07 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 1da660c..36fd60d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -17,174 +17,27 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 
-import java.util.stream.Stream;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.quotas.QuotaFilter;
 import org.apache.hadoop.hbase.quotas.QuotaSettings;
-import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -192,2110 +45,404 @@ import org.apache.hadoop.hbase.util.Pair;
  */
 @InterfaceAudience.Private
 public class AsyncHBaseAdmin implements AsyncAdmin {
-  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
 
   private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
 
-  private final AsyncConnectionImpl connection;
-
-  private final RawAsyncTable metaTable;
-
-  private final long rpcTimeoutNs;
-
-  private final long operationTimeoutNs;
-
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final int startLogErrorsCnt;
+  private final RawAsyncHBaseAdmin rawAdmin;
 
-  private final NonceGenerator ng;
-
-  AsyncHBaseAdmin(AsyncConnectionImpl connection) {
-    this.connection = connection;
-    this.metaTable = connection.getRawTable(META_TABLE_NAME);
-    this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
-    this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
-    this.pauseNs = connection.connConf.getPauseNs();
-    this.maxAttempts = connection.connConf.getMaxRetries();
-    this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
-    this.ng = connection.getNonceGenerator();
-  }
-
-  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
-    return this.connection.callerFactory.<T> masterRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
-  }
-
-  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
-    return this.connection.callerFactory.<T> adminRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
-  }
-
-  @FunctionalInterface
-  private interface MasterRpcCall<RESP, REQ> {
-    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
-        RpcCallback<RESP> done);
-  }
-
-  @FunctionalInterface
-  private interface AdminRpcCall<RESP, REQ> {
-    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
-        RpcCallback<RESP> done);
-  }
+  private final ExecutorService pool;
 
-  @FunctionalInterface
-  private interface Converter<D, S> {
-    D convert(S src) throws IOException;
+  AsyncHBaseAdmin(RawAsyncHBaseAdmin rawAdmin, ExecutorService pool) {
+    this.rawAdmin = rawAdmin;
+    this.pool = pool;
   }
 
-  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
-      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
-      Converter<RESP, PRESP> respConverter) {
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
-      @Override
-      public void run(PRESP resp) {
-        if (controller.failed()) {
-          future.completeExceptionally(controller.getFailed());
-        } else {
-          try {
-            future.complete(respConverter.convert(resp));
-          } catch (IOException e) {
-            future.completeExceptionally(e);
-          }
-        }
-      }
-    });
-    return future;
-  }
-
-  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
-      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
-      Converter<RESP, PRESP> respConverter) {
-
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
-      @Override
-      public void run(PRESP resp) {
-        if (controller.failed()) {
-          future.completeExceptionally(new IOException(controller.errorText()));
-        } else {
-          try {
-            future.complete(respConverter.convert(resp));
-          } catch (IOException e) {
-            future.completeExceptionally(e);
-          }
-        }
+  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+    CompletableFuture<T> asyncFuture = new CompletableFuture<>();
+    future.whenCompleteAsync((r, e) -> {
+      if (e != null) {
+        asyncFuture.completeExceptionally(e);
+      } else {
+        asyncFuture.complete(r);
       }
-    });
-    return future;
-  }
-
-  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
-      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
-      ProcedureBiConsumer consumer) {
-    CompletableFuture<Long> procFuture = this
-        .<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
-            respConverter)).call();
-    return waitProcedureResult(procFuture).whenComplete(consumer);
-  }
-
-  @FunctionalInterface
-  private interface TableOperator {
-    CompletableFuture<Void> operate(TableName table);
-  }
-
-  private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
-      TableOperator operator, String operationType) {
-    CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
-    List<TableDescriptor> failed = new LinkedList<>();
-    listTables(Optional.ofNullable(pattern), false).whenComplete(
-      (tables, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        CompletableFuture[] futures =
-            tables.stream()
-                .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
-                  if (ex != null) {
-                    LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
-                    failed.add(table);
-                  }
-                })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
-        CompletableFuture.allOf(futures).thenAccept((v) -> {
-          future.complete(failed);
-        });
-      });
-    return future;
-  }
-
-  @Override
-  public AsyncConnectionImpl getConnection() {
-    return this.connection;
+    }, pool);
+    return asyncFuture;
   }
 
   @Override
   public CompletableFuture<Boolean> tableExists(TableName tableName) {
-    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+    return wrap(rawAdmin.tableExists(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
       boolean includeSysTables) {
-    return this.<List<TableDescriptor>> newMasterCaller()
-        .action((controller, stub) -> this
-            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
-              controller, stub,
-              RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
-              (s, c, req, done) -> s.getTableDescriptors(c, req, done),
-              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
-        .call();
+    return wrap(rawAdmin.listTables(pattern, includeSysTables));
   }
 
   @Override
   public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
       boolean includeSysTables) {
-    return this.<List<TableName>> newMasterCaller()
-        .action((controller, stub) -> this
-            .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
-              RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
-              (s, c, req, done) -> s.getTableNames(c, req, done),
-              (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
-        .call();
+    return wrap(rawAdmin.listTableNames(pattern, includeSysTables));
   }
 
   @Override
   public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
-    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
-    this.<List<TableSchema>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
-                controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
-                    c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
-                    .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-            return;
-          }
-          if (!tableSchemas.isEmpty()) {
-            future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
-          } else {
-            future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
-          }
-        });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Void> createTable(TableDescriptor desc) {
-    return createTable(desc, null);
+    return wrap(rawAdmin.getTableDescriptor(tableName));
   }
 
   @Override
   public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
       int numRegions) {
-    try {
-      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
-    if (desc.getTableName() == null) {
-      return failedFuture(new IllegalArgumentException("TableName cannot be null"));
-    }
-    if (splitKeys != null && splitKeys.length > 0) {
-      Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
-      // Verify there are no duplicate split keys
-      byte[] lastKey = null;
-      for (byte[] splitKey : splitKeys) {
-        if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
-          return failedFuture(new IllegalArgumentException(
-              "Empty split key must not be passed in the split keys."));
-        }
-        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
-          return failedFuture(new IllegalArgumentException("All split keys must be unique, "
-              + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", "
-              + Bytes.toStringBinary(lastKey)));
-        }
-        lastKey = splitKey;
-      }
-    }
+    return wrap(rawAdmin.createTable(desc, startKey, endKey, numRegions));
+  }
 
-    return this.<CreateTableRequest, CreateTableResponse> procedureCall(
-      RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
-      new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+  @Override
+  public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+    return wrap(rawAdmin.createTable(desc, splitKeys));
   }
 
   @Override
   public CompletableFuture<Void> deleteTable(TableName tableName) {
-    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
-        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
-      new DeleteTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.deleteTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+    return wrap(rawAdmin.deleteTables(pattern));
   }
 
   @Override
   public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
-    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
-      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
-      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.truncateTable(tableName, preserveSplits));
   }
 
   @Override
   public CompletableFuture<Void> enableTable(TableName tableName) {
-    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
-        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
-      new EnableTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.enableTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+    return wrap(rawAdmin.enableTables(pattern));
   }
 
   @Override
   public CompletableFuture<Void> disableTable(TableName tableName) {
-    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
-        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
-      new DisableTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.disableTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+    return wrap(rawAdmin.disableTables(pattern));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (state.isPresent()) {
-        future.complete(state.get().inStates(TableState.State.ENABLED));
-      } else {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      }
-    });
-    return future;
+    return wrap(rawAdmin.isTableEnabled(tableName));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (state.isPresent()) {
-        future.complete(state.get().inStates(TableState.State.DISABLED));
-      } else {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      }
-    });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
-    return isTableAvailable(tableName, null);
+    return wrap(rawAdmin.isTableDisabled(tableName));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    isTableEnabled(tableName).whenComplete(
-      (enabled, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        if (!enabled) {
-          future.complete(false);
-        } else {
-          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
-              .whenComplete(
-                (locations, error1) -> {
-                  if (error1 != null) {
-                    future.completeExceptionally(error1);
-                    return;
-                  }
-                  int notDeployed = 0;
-                  int regionCount = 0;
-                  for (HRegionLocation location : locations) {
-                    HRegionInfo info = location.getRegionInfo();
-                    if (location.getServerName() == null) {
-                      if (LOG.isDebugEnabled()) {
-                        LOG.debug("Table " + tableName + " has not deployed region "
-                            + info.getEncodedName());
-                      }
-                      notDeployed++;
-                    } else if (splitKeys != null
-                        && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
-                      for (byte[] splitKey : splitKeys) {
-                        // Just check if the splitkey is available
-                        if (Bytes.equals(info.getStartKey(), splitKey)) {
-                          regionCount++;
-                          break;
-                        }
-                      }
-                    } else {
-                      // Always empty start row should be counted
-                      regionCount++;
-                    }
-                  }
-                  if (notDeployed > 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
-                    }
-                    future.complete(false);
-                  } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " expected to have "
-                          + (splitKeys.length + 1) + " regions, but only " + regionCount
-                          + " available");
-                    }
-                    future.complete(false);
-                  } else {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " should be available");
-                    }
-                    future.complete(true);
-                  }
-                });
-        }
-      });
-    return future;
+    return wrap(rawAdmin.isTableAvailable(tableName, splitKeys));
   }
 
   @Override
   public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
-    return this
-        .<Pair<Integer, Integer>>newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
-                controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
-                    c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
-                    resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+    return wrap(rawAdmin.getAlterStatus(tableName));
   }
 
   @Override
-  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
-    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
-      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
-      new AddColumnFamilyProcedureBiConsumer(this, tableName));
+  public CompletableFuture<Void> addColumnFamily(TableName tableName,
+      ColumnFamilyDescriptor columnFamily) {
+    return wrap(rawAdmin.addColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
-    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
-      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
-      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.deleteColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
       ColumnFamilyDescriptor columnFamily) {
-    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
-      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
-      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.modifyColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
-    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
-      RequestConverter.buildCreateNamespaceRequest(descriptor),
-      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+    return wrap(rawAdmin.createNamespace(descriptor));
   }
 
   @Override
   public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
-    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
-      RequestConverter.buildModifyNamespaceRequest(descriptor),
-      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+    return wrap(rawAdmin.modifyNamespace(descriptor));
   }
 
   @Override
   public CompletableFuture<Void> deleteNamespace(String name) {
-    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
-      RequestConverter.buildDeleteNamespaceRequest(name),
-      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new DeleteNamespaceProcedureBiConsumer(this, name));
+    return wrap(rawAdmin.deleteNamespace(name));
   }
 
   @Override
   public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
-    return this
-        .<NamespaceDescriptor> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
-                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
-                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
-                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+    return wrap(rawAdmin.getNamespaceDescriptor(name));
   }
 
   @Override
   public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
-    return this
-        .<List<NamespaceDescriptor>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
-                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
-                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
-                    .toNamespaceDescriptorList(resp))).call();
+    return wrap(rawAdmin.listNamespaceDescriptors());
   }
 
   @Override
-  public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
-                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
-                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
-                (resp) -> resp.getPrevBalanceValue())).call();
+  public CompletableFuture<Boolean> setBalancerOn(boolean on) {
+    return wrap(rawAdmin.setBalancerOn(on));
   }
 
   @Override
   public CompletableFuture<Boolean> balance(boolean forcible) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
-            stub, RequestConverter.buildBalanceRequest(forcible),
-            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+    return wrap(rawAdmin.balance(forcible));
   }
 
   @Override
   public CompletableFuture<Boolean> isBalancerOn() {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
-            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
-            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
-        .call();
+    return wrap(rawAdmin.isBalancerOn());
   }
 
   @Override
   public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete((location, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
-      if (server == null) {
-        future.completeExceptionally(new NotServingRegionException(regionName));
-      } else {
-        closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
-  }
-
-  private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
-    return this
-        .<Boolean> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
-            controller, stub,
-            ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
-            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
-        .serverName(serverName).call();
+    return wrap(rawAdmin.closeRegion(regionName, serverName));
   }
 
   @Override
-  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
-    return this.<List<HRegionInfo>> newAdminCaller()
-        .action((controller, stub) -> this
-            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
-              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
-              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
-              resp -> ProtobufUtil.getRegionInfos(resp)))
-        .serverName(sn).call();
+  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
+    return wrap(rawAdmin.getOnlineRegions(serverName));
   }
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (!exists) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      } else {
-        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else if (!tableEnabled) {
-            future.completeExceptionally(new TableNotEnabledException(tableName));
-          } else {
-            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
-              new HashMap<>()).whenComplete((ret, err3) -> {
-                if (err3 != null) {
-                  future.completeExceptionally(err3);
-                } else {
-                  future.complete(ret);
-                }
-              });
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.flush(tableName));
   }
 
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-
-        HRegionInfo regionInfo = location.getRegionInfo();
-        this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null)).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.flushRegion(regionName));
   }
 
   @Override
   public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
-    return compact(tableName, columnFamily, false, CompactType.NORMAL);
+    return wrap(rawAdmin.compact(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
-    return compactRegion(regionName, columnFamily, false);
+    return wrap(rawAdmin.compactRegion(regionName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
-    return compact(tableName, columnFamily, true, CompactType.NORMAL);
+    return wrap(rawAdmin.majorCompact(tableName, columnFamily));
   }
 
   @Override
-  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
-    return compactRegion(regionName, columnFamily, true);
+  public CompletableFuture<Void>
+      majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+    return wrap(rawAdmin.majorCompactRegion(regionName, columnFamily));
   }
 
   @Override
-  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
-    return compactRegionServer(sn, false);
+  public CompletableFuture<Void> compactRegionServer(ServerName serverName) {
+    return wrap(rawAdmin.compactRegionServer(serverName));
   }
 
   @Override
-  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
-    return compactRegionServer(sn, true);
-  }
-
-  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
-      if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
-      }
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
-    });
-    return future;
-  }
-
-  private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
-      boolean major) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
-  }
-
-  /**
-   * List all region locations for the specific table.
-   */
-  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-      // For meta table, we use zk to fetch all locations.
-      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
-      registry.getMetaRegionLocation().whenComplete(
-        (metaRegions, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-          } else if (metaRegions == null || metaRegions.isEmpty()
-              || metaRegions.getDefaultRegionLocation() == null) {
-            future.completeExceptionally(new IOException("meta region does not found"));
-          } else {
-            future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-          }
-          // close the registry.
-          IOUtils.closeQuietly(registry);
-        });
-      return future;
-    } else {
-      // For non-meta table, we fetch all locations by scanning hbase:meta table
-      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
-    }
-  }
-
-  /**
-   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
-   */
-  private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
-      final boolean major, CompactType compactType) {
-    if (CompactType.MOB.equals(compactType)) {
-      // TODO support MOB compact.
-      return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
-    }
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
-      for (HRegionLocation location : locations) {
-        if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
-        if (location.getServerName() == null) continue;
-        compactFutures
-            .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
-      }
-      // future complete unless all of the compact futures are completed.
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
-    });
-    return future;
-  }
-
-  /**
-   * Compact the region at specific region server.
-   */
-  private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
-      final boolean major, Optional<byte[]> columnFamily) {
-    return this
-        .<Void> newAdminCaller()
-        .serverName(sn)
-        .action(
-          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
-            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
-              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
-            resp -> null)).call();
-  }
-
-  private byte[] toEncodeRegionName(byte[] regionName) {
-    try {
-      return HRegionInfo.isEncodedRegionName(regionName) ? regionName
-          : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
-    } catch (IOException e) {
-      return regionName;
-    }
-  }
-
-  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
-      CompletableFuture<TableName> result) {
-    getRegionLocation(encodeRegionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          result.completeExceptionally(err);
-          return;
-        }
-        HRegionInfo regionInfo = location.getRegionInfo();
-        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-          result.completeExceptionally(new IllegalArgumentException(
-              "Can't invoke merge on non-default regions directly"));
-          return;
-        }
-        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
-          if (!tableName.get().equals(regionInfo.getTable())) {
-            // tables of this two region should be same.
-            result.completeExceptionally(new IllegalArgumentException(
-                "Cannot merge regions from two different tables " + tableName.get() + " and "
-                    + regionInfo.getTable()));
-          } else {
-            result.complete(tableName.get());
-          }
-        }
-      });
-  }
-
-  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
-      byte[] encodeRegionNameB) {
-    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
-    CompletableFuture<TableName> future = new CompletableFuture<>();
-
-    checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
-    checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
-    return future;
+  public CompletableFuture<Void> majorCompactRegionServer(ServerName serverName) {
+    return wrap(rawAdmin.majorCompactRegionServer(serverName));
   }
 
   @Override
   public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
       boolean forcible) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
-    final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
-
-    checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
-        .whenComplete((tableName, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-
-          MergeTableRegionsRequest request = null;
-          try {
-            request = RequestConverter.buildMergeTableRegionsRequest(
-              new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
-              ng.newNonce());
-          } catch (DeserializationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
-
-          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
-            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
-            new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-
-        });
-    return future;
+    return wrap(rawAdmin.mergeRegions(nameOfRegionA, nameOfRegionB, forcible));
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exist, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (!exist) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-        return;
-      }
-      metaTable
-          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
-              .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
-              .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
-          .whenComplete((results, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (results != null && !results.isEmpty()) {
-              List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
-              for (Result r : results) {
-                if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
-                RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
-                if (rl != null) {
-                  for (HRegionLocation h : rl.getRegionLocations()) {
-                    if (h != null && h.getServerName() != null) {
-                      HRegionInfo hri = h.getRegionInfo();
-                      if (hri == null || hri.isSplitParent()
-                          || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
-                        continue;
-                      splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
-                    }
-                  }
-                }
-              }
-              CompletableFuture
-                  .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
-                  .whenComplete((ret, exception) -> {
-                    if (exception != null) {
-                      future.completeExceptionally(exception);
-                      return;
-                    }
-                    future.complete(ret);
-                  });
-            } else {
-              future.complete(null);
-            }
-          });
-    });
-    return future;
+    return wrap(rawAdmin.split(tableName));
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
-    CompletableFuture<Void> result = new CompletableFuture<>();
-    if (splitPoint == null) {
-      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
-    }
-    connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
-        .whenComplete((loc, err) -> {
-          if (err != null) {
-            result.completeExceptionally(err);
-          } else if (loc == null || loc.getRegionInfo() == null) {
-            result.completeExceptionally(new IllegalArgumentException(
-                "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
-          } else {
-            splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
-                .whenComplete((ret, err2) -> {
-                  if (err2 != null) {
-                    result.completeExceptionally(err2);
-                  } else {
-                    result.complete(ret);
-                  }
-
-                });
-          }
-        });
-    return result;
+    return wrap(rawAdmin.split(tableName, splitPoint));
   }
 
   @Override
   public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        HRegionInfo regionInfo = location.getRegionInfo();
-        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
-      });
-    return future;
-  }
-
-  private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
-      Optional<byte[]> splitPoint) {
-    if (hri.getStartKey() != null && splitPoint.isPresent()
-        && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
-      return failedFuture(new IllegalArgumentException(
-          "should not give a splitkey which equals to startkey!"));
-    }
-    return this
-        .<Void> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
-            controller, stub,
-            ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
-            (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
-        .serverName(sn).call();
+    return wrap(rawAdmin.splitRegion(regionName, splitPoint));
   }
 
   @Override
   public CompletableFuture<Void> assign(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.assign(regionName));
   }
 
   @Override
   public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this
-                  .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
-                    RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
-                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.unassign(regionName, forcible));
   }
 
   @Override
   public CompletableFuture<Void> offline(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.offline(regionName));
   }
 
   @Override
   public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildMoveRegionRequest(
-                  regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
-                    .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.move(regionName, destServerName));
   }
 
   @Override
   public CompletableFuture<Void> setQuota(QuotaSettings quota) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
-            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
-            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.setQuota(quota));
   }
 
   @Override
   public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
-    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
-    Scan scan = QuotaTableUtil.makeScan(filter);
-    this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
-        .scan(scan, new RawScanResultConsumer() {
-          List<QuotaSettings> settings = new ArrayList<>();
-
-          @Override
-          public void onNext(Result[] results, ScanController controller) {
-            for (Result result : results) {
-              try {
-                QuotaTableUtil.parseResultToCollection(result, settings);
-              } catch (IOException e) {
-                controller.terminate();
-                future.completeExceptionally(e);
-              }
-            }
-          }
-
-          @Override
-          public void onError(Throwable error) {
-            future.completeExceptionally(error);
-          }
-
-          @Override
-          public void onComplete() {
-            future.complete(settings);
-          }
-        });
-    return future;
-  }
-
-  public CompletableFuture<Void> addReplicationPeer(String peerId,
-      ReplicationPeerConfig peerConfig) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
-                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
-                    done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.getQuota(filter));
+  }
+
+  @Override
+  public CompletableFuture<Void>
+      addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
+    return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
   }
 
   @Override
   public CompletableFuture<Void> removeReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
-                stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
-                (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.removeReplicationPeer(peerId));
   }
 
   @Override
   public CompletableFuture<Void> enableReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
-                stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
-                (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.enableReplicationPeer(peerId));
   }
 
   @Override
   public CompletableFuture<Void> disableReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
-                controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
-                    c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
-        .call();
+    return wrap(rawAdmin.disableReplicationPeer(peerId));
   }
 
+  @Override
   public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
-    return this
-        .<ReplicationPeerConfig> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
-                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
-                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
-                (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+    return wrap(rawAdmin.getReplicationPeerConfig(peerId));
   }
 
   @Override
   public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
       ReplicationPeerConfig peerConfig) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
-                controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
-                  peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
-                    resp) -> null)).call();
+    return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig));
   }
 
   @Override
-  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+  public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return failedFuture(new ReplicationException("tableCfs is null"));
-    }
-
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
-      if (!completeExceptionally(future, error)) {
-        ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
-        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
-          if (!completeExceptionally(future, error)) {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
   }
 
   @Override
-  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+  public CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return failedFuture(new ReplicationException("tableCfs is null"));
-    }
-
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
-      if (!completeExceptionally(future, error)) {
-        try {
-          ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
-        } catch (ReplicationException e) {
-          future.completeExceptionally(e);
-          return;
-        }
-        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
-          if (!completeExceptionally(future, error)) {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs));
   }
 
   @Override
-  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
-    return this
-        .<List<ReplicationPeerDescription>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
-                controller,
-                stub,
-                RequestConverter.buildListReplicationPeersRequest(pattern),
-                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
-                (resp) -> resp.getPeerDescList().stream()
-                    .map(ReplicationSerDeHelper::toReplicationPeerDescription)
-                    .collect(Collectors.toList()))).call();
+  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
+      Optional<Pattern> pattern) {
+    return wrap(rawAdmin.listReplicationPeers(pattern));
   }
 
   @Override
   public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
-    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
-    listTables().whenComplete(
-      (tables, error) -> {
-        if (!completeExceptionally(future, error)) {
-          List<TableCFs> replicatedTableCFs = new ArrayList<>();
-          tables.forEach(table -> {
-            Map<String, Integer> cfs = new HashMap<>();
-            Stream.of(table.getColumnFamilies())
-                .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
-                .forEach(column -> {
-                  cfs.put(column.getNameAsString(), column.getScope());
-                });
-            if (!cfs.isEmpty()) {
-              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
-            }
-          });
-          future.complete(replicatedTableCFs);
-        }
-      });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
-    return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
-      SnapshotType type) {
-    return snapshot(new SnapshotDescription(snapshotName, tableName, type));
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
-    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
-        .createHBaseProtosSnapshotDesc(snapshotDesc);
-    try {
-      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
-    this.<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
-            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
-            resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                  } else if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                  long pauseTime = ConnectionUtils.getPauseTime(
-                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
-                  pauseTime = Math.min(pauseTime, maxPauseTime);
-                  AsyncConnectionImpl.RETRY_TIMER
-                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
-                }
-              } );
-              } else {
-                future.completeExceptionally(new SnapshotCreationException("Snapshot '"
-                    + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
-                    + " ms", snapshotDesc));
-              }
-            }
-          };
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
-    return future;
+    return wrap(rawAdmin.listReplicatedTableCFs());
+  }
+
+  @Override
+  public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
+    return wrap(rawAdmin.snapshot(snapshot));
   }
 
   @Override
   public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
-            controller,
-            stub,
-            IsSnapshotDoneRequest.newBuilder()
-                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
-                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+    return wrap(rawAdmin.isSnapshotFinished(snapshot));
   }
 
   @Override
   public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
-    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
-      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
-      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
-    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+    return wrap(rawAdmin.restoreSnapshot(snapshotName));
   }
 
   @Override
   public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshots(Pattern.compile(snapshotName)).whenComplete(
-      (snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        TableName tableName = null;
-        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
-          for (SnapshotDescription snap : snapshotDescriptions) {
-            if (snap.getName().equals(snapshotName)) {
-              tableName = snap.getTableName();
-              break;
-            }
-          }
-        }
-        if (tableName == null) {
-          future.completeExceptionally(new RestoreSnapshotException(
-              "Unable to find the table name for snapshot=" + snapshotName));
-          return;
-        }
-        final TableName finalTableName = tableName;
-        tableExists(finalTableName)
-            .whenComplete((exists, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else if (!exists) {
-                // if table does not exist, then just clone snapshot into new table.
-              completeConditionalOnFuture(future,
-                internalRestoreSnapshot(snapshotName, finalTableName));
-            } else {
-              isTableDisabled(finalTableName).whenComplete(
-                (disabled, err4) -> {
-                  if (err4 != null) {
-                    future.completeExceptionally(err4);
-                  } else if (!disabled) {
-                    future.completeExceptionally(new TableNotDisabledException(finalTableName));
-                  } else {
-                    completeConditionalOnFuture(future,
-                      restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
-                  }
-                });
-            }
-          } );
-      });
-    return future;
-  }
-
-  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
-      boolean takeFailSafeSnapshot) {
-    if (takeFailSafeSnapshot) {
-      CompletableFuture<Void> future = new CompletableFuture<>();
-      // Step.1 Take a snapshot of the current state
-      String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
-        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
-        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
-      final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
-          .replace("{snapshot.name}", snapshotName)
-          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
-          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
-      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-      snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else {
-          // Step.2 Restore snapshot
-        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
-          if (err2 != null) {
-            // Step.3.a Something went wrong during the restore and try to rollback.
-          internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
-            (void3, err3) -> {
-              if (err3 != null) {
-                future.completeExceptionally(err3);
-              } else {
-                String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
-                    + failSafeSnapshotSnapshotName + " succeeded.";
-                future.completeExceptionally(new RestoreSnapshotException(msg));
-              }
-            });
-        } else {
-          // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
-          LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
-            (ret3, err3) -> {
-              if (err3 != null) {
-                LOG.error(
-                  "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
-                future.completeExceptionally(err3);
-              } else {
-                future.complete(ret3);
-              }
-            });
-        }
-      } );
-      }
-    } );
-      return future;
-    } else {
-      return internalRestoreSnapshot(snapshotName, tableName);
-    }
-  }
-
-  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
-      CompletableFuture<T> parentFuture) {
-    parentFuture.whenComplete((res, err) -> {
-      if (err != null) {
-        dependentFuture.completeExceptionally(err);
-      } else {
-        dependentFuture.complete(res);
-      }
-    });
+    return wrap(rawAdmin.restoreSnapshot(snapshotName, takeFailSafeSnapshot));
   }
 
   @Override
   public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (exists) {
-        future.completeExceptionally(new TableExistsException(tableName));
-      } else {
-        completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
-      }
-    });
-    return future;
-  }
-
-  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
-    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
-        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
-    try {
-      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-    return waitProcedureResult(this
-        .<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
-            controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
-                .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
-                done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
-  }
-
-  @Override
-  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
-    return this
-        .<List<SnapshotDescription>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(
-                controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
-                    done) -> s.getCompletedSnapshots(c, req, done), resp -> resp.getSnapshotsList()
-                    .stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList())))
-        .call();
-  }
-
-  @Override
-  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
-    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listSnapshots()
-        .whenComplete(
-          (snapshotDescList, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> pattern.matcher(snap.getName()).matches())
-                .collect(Collectors.toList()));
-          });
-    return future;
+    return wrap(rawAdmin.cloneSnapshot(snapshotName, tableName));
   }
 
   @Override
-  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
-      Pattern snapshotNamePattern) {
-    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
-      (tableNames, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        if (tableNames == null || tableNames.size() <= 0) {
-          future.complete(Collections.emptyList());
-          return;
-        }
-        listSnapshots(snapshotNamePattern).whenComplete(
-          (snapshotDescList, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
-                .collect(Collectors.toList()));
-          });
-      });
-    return future;
+  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+    return wrap(rawAdmin.listSnapshots(pattern));
   }
 
   @Override
-  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
-    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+      Pattern snapshotNamePattern) {
+    return wrap(rawAdmin.listTableSnapshots(tableNamePattern, snapshotNamePattern));
   }
 
   @Override
-  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
-    return deleteTableSnapshots(null, snapshotNamePattern);
+  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+    return wrap(rawAdmin.deleteSnapshot(snapshotName));
   }
 
   @Override
   public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
       Pattern snapshotNamePattern) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
-      ((snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
-          future.complete(null);
-          return;
-        }
-        List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
-        snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
-            .add(internalDeleteSnapshot(snapDesc)));
-        CompletableFuture.allOf(
-          deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
-            .thenAccept(v -> future.complete(v));
-      }));
-    return future;
-  }
-
-  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
-            controller,
-            stub,
-            DeleteSnapshotRequest.newBuilder()
-                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
-                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+    return wrap(rawAdmin.deleteTableSnapshots(tableNamePattern, snapshotNamePattern));
   }
 
   @Override
   public CompletableFuture<Void> execProcedure(String signature, String instance,
       Map<String, String> props) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    ProcedureDescription procDesc =
-        ProtobufUtil.buildProcedureDescription(signature, instance, props);
-    this.<Long> newMasterCaller()
-        .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
-          controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
-          (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
-        .call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            lon

<TRUNCATED>

Mime
View raw message