Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F0640E8DA for ; Tue, 25 Jun 2013 18:52:12 +0000 (UTC) Received: (qmail 31537 invoked by uid 500); 25 Jun 2013 18:52:12 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 31488 invoked by uid 500); 25 Jun 2013 18:52:12 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 31445 invoked by uid 99); 25 Jun 2013 18:52:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jun 2013 18:52:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jun 2013 18:52:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2BE4D2388C2A; Tue, 25 Jun 2013 18:51:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1496585 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/master/ main/ruby/hbase/ main/ruby/shell/comman... Date: Tue, 25 Jun 2013 18:51:49 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130625185150.2BE4D2388C2A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Tue Jun 25 18:51:49 2013 New Revision: 1496585 URL: http://svn.apache.org/r1496585 Log: [HBASE-8775] Removed RegionManager blocking caused by sleep of ThrottledRegionReopener Author: shaneh Summary: Created runable to kick off retry when wait interval prevented ThrottledRegionReopener from making progress. Test Plan: Tested locally on dev machine. Also will push to tsh025 to test there. Reviewers: liyintang, nspiegelberg, manukranthk Reviewed By: liyintang CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D775422 Task ID: 2178363 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun 25 18:51:49 2013 @@ -108,6 +108,27 @@ public final class HConstants { public static final int DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS = 60 * 1000; + /** Configuration key for time out for schema modification try lock */ + public static final String MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS = + "hbase.master.schemaChanges.trylock.timeout.ms"; + + public static final int DEFAULT_MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS = + 5 * 1000; + + /** Configuration key for for schema modification wait interval. */ + public static final String MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS = + "hbase.regionserver.alterTable.waitInterval.ms"; + + public static final int DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS = + 1000; + + /** Configuration key for for schema modification max concurrent regions closed. */ + public static final String MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE = + "hbase.regionserver.alterTable.maxConcurrentClose"; + + public static final int DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE = + 5; + /** Name of ZooKeeper quorum configuration parameter. */ public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; @@ -231,7 +252,6 @@ public final class HConstants { public static final boolean HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT = true; - /** Used to construct the name of the compaction directory during compaction */ public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir"; Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Jun 25 18:51:49 2013 @@ -605,25 +605,30 @@ public class HBaseAdmin { } /** - * Sets the wait interval to wait between closing and reopening of Regions. + * Batch alter a table. Only takes regions offline once and performs a single + * update to .META. * Asynchronous operation. * - * @param waitInterval The interval in milliseconds to wait - * in between closing and reopening regions + * @param tableName name of the table to add column to + * @param columnAdditions column descriptors to add to the table + * @param columnModifications pairs of column names with new descriptors + * @param columnDeletions column names to delete from the table + * @throws IOException if a remote or network exception occurs */ - public void setCloseRegionWaitInterval(final String tableName, int waitInterval) { - this.master.setCloseRegionWaitInterval(tableName, waitInterval); - } + public void alterTable(final String tableName, + List columnAdditions, + List> columnModifications, + List columnDeletions) throws IOException { - /** - * Sets the number of Regions to close concurrently. - * Asynchronous operation. - * - * @param numConcurrentClose The number of Regions to close at the same time - * - */ - public void setNumConcurrentCloseRegions(final String tableName, int numConcurrentClose) { - this.master.setNumConcurrentCloseRegions(tableName, numConcurrentClose); + //Use default values since none were specified + int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS); + + int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE); + + alterTable(tableName, columnAdditions, columnModifications, + columnDeletions, waitInterval, maxClosedRegions); } /** @@ -635,29 +640,35 @@ public class HBaseAdmin { * @param columnAdditions column descriptors to add to the table * @param columnModifications pairs of column names with new descriptors * @param columnDeletions column names to delete from the table + * @param waitInterval the interval of time to spread the close of + * numConcurrentRegionsClosed over + * @param maxConcurrentRegionsClosed the max number of regions to have closed + * at a time. * @throws IOException if a remote or network exception occurs */ public void alterTable(final String tableName, - List columnAdditions, - List> columnModifications, - List columnDeletions) throws IOException { + List columnAdditions, + List> columnModifications, + List columnDeletions, + int waitInterval, + int maxConcurrentRegionsClosed) throws IOException { // convert all of the strings to bytes and pass to the bytes method - List> modificationsBytes = - new ArrayList>( - columnModifications.size()); - List deletionsBytes = - new ArrayList(columnDeletions.size()); + List> modificationsBytes = + new ArrayList>( + columnModifications.size()); + List deletionsBytes = + new ArrayList(columnDeletions.size()); for(Pair c : columnModifications) { modificationsBytes.add(new Pair( - Bytes.toBytes(c.getFirst()), c.getSecond())); + Bytes.toBytes(c.getFirst()), c.getSecond())); } for(String c : columnDeletions) { deletionsBytes.add(Bytes.toBytes(c)); } alterTable(Bytes.toBytes(tableName), columnAdditions, modificationsBytes, - deletionsBytes); + deletionsBytes, waitInterval, maxConcurrentRegionsClosed); } /** @@ -680,10 +691,45 @@ public class HBaseAdmin { if (this.master == null) { throw new MasterNotRunningException("master has been shut down"); } + int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS); + + int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE); + + alterTable(tableName, columnAdditions, columnModifications, columnDeletions, waitInterval, maxClosedRegions); + } + + /** + * Batch alter a table. Only takes regions offline once and performs a single + * update to .META. + * Any of the three lists can be null, in which case those types of + * alterations will be ignored. + * Asynchronous operation. + * + * @param tableName name of the table to add column to + * @param columnAdditions column descriptors to add to the table + * @param columnModifications pairs of column names with new descriptors + * @param columnDeletions column names to delete from the table + * @param waitInterval the interval of time to spread the close of + * numConcurrentRegionsClosed over + * @param maxConcurrentRegionsClosed the max number of regions to have closed + * at a time. + * @throws IOException if a remote or network exception occurs + */ + public void alterTable(final byte [] tableName, + List columnAdditions, + List> columnModifications, + List columnDeletions, + int waitInterval, + int maxConcurrentRegionsClosed) throws IOException { + if (this.master == null) { + throw new MasterNotRunningException("master has been shut down"); + } HTableDescriptor.isLegalTableName(tableName); try { this.master.alterTable(tableName, columnAdditions, columnModifications, - columnDeletions); + columnDeletions, waitInterval, maxConcurrentRegionsClosed); } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Tue Jun 25 18:51:49 2013 @@ -81,6 +81,24 @@ public interface HMasterInterface extend List columnDeletions) throws IOException; /** + * Batch adds, modifies, and deletes columns from the specified table. + * Any of the lists may be null, in which case those types of alterations + * will not occur. + * + * @param tableName table to modify + * @param columnAdditions column descriptors to add to the table + * @param columnModifications pairs of column names with new descriptors + * @param columnDeletions column names to delete from the table + * @throws IOException e + */ + public void alterTable(final byte [] tableName, + List columnAdditions, + List> columnModifications, + List columnDeletions, + int waitInterval, + int numConcurentRegionsClosed) throws IOException; + + /** * Adds a column to the specified table * @param tableName table to modify * @param column column descriptor @@ -136,22 +154,6 @@ public interface HMasterInterface extend throws IOException; /** - * Set wait interval for close and reopen of regions - * - * @param tableName table to modify - * @param waitInterval the interval to wait in milliseconds - */ - public void setCloseRegionWaitInterval(String tableName, int waitInterval); - - /** - * Set the number of Regions to close concurrently. - * - * @param tableName table to modify - * @param numConcurrentClose the number of Regions to close at the same time. - */ - void setNumConcurrentCloseRegions(String tableName, int numConcurrentClose); - - /** * Shutdown an HBase cluster. * @throws IOException e */ Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Jun 25 18:51:49 2013 @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.MiniZooKe import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.StopStatus; import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableLockTimeoutException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.MetaScanner; @@ -257,6 +258,8 @@ public class HMaster extends HasThread i private ConfigurationManager configurationManager = new ConfigurationManager(); + private long schemaChangeTryLockTimeoutMs; + /** * Constructor * @param conf configuration @@ -386,6 +389,11 @@ public class HMaster extends HasThread i HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS); tableLockManager = new TableLockManager(zooKeeperWrapper, address, schemaChangeLockTimeoutMs); + + this.schemaChangeTryLockTimeoutMs = conf.getInt( + HConstants.MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_TRY_LOCK_TIMEOUT_MS); + } else { tableLockManager = null; } @@ -1372,6 +1380,22 @@ public class HMaster extends HasThread i return tableLockManager != null; } + public boolean isTableLocked(byte[] tableName) { + if (isTableLockEnabled()) { + return tableLockManager.isTableLocked(tableName); + } + return false; + } + + + protected boolean tryLockTable(byte[] tableName, String purpose, long timeout) + throws IOException { + if (isTableLockEnabled()) { + return tableLockManager.tryLockTable(tableName, purpose, timeout); + } + return false; + } + protected void lockTable(byte[] tableName, String purpose) throws IOException { if (isTableLockEnabled()) { @@ -1549,19 +1573,37 @@ public class HMaster extends HasThread i public void alterTable(final byte [] tableName, List columnAdditions, List> columnModifications, + List columnDeletions, + int waitInterval, + int maxConcurrentRegionsClosed) throws IOException { + //This lock will be released when the ThrottledRegionReopener is done. + if (!tryLockTable(tableName, "alter", schemaChangeTryLockTimeoutMs)) { + throw new TableLockTimeoutException("Timed out acquiring " + + "lock for " + Bytes.toString(tableName) + " after " + schemaChangeTryLockTimeoutMs + " ms."); + } + + InjectionHandler.processEvent(InjectionEvent.HMASTER_ALTER_TABLE); + ThrottledRegionReopener reopener = this.regionManager. + createThrottledReopener(Bytes.toString(tableName), waitInterval, maxConcurrentRegionsClosed); + // Regions are added to the reopener in MultiColumnOperation + new MultiColumnOperation(this, tableName, columnAdditions, + columnModifications, columnDeletions).process(); + reopener.startRegionsReopening(); + } + + @Override + public void alterTable(final byte [] tableName, + List columnAdditions, + List> columnModifications, List columnDeletions) throws IOException { - lockTable(tableName, "alter"); - try { - InjectionHandler.processEvent(InjectionEvent.HMASTER_ALTER_TABLE); - ThrottledRegionReopener reopener = this.regionManager. - createThrottledReopener(Bytes.toString(tableName)); - // Regions are added to the reopener in MultiColumnOperation - new MultiColumnOperation(this, tableName, columnAdditions, - columnModifications, columnDeletions).process(); - reopener.reOpenRegionsThrottle(); - } finally { - unlockTable(tableName); - } + + int waitInterval = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS); + + int maxClosedRegions = conf.getInt(HConstants.MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE, + HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE); + + alterTable(tableName, columnAdditions, columnModifications, columnDeletions, waitInterval, maxClosedRegions); } public Pair getAlterStatus(byte [] tableName) @@ -2281,18 +2323,6 @@ public class HMaster extends HasThread i return -1; } - @Override - public void setCloseRegionWaitInterval(String tableName, int waitInterval) { - ThrottledRegionReopener reopener = this.regionManager.createThrottledReopener(tableName); - reopener.setRegionCloseWaitInterval(waitInterval); - } - - @Override - public void setNumConcurrentCloseRegions(String tableName, int numConcurrentClose) { - ThrottledRegionReopener reopener = this.regionManager.createThrottledReopener(tableName); - reopener.setNumConcurrentCloseRegions(numConcurrentClose); - } - String getZKWrapperName() { return getServerName(); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Jun 25 18:51:49 2013 @@ -101,7 +101,7 @@ public class RegionManager { * An entry is created in the map before an alter operation is performed on the * table. It is cleared when all the regions have reopened. */ - private final Map tablesReopeningRegions = + private final Map tablesReopeningRegions = new ConcurrentHashMap(); /** * Map of region name to RegionState for regions that are in transition such as @@ -2590,15 +2590,23 @@ public class RegionManager { } /** - * Create a reopener for this table, if one exists, return the existing throttler. - * @param tableName + * Get the current ThrottledRegionReopener for a given table if it already exists. Create one with + * the specified wait interval and maxNumRegionsClosed if it doesn't exists yet. + * @param tableName The table this re-opener will work on. + * @param waitInterval The amount of time to spread the close of regions over. + * @param maxNumRegionsClosed The maximum number of regions to have closed at a time. * @return */ - public ThrottledRegionReopener createThrottledReopener(String tableName) { + public ThrottledRegionReopener createThrottledReopener(String tableName, int waitInterval, int maxNumRegionsClosed) { if (!tablesReopeningRegions.containsKey(tableName)) { - ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(tableName, this.master, this); + ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener( + tableName, + this.master, + this, + waitInterval, + maxNumRegionsClosed); tablesReopeningRegions.put(tableName, throttledReopener); - } + } return tablesReopeningRegions.get(tableName); } @@ -2619,6 +2627,18 @@ public class RegionManager { // if tablesReopeningRegions.contains do something if (tablesReopeningRegions.containsKey(tableName)) { tablesReopeningRegions.remove(tableName); + + //The table was locked by the master before the alter table command + //was issued. Now that the reopen of the regions is done the table should + //be unlocked. + try { + master.unlockTable(Bytes.toBytes(tableName)); + } catch (IllegalStateException ise) { + LOG.warn("RegionManager tried to unlock " + tableName + " and it was not locked."); + } catch (IOException e) { + LOG.warn("An exception was encountered while RegionManager was trying to unlock " + tableName + + ". " + e.getStackTrace()); + } LOG.debug("Removed throttler for " + tableName); } else { LOG.debug("Tried to delete a throttled reopener, but it does not exist."); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java Tue Jun 25 18:51:49 2013 @@ -79,40 +79,52 @@ public class TableLockManager { } /** - * Lock a table, given a purpose. + * Try to lock a table, given a purpose. * @param tableName Table to lock * @param purpose Human readable reason for locking the table + * @return True if the table was locked before the timeout false otherwise * @throws TableLockTimeoutException If unable to acquire a lock within a * specified time period (if any) * @throws IOException If unrecoverable ZooKeeper error occurs */ - public void lockTable(byte[] tableName, String purpose) - throws IOException { + public boolean tryLockTable(byte[] tableName, String purpose, long timeout) + throws IOException { + boolean result; String tableNameStr = Bytes.toString(tableName); HLock lock = createTableLock(tableNameStr, purpose); try { - if (lockTimeoutMs == -1) { - // Wait indefinitely - lock.acquire(); - } else { - if (!lock.tryAcquire(lockTimeoutMs)) { - throw new TableLockTimeoutException("Timed out acquiring " + - "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms."); - } - } + result = lock.tryAcquire(timeout); } catch (InterruptedException e) { LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e); - Thread.currentThread().interrupt(); throw new InterruptedIOException("Interrupted acquiring a lock"); } - if (acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) { + if (result && + acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) { // This should never execute if DistributedLock is implemented // correctly. LOG.error("Lock for " + tableNameStr + " acquired by multiple owners!"); LOG.error("Currently held locks: " + acquiredTableLocks); throw new IllegalStateException("Lock for " + tableNameStr + - " was acquired by multiple owners!"); + " was acquired by multiple owners!"); + } + + return result; + } + + /** + * Lock a table, given a purpose. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @throws TableLockTimeoutException If unable to acquire a lock within a + * specified time period (if any) + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public void lockTable(byte[] tableName, String purpose) + throws IOException { + if(!tryLockTable(tableName, purpose, lockTimeoutMs)) { + throw new TableLockTimeoutException("Timed out acquiring " + + "lock for " + Bytes.toString(tableName) + " after " + lockTimeoutMs + " ms."); } } @@ -145,4 +157,14 @@ public class TableLockManager { throw new InterruptedIOException(); } } + + /** + * Used to determine if a table is currently locked from this table lock manager. + * + * @param tableName + * @return True if the table was locked by this table manager false otherwise. + */ + public boolean isTableLocked(byte[] tableName) { + return acquiredTableLocks.containsKey(Bytes.toString(tableName)); + } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java Tue Jun 25 18:51:49 2013 @@ -4,6 +4,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -12,6 +13,8 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Pair; public class ThrottledRegionReopener { @@ -22,13 +25,20 @@ public class ThrottledRegionReopener { private String tableName; private int totalNoOfRegionsToReopen = 0; private int regionCloseWaitInterval = 1000; - private long timeAllowedToProceed = 0; - private int numOfConcurrentClose = 0; + private int numOfConcurrentClose = 5; - ThrottledRegionReopener(String tn, HMaster m, RegionManager regMgr) { + private Closer closer; + + ThrottledRegionReopener(String tn, HMaster m, RegionManager regMgr, + int waitInterval, int maxConcurrentRegionsClosed) { this.tableName = tn; this.master = m; this.regionManager = regMgr; + this.regionCloseWaitInterval = waitInterval; + this.numOfConcurrentClose = maxConcurrentRegionsClosed; + //Create a Daemon thread to close regions at the specified rate. + this.closer = new Closer(); + this.closer.setDaemon(true); } /** @@ -87,65 +97,15 @@ public class ThrottledRegionReopener { if (regionsBeingReopened.contains(region)) { LOG.info("Region reopened: " + region.getRegionNameAsString()); regionsBeingReopened.remove(region); - - // Check if all the regions have reopened and log. - if (closeSomeRegions() == 0) { - if (regionsToBeReopened.size() == 0 && regionsBeingReopened.size() == 0) { - this.numOfConcurrentClose = 0; - LOG.info("All regions of " + tableName + " reopened successfully."); - } else { - LOG.error("All regions of " + tableName - + " could not be reopened. Retry the operation."); - } - regionManager.deleteThrottledReopener(tableName); - } } } /** - * Close some of the reopening regions. Used for throttling the percentage of - * regions of a table that may be reopened concurrently. This is configurable - * by a hbase config parameter hbase.regionserver.alterTable.concurrentReopen - * which defines the percentage of regions of a table that the master may - * reopen concurrently (defaults to 1). + * Close a region to be reopened. * + * @return True if a region was closed false otherwise. */ - public synchronized int closeSomeRegions() { - - float localNumOfConcurrentClose = this.numOfConcurrentClose; - - //Try to get the number from the config if class value is set to 0 - if (this.numOfConcurrentClose == 0) { - float percentConcurrentClose = this.master.getConfiguration().getFloat( - "hbase.regionserver.alterTable.concurrentReopen", 5); - // Find the number of regions you are allowed to close concurrently - float configNumOfConcurrentClose = (percentConcurrentClose / 100) - * totalNoOfRegionsToReopen; - // Close at least one region at a time - if (configNumOfConcurrentClose < 1 && configNumOfConcurrentClose > 0) { - configNumOfConcurrentClose = 1; - } - - localNumOfConcurrentClose = configNumOfConcurrentClose; - } - - localNumOfConcurrentClose -= regionsBeingReopened.size(); - if (localNumOfConcurrentClose <= 0) { - return 0; - } - - //If true we are not yet at the time when we can proceed - if ( this.timeAllowedToProceed > System.currentTimeMillis()) { - //Wait until we are allowed to try and close more regions - try { - long sleepLength = this.timeAllowedToProceed - System.currentTimeMillis(); - Thread.sleep(sleepLength); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - int cnt = 0; + private synchronized boolean doRegionClose() { for (Iterator iter = regionsToBeReopened.iterator(); iter .hasNext();) { HRegionInfo region = iter.next(); @@ -163,20 +123,10 @@ public class ThrottledRegionReopener { regionManager.setClosing(serverName, region, false); iter.remove(); // Remove from regionsToBeReopened regionsBeingReopened.add(region); - cnt++; - // Close allowed number of regions, exit - if (cnt == localNumOfConcurrentClose) { - break; - } + return true; } - - //If we closed some regions set the time we are allowed to close more regions - if (cnt > 0) { - this.timeAllowedToProceed = System.currentTimeMillis() + regionCloseWaitInterval; - } - - return cnt; + return false; } /** @@ -217,7 +167,7 @@ public class ThrottledRegionReopener { * @param serverInfo */ public synchronized void addPreferredAssignmentForReopen(HRegionInfo region, - HServerInfo serverInfo) { + HServerInfo serverInfo) { if (regionsBeingReopened.contains(region) && !master.isServerBlackListed(serverInfo.getHostnamePort())) { regionManager.getAssignmentManager().addTransientAssignment( @@ -230,14 +180,10 @@ public class ThrottledRegionReopener { * * @throws IOException */ - public synchronized void reOpenRegionsThrottle() throws IOException { + public void startRegionsReopening() throws IOException { if (HTable.isTableEnabled(master.getConfiguration(), tableName)) { LOG.info("Initiating reopen for all regions of " + tableName); - if (closeSomeRegions() == 0) { - regionManager.deleteThrottledReopener(tableName); - throw new IOException("Could not reopen regions of the table, " - + "retry the alter operation"); - } + closer.start(); } } @@ -245,17 +191,109 @@ public class ThrottledRegionReopener { return regionCloseWaitInterval; } - public int getNumConcurrentCloseRegions() { + public int getMaxConcurrentRegionsClosed() { return numOfConcurrentClose; } - public synchronized void setRegionCloseWaitInterval(int regionCloseWaitInterval) { - LOG.info("ThrottledRegionReopener: Setting wait interval " + regionCloseWaitInterval + " !!!"); - //Wait interval is in milliseconds - this.regionCloseWaitInterval = regionCloseWaitInterval; + public synchronized int getNumRegionsCurrentlyClosed() { + return regionsBeingReopened.size(); + } + + public synchronized int getNumRegionsLeftToReOpen() { + return regionsToBeReopened.size(); } - public synchronized void setNumConcurrentCloseRegions(int numConcurrentClose) { - this.numOfConcurrentClose = numConcurrentClose; + private void failedToProgress() { + LOG.error("Could not reopen regions of the table, " + + tableName + ", retry the alter operation"); + regionManager.deleteThrottledReopener(tableName); + this.closer.interrupt(); + } + + private void finished() { + LOG.info("All regions of " + tableName + " reopened successfully."); + regionManager.deleteThrottledReopener(tableName); + this.closer.running.set(false); + } + + private class Closer extends HasThread { + + private long timeAllowedToClose = EnvironmentEdgeManager.currentTimeMillis(); + private long closeInterval = 1000; + private long timeOfLastRegionClose = 0; + + //Get progress timeout. Default is 5 min. + private long progressTimeout = master.getConfiguration() + .getLong("hbase.regionserver.alterTable.progressTimeout", 5 * 60 * 1000); + + private AtomicBoolean running = new AtomicBoolean(true); + + private void checkFinished() { + if (getNumRegionsCurrentlyClosed() == 0 + && getNumRegionsLeftToReOpen() == 0) { + running.set(false); + finished(); + } + } + + private void checkTimeout() { + if (EnvironmentEdgeManager.currentTimeMillis() >= + timeOfLastRegionClose + progressTimeout) { + running.set(false); + failedToProgress(); + } + } + + @Override + public void run() { + + closeInterval = Math.max(getRegionCloseWaitInterval() / + getMaxConcurrentRegionsClosed(), 1); + + LOG.debug("Closer started for ThrottledRegionReopener of table " + tableName); + + while (running.get()) { + try { + long sleepTime = EnvironmentEdgeManager.currentTimeMillis() - timeAllowedToClose; + if (sleepTime > 0) { + LOG.debug("Closer running and about to wait. Currently Closed: " + + getNumRegionsCurrentlyClosed() + " Left to Close: " + + getNumRegionsLeftToReOpen()); + sleep(sleepTime); + } + + //Not allowed to try yet + if (EnvironmentEdgeManager.currentTimeMillis() < timeAllowedToClose) { + continue; + } + + //Max concurrent regions already closed + if (getNumRegionsCurrentlyClosed() >= getMaxConcurrentRegionsClosed()) { + checkTimeout(); + continue; + } + + //Looks like we can close a region now + if (doRegionClose()) { + //We closed a region so update the time we can try again. + timeAllowedToClose = EnvironmentEdgeManager.currentTimeMillis() + closeInterval; + timeOfLastRegionClose = EnvironmentEdgeManager.currentTimeMillis(); + } + + //Are we done? + checkFinished(); + + //Did we timeout before making any progress? + checkTimeout(); + + } catch (InterruptedException e) { + LOG.warn("Closer of the ThrottledRegionReopener for table: " + tableName + + " was interrupted. There are " + getNumRegionsLeftToReOpen() + " regions left to reopen."); + running.set(false); + } + } + + LOG.debug("Closer for ThrottledRegionReopener of table " + tableName + " is stopping."); + } } } Modified: hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb (original) +++ hbase/branches/0.89-fb/src/main/ruby/hbase/admin.rb Tue Jun 25 18:51:49 2013 @@ -329,6 +329,12 @@ module Hbase columnsToAdd = ArrayList.new() columnsToMod = ArrayList.new() columnsToDel = ArrayList.new() + + waitInterval = @conf.getInt(HConstants::MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS, + HConstants::DEFAULT_MASTER_SCHEMA_CHANGES_WAIT_INTERVAL_MS); + numConcurrentRegionsClosed = @conf.getInt(HConstants::MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE, + HConstants::DEFAULT_MASTER_SCHEMA_CHANGES_MAX_CONCURRENT_REGION_CLOSE); + args.each do |arg| # Normalize args to support column name only alter specs arg = { NAME => arg } if arg.kind_of?(String) @@ -338,12 +344,12 @@ module Hbase # Now set regionCloseWaitInterval if specified if arg[WAIT_INTERVAL] - @admin.setCloseRegionWaitInterval(table_name, arg[WAIT_INTERVAL]) + waitInterval = arg[WAIT_INTERVAL] end # Now set the NumConcurrentCloseRegions if specified if arg[NUM_CONCURRENT_CLOSE] - @admin.setNumConcurrentCloseRegions(table_name, arg[NUM_CONCURRENT_CLOSE]) + numConcurrentRegionsClosed = arg[NUM_CONCURRENT_CLOSE] end # No method parameter, try to use the args as a column definition @@ -391,7 +397,7 @@ module Hbase end # now batch process alter requests - @admin.alterTable(table_name, columnsToAdd, columnsToMod, columnsToDel) + @admin.alterTable(table_name, columnsToAdd, columnsToMod, columnsToDel, waitInterval, numConcurrentRegionsClosed) if wait == true puts "Updating all regions with the new schema..." alter_status(table_name) Modified: hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb (original) +++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter.rb Tue Jun 25 18:51:49 2013 @@ -50,7 +50,7 @@ module Shell hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'} - You can also specify the wait interval, in milliseconds, to pause for in between region restarts: + You can also specify the wait interval, in milliseconds, to pause for in-between region restarts: hbase> alter 't1', NAME => 'f1', METHOD => 'delete', WAIT_INTERVAL => 1000, NUM_CONCURRENT_CLOSE => 1 EOF Modified: hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb?rev=1496585&r1=1496584&r2=1496585&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb (original) +++ hbase/branches/0.89-fb/src/main/ruby/shell/commands/alter_async.rb Tue Jun 25 18:51:49 2013 @@ -52,7 +52,7 @@ module Shell hbase> alter_async 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'} - You can also specify the wait interval, in milliseconds, to pause for in between region restarts + You can also specify the wait interval, in milliseconds, to pause for in-between region restarts and the simultaneous number of regions to close at a time: hbase> alter_async 't1', NAME => 'f1', METHOD => 'delete', WAIT_INTERVAL => 1000, NUM_CONCURRENT_CLOSE => 1 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java?rev=1496585&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestAlterTableLocking.java Tue Jun 25 18:51:49 2013 @@ -0,0 +1,110 @@ +package org.apache.hadoop.hbase.master; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestAlterTableLocking { + + private final Log LOG = LogFactory.getLog(TestAlterTableLocking.class); + + private final HBaseTestingUtility testingUtility = new HBaseTestingUtility(); + + private static final byte[] TABLENAME = Bytes.toBytes("TestTable"); + private static final byte[] FAMILYNAME = Bytes.toBytes("cf"); + private static final int NUM_REGIONS = 20; + private static final int NUM_VERSIONS = 1; + private static final byte[] START_KEY = Bytes.toBytes(0); + private static final byte[] END_KEY = Bytes.toBytes(200); + private static final int WAIT_INTERVAL = 1000; + private static final int MAX_CONCURRENT_REGIONS_CLOSED = 2; + private static final int WAIT_FOR_TABLE = 30 * 1000; + private static final Pair ALTER_DONE = Pair.newPair(0,0); + + @Before + public void setup() throws IOException, InterruptedException { + //Start mini cluster + testingUtility.startMiniCluster(1); + testingUtility.createTable( + TABLENAME, + new byte[][]{FAMILYNAME}, + NUM_VERSIONS, + START_KEY, + END_KEY, + NUM_REGIONS + ); + testingUtility.waitTableAvailable(TABLENAME, WAIT_FOR_TABLE); + } + + @After + public void tearDown() throws IOException { + testingUtility.shutdownMiniCluster(); + } + + @Test + public void testTryLock() { + HBaseAdmin admin = null; + try { + admin = testingUtility.getHBaseAdmin(); + } catch (MasterNotRunningException e) { + LOG.error("Master Not running. " + e.getStackTrace()); + Assert.fail("Master for test cluster is not running."); + } + + try { + admin.alterTable(TABLENAME,null,null,null, WAIT_INTERVAL, MAX_CONCURRENT_REGIONS_CLOSED); + + boolean threwIOException = false; + try { + //Should throw exception if locking is working. + admin.alterTable(TABLENAME, null, null, null, WAIT_INTERVAL, MAX_CONCURRENT_REGIONS_CLOSED); + } catch (IOException e) { + //No simultaneous alterTable allowed. + threwIOException = true; + } + Assert.assertTrue("TableLock Failed to stop a second " + + "Alter Table op from starting.", threwIOException); + + } catch (MasterNotRunningException e) { + LOG.error("Master for test cluster is not running. " + e.getStackTrace()); + Assert.fail("Master for test cluster is not running."); + } catch (IOException e) { + LOG.error("An exception was thrown while trying to alter a table. " + e.getStackTrace()); + Assert.fail("An exception was encountered while trying to run the Alter Table command."); + } + + try { + while (!admin.getAlterStatus(TABLENAME).equals(ALTER_DONE)) { + try { + Assert.assertTrue(tableIsLocked(TABLENAME)); + Thread.sleep(WAIT_INTERVAL); + } catch (InterruptedException e) { + LOG.error("Interrupted while sleeping after table edit. " + e.getStackTrace()); + } + } + } catch (IOException e) { + LOG.error("An IOException was encountered while waiting for the " + + "Alter Table command to finish reopening regions." + e.getStackTrace()); + Assert.fail("An IOException was encountered while reopening regions."); + } + + Assert.assertFalse("The table lock was not released after the " + + "Alter Table command was finished.", tableIsLocked(TABLENAME)); + + } + + private boolean tableIsLocked(byte[] tableName) { + return testingUtility.getHBaseCluster().getMaster().isTableLocked(tableName); + } +}