hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [03/14] hbase git commit: HBASE-13204 Procedure v2 - client create/delete table sync
Date Thu, 09 Apr 2015 23:21:14 GMT
HBASE-13204 Procedure v2 - client create/delete table sync


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5dccd9c5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5dccd9c5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5dccd9c5

Branch: refs/heads/hbase-12439
Commit: 5dccd9c534d4235cf76def5fc07675d601661cbf
Parents: aa934f8
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Thu Apr 9 21:01:20 2015 +0100
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Fri Apr 10 00:20:22 2015 +0100

----------------------------------------------------------------------
 .../hbase/client/ConnectionImplementation.java  |    6 +
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |  608 ++++-
 .../hbase/client/TestProcedureFuture.java       |  186 ++
 .../hbase/protobuf/generated/MasterProtos.java  | 2576 +++++++++++++++---
 hbase-protocol/src/main/protobuf/Master.proto   |   24 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   12 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |   51 +-
 .../hadoop/hbase/master/MasterServices.java     |    4 +-
 .../master/procedure/DeleteTableProcedure.java  |    1 +
 .../hadoop/hbase/master/TestCatalogJanitor.java |    7 +-
 10 files changed, 2921 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5dccd9c5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8442a77..bc2d51a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1598,6 +1598,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable
{
       }
 
       @Override
+      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
+          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
+        return stub.getProcedureResult(controller, request);
+      }
+
+      @Override
       public MasterProtos.IsMasterRunningResponse isMasterRunning(
           RpcController controller, MasterProtos.IsMasterRunningRequest request)
           throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dccd9c5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 21a9139..1697c03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -31,6 +31,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
@@ -89,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
@@ -101,6 +108,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResp
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@@ -142,6 +151,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -186,6 +196,7 @@ public class HBaseAdmin implements Admin {
   // numRetries is for 'normal' stuff... Multiply by this factor when
   // want to wait a long time.
   private final int retryLongerMultiplier;
+  private final int syncWaitTimeout;
   private boolean aborted;
   private boolean cleanupConnectionOnClose = false; // close the connection in close()
   private boolean closed = false;
@@ -242,6 +253,8 @@ public class HBaseAdmin implements Admin {
         "hbase.client.retries.longer.multiplier", 10);
     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+    this.syncWaitTimeout = this.conf.getInt(
+      "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
   }
@@ -541,92 +554,23 @@ public class HBaseAdmin implements Admin {
    */
   @Override
   public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
-  throws IOException {
+      throws IOException {
+    Future<Void> future = createTableAsyncV2(desc, splitKeys);
     try {
-      createTableAsync(desc, splitKeys);
-    } catch (SocketTimeoutException ste) {
-      LOG.warn("Creating " + desc.getTableName() + " took too long", ste);
-    }
-    int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
-    int prevRegCount = 0;
-    boolean tableWasEnabled = false;
-    for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier;
-      ++tries) {
-      if (tableWasEnabled) {
-        // Wait all table regions comes online
-        final AtomicInteger actualRegCount = new AtomicInteger(0);
-        MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
-          @Override
-          public boolean visit(Result rowResult) throws IOException {
-            RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
-            if (list == null) {
-              LOG.warn("No serialized HRegionInfo in " + rowResult);
-              return true;
-            }
-            HRegionLocation l = list.getRegionLocation();
-            if (l == null) {
-              return true;
-            }
-            if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
-              return false;
-            }
-            if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
-            HRegionLocation[] locations = list.getRegionLocations();
-            for (HRegionLocation location : locations) {
-              if (location == null) continue;
-              ServerName serverName = location.getServerName();
-              // Make sure that regions are assigned to server
-              if (serverName != null && serverName.getHostAndPort() != null) {
-                actualRegCount.incrementAndGet();
-              }
-            }
-            return true;
-          }
-        };
-        MetaTableAccessor.scanMetaForTableRegions(connection, visitor, desc.getTableName());
-        if (actualRegCount.get() < numRegs) {
-          if (tries == this.numRetries * this.retryLongerMultiplier - 1) {
-            throw new RegionOfflineException("Only " + actualRegCount.get() +
-              " of " + numRegs + " regions are online; retries exhausted.");
-          }
-          try { // Sleep
-            Thread.sleep(getPauseTime(tries));
-          } catch (InterruptedException e) {
-            throw new InterruptedIOException("Interrupted when opening" +
-              " regions; " + actualRegCount.get() + " of " + numRegs +
-              " regions processed so far");
-          }
-          if (actualRegCount.get() > prevRegCount) { // Making progress
-            prevRegCount = actualRegCount.get();
-            tries = -1;
-          }
-        } else {
-          return;
-        }
+      // TODO: how long should we wait? spin forever?
+      future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted when waiting" +
+          " for table to be enabled; meta scan was done");
+    } catch (TimeoutException e) {
+      throw new TimeoutIOException(e);
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException)e.getCause();
       } else {
-        try {
-          tableWasEnabled = isTableAvailable(desc.getTableName());
-        } catch (TableNotFoundException tnfe) {
-          LOG.debug(
-              "Table " + desc.getTableName() + " was not enabled, sleeping, still " + numRetries
-                  + " retries left");
-        }
-        if (tableWasEnabled) {
-          // no we will scan meta to ensure all regions are online
-          tries = -1;
-        } else {
-          try { // Sleep
-            Thread.sleep(getPauseTime(tries));
-          } catch (InterruptedException e) {
-            throw new InterruptedIOException("Interrupted when waiting" +
-                " for table to be enabled; meta scan was done");
-          }
-        }
+        throw new IOException(e.getCause());
       }
     }
-    throw new TableNotEnabledException(
-      "Retries exhausted while still waiting for table: "
-      + desc.getTableName() + " to be enabled");
   }
 
   /**
@@ -646,22 +590,42 @@ public class HBaseAdmin implements Admin {
    * @throws IOException
    */
   @Override
-  public void createTableAsync(
-    final HTableDescriptor desc, final byte [][] splitKeys)
-  throws IOException {
-    if(desc.getTableName() == null) {
+  public void createTableAsync(final HTableDescriptor desc, final byte [][] splitKeys)
+      throws IOException {
+    createTableAsyncV2(desc, splitKeys);
+  }
+
+  /**
+   * Creates a new table but does not block and wait for it to come online.
+   * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
+   * It may throw ExecutionException if there was an error while executing the operation
+   * or TimeoutException in case the wait timeout was not long enough to allow the
+   * operation to complete.
+   *
+   * @param desc table descriptor for table
+   * @param splitKeys keys to check if the table has been created with all split keys
+   * @throws IllegalArgumentException Bad table name, if the split keys
+   *    are repeated and if the split key has empty byte array.
+   * @throws IOException if a remote or network exception occurs
+   * @return the result of the async creation. You can use Future.get(long, TimeUnit)
+   *    to wait on the operation to complete.
+   */
+  // TODO: This should be called Async but it will break binary compatibility
+  private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][]
splitKeys)
+      throws IOException {
+    if (desc.getTableName() == null) {
       throw new IllegalArgumentException("TableName cannot be null");
     }
-    if(splitKeys != null && splitKeys.length > 0) {
+    if (splitKeys != null && splitKeys.length > 0) {
       Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
       // Verify there are no duplicate split keys
-      byte [] lastKey = null;
-      for(byte [] splitKey : splitKeys) {
+      byte[] lastKey = null;
+      for (byte[] splitKey : splitKeys) {
         if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
           throw new IllegalArgumentException(
               "Empty split key must not be passed in the split keys.");
         }
-        if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
+        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
           throw new IllegalArgumentException("All split keys must be unique, " +
             "found duplicate: " + Bytes.toStringBinary(splitKey) +
             ", " + Bytes.toStringBinary(lastKey));
@@ -670,14 +634,127 @@ public class HBaseAdmin implements Admin {
       }
     }
 
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+    CreateTableResponse response = executeCallable(
+        new MasterCallable<CreateTableResponse>(getConnection()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
+      public CreateTableResponse call(int callTimeout) throws ServiceException {
         CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
-        master.createTable(null, request);
-        return null;
+        return master.createTable(null, request);
       }
     });
+    return new CreateTableFuture(this, desc, splitKeys, response);
+  }
+
+  private static class CreateTableFuture extends ProcedureFuture<Void> {
+    private final HTableDescriptor desc;
+    private final byte[][] splitKeys;
+
+    public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
+        final byte[][] splitKeys, final CreateTableResponse response) {
+      super(admin, (response != null && response.hasProcId()) ? response.getProcId()
: null);
+      this.splitKeys = splitKeys;
+      this.desc = desc;
+    }
+
+    @Override
+    protected Void waitOperationResult(final long deadlineTs)
+        throws IOException, TimeoutException {
+      waitForTableEnabled(deadlineTs);
+      waitForAllRegionsOnline(deadlineTs);
+      return null;
+    }
+
+    @Override
+    protected Void postOperationResult(final Void result, final long deadlineTs)
+        throws IOException, TimeoutException {
+      LOG.info("Created " + desc.getTableName());
+      return result;
+    }
+
+    private void waitForTableEnabled(final long deadlineTs)
+        throws IOException, TimeoutException {
+      waitForState(deadlineTs, new WaitForStateCallable() {
+        @Override
+        public boolean checkState(int tries) throws IOException {
+          try {
+            if (getAdmin().isTableAvailable(desc.getTableName())) {
+              return true;
+            }
+          } catch (TableNotFoundException tnfe) {
+            LOG.debug("Table "+ desc.getTableName() +" was not enabled, sleeping. tries="+
 tries);
+          }
+          return false;
+        }
+
+        @Override
+        public void throwInterruptedException() throws InterruptedIOException {
+          throw new InterruptedIOException("Interrupted when waiting for table " +
+              desc.getTableName() + " to be enabled");
+        }
+
+        @Override
+        public void throwTimeoutException(long elapsedTime) throws TimeoutException {
+          throw new TimeoutException("Table " + desc.getTableName() +
+            " not enabled after " + elapsedTime + "msec");
+        }
+      });
+    }
+
+    private void waitForAllRegionsOnline(final long deadlineTs)
+        throws IOException, TimeoutException {
+      final AtomicInteger actualRegCount = new AtomicInteger(0);
+      final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
+        @Override
+        public boolean visit(Result rowResult) throws IOException {
+          RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
+          if (list == null) {
+            LOG.warn("No serialized HRegionInfo in " + rowResult);
+            return true;
+          }
+          HRegionLocation l = list.getRegionLocation();
+          if (l == null) {
+            return true;
+          }
+          if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
+            return false;
+          }
+          if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
+          HRegionLocation[] locations = list.getRegionLocations();
+          for (HRegionLocation location : locations) {
+            if (location == null) continue;
+            ServerName serverName = location.getServerName();
+            // Make sure that regions are assigned to server
+            if (serverName != null && serverName.getHostAndPort() != null) {
+              actualRegCount.incrementAndGet();
+            }
+          }
+          return true;
+        }
+      };
+
+      int tries = 0;
+      IOException serverEx = null;
+      int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
+      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+        actualRegCount.set(0);
+        MetaTableAccessor.scanMetaForTableRegions(
+          getAdmin().getConnection(), visitor, desc.getTableName());
+        if (actualRegCount.get() == numRegs) {
+          // all the regions are online
+          return;
+        }
+
+        try {
+          Thread.sleep(getAdmin().getPauseTime(tries++));
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException("Interrupted when opening" +
+            " regions; " + actualRegCount.get() + " of " + numRegs +
+            " regions processed so far");
+        }
+      }
+      throw new TimeoutException("Only " + actualRegCount.get() +
+              " of " + numRegs + " regions are online; retries exhausted.");
+    }
   }
 
   public void deleteTable(final String tableName) throws IOException {
@@ -697,48 +774,93 @@ public class HBaseAdmin implements Admin {
    */
   @Override
   public void deleteTable(final TableName tableName) throws IOException {
-    boolean tableExists = true;
+    Future<Void> future = deleteTableAsyncV2(tableName);
+    try {
+      future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
+    } catch (TimeoutException e) {
+      throw new TimeoutIOException(e);
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException)e.getCause();
+      } else {
+        throw new IOException(e.getCause());
+      }
+    }
+  }
 
-    executeCallable(new MasterCallable<Void>(getConnection()) {
+  /**
+   * Deletes the table but does not block and wait for it be completely removed.
+   * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
+   * It may throw ExecutionException if there was an error while executing the operation
+   * or TimeoutException in case the wait timeout was not long enough to allow the
+   * operation to complete.
+   *
+   * @param desc table descriptor for table
+   * @param tableName name of table to delete
+   * @throws IOException if a remote or network exception occurs
+   * @return the result of the async delete. You can use Future.get(long, TimeUnit)
+   *    to wait on the operation to complete.
+   */
+  // TODO: This should be called Async but it will break binary compatibility
+  private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException
{
+    DeleteTableResponse response = executeCallable(
+        new MasterCallable<DeleteTableResponse>(getConnection()) {
       @Override
-      public Void call(int callTimeout) throws ServiceException {
+      public DeleteTableResponse call(int callTimeout) throws ServiceException {
         DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
-        master.deleteTable(null,req);
-        return null;
+        return master.deleteTable(null,req);
       }
     });
+    return new DeleteTableFuture(this, tableName, response);
+  }
 
-    int failures = 0;
-    for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++)
{
-      try {
-        tableExists = tableExists(tableName);
-        if (!tableExists)
-          break;
-      } catch (IOException ex) {
-        failures++;
-        if(failures >= numRetries - 1) {           // no more tries left
-          if (ex instanceof RemoteException) {
-            throw ((RemoteException) ex).unwrapRemoteException();
-          } else {
-            throw ex;
-          }
-        }
-      }
-      try {
-        Thread.sleep(getPauseTime(tries));
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("Interrupted when waiting" +
-            " for table to be deleted");
-      }
+  private static class DeleteTableFuture extends ProcedureFuture<Void> {
+    private final TableName tableName;
+
+    public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
+        final DeleteTableResponse response) {
+      super(admin, (response != null && response.hasProcId()) ? response.getProcId()
: null);
+      this.tableName = tableName;
+    }
+
+    @Override
+    protected Void waitOperationResult(final long deadlineTs)
+        throws IOException, TimeoutException {
+      waitTableNotFound(deadlineTs);
+      return null;
+    }
+
+    @Override
+    protected Void postOperationResult(final Void result, final long deadlineTs)
+        throws IOException, TimeoutException {
+      // Delete cached information to prevent clients from using old locations
+      getAdmin().getConnection().clearRegionCache(tableName);
+      LOG.info("Deleted " + tableName);
+      return result;
     }
 
-    if (tableExists) {
-      throw new IOException("Retries exhausted, it took too long to wait"+
-        " for the table " + tableName + " to be deleted.");
+    private void waitTableNotFound(final long deadlineTs)
+        throws IOException, TimeoutException {
+      waitForState(deadlineTs, new WaitForStateCallable() {
+        @Override
+        public boolean checkState(int tries) throws IOException {
+          return !getAdmin().tableExists(tableName);
+        }
+
+        @Override
+        public void throwInterruptedException() throws InterruptedIOException {
+          throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
+        }
+
+        @Override
+        public void throwTimeoutException(long elapsedTime) throws TimeoutException {
+          throw new TimeoutException("Table " + tableName + " not yet deleted after " +
+              elapsedTime + "msec");
+        }
+      });
     }
-    // Delete cached information to prevent clients from using old locations
-    this.connection.clearRegionCache(tableName);
-    LOG.info("Deleted " + tableName);
   }
 
   /**
@@ -3834,4 +3956,236 @@ public class HBaseAdmin implements Admin {
       }
     });
   }
+
+  /**
+   * Future that waits on a procedure result.
+   * Returned by the async version of the Admin calls,
+   * and used internally by the sync calls to wait on the result of the procedure.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Evolving
+  protected static class ProcedureFuture<V> implements Future<V> {
+    private ExecutionException exception = null;
+    private boolean procResultFound = false;
+    private boolean done = false;
+    private V result = null;
+
+    private final HBaseAdmin admin;
+    private final Long procId;
+
+    public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
+      this.admin = admin;
+      this.procId = procId;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isCancelled() {
+      // TODO: Abort not implemented yet
+      return false;
+    }
+
+    @Override
+    public V get() throws InterruptedException, ExecutionException {
+      // TODO: should we ever spin forever?
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public V get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      if (!done) {
+        long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
+        try {
+          try {
+            // if the master support procedures, try to wait the result
+            if (procId != null) {
+              result = waitProcedureResult(procId, deadlineTs);
+            }
+            // if we don't have a proc result, try the compatibility wait
+            if (!procResultFound) {
+              result = waitOperationResult(deadlineTs);
+            }
+            result = postOperationResult(result, deadlineTs);
+            done = true;
+          } catch (IOException e) {
+            result = postOpeartionFailure(e, deadlineTs);
+            done = true;
+          }
+        } catch (IOException e) {
+          exception = new ExecutionException(e);
+          done = true;
+        }
+      }
+      if (exception != null) {
+        throw exception;
+      }
+      return result;
+    }
+
+    @Override
+    public boolean isDone() {
+      return done;
+    }
+
+    protected HBaseAdmin getAdmin() {
+      return admin;
+    }
+
+    private V waitProcedureResult(long procId, long deadlineTs)
+        throws IOException, TimeoutException, InterruptedException {
+      GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
+          .setProcId(procId)
+          .build();
+
+      int tries = 0;
+      IOException serviceEx = null;
+      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+        GetProcedureResultResponse response = null;
+        try {
+          // Try to fetch the result
+          response = getProcedureResult(request);
+        } catch (IOException e) {
+          serviceEx = unwrapException(e);
+
+          // the master may be down
+          LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
+
+          // Not much to do, if we have a DoNotRetryIOException
+          if (serviceEx instanceof DoNotRetryIOException) {
+            // TODO: looks like there is no way to unwrap this exception and get the proper
+            // UnsupportedOperationException aside from looking at the message.
+            // anyway, if we fail here we just failover to the compatibility side
+            // and that is always a valid solution.
+            LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(),
serviceEx);
+            procResultFound = false;
+            return null;
+          }
+        }
+
+        // If the procedure is no longer running, we should have a result
+        if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING)
{
+          procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
+          return convertResult(response);
+        }
+
+        try {
+          Thread.sleep(getAdmin().getPauseTime(tries++));
+        } catch (InterruptedException e) {
+          throw new InterruptedException(
+            "Interrupted while waiting for the result of proc " + procId);
+        }
+      }
+      if (serviceEx != null) {
+        throw serviceEx;
+      } else {
+        throw new TimeoutException("The procedure " + procId + " is still running");
+      }
+    }
+
+    private static IOException unwrapException(IOException e) {
+      if (e instanceof RemoteException) {
+        return ((RemoteException)e).unwrapRemoteException();
+      }
+      return e;
+    }
+
+    protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest
request)
+        throws IOException {
+      return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
+          admin.getConnection()) {
+        @Override
+        public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
+          return master.getProcedureResult(null, request);
+        }
+      });
+    }
+
+    /**
+     * Convert the procedure result response to a specified type.
+     * @param response the procedure result object to parse
+     * @return the result data of the procedure.
+     */
+    protected V convertResult(final GetProcedureResultResponse response) throws IOException
{
+      if (response.hasException()) {
+        throw ForeignExceptionUtil.toIOException(response.getException());
+      }
+      return null;
+    }
+
+    /**
+     * Fallback implementation in case the procedure is not supported by the server.
+     * It should try to wait until the operation is completed.
+     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+     * @return the result data of the operation
+     */
+    protected V waitOperationResult(final long deadlineTs)
+        throws IOException, TimeoutException {
+      return null;
+    }
+
+    /**
+     * Called after the operation is completed and the result fetched.
+     * this allows to perform extra steps after the procedure is completed.
+     * it allows to apply transformations to the result that will be returned by get().
+     * @param result the result of the procedure
+     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+     * @return the result of the procedure, which may be the same as the passed one
+     */
+    protected V postOperationResult(final V result, final long deadlineTs)
+        throws IOException, TimeoutException {
+      return result;
+    }
+
+    /**
+     * Called after the operation is terminated with a failure.
+     * this allows to perform extra steps after the procedure is terminated.
+     * it allows to apply transformations to the result that will be returned by get().
+     * The default implementation will rethrow the exception
+     * @param result the result of the procedure
+     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+     * @return the result of the procedure, which may be the same as the passed one
+     */
+    protected V postOpeartionFailure(final IOException exception, final long deadlineTs)
+        throws IOException, TimeoutException {
+      throw exception;
+    }
+
+    protected interface WaitForStateCallable {
+      boolean checkState(int tries) throws IOException;
+      void throwInterruptedException() throws InterruptedIOException;
+      void throwTimeoutException(long elapsed) throws TimeoutException;
+    }
+
+    protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
+        throws IOException, TimeoutException {
+      int tries = 0;
+      IOException serverEx = null;
+      long startTime = EnvironmentEdgeManager.currentTime();
+      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+        serverEx = null;
+        try {
+          if (callable.checkState(tries)) {
+            return;
+          }
+        } catch (IOException e) {
+          serverEx = e;
+        }
+        try {
+          Thread.sleep(getAdmin().getPauseTime(tries++));
+        } catch (InterruptedException e) {
+          callable.throwInterruptedException();
+        }
+      }
+      if (serverEx != null) {
+        throw unwrapException(serverEx);
+      } else {
+        callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dccd9c5/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
new file mode 100644
index 0000000..da3ffe9
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({ClientTests.class, SmallTests.class})
+public class TestProcedureFuture {
+  private static class TestFuture extends HBaseAdmin.ProcedureFuture<Void> {
+    private boolean postOperationResultCalled = false;
+    private boolean waitOperationResultCalled = false;
+    private boolean getProcedureResultCalled = false;
+    private boolean convertResultCalled = false;
+
+    public TestFuture(final HBaseAdmin admin, final Long procId) {
+      super(admin, procId);
+    }
+
+    public boolean wasPostOperationResultCalled() {
+      return postOperationResultCalled;
+    }
+
+    public boolean wasWaitOperationResultCalled() {
+      return waitOperationResultCalled;
+    }
+
+    public boolean wasGetProcedureResultCalled() {
+      return getProcedureResultCalled;
+    }
+
+    public boolean wasConvertResultCalled() {
+      return convertResultCalled;
+    }
+
+    @Override
+    protected GetProcedureResultResponse getProcedureResult(
+        final GetProcedureResultRequest request) throws IOException {
+      getProcedureResultCalled = true;
+      return GetProcedureResultResponse.newBuilder()
+              .setState(GetProcedureResultResponse.State.FINISHED)
+              .build();
+    }
+
+    @Override
+    protected Void convertResult(final GetProcedureResultResponse response) throws IOException
{
+      convertResultCalled = true;
+      return null;
+    }
+
+    @Override
+    protected Void waitOperationResult(final long deadlineTs)
+        throws IOException, TimeoutException {
+      waitOperationResultCalled = true;
+      return null;
+    }
+
+    @Override
+    protected Void postOperationResult(final Void result, final long deadlineTs)
+        throws IOException, TimeoutException {
+      postOperationResultCalled = true;
+      return result;
+    }
+  }
+
+  /**
+   * When a master return a result with procId,
+   * we are skipping the waitOperationResult() call,
+   * since we are getting the procedure result.
+   */
+  @Test(timeout=60000)
+  public void testWithProcId() throws Exception {
+    HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+    TestFuture f = new TestFuture(admin, 100L);
+    f.get(1, TimeUnit.MINUTES);
+
+    assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
+    assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
+    assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
+    assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+  }
+
+  /**
+   * Verify that the spin loop for the procedure running works.
+   */
+  @Test(timeout=60000)
+  public void testWithProcIdAndSpinning() throws Exception {
+    final AtomicInteger spinCount = new AtomicInteger(0);
+    HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+    TestFuture f = new TestFuture(admin, 100L) {
+      @Override
+      protected GetProcedureResultResponse getProcedureResult(
+          final GetProcedureResultRequest request) throws IOException {
+        boolean done = spinCount.incrementAndGet() >= 10;
+        return GetProcedureResultResponse.newBuilder()
+              .setState(done ? GetProcedureResultResponse.State.FINISHED :
+                GetProcedureResultResponse.State.RUNNING)
+              .build();
+      }
+    };
+    f.get(1, TimeUnit.MINUTES);
+
+    assertEquals(10, spinCount.get());
+    assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
+    assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
+    assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+  }
+
+  /**
+   * When a master return a result without procId,
+   * we are skipping the getProcedureResult() call.
+   */
+  @Test(timeout=60000)
+  public void testWithoutProcId() throws Exception {
+    HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+    TestFuture f = new TestFuture(admin, null);
+    f.get(1, TimeUnit.MINUTES);
+
+    assertFalse("unexpected getProcedureResult() called", f.wasGetProcedureResultCalled());
+    assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
+    assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
+    assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+  }
+
+  /**
+   * When a new client with procedure support tries to ask an old-master without proc-support
+   * the procedure result we get a DoNotRetryIOException (which is an UnsupportedOperationException)
+   * The future should trap that and fallback to the waitOperationResult().
+   *
+   * This happens when the operation calls happens on a "new master" but while we are waiting
+   * the operation to be completed, we failover on an "old master".
+   */
+  @Test(timeout=60000)
+  public void testOnServerWithNoProcedureSupport() throws Exception {
+    HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+    TestFuture f = new TestFuture(admin, 100L) {
+      @Override
+      protected GetProcedureResultResponse getProcedureResult(
+        final GetProcedureResultRequest request) throws IOException {
+        super.getProcedureResult(request);
+        throw new DoNotRetryIOException(new UnsupportedOperationException("getProcedureResult"));
+      }
+    };
+    f.get(1, TimeUnit.MINUTES);
+
+    assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
+    assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
+    assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
+    assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+  }
+}
\ No newline at end of file


Mime
View raw message