hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1496585 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/master/ main/ruby/hbase/ main/ruby/shell/comman...
Date Tue, 25 Jun 2013 18:51:49 GMT
Author: liyin
Date: Tue Jun 25 18:51:49 2013
New Revision: 1496585

URL: http://svn.apache.org/r1496585
Log:
[HBASE-8775] Removed RegionManager blocking caused by sleep of ThrottledRegionReopener

Author: shaneh

Summary:
Created runable to kick off retry when wait interval
prevented ThrottledRegionReopener from making progress.

Test Plan:
Tested locally on dev machine. Also will push to tsh025
to test there.

Reviewers: liyintang, nspiegelberg, manukranthk

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D775422

Task ID: 2178363

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
    hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb
    hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb
    hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun 25
18:51:49 2013
@@ -108,6 +108,27 @@ public final class HConstants {
   public static final int DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS =
     60 * 1000;
 
+  /** Configuration key for time out for schema modification try lock */
+  public static final String MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS =
+      "hbase.master.schemaChanges.trylock.timeout.ms";
+
+  public static final int DEFAULT_MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS =
+      5 * 1000;
+
+  /** Configuration key for for schema modification wait interval. */
+  public static final String MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS =
+      "hbase.regionserver.alterTable.waitInterval.ms";
+
+  public static final int DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS =
+      1000;
+
+  /** Configuration key for for schema modification max concurrent regions closed. */
+  public static final String MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE =
+      "hbase.regionserver.alterTable.maxConcurrentClose";
+
+  public static final int DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE =
+      5;
+
   /** Name of ZooKeeper quorum configuration parameter. */
   public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
 
@@ -231,7 +252,6 @@ public final class HConstants {
   public static final boolean HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT =
     true;
 
-
   /** Used to construct the name of the compaction directory during compaction */
   public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir";
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue
Jun 25 18:51:49 2013
@@ -605,25 +605,30 @@ public class HBaseAdmin {
   }
 
   /**
-   * Sets the wait interval to wait between closing and reopening of Regions.
+   * Batch alter a table. Only takes regions offline once and performs a single
+   * update to .META.
    * Asynchronous operation.
    *
-   * @param waitInterval The interval in milliseconds to wait
-   *                     in between closing and reopening regions
+   * @param tableName name of the table to add column to
+   * @param columnAdditions column descriptors to add to the table
+   * @param columnModifications pairs of column names with new descriptors
+   * @param columnDeletions column names to delete from the table
+   * @throws IOException if a remote or network exception occurs
    */
-  public void setCloseRegionWaitInterval(final String tableName, int waitInterval) {
-    this.master.setCloseRegionWaitInterval(tableName, waitInterval);
-  }
+  public void alterTable(final String tableName,
+      List<HColumnDescriptor> columnAdditions,
+      List<Pair<String, HColumnDescriptor>> columnModifications,
+      List<String> columnDeletions) throws IOException {
 
-  /**
-   * Sets the number of Regions to close concurrently.
-   * Asynchronous operation.
-   *
-   * @param numConcurrentClose The number of Regions to close at the same time
-   *
-   */
-  public void setNumConcurrentCloseRegions(final String tableName, int numConcurrentClose)
{
-    this.master.setNumConcurrentCloseRegions(tableName, numConcurrentClose);
+    //Use default values since none were specified
+    int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS);
+
+    int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE);
+
+    alterTable(tableName, columnAdditions, columnModifications,
+        columnDeletions, waitInterval, maxClosedRegions);
   }
 
   /**
@@ -635,29 +640,35 @@ public class HBaseAdmin {
    * @param columnAdditions column descriptors to add to the table
    * @param columnModifications pairs of column names with new descriptors
    * @param columnDeletions column names to delete from the table
+   * @param waitInterval the interval of time to spread the close of
+   *                     numConcurrentRegionsClosed over
+   * @param maxConcurrentRegionsClosed the max number of regions to have closed
+   *                                   at a time.
    * @throws IOException if a remote or network exception occurs
    */
   public void alterTable(final String tableName,
-      List<HColumnDescriptor> columnAdditions,
-      List<Pair<String, HColumnDescriptor>> columnModifications,
-      List<String> columnDeletions) throws IOException {
+                         List<HColumnDescriptor> columnAdditions,
+                         List<Pair<String, HColumnDescriptor>> columnModifications,
+                         List<String> columnDeletions,
+                         int waitInterval,
+                         int maxConcurrentRegionsClosed) throws IOException {
     // convert all of the strings to bytes and pass to the bytes method
-    List<Pair<byte [], HColumnDescriptor>> modificationsBytes = 
-      new ArrayList<Pair<byte [], HColumnDescriptor>>(
-          columnModifications.size());
-    List<byte []> deletionsBytes = 
-      new ArrayList<byte []>(columnDeletions.size());
+    List<Pair<byte [], HColumnDescriptor>> modificationsBytes =
+        new ArrayList<Pair<byte [], HColumnDescriptor>>(
+            columnModifications.size());
+    List<byte []> deletionsBytes =
+        new ArrayList<byte []>(columnDeletions.size());
 
     for(Pair<String, HColumnDescriptor> c : columnModifications) {
       modificationsBytes.add(new Pair<byte [], HColumnDescriptor>(
-            Bytes.toBytes(c.getFirst()), c.getSecond()));
+          Bytes.toBytes(c.getFirst()), c.getSecond()));
     }
     for(String c : columnDeletions) {
       deletionsBytes.add(Bytes.toBytes(c));
     }
 
     alterTable(Bytes.toBytes(tableName), columnAdditions, modificationsBytes,
-        deletionsBytes);
+        deletionsBytes, waitInterval, maxConcurrentRegionsClosed);
   }
 
   /**
@@ -680,10 +691,45 @@ public class HBaseAdmin {
     if (this.master == null) {
       throw new MasterNotRunningException("master has been shut down");
     }
+    int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS);
+
+    int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE);
+
+    alterTable(tableName, columnAdditions, columnModifications, columnDeletions, waitInterval,
maxClosedRegions);
+  }
+
+  /**
+   * Batch alter a table. Only takes regions offline once and performs a single
+   * update to .META.
+   * Any of the three lists can be null, in which case those types of
+   * alterations will be ignored.
+   * Asynchronous operation.
+   *
+   * @param tableName name of the table to add column to
+   * @param columnAdditions column descriptors to add to the table
+   * @param columnModifications pairs of column names with new descriptors
+   * @param columnDeletions column names to delete from the table
+   * @param waitInterval the interval of time to spread the close of
+   *                     numConcurrentRegionsClosed over
+   * @param maxConcurrentRegionsClosed the max number of regions to have closed
+   *                                   at a time.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public void alterTable(final byte [] tableName,
+                         List<HColumnDescriptor> columnAdditions,
+                         List<Pair<byte[], HColumnDescriptor>> columnModifications,
+                         List<byte[]> columnDeletions,
+                         int waitInterval,
+                         int maxConcurrentRegionsClosed) throws IOException {
+    if (this.master == null) {
+      throw new MasterNotRunningException("master has been shut down");
+    }
     HTableDescriptor.isLegalTableName(tableName);
     try {
       this.master.alterTable(tableName, columnAdditions, columnModifications,
-          columnDeletions);
+          columnDeletions, waitInterval, maxConcurrentRegionsClosed);
     } catch (RemoteException e) {
       throw RemoteExceptionHandler.decodeRemoteException(e);
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
Tue Jun 25 18:51:49 2013
@@ -81,6 +81,24 @@ public interface HMasterInterface extend
                          List<byte []> columnDeletions) throws IOException;
 
   /**
+   * Batch adds, modifies, and deletes columns from the specified table.
+   * Any of the lists may be null, in which case those types of alterations
+   * will not occur.
+   *
+   * @param tableName table to modify
+   * @param columnAdditions column descriptors to add to the table
+   * @param columnModifications pairs of column names with new descriptors
+   * @param columnDeletions column names to delete from the table
+   * @throws IOException e
+   */
+  public void alterTable(final byte [] tableName,
+                         List<HColumnDescriptor> columnAdditions,
+                         List<Pair<byte [], HColumnDescriptor>> columnModifications,
+                         List<byte []> columnDeletions,
+                         int waitInterval,
+                         int numConcurentRegionsClosed) throws IOException;
+
+  /**
    * Adds a column to the specified table
    * @param tableName table to modify
    * @param column column descriptor
@@ -136,22 +154,6 @@ public interface HMasterInterface extend
     throws IOException;
 
   /**
-   * Set wait interval for close and reopen of regions
-   *
-   * @param tableName table to modify
-   * @param waitInterval the interval to wait in milliseconds
-   */
-  public void setCloseRegionWaitInterval(String tableName, int waitInterval);
-
-  /**
-   * Set the number of Regions to close concurrently.
-   *
-   * @param tableName table to modify
-   * @param numConcurrentClose the number of Regions to close at the same time.
-   */
-  void setNumConcurrentCloseRegions(String tableName, int numConcurrentClose);
-
-  /**
    * Shutdown an HBase cluster.
    * @throws IOException e
    */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Jun
25 18:51:49 2013
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.MiniZooKe
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.StopStatus;
 import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableLockTimeoutException;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.MetaScanner;
@@ -257,6 +258,8 @@ public class HMaster extends HasThread i
   private ConfigurationManager configurationManager =
       new ConfigurationManager();
   
+  private long schemaChangeTryLockTimeoutMs;
+
   /**
    * Constructor
    * @param conf configuration
@@ -386,6 +389,11 @@ public class HMaster extends HasThread i
         HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS);
       tableLockManager = new TableLockManager(zooKeeperWrapper,
         address, schemaChangeLockTimeoutMs);
+
+      this.schemaChangeTryLockTimeoutMs = conf.getInt(
+          HConstants.MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS,
+          HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS);
+
     } else {
       tableLockManager = null;
     }
@@ -1372,6 +1380,22 @@ public class HMaster extends HasThread i
     return tableLockManager != null;
   }
 
+  public boolean isTableLocked(byte[] tableName) {
+    if (isTableLockEnabled()) {
+      return tableLockManager.isTableLocked(tableName);
+    }
+    return false;
+  }
+
+
+  protected boolean tryLockTable(byte[] tableName, String purpose, long timeout)
+      throws IOException {
+    if (isTableLockEnabled()) {
+      return tableLockManager.tryLockTable(tableName, purpose, timeout);
+    }
+    return false;
+  }
+
   protected void lockTable(byte[] tableName, String purpose)
   throws IOException {
     if (isTableLockEnabled()) {
@@ -1549,19 +1573,37 @@ public class HMaster extends HasThread i
   public void alterTable(final byte [] tableName,
                          List<HColumnDescriptor> columnAdditions,
                          List<Pair<byte [], HColumnDescriptor>> columnModifications,
+                         List<byte []> columnDeletions,
+                         int waitInterval,
+                         int maxConcurrentRegionsClosed) throws IOException {
+    //This lock will be released when the ThrottledRegionReopener is done.
+    if (!tryLockTable(tableName, "alter", schemaChangeTryLockTimeoutMs)) {
+      throw new TableLockTimeoutException("Timed out acquiring " +
+          "lock for " + Bytes.toString(tableName) + " after " + schemaChangeTryLockTimeoutMs
+ " ms.");
+    }
+
+    InjectionHandler.processEvent(InjectionEvent.HMASTER_ALTER_TABLE);
+    ThrottledRegionReopener reopener = this.regionManager.
+        createThrottledReopener(Bytes.toString(tableName), waitInterval, maxConcurrentRegionsClosed);
+    // Regions are added to the reopener in MultiColumnOperation
+    new MultiColumnOperation(this, tableName, columnAdditions,
+        columnModifications, columnDeletions).process();
+    reopener.startRegionsReopening();
+  }
+
+  @Override
+  public void alterTable(final byte [] tableName,
+                         List<HColumnDescriptor> columnAdditions,
+                         List<Pair<byte [], HColumnDescriptor>> columnModifications,
                          List<byte []> columnDeletions) throws IOException {
-    lockTable(tableName, "alter");
-    try {
-      InjectionHandler.processEvent(InjectionEvent.HMASTER_ALTER_TABLE);
-      ThrottledRegionReopener reopener = this.regionManager.
-        createThrottledReopener(Bytes.toString(tableName));
-      // Regions are added to the reopener in MultiColumnOperation
-      new MultiColumnOperation(this, tableName, columnAdditions,
-                               columnModifications, columnDeletions).process();
-      reopener.reOpenRegionsThrottle();
-    } finally {
-      unlockTable(tableName);
-    }
+
+    int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS);
+
+    int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE);
+
+    alterTable(tableName, columnAdditions, columnModifications, columnDeletions, waitInterval,
maxClosedRegions);
   }
 
   public Pair<Integer, Integer> getAlterStatus(byte [] tableName)
@@ -2281,18 +2323,6 @@ public class HMaster extends HasThread i
     return -1;
   }
 
-  @Override
-  public void setCloseRegionWaitInterval(String tableName, int waitInterval) {
-    ThrottledRegionReopener reopener = this.regionManager.createThrottledReopener(tableName);
-    reopener.setRegionCloseWaitInterval(waitInterval);
-  }
-
-  @Override
-  public void setNumConcurrentCloseRegions(String tableName, int numConcurrentClose) {
-    ThrottledRegionReopener reopener = this.regionManager.createThrottledReopener(tableName);
-    reopener.setNumConcurrentCloseRegions(numConcurrentClose);
-  }
-
   String getZKWrapperName() {
     return getServerName();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
Tue Jun 25 18:51:49 2013
@@ -101,7 +101,7 @@ public class RegionManager {
    * An entry is created in the map before an alter operation is performed on the
    * table. It is cleared when all the regions have reopened.   
    */
-  private final Map<String, ThrottledRegionReopener> tablesReopeningRegions = 
+  private final Map<String, ThrottledRegionReopener> tablesReopeningRegions =
       new ConcurrentHashMap<String, ThrottledRegionReopener>();
   /**
    * Map of region name to RegionState for regions that are in transition such as
@@ -2590,15 +2590,23 @@ public class RegionManager {
   }
 
   /**
-   * Create a reopener for this table, if one exists, return the existing throttler. 
-   * @param tableName
+   * Get the current ThrottledRegionReopener for a given table if it already exists. Create
one with
+   * the specified wait interval and maxNumRegionsClosed if it doesn't exists yet.
+   * @param tableName The table this re-opener will work on.
+   * @param waitInterval The amount of time to spread the close of regions over.
+   * @param maxNumRegionsClosed The maximum number of regions to have closed at a time.
    * @return
    */
-  public ThrottledRegionReopener createThrottledReopener(String tableName) {
+  public ThrottledRegionReopener createThrottledReopener(String tableName, int waitInterval,
int maxNumRegionsClosed) {
     if (!tablesReopeningRegions.containsKey(tableName)) {
-      ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(tableName,
this.master, this);
+      ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(
+          tableName,
+          this.master,
+          this,
+          waitInterval,
+          maxNumRegionsClosed);
       tablesReopeningRegions.put(tableName, throttledReopener);
-    }  
+    }
     return tablesReopeningRegions.get(tableName);
   }
   
@@ -2619,6 +2627,18 @@ public class RegionManager {
     // if tablesReopeningRegions.contains do something
     if (tablesReopeningRegions.containsKey(tableName)) {
       tablesReopeningRegions.remove(tableName);
+
+      //The table was locked by the master before the alter table command
+      //was issued. Now that the reopen of the regions is done the table should
+      //be unlocked.
+      try {
+        master.unlockTable(Bytes.toBytes(tableName));
+      } catch (IllegalStateException ise) {
+        LOG.warn("RegionManager tried to unlock " + tableName + " and it was not locked.");
+      } catch (IOException e) {
+        LOG.warn("An exception was encountered while RegionManager was trying to unlock "
+ tableName +
+            ". " + e.getStackTrace());
+      }
       LOG.debug("Removed throttler for " + tableName);
     } else {
       LOG.debug("Tried to delete a throttled reopener, but it does not exist.");

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
Tue Jun 25 18:51:49 2013
@@ -79,40 +79,52 @@ public class TableLockManager {
   }
 
   /**
-   * Lock a table, given a purpose.
+   * Try to lock a table, given a purpose.
    * @param tableName Table to lock
    * @param purpose Human readable reason for locking the table
+   * @return True if the table was locked before the timeout false otherwise
    * @throws TableLockTimeoutException If unable to acquire a lock within a
    *                                   specified time period (if any)
    * @throws IOException If unrecoverable ZooKeeper error occurs
    */
-  public void lockTable(byte[] tableName, String purpose)
-  throws IOException {
+  public boolean tryLockTable(byte[] tableName, String purpose, long timeout)
+      throws IOException {
+    boolean result;
     String tableNameStr = Bytes.toString(tableName);
     HLock lock = createTableLock(tableNameStr, purpose);
     try {
-      if (lockTimeoutMs == -1) {
-        // Wait indefinitely
-        lock.acquire();
-      } else {
-        if (!lock.tryAcquire(lockTimeoutMs)) {
-          throw new TableLockTimeoutException("Timed out acquiring " +
-            "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms.");
-        }
-      }
+      result = lock.tryAcquire(timeout);
     } catch (InterruptedException e) {
       LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e);
-      Thread.currentThread().interrupt();
       throw new InterruptedIOException("Interrupted acquiring a lock");
     }
 
-    if (acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) {
+    if (result &&
+        acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) {
       // This should never execute if DistributedLock is implemented
       // correctly.
       LOG.error("Lock for " + tableNameStr + " acquired by multiple owners!");
       LOG.error("Currently held locks: " + acquiredTableLocks);
       throw new IllegalStateException("Lock for " + tableNameStr +
-        " was acquired by multiple owners!");
+          " was acquired by multiple owners!");
+    }
+
+    return result;
+  }
+
+  /**
+   * Lock a table, given a purpose.
+   * @param tableName Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @throws TableLockTimeoutException If unable to acquire a lock within a
+   *                                   specified time period (if any)
+   * @throws IOException If unrecoverable ZooKeeper error occurs
+   */
+  public void lockTable(byte[] tableName, String purpose)
+  throws IOException {
+    if(!tryLockTable(tableName, purpose, lockTimeoutMs)) {
+      throw new TableLockTimeoutException("Timed out acquiring " +
+          "lock for " + Bytes.toString(tableName) + " after " + lockTimeoutMs + " ms.");
     }
   }
 
@@ -145,4 +157,14 @@ public class TableLockManager {
       throw new InterruptedIOException();
     }
   }
+
+  /**
+   * Used to determine if a table is currently locked from this table lock manager.
+   *
+   * @param tableName
+   * @return True if the table was locked by this table manager false otherwise.
+   */
+  public boolean isTableLocked(byte[] tableName) {
+    return acquiredTableLocks.containsKey(Bytes.toString(tableName));
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
Tue Jun 25 18:51:49 2013
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -12,6 +13,8 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Pair;
 
 public class ThrottledRegionReopener {
@@ -22,13 +25,20 @@ public class ThrottledRegionReopener {
   private String tableName;
   private int totalNoOfRegionsToReopen = 0;
   private int regionCloseWaitInterval = 1000;
-  private long timeAllowedToProceed = 0;
-  private int numOfConcurrentClose = 0;
+  private int numOfConcurrentClose = 5;
 
-  ThrottledRegionReopener(String tn, HMaster m, RegionManager regMgr) {
+  private Closer closer;
+
+  ThrottledRegionReopener(String tn, HMaster m, RegionManager regMgr,
+                          int waitInterval, int maxConcurrentRegionsClosed) {
     this.tableName = tn;
     this.master = m;
     this.regionManager = regMgr;
+    this.regionCloseWaitInterval = waitInterval;
+    this.numOfConcurrentClose = maxConcurrentRegionsClosed;
+    //Create a Daemon thread to close regions at the specified rate.
+    this.closer = new Closer();
+    this.closer.setDaemon(true);
   }
 
   /**
@@ -87,65 +97,15 @@ public class ThrottledRegionReopener {
     if (regionsBeingReopened.contains(region)) {
       LOG.info("Region reopened: " + region.getRegionNameAsString());
       regionsBeingReopened.remove(region);
-
-      // Check if all the regions have reopened and log.
-      if (closeSomeRegions() == 0) {
-        if (regionsToBeReopened.size() == 0 && regionsBeingReopened.size() == 0)
{
-          this.numOfConcurrentClose = 0;
-          LOG.info("All regions of " + tableName + " reopened successfully.");
-        } else {
-          LOG.error("All regions of " + tableName
-              + " could not be reopened. Retry the operation.");
-        }
-        regionManager.deleteThrottledReopener(tableName);
-      }
     }
   }
 
   /**
-   * Close some of the reopening regions. Used for throttling the percentage of
-   * regions of a table that may be reopened concurrently. This is configurable
-   * by a hbase config parameter hbase.regionserver.alterTable.concurrentReopen
-   * which defines the percentage of regions of a table that the master may
-   * reopen concurrently (defaults to 1).
+   * Close a region to be reopened.
    *
+   * @return True if a region was closed false otherwise.
    */
-  public synchronized int closeSomeRegions() {
-
-    float localNumOfConcurrentClose = this.numOfConcurrentClose;
-
-    //Try to get the number from the config if class value is set to 0
-    if (this.numOfConcurrentClose == 0) {
-      float percentConcurrentClose = this.master.getConfiguration().getFloat(
-          "hbase.regionserver.alterTable.concurrentReopen", 5);
-      // Find the number of regions you are allowed to close concurrently
-      float configNumOfConcurrentClose = (percentConcurrentClose / 100)
-          * totalNoOfRegionsToReopen;
-      // Close at least one region at a time
-      if (configNumOfConcurrentClose < 1 && configNumOfConcurrentClose > 0)
{
-        configNumOfConcurrentClose = 1;
-      }
-
-      localNumOfConcurrentClose = configNumOfConcurrentClose;
-    }
-
-    localNumOfConcurrentClose -= regionsBeingReopened.size();
-    if (localNumOfConcurrentClose <= 0) {
-      return 0;
-    }
-
-    //If true we are not yet at the time when we can proceed
-    if ( this.timeAllowedToProceed > System.currentTimeMillis()) {
-      //Wait until we are allowed to try and close more regions
-      try {
-        long sleepLength = this.timeAllowedToProceed - System.currentTimeMillis();
-        Thread.sleep(sleepLength);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    int cnt = 0;
+  private synchronized boolean doRegionClose() {
     for (Iterator<HRegionInfo> iter = regionsToBeReopened.iterator(); iter
         .hasNext();) {
       HRegionInfo region = iter.next();
@@ -163,20 +123,10 @@ public class ThrottledRegionReopener {
       regionManager.setClosing(serverName, region, false);
       iter.remove(); // Remove from regionsToBeReopened
       regionsBeingReopened.add(region);
-      cnt++;
 
-      // Close allowed number of regions, exit
-      if (cnt == localNumOfConcurrentClose) {
-        break;
-      }
+      return true;
     }
-
-    //If we closed some regions set the time we are allowed to close more regions
-    if (cnt > 0) {
-      this.timeAllowedToProceed = System.currentTimeMillis() + regionCloseWaitInterval;
-    }
-
-    return cnt;
+    return false;
   }
 
   /**
@@ -217,7 +167,7 @@ public class ThrottledRegionReopener {
    * @param serverInfo
    */
   public synchronized void addPreferredAssignmentForReopen(HRegionInfo region,
-      HServerInfo serverInfo) {
+                                                           HServerInfo serverInfo) {
     if (regionsBeingReopened.contains(region) &&
         !master.isServerBlackListed(serverInfo.getHostnamePort())) {
       regionManager.getAssignmentManager().addTransientAssignment(
@@ -230,14 +180,10 @@ public class ThrottledRegionReopener {
    *
    * @throws IOException
    */
-  public synchronized void reOpenRegionsThrottle() throws IOException {
+  public void startRegionsReopening() throws IOException {
     if (HTable.isTableEnabled(master.getConfiguration(), tableName)) {
       LOG.info("Initiating reopen for all regions of " + tableName);
-      if (closeSomeRegions() == 0) {
-        regionManager.deleteThrottledReopener(tableName);
-        throw new IOException("Could not reopen regions of the table, "
-            + "retry the alter operation");
-      }
+      closer.start();
     }
   }
 
@@ -245,17 +191,109 @@ public class ThrottledRegionReopener {
     return regionCloseWaitInterval;
   }
 
-  public int getNumConcurrentCloseRegions() {
+  public int getMaxConcurrentRegionsClosed() {
     return numOfConcurrentClose;
   }
 
-  public synchronized void setRegionCloseWaitInterval(int regionCloseWaitInterval) {
-    LOG.info("ThrottledRegionReopener: Setting wait interval " + regionCloseWaitInterval
+ " !!!");
-    //Wait interval is in milliseconds
-    this.regionCloseWaitInterval = regionCloseWaitInterval;
+  public synchronized int getNumRegionsCurrentlyClosed() {
+    return regionsBeingReopened.size();
+  }
+
+  public synchronized int getNumRegionsLeftToReOpen() {
+    return regionsToBeReopened.size();
   }
 
-  public synchronized void setNumConcurrentCloseRegions(int numConcurrentClose) {
-    this.numOfConcurrentClose = numConcurrentClose;
+  private void failedToProgress() {
+    LOG.error("Could not reopen regions of the table, "
+        + tableName + ", retry the alter operation");
+    regionManager.deleteThrottledReopener(tableName);
+    this.closer.interrupt();
+  }
+
+  private void finished() {
+    LOG.info("All regions of " + tableName + " reopened successfully.");
+    regionManager.deleteThrottledReopener(tableName);
+    this.closer.running.set(false);
+  }
+
+  private class Closer extends HasThread {
+
+    private long timeAllowedToClose = EnvironmentEdgeManager.currentTimeMillis();
+    private long closeInterval = 1000;
+    private long timeOfLastRegionClose = 0;
+
+    //Get progress timeout. Default is 5 min.
+    private long progressTimeout = master.getConfiguration()
+        .getLong("hbase.regionserver.alterTable.progressTimeout", 5 * 60 * 1000);
+
+    private AtomicBoolean running = new AtomicBoolean(true);
+
+    private void checkFinished() {
+      if (getNumRegionsCurrentlyClosed() == 0
+          && getNumRegionsLeftToReOpen() == 0) {
+        running.set(false);
+        finished();
+      }
+    }
+
+    private void checkTimeout() {
+      if (EnvironmentEdgeManager.currentTimeMillis() >=
+          timeOfLastRegionClose + progressTimeout) {
+        running.set(false);
+        failedToProgress();
+      }
+    }
+
+    @Override
+    public void run() {
+
+      closeInterval = Math.max(getRegionCloseWaitInterval() /
+          getMaxConcurrentRegionsClosed(), 1);
+
+      LOG.debug("Closer started for ThrottledRegionReopener of table " + tableName);
+
+      while (running.get()) {
+        try {
+          long sleepTime = EnvironmentEdgeManager.currentTimeMillis() - timeAllowedToClose;
+          if (sleepTime > 0) {
+            LOG.debug("Closer running and about to wait. Currently Closed: " +
+                getNumRegionsCurrentlyClosed() + "  Left to Close: " +
+                getNumRegionsLeftToReOpen());
+            sleep(sleepTime);
+          }
+
+          //Not allowed to try yet
+          if (EnvironmentEdgeManager.currentTimeMillis() < timeAllowedToClose) {
+            continue;
+          }
+
+          //Max concurrent regions already closed
+          if (getNumRegionsCurrentlyClosed() >= getMaxConcurrentRegionsClosed()) {
+            checkTimeout();
+            continue;
+          }
+
+          //Looks like we can close a region now
+          if (doRegionClose()) {
+            //We closed a region so update the time we can try again.
+            timeAllowedToClose = EnvironmentEdgeManager.currentTimeMillis() + closeInterval;
+            timeOfLastRegionClose = EnvironmentEdgeManager.currentTimeMillis();
+          }
+
+          //Are we done?
+          checkFinished();
+
+          //Did we timeout before making any progress?
+          checkTimeout();
+
+        } catch (InterruptedException e) {
+          LOG.warn("Closer of the ThrottledRegionReopener for table: " + tableName +
+              " was interrupted. There are " + getNumRegionsLeftToReOpen() + " regions left
to reopen.");
+          running.set(false);
+        }
+      }
+
+      LOG.debug("Closer for ThrottledRegionReopener of table " + tableName + " is stopping.");
+    }
   }
 }

Modified: hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb Tue Jun 25 18:51:49 2013
@@ -329,6 +329,12 @@ module Hbase
       columnsToAdd = ArrayList.new()
       columnsToMod = ArrayList.new()
       columnsToDel = ArrayList.new()
+
+      waitInterval = @conf.getInt(HConstants::MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS,
+                                         HConstants::DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS);
+      numConcurrentRegionsClosed =  @conf.getInt(HConstants::MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE,
+                                         HConstants::DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE);
+
       args.each do |arg|
         # Normalize args to support column name only alter specs
         arg = { NAME => arg } if arg.kind_of?(String)
@@ -338,12 +344,12 @@ module Hbase
 
         # Now set regionCloseWaitInterval if specified
         if arg[WAIT_INTERVAL]
-          @admin.setCloseRegionWaitInterval(table_name, arg[WAIT_INTERVAL])
+          waitInterval = arg[WAIT_INTERVAL]
         end
 
         # Now set the NumConcurrentCloseRegions if specified
         if arg[NUM_CONCURRENT_CLOSE]
-          @admin.setNumConcurrentCloseRegions(table_name, arg[NUM_CONCURRENT_CLOSE])
+          numConcurrentRegionsClosed = arg[NUM_CONCURRENT_CLOSE]
         end
 
         # No method parameter, try to use the args as a column definition
@@ -391,7 +397,7 @@ module Hbase
       end
 
       # now batch process alter requests
-      @admin.alterTable(table_name, columnsToAdd, columnsToMod, columnsToDel)
+      @admin.alterTable(table_name, columnsToAdd, columnsToMod, columnsToDel, waitInterval,
numConcurrentRegionsClosed)
       if wait == true
         puts "Updating all regions with the new schema..."
         alter_status(table_name)

Modified: hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb (original)
+++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb Tue Jun 25 18:51:49 2013
@@ -50,7 +50,7 @@ module Shell
 
             hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
 
-          You can also specify the wait interval, in milliseconds, to pause for in between
region restarts:
+          You can also specify the wait interval, in milliseconds, to pause for in-between
region restarts:
 
             hbase> alter 't1', NAME => 'f1', METHOD => 'delete', WAIT_INTERVAL =>
1000, NUM_CONCURRENT_CLOSE => 1
         EOF

Modified: hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb?rev=1496585&r1=1496584&r2=1496585&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb (original)
+++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb Tue Jun 25 18:51:49
2013
@@ -52,7 +52,7 @@ module Shell
 
             hbase> alter_async 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD =>
'delete'}
 
-          You can also specify the wait interval, in milliseconds, to pause for in between
region restarts
+          You can also specify the wait interval, in milliseconds, to pause for in-between
region restarts
           and the simultaneous number of regions to close at a time:
 
             hbase> alter_async 't1', NAME => 'f1', METHOD => 'delete', WAIT_INTERVAL
=> 1000, NUM_CONCURRENT_CLOSE => 1

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java?rev=1496585&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java
Tue Jun 25 18:51:49 2013
@@ -0,0 +1,110 @@
+package org.apache.hadoop.hbase.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestAlterTableLocking {
+
+  private final Log LOG = LogFactory.getLog(TestAlterTableLocking.class);
+
+  private final HBaseTestingUtility testingUtility = new HBaseTestingUtility();
+
+  private static final byte[] TABLENAME = Bytes.toBytes("TestTable");
+  private static final byte[] FAMILYNAME = Bytes.toBytes("cf");
+  private static final int NUM_REGIONS = 20;
+  private static final int NUM_VERSIONS = 1;
+  private static final byte[] START_KEY = Bytes.toBytes(0);
+  private static final byte[] END_KEY = Bytes.toBytes(200);
+  private static final int WAIT_INTERVAL = 1000;
+  private static final int MAX_CONCURRENT_REGIONS_CLOSED = 2;
+  private static final int WAIT_FOR_TABLE = 30 * 1000;
+  private static final Pair<Integer, Integer> ALTER_DONE = Pair.newPair(0,0);
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    //Start mini cluster
+    testingUtility.startMiniCluster(1);
+    testingUtility.createTable(
+        TABLENAME,
+        new byte[][]{FAMILYNAME},
+        NUM_VERSIONS,
+        START_KEY,
+        END_KEY,
+        NUM_REGIONS
+    );
+    testingUtility.waitTableAvailable(TABLENAME, WAIT_FOR_TABLE);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    testingUtility.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testTryLock() {
+    HBaseAdmin admin = null;
+    try {
+      admin = testingUtility.getHBaseAdmin();
+    } catch (MasterNotRunningException e) {
+      LOG.error("Master Not running. " + e.getStackTrace());
+      Assert.fail("Master for test cluster is not running.");
+    }
+
+    try {
+      admin.alterTable(TABLENAME,null,null,null, WAIT_INTERVAL, MAX_CONCURRENT_REGIONS_CLOSED);
+
+      boolean threwIOException = false;
+      try {
+        //Should throw exception if locking is working.
+        admin.alterTable(TABLENAME, null, null, null, WAIT_INTERVAL, MAX_CONCURRENT_REGIONS_CLOSED);
+      } catch (IOException e) {
+        //No simultaneous alterTable allowed.
+        threwIOException = true;
+      }
+      Assert.assertTrue("TableLock Failed to stop a second " +
+          "Alter Table op from starting.", threwIOException);
+
+    } catch (MasterNotRunningException e) {
+      LOG.error("Master for test cluster is not running. " + e.getStackTrace());
+      Assert.fail("Master for test cluster is not running.");
+    } catch (IOException e) {
+      LOG.error("An exception was thrown while trying to alter a table. " + e.getStackTrace());
+      Assert.fail("An exception was encountered while trying to run the Alter Table command.");
+    }
+
+    try {
+      while (!admin.getAlterStatus(TABLENAME).equals(ALTER_DONE)) {
+        try {
+          Assert.assertTrue(tableIsLocked(TABLENAME));
+          Thread.sleep(WAIT_INTERVAL);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while sleeping after table edit. " + e.getStackTrace());
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("An IOException was encountered while waiting for the " +
+          "Alter Table command to finish reopening regions." + e.getStackTrace());
+      Assert.fail("An IOException was encountered while reopening regions.");
+    }
+
+    Assert.assertFalse("The table lock was not released after the " +
+        "Alter Table command was finished.", tableIsLocked(TABLENAME));
+
+  }
+
+  private boolean tableIsLocked(byte[] tableName) {
+    return testingUtility.getHBaseCluster().getMaster().isTableLocked(tableName);
+  }
+}



Mime
View raw message