hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1181953 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/master/ main/ruby/ main/ruby/hbase/ main/ruby/s...
Date Tue, 11 Oct 2011 17:44:02 GMT
Author: nspiegelberg
Date: Tue Oct 11 17:44:01 2011
New Revision: 1181953

URL: http://svn.apache.org/viewvc?rev=1181953&view=rev
Log:
Port online schema changes to 89

Summary:
Enables changing the hbase schema without disabling the table.
The schema update takes place in the following steps:
1. Update Meta
2. Close all regions of the table.
3. Open the regions (preferably on the same region server) so that they have
the updated schema.

This is achieved with the help of in memory data structures on the Master. The
sequence of steps is:
1. Client issues a addColumn
2. Master updates the .META. and creates a mapping of all the regions of that
table that need to be updated.
3. The master then issues a close to the region servers - a configurable number
of regions may be closed per region server.
4. When the master receives a notification that one of the reopening regions
has opened, it removes this region from the in memory data structure, and closes
the next set of regions on this region server.

Test Plan: run unit tests

Reviewers: kranganathan, kannan, nspiegelberg, bogdan

Reviewed By: kranganathan

CC: hbase@lists, gqchen, kranganathan, liyintang, nileema, bogdan, kannan

Differential Revision: 299930

Task ID: 668176

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
    hbase/branches/0.89/src/main/ruby/shell/commands/alter_async.rb
      - copied, changed from r1181952, hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb
    hbase/branches/0.89/src/main/ruby/shell/commands/alter_status.rb
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/AddColumn.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/DeleteColumn.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ModifyColumn.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TableOperation.java
    hbase/branches/0.89/src/main/ruby/hbase/admin.rb
    hbase/branches/0.89/src/main/ruby/shell.rb
    hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 17:44:01 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.Writable;
@@ -575,6 +576,28 @@ public class HBaseAdmin {
   }
 
   /**
+   * Get the status of alter command - indicates how many regions have received
+   * the updated schema Asynchronous operation.
+   *
+   * @param tableName
+   *          name of the table to get the status of
+   * @return List indicating the number of regions updated List.get(0) is the
+   *         regions that are yet to be updated List.get(1) is the total number
+   *         of regions of the table
+   * @throws IOException
+   *           if a remote or network exception occurs
+   */
+  public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
+      throws IOException {
+    HTableDescriptor.isLegalTableName(tableName);
+    try {
+      return this.master.getAlterStatus(tableName);
+    } catch (RemoteException e) {
+      throw RemoteExceptionHandler.decodeRemoteException(e);
+    }
+  }
+
+  /**
    * Add a column to an existing table.
    * Asynchronous operation.
    *

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Oct 11 17:44:01 2011
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * This is a customized version of the polymorphic hadoop
@@ -175,6 +176,10 @@ public class HbaseObjectWritable impleme
 
     addToMap(KeyOnlyFilter.class, code++);
     addToMap(ColumnRangeFilter.class, code++);
+
+    // Online schema change
+    addToMap(Integer.class, code++);
+    addToMap(Pair.class, code++);
   }
 
   private Class<?> declaredClass;
@@ -274,6 +279,8 @@ public class HbaseObjectWritable impleme
     if (code == null ) {
       if ( List.class.isAssignableFrom(c)) {
         code = CLASS_TO_CODE.get(List.class);
+      } else if (Pair.class.isAssignableFrom(c)) {
+        code = CLASS_TO_CODE.get(Pair.class);
       }
     }
     if (code == null) {
@@ -356,6 +363,10 @@ public class HbaseObjectWritable impleme
         writeObject(out, list.get(i),
                   list.get(i).getClass(), conf);
       }
+    } else if (Pair.class.isAssignableFrom(declClass)) {
+      Pair pair = (Pair) instanceObj;
+      writeObject(out, pair.getFirst(), pair.getFirst().getClass(), conf);
+      writeObject(out, pair.getSecond(), pair.getSecond().getClass(), conf);
     } else if (declClass == String.class) {   // String
       Text.writeString(out, (String)instanceObj);
     } else if (declClass.isPrimitive()) {     // primitive type
@@ -379,6 +390,8 @@ public class HbaseObjectWritable impleme
       } else {
         throw new IllegalArgumentException("Not a primitive: "+declClass);
       }
+    } else if (declClass == Integer.class) { // Integer
+      out.writeInt(((Integer) instanceObj).intValue());
     } else if (declClass.isEnum()) {         // enum
       Text.writeString(out, ((Enum)instanceObj).name());
     } else if (Writable.class.isAssignableFrom(declClass)) { // Writable
@@ -447,6 +460,8 @@ public class HbaseObjectWritable impleme
       } else {
         throw new IllegalArgumentException("Not a primitive: "+declaredClass);
       }
+    } else if (declaredClass == Integer.class) { // Integer
+      instance = Integer.valueOf(in.readInt());
     } else if (declaredClass.isArray()) {              // array
       if (declaredClass.equals(byte [].class)) {
         instance = Bytes.readByteArray(in);
@@ -465,6 +480,10 @@ public class HbaseObjectWritable impleme
       for (int i = 0; i < length; i++) {
         ((ArrayList)instance).add(readObject(in, conf));
       }
+    } else if (Pair.class.isAssignableFrom(declaredClass)) { // Pair
+      instance = new Pair();
+      ((Pair) instance).setFirst(readObject(in, conf));
+      ((Pair) instance).setSecond(readObject(in, conf));
     } else if (declaredClass == String.class) {        // String
       instance = Text.readString(in);
     } else if (declaredClass.isEnum()) {         // enum

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Tue Oct 11 17:44:01 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
 
 import java.io.IOException;
@@ -135,4 +136,16 @@ public interface HMasterInterface extend
    * @param region region to clear from transition map
    */
   public void clearFromTransition(HRegionInfo region);
+
+  /**
+   * Used by the client to get the number of regions that have received the
+   * updated schema
+   *
+   * @param tableName
+   * @return Pair getFirst() is the number of regions pending an update
+   *              getSecond() total number of regions of the table
+   * @throws IOException
+   */
+  public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
+      throws IOException;
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/AddColumn.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/AddColumn.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/AddColumn.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/AddColumn.java Tue Oct 11 17:44:01 2011
@@ -22,12 +22,18 @@ package org.apache.hadoop.hbase.master;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 /** Instantiated to add a column family to a table */
 class AddColumn extends ColumnOperation {
   private final HColumnDescriptor newColumn;
+  private static final Log LOG = LogFactory.getLog(AddColumn.class);
 
   AddColumn(final HMaster master, final byte [] tableName,
     final HColumnDescriptor newColumn)
@@ -39,12 +45,31 @@ class AddColumn extends ColumnOperation 
   @Override
   protected void postProcessMeta(MetaRegion m, HRegionInterface server)
   throws IOException {
-    for (HRegionInfo i: unservedRegions) {
+    Set<HRegionInfo> regionsToReopen = new HashSet<HRegionInfo>();
+    for (HRegionInfo i : regionsToProcess) {
       // All we need to do to add a column is add it to the table descriptor.
       // When the region is brought on-line, it will find the column missing
       // and create it.
       i.getTableDesc().addFamily(newColumn);
       updateRegionInfo(server, m.getRegionName(), i);
+      // Ignore regions that are split or disabled,
+      // as we do not want to reopen them
+      if (!(i.isSplit() || i.isOffline())) {
+        regionsToReopen.add(i);
+      }
+    }
+    if (regionsToReopen.size() > 0) {
+      this.master.getRegionManager().getThrottledReopener(Bytes.toString(tableName)).
+                    addRegionsToReopen(regionsToReopen);
+    }
+  }
+
+  @Override
+  protected void processScanItem(String serverName, final HRegionInfo info)
+      throws IOException {
+    if (isEnabled(info)) {
+      LOG.debug("Performing online schema change (region not disabled): "
+          + info.getRegionNameAsString());
     }
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/DeleteColumn.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/DeleteColumn.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/DeleteColumn.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/DeleteColumn.java Tue Oct 11 17:44:01 2011
@@ -19,16 +19,22 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 /** Instantiated to remove a column family from a table */
 class DeleteColumn extends ColumnOperation {
   private final byte [] columnName;
+  private static final Log LOG = LogFactory.getLog(DeleteColumn.class);
 
   DeleteColumn(final HMaster master, final byte [] tableName,
     final byte [] columnName)
@@ -40,7 +46,8 @@ class DeleteColumn extends ColumnOperati
   @Override
   protected void postProcessMeta(MetaRegion m, HRegionInterface server)
   throws IOException {
-    for (HRegionInfo i: unservedRegions) {
+    Set<HRegionInfo> regionsToReopen = new HashSet<HRegionInfo>();
+    for (HRegionInfo i: regionsToProcess) {
       i.getTableDesc().removeFamily(columnName);
       updateRegionInfo(server, m.getRegionName(), i);
       // Delete the directories used by the column
@@ -49,6 +56,23 @@ class DeleteColumn extends ColumnOperati
       this.master.getFileSystem().
         delete(Store.getStoreHomedir(tabledir, i.getEncodedName(),
         this.columnName), true);
+      // Ignore regions that are split or disabled,
+      // as we do not want to reopen them
+      if (!(i.isSplit() || i.isOffline())) {
+        regionsToReopen.add(i);
+      }
+    }
+    if (regionsToReopen.size() > 0) {
+      this.master.getRegionManager().getThrottledReopener(Bytes.toString(tableName)).
+      addRegionsToReopen(regionsToReopen);
+    }
+  }
+  @Override
+  protected void processScanItem(String serverName, final HRegionInfo info)
+      throws IOException {
+    if (isEnabled(info)) {
+      LOG.debug("Performing online schema change (region not disabled): "
+          + info.getRegionNameAsString());
     }
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 17:44:01 2011
@@ -972,20 +972,44 @@ public class HMaster extends Thread impl
   @Override
   public void addColumn(byte [] tableName, HColumnDescriptor column)
   throws IOException {
+    ThrottledRegionReopener reopener = this.regionManager.
+            createThrottledReopener(Bytes.toString(tableName));
+    // Regions are added to the reopener in AddColumn
     new AddColumn(this, tableName, column).process();
+    reopener.reOpenRegionsThrottle();
+  }
+
+  public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
+      throws IOException {
+    Pair <Integer, Integer> p = new Pair<Integer, Integer>(0,0);
+    if (regionManager.getThrottledReopener(Bytes.toString(tableName)) != null) {
+      p = regionManager.getThrottledReopener(
+                    Bytes.toString(tableName)).getReopenStatus();
+    } else {
+      // Table is not reopening any regions return (0,0)
+    }
+    return p;
   }
 
   @Override
   public void modifyColumn(byte [] tableName, byte [] columnName,
     HColumnDescriptor descriptor)
   throws IOException {
+    ThrottledRegionReopener reopener = this.regionManager.
+                     createThrottledReopener(Bytes.toString(tableName));
+    // Regions are added to the reopener in ModifyColumn
     new ModifyColumn(this, tableName, columnName, descriptor).process();
+    reopener.reOpenRegionsThrottle();
   }
 
   @Override
   public void deleteColumn(final byte [] tableName, final byte [] c)
   throws IOException {
+    ThrottledRegionReopener reopener = this.regionManager.
+                    createThrottledReopener(Bytes.toString(tableName));
+    // Regions are added to the reopener in DeleteColumn
     new DeleteColumn(this, tableName, KeyValue.parseColumn(c)[0]).process();
+    reopener.reOpenRegionsThrottle();
   }
 
   @Override
@@ -1010,7 +1034,7 @@ public class HMaster extends Thread impl
    * currently deployed.
    * TODO: Redo so this method does not duplicate code with subsequent methods.
    */
-  List<Pair<HRegionInfo,HServerAddress>> getTableRegions(
+  public List<Pair<HRegionInfo,HServerAddress>> getTableRegions(
       final byte [] tableName)
   throws IOException {
     final ArrayList<Pair<HRegionInfo, HServerAddress>> result =

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ModifyColumn.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ModifyColumn.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ModifyColumn.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ModifyColumn.java Tue Oct 11 17:44:01 2011
@@ -19,17 +19,22 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.HashSet;
 
 /** Instantiated to modify an existing column family on a table */
 class ModifyColumn extends ColumnOperation {
   private final HColumnDescriptor descriptor;
   private final byte [] columnName;
+  private static final Log LOG = LogFactory.getLog(ModifyColumn.class);
 
   ModifyColumn(final HMaster master, final byte [] tableName,
     final byte [] columnName, HColumnDescriptor descriptor)
@@ -42,15 +47,34 @@ class ModifyColumn extends ColumnOperati
   @Override
   protected void postProcessMeta(MetaRegion m, HRegionInterface server)
   throws IOException {
-    for (HRegionInfo i: unservedRegions) {
+    Set<HRegionInfo> regionsToReopen = new HashSet<HRegionInfo>();
+    for (HRegionInfo i: regionsToProcess) {
       if (i.getTableDesc().hasFamily(columnName)) {
         i.getTableDesc().addFamily(descriptor);
         updateRegionInfo(server, m.getRegionName(), i);
+        // Ignore regions that are split or disabled,
+        // as we do not want to reopen them
+        if (!(i.isSplit() || i.isOffline())) {
+          regionsToReopen.add(i);
+        }
       } else { // otherwise, we have an error.
         throw new InvalidColumnNameException("Column family '" +
           Bytes.toString(columnName) +
           "' doesn't exist, so cannot be modified.");
       }
     }
+    if (regionsToReopen.size() > 0) {
+      this.master.getRegionManager().getThrottledReopener(Bytes.toString(tableName)).
+                    addRegionsToReopen(regionsToReopen);
+    }
+  }
+
+  @Override
+  protected void processScanItem(String serverName, final HRegionInfo info)
+      throws IOException {
+    if (isEnabled(info)) {
+      LOG.debug("Performing online schema change (region not disabled): "
+          + info.getRegionNameAsString());
+    }
   }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 17:44:01 2011
@@ -87,6 +87,40 @@ public class RegionManager {
   private static final byte [] META_REGION_PREFIX = Bytes.toBytes(".META.,");
 
   /**
+   * Preferred assignment map
+   * key -> Region server
+   * value -> set of regions to be assigned to this region server
+   *
+   */
+  private final Map<HServerAddress, Set<HRegionInfo>> preferredAssignmentMap =
+                 new ConcurrentHashMap<HServerAddress, Set<HRegionInfo>>();
+
+  /**
+   * Set of all regions that have a preferred assignment, used for quick lookup
+   */
+  private final Set<HRegionInfo> regionsWithPreferredAssignment =
+                                      new TreeSet<HRegionInfo>();
+
+  /**
+   * Thread to handle timeout of Regions that have a preferred assignment.
+   */
+  private final PreferredAssignmentHandler preferredAssignmentHandlerThread;
+
+  /**
+   * Delay queue for regions. Regions that have a "preferred assignment" are
+   * held for a particular timeout. Regions are removed from the queue after a
+   * timeout, and are assigned to the next available region server
+   */
+  private final DelayQueue<PreferredAssignment> preferredAssignmentTimeout =
+                                    new DelayQueue<PreferredAssignment>();
+  /**
+   * Map key -> tableName, value -> ThrottledRegionReopener
+   * An entry is created in the map before an alter operation is performed on the
+   * table. It is cleared when all the regions have reopened.
+   */
+  private final Map<String, ThrottledRegionReopener> tablesReopeningRegions =
+      new ConcurrentHashMap<String, ThrottledRegionReopener>();
+  /**
    * Map of region name to RegionState for regions that are in transition such as
    *
    * unassigned -> pendingOpen -> open
@@ -102,34 +136,6 @@ public class RegionManager {
    final SortedMap<String, RegionState> regionsInTransition =
     Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
 
-  // RESTARTING LOGIC
-
-  /**
-   * Thread to handle timeout of restarting Region Servers
-   */
-  private final RestartingServerHandler restartingServerHandlerThread;
-
-  /**
-   * Map from all the restarting servers to the set of non-META / non-ROOT
-   * regions they were serving
-   */
-  private final Map<HServerAddress, Set<HRegionInfo>>
-     restartingServersToRegions =
- new ConcurrentHashMap<HServerAddress, Set<HRegionInfo>>();
-
-  /**
-   * Set of all regions being held for restart. It is needed to maintain sync if
-   * multiple servers restart at the same time
-   */
-  private final Set<HRegionInfo> restartingRegions =
-    new TreeSet<HRegionInfo>();
-
-   /**
-    * Map from all restarting servers to their restart time.
-    */
-   private final DelayQueue<RestartingServer> restartingServers =
-     new DelayQueue<RestartingServer>();
-
    // regions in transition are also recorded in ZK using the zk wrapper
    final ZooKeeperWrapper zkWrapper;
 
@@ -204,8 +210,8 @@ public class RegionManager {
     // Scans the meta table
     metaScannerThread = new MetaScanner(master);
 
-    // scans for restarting regions timeout
-    this.restartingServerHandlerThread = new RestartingServerHandler();
+    // Scans for preferred assignment timeout
+    this.preferredAssignmentHandlerThread = new PreferredAssignmentHandler();
 
     zooKeeperNumRetries = conf.getInt(HConstants.ZOOKEEPER_RETRIES,
         HConstants.DEFAULT_ZOOKEEPER_RETRIES);
@@ -216,8 +222,8 @@ public class RegionManager {
   }
 
   void start() {
-    Threads.setDaemonThreadRunning(restartingServerHandlerThread,
-      "RegionManager.restartingServerHandler");
+    Threads.setDaemonThreadRunning(preferredAssignmentHandlerThread,
+    "RegionManager.preferredAssignmentHandler");
     Threads.setDaemonThreadRunning(rootScannerThread,
       "RegionManager.rootScanner");
     Threads.setDaemonThreadRunning(metaScannerThread,
@@ -302,11 +308,11 @@ public class RegionManager {
     // the server we are examining was registered as restarting and thus we
     // should assign all the regions to it directly; else, we should go through
     // the normal code path
-    MutableBoolean restaringServerAndOnTime = new MutableBoolean(false);
+    MutableBoolean preferredAssignment = new MutableBoolean(false);
 
     // get the region set to be assigned to this region server
     regionsToAssign = regionsAwaitingAssignment(info.getServerAddress(),
-        isSingleServer, restaringServerAndOnTime, assignmentByLocality,
+        isSingleServer, preferredAssignment, assignmentByLocality,
         holdRegionForBestRegionServer,
         quickStartRegionServerSet);
 
@@ -321,7 +327,7 @@ public class RegionManager {
       // if there's only one server or assign the region by locality,
       // just give the regions to this server
       if (isSingleServer || assignmentByLocality
-          || restaringServerAndOnTime.booleanValue()) {
+          || preferredAssignment.booleanValue()) {
         assignRegionsToOneServer(regionsToAssign, info, returnMsgs);
       } else {
         // otherwise, give this server a few regions taking into account the
@@ -495,7 +501,7 @@ public class RegionManager {
     return nRegions;
   }
 
-  /*
+  /**
    * Get the set of regions that should be assignable in this pass.
    *
    * Note that no synchronization on regionsInTransition is needed because the
@@ -503,46 +509,35 @@ public class RegionManager {
    * the monitor for RegionManager
    */
   private Set<RegionState> regionsAwaitingAssignment(HServerAddress addr,
-      boolean isSingleServer, MutableBoolean restaringServerAndOnTime,
+      boolean isSingleServer, MutableBoolean isPreferredAssignment,
       boolean assignmentByLocality, boolean holdRegionForBestRegionserver,
       Set<String> quickStartRegionServerSet) {
 
     // set of regions we want to assign to this server
     Set<RegionState> regionsToAssign = new HashSet<RegionState>();
 
-    // check if server is restarting
-    // take its information away as you this method is synchronized on
-    // regionsInTransition
-    Set<HRegionInfo> regions = unholdRestartingServer(addr);
+    Set<HRegionInfo> regions = preferredAssignmentMap.get(addr);
     if (null != regions) {
-      StringBuilder regionNames = new StringBuilder();
-      regionNames.append("[ ");
-      for (HRegionInfo region : regions) {
-        regionNames.append(region.getRegionNameAsString());
-        regionNames.append(" , ");
-      }
-      regionNames.append(" ]");
-      restaringServerAndOnTime.setValue(true);
-      LOG.debug("RegionServer " + addr.getHostname()
-          + " should receive regions " + regionNames.toString()
-          + " coming back from restart");
+      isPreferredAssignment.setValue(true);
       // One could use regionsInTransition.keySet().containsAll(regions) but
       // this provides more control and probably the same complexity. Also, this
       // gives direct logging of precise errors
-      for (HRegionInfo ri : regions) {
-        // no need for sync as caller owns monitor
+      HRegionInfo[] regionInfo = regions.toArray(new HRegionInfo[regions.size()]);
+      for (HRegionInfo ri : regionInfo) {
         RegionState state = regionsInTransition.get(ri.getRegionNameAsString());
-        if (null == state || !state.isUnassigned()) {
-          LOG.error("Region "
-              + ri
-              + (null == state ? " is not in transition" : " is now in state "
-                  + state)
-              + " and is no longer available for assigning to previously owning RS "
-              + addr.getHostname());
-        } else {
+        if (null != state && state.isUnassigned()) {
           regionsToAssign.add(state);
+          removeRegionFromPreferredAssignment(addr, ri);
         }
       }
+      StringBuilder regionNames = new StringBuilder();
+      regionNames.append("[ ");
+      for (RegionState regionState : regionsToAssign) {
+        regionNames.append(Bytes.toString(regionState.getRegionName()));
+        regionNames.append(" , ");
+      }
+      regionNames.append(" ]");
+      LOG.debug("Assigning regions to " + addr + " : " + regionNames);
       // return its initial regions ASAP
       return regionsToAssign;
     }
@@ -598,11 +593,12 @@ public class RegionManager {
           continue;
         }
 
-        // if we are holding it, don't give it away to any other server
-        if (restartingRegions.contains(s.getRegionInfo())) {
-          continue;
+        synchronized (regionsWithPreferredAssignment) {
+          // if we are holding it, don't give it away to any other server
+          if (regionsWithPreferredAssignment.contains(s.getRegionInfo())) {
+            continue;
+          }
         }
-
         if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
           String preferredHost =
             this.master.getPreferredRegionToRegionServerMapping().get(name);
@@ -1918,65 +1914,58 @@ public class RegionManager {
     }
   }
 
-  private class RestartingServerHandler extends Thread {
-    public RestartingServerHandler() {
+  private class PreferredAssignmentHandler extends Thread {
+    public PreferredAssignmentHandler() {
     }
 
     @Override
     public void run() {
-      LOG.debug("Started RestartingServerHandler");
+      LOG.debug("Started PreferredAssignmentHandler");
+      PreferredAssignment plan = null;
       while (!master.getClosed().get()) {
         try {
-          // check if any servers' waiting time expired
-          RestartingServer server = restartingServers.poll(
-              master.getConfiguration().getInt(
-                  HConstants.THREAD_WAKE_FREQUENCY, 30 * 1000),
+          // check if any regions waiting time expired
+          plan = preferredAssignmentTimeout.poll(master.getConfiguration()
+              .getInt(HConstants.THREAD_WAKE_FREQUENCY, 30 * 1000),
               TimeUnit.MILLISECONDS);
-          Set<HRegionInfo> regions;
-
-          if (null == server) {
-            continue;
-          }
-
-          regions = unholdRestartingServer(server.getServer()
-              .getServerAddress());
-          if (null != regions) {
-            LOG.info("RegionServer "
-                + server.getServer()
-                + " failed to report back after restart! Redistributing all of its regions "
-                + regions);
-          } else {
-            // the server came back and restarted properly
-          }
         } catch (InterruptedException e) {
           // no problem, just continue
           continue;
         }
+        if (null == plan) {
+          continue;
+        }
+        if (removeRegionFromPreferredAssignment(plan.getServer(),
+            plan.getRegionInfo())) {
+          LOG.info("Removed region from preferred assignment: " +
+              plan.getRegionInfo().getRegionNameAsString());
+        }
       }
     }
   }
 
-  private class RestartingServer implements Delayed {
+  private class PreferredAssignment implements Delayed {
     private long creationTime;
-    private HServerInfo server;
+    private HRegionInfo region;
+    private HServerAddress server;
     private long millisecondDelay;
 
-    RestartingServer(HServerInfo server, long creationTime,
-        long millisecondDelay) {
-      this.server = server;
+    PreferredAssignment(HRegionInfo region, HServerAddress addr,
+        long creationTime, long millisecondDelay) {
+      this.region = region;
+      this.server = addr;
       this.creationTime = creationTime;
       this.millisecondDelay = millisecondDelay;
     }
 
-    /**
-     * Method to get the server info back
-     *
-     * @return the server info for this respective regionserver
-     */
-    public HServerInfo getServer() {
+    public HServerAddress getServer() {
       return this.server;
     }
 
+    public HRegionInfo getRegionInfo() {
+      return this.region;
+    }
+
     @Override
     public int compareTo(Delayed arg0) {
       long delta = this.getDelay(TimeUnit.MILLISECONDS)
@@ -1993,10 +1982,12 @@ public class RegionManager {
 
     @Override
     public boolean equals(Object o) {
-      if (o instanceof RestartingServer) {
-        return ((RestartingServer) o).getServer().equals(this.server);
+      if (o instanceof PreferredAssignment) {
+        if (((PreferredAssignment) o).getServer().equals(this.getServer()) &&
+         ((PreferredAssignment) o).getRegionInfo().equals(this.getRegionInfo())) {
+          return true;
+        }
       }
-
       return false;
     }
   }
@@ -2012,59 +2003,104 @@ public class RegionManager {
    */
   public void addRegionServerForRestart(final HServerInfo regionServer,
       Set<HRegionInfo> regions) {
-    addRegionServerForRestart(regionServer, regions, master.getConfiguration()
-        .getLong("hbase.regionserver.restart.regionHoldPeriod", 60 * 1000));
+    LOG.debug("Holding regions of restartng server: " +
+        regionServer.getServerName());
+    HServerAddress addr = regionServer.getServerAddress();
+    addRegionToPreferredAssignment(addr, regions);
   }
 
-  /**
-   * Method used to do housekeeping for holding regions for a RegionServer going
-   * down for a restart
-   *
-   * @param regionServer
-   *          the RegionServer going down for a restart
-   * @param regions
-   *          the HRegions it was previously serving
-   * @param millisecondDelay
-   *          the delay to wait until redistributing the regions from holding
-   */
-  public void addRegionServerForRestart(final HServerInfo regionServer,
-      Set<HRegionInfo> regions, long millisecondDelay) {
-
-    LOG.debug("Holding for server  " + regionServer + " regions " + regions
-        + " for this much time " + millisecondDelay + " ms");
+  public boolean hasPreferredAssignment(final HServerAddress hsa) {
+    if (preferredAssignmentMap.containsKey(hsa)) {
+      return true;
+    }
+    return false;
+  }
 
-    restartingServersToRegions.put(regionServer.getServerAddress(), regions);
-    synchronized (restartingRegions) {
-      restartingRegions.addAll(regions);
+  private void addRegionToPreferredAssignment(HServerAddress server,
+      Set<HRegionInfo> regions) {
+    for (HRegionInfo region : regions) {
+      addRegionToPreferredAssignment(server, region);
     }
+  }
 
-    RestartingServer serv = new RestartingServer(regionServer,
-        System.currentTimeMillis(), millisecondDelay);
-    restartingServers.put(serv);
+  public void addRegionToPreferredAssignment(HServerAddress server,
+      HRegionInfo region) {
+    synchronized (regionsWithPreferredAssignment) {
+      if (!preferredAssignmentMap.containsKey(server)) {
+        Set<HRegionInfo> regions = new TreeSet<HRegionInfo>();
+        preferredAssignmentMap.put(server, regions);
+      }
+      preferredAssignmentMap.get(server).add(region);
+      regionsWithPreferredAssignment.add(region);
+    }
+    // Add to delay queue
+    long millisecondDelay = master.getConfiguration().getLong(
+        "hbase.regionserver.preferredAssignment.regionHoldPeriod", 60000);
+    preferredAssignmentTimeout.add(new PreferredAssignment(region, server,
+        System.currentTimeMillis(), millisecondDelay));
+  }
+
+  private boolean removeRegionFromPreferredAssignment(HServerAddress server,
+      HRegionInfo region) {
+    synchronized (regionsWithPreferredAssignment) {
+      if (preferredAssignmentMap.containsKey(server)) {
+        preferredAssignmentMap.get(server).remove(region);
+        // If no more regions are held for this region server
+        if (preferredAssignmentMap.get(server).size() == 0) {
+          preferredAssignmentMap.remove(server);
+        }
+        regionsWithPreferredAssignment.remove(region);
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
-   * Removes all the information being held for restart purposes for this
-   * particular regionserver
-   *
-   * @param addr
-   *          the address of the regionserver that went down for a restart
-   * @return the regions this regionserver was holding or null if this method
-   *         already got called before
+   * Create a reopener for this table, if one exists, return the existing throttler.
+   * @param tableName
+   * @return
    */
-  private Set<HRegionInfo> unholdRestartingServer(final HServerAddress addr) {
-    Set<HRegionInfo> regions = restartingServersToRegions.remove(addr);
-    if (null != regions) {
-      // no longer hold the regions
-      synchronized (restartingRegions) {
-        restartingRegions.removeAll(regions);
-      }
+  public ThrottledRegionReopener createThrottledReopener(String tableName) {
+    if (!tablesReopeningRegions.containsKey(tableName)) {
+      ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(tableName, this.master, this);
+      tablesReopeningRegions.put(tableName, throttledReopener);
     }
+    return tablesReopeningRegions.get(tableName);
+  }
 
-    return regions;
+  /**
+   * Return the throttler for this table
+   * @param tableName
+   * @return
+   */
+  public ThrottledRegionReopener getThrottledReopener(String tableName) {
+    return tablesReopeningRegions.get(tableName);
   }
 
-  public boolean isServerRestarting(final HServerInfo hsi) {
-    return restartingServersToRegions.containsKey(hsi.getServerAddress());
+  /**
+   * Delete the throttler when the operation is complete
+   * @param tableName
+   */
+  public void deleteThrottledReopener(String tableName) {
+    // if tablesReopeningRegions.contains do something
+    if (tablesReopeningRegions.containsKey(tableName)) {
+      tablesReopeningRegions.remove(tableName);
+      LOG.debug("Removed throttler for " + tableName);
+    } else {
+      LOG.debug("Tried to delete a throttled reopener, but it does not exist.");
+    }
+  }
+
+  /**
+   * When the region is opened, check if it is reopening and notify the throttler
+   * for further processing.
+   * @param region
+   */
+  public void notifyRegionReopened(HRegionInfo region) {
+    String tableName = region.getTableDesc().getNameAsString();
+    if (tablesReopeningRegions.containsKey(tableName)) {
+      tablesReopeningRegions.get(tableName).notifyRegionOpened(region);
+    }
   }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Oct 11 17:44:01 2011
@@ -505,6 +505,9 @@ public class ServerManager {
 
         case MSG_REPORT_CLOSE:
           processRegionClose(region);
+            this.master.getRegionManager().getThrottledReopener(
+                  region.getTableDesc().getNameAsString()).
+                  addPreferredAssignmentForReopen(region, serverInfo);
           break;
 
         case MSG_REPORT_SPLIT:
@@ -666,6 +669,7 @@ public class ServerManager {
           this.master.getRegionServerOperationQueue().put(op);
         }
       }
+      this.master.getRegionManager().notifyRegionReopened(region);
     }
   }
 
@@ -768,11 +772,11 @@ public class ServerManager {
       for (Map.Entry<String, HServerLoad> entry : serversToLoad.entrySet()) {
         HServerInfo hsi = serversToServerInfo.get(entry.getKey());
         if (null != hsi) {
-          if (!this.master.getRegionManager().isServerRestarting(hsi)) {
+          if (!this.master.getRegionManager().hasPreferredAssignment(hsi.getServerAddress())) {
             totalLoad += entry.getValue().getNumberOfRegions();
           } else {
-            // server is being processed for a restart, ignore for loadbalancing
-            // purposes
+            // Master has held some regions for this server, ignore this server
+            // for loadbalancing purposes
           }
         } else {
           // this server has already been removed from the serversToServerInfo

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TableOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TableOperation.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TableOperation.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TableOperation.java Tue Oct 11 17:44:01 2011
@@ -19,8 +19,10 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -46,6 +48,7 @@ abstract class TableOperation {
   protected final byte [] tableName;
   // Do regions in order.
   protected final Set<HRegionInfo> unservedRegions = new TreeSet<HRegionInfo>();
+  protected final Set<HRegionInfo> regionsToProcess = new TreeSet<HRegionInfo>();
   protected HMaster master;
 
   protected TableOperation(final HMaster master, final byte [] tableName)
@@ -71,8 +74,10 @@ abstract class TableOperation {
   }
 
   private class ProcessTableOperation extends RetryableMetaOperation<Boolean> {
-    ProcessTableOperation(MetaRegion m, HMaster master) {
+    TableOperation tableOp;
+    ProcessTableOperation(MetaRegion m, HMaster master, TableOperation operation) {
       super(m, master);
+      tableOp = operation;
     }
 
     public Boolean call() throws IOException {
@@ -114,6 +119,10 @@ abstract class TableOperation {
           }
 
           tableExists = true;
+          if(tableOp instanceof AddColumn || tableOp instanceof ModifyColumn ||
+              tableOp instanceof DeleteColumn) {
+            regionsToProcess.add(info);
+          }
           if (!isBeingServed(serverName) || !isEnabled(info)) {
             unservedRegions.add(info);
           }
@@ -154,7 +163,7 @@ abstract class TableOperation {
     // Prevent meta scanner from running
     synchronized(master.getRegionManager().metaScannerThread.scannerLock) {
       for (MetaRegion m: metaRegions) {
-        new ProcessTableOperation(m, master).doWithRetries();
+        new ProcessTableOperation(m, master, this).doWithRetries();
       }
     }
   }

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java?rev=1181953&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java Tue Oct 11 17:44:01 2011
@@ -0,0 +1,202 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Pair;
+
+public class ThrottledRegionReopener {
+  protected static final Log LOG = LogFactory
+      .getLog(ThrottledRegionReopener.class);
+  private HMaster master;
+  private RegionManager regionManager;
+  private String tableName;
+  private int totalNoOfRegionsToReopen = 0;
+
+  ThrottledRegionReopener(String tn, HMaster m, RegionManager regMgr) {
+    this.tableName = tn;
+    this.master = m;
+    this.regionManager = regMgr;
+  }
+
+  /**
+   * Set of regions that need to be processed for schema change. This is
+   * required for throttling. Regions are added to this set when the .META. is
+   * updated. Regions are held in this set until they can be closed. When a
+   * region is closed it is removed from this set and added to the
+   * alterTableReopeningRegions set.
+   */
+  private final Set<HRegionInfo> regionsToBeReopend = new HashSet<HRegionInfo>();
+
+  /**
+   * Set of regions that are currently being reopened by the master. Regions are
+   * added to this set when their status is changed to "CLOSING" A region is
+   * removed from this set when the master detects that the region has opened.
+   */
+  private final Set<HRegionInfo> regionsBeingReopened = new HashSet<HRegionInfo>();
+
+  public synchronized void addRegionsToReopen(Set<HRegionInfo> regions) {
+    regionsToBeReopend.addAll(regions);
+    totalNoOfRegionsToReopen = regions.size();
+  }
+
+  /**
+   * @return a pair of Integers (regions pending for reopen, total number of
+   *         regions in the table).
+   * @throws IOException
+   */
+  public synchronized Pair<Integer, Integer> getReopenStatus()
+      throws IOException {
+    int pending = regionsToBeReopend.size() + regionsBeingReopened.size();
+    return new Pair<Integer, Integer>(pending, totalNoOfRegionsToReopen);
+  }
+
+  /**
+   * When a reopening region changes state to OPEN, remove it from reopening
+   * regions.
+   *
+   * @param region
+   * @param serverName
+   *          on which the region reopened.
+   */
+  public synchronized void notifyRegionOpened(HRegionInfo region) {
+    if (regionsBeingReopened.contains(region)) {
+      LOG.info("Region reopened: " + region.getRegionNameAsString());
+      regionsBeingReopened.remove(region);
+
+      // Check if all the regions have reopened and log.
+      if (closeSomeRegions() == 0) {
+        if (regionsToBeReopend.size() == 0 && regionsBeingReopened.size() == 0) {
+          LOG.info("All regions of " + tableName + " reopened successfully.");
+        } else {
+          LOG.error("All regions of " + tableName
+              + " could not be reopened. Retry the operation.");
+        }
+        regionManager.deleteThrottledReopener(tableName);
+      }
+    }
+  }
+
+  /**
+   * Close some of the reopening regions. Used for throttling the percentage of
+   * regions of a table that may be reopened concurrently. This is configurable
+   * by a hbase config parameter hbase.regionserver.alterTable.concurrentReopen
+   * which defines the percentage of regions of a table that the master may
+   * reopen concurrently (defaults to 1).
+   *
+   * @param serverName
+   *          Region server on which to close regions
+   */
+  public synchronized int closeSomeRegions() {
+
+    float percentConcurrentClose = this.master.getConfiguration().getFloat(
+        "hbase.regionserver.alterTable.concurrentReopen", 5);
+    // Find the number of regions you are allowed to close concurrently
+    float numOfConcurrentClose = (percentConcurrentClose / 100)
+        * totalNoOfRegionsToReopen;
+    // Close at least one region at a time
+    if (numOfConcurrentClose < 1 && numOfConcurrentClose > 0) {
+      numOfConcurrentClose = 1;
+    }
+
+    numOfConcurrentClose -= regionsBeingReopened.size();
+    if (numOfConcurrentClose <= 0) {
+      return 0;
+    }
+    int cnt = 0;
+    for (Iterator<HRegionInfo> iter = regionsToBeReopend.iterator(); iter
+        .hasNext();) {
+      HRegionInfo region = iter.next();
+      // Get the server name on which this is currently deployed
+      String serverName = getRegionServerName(region);
+      // Skip this region, process it when it has a non-null entry in META
+      if (regionManager.regionIsInTransition(region.getRegionNameAsString())
+          || serverName == null) {
+        LOG.info("Skipping region in transition: "
+            + region.getRegionNameAsString());
+        continue;
+      }
+
+      LOG.debug("Closing region " + region.getRegionNameAsString());
+      regionManager.setClosing(serverName, region, false);
+      iter.remove(); // Remove from regionsToBeReopened
+      regionsBeingReopened.add(region);
+      cnt++;
+      // Close allowed number of regions, exit
+      if (cnt == (int) numOfConcurrentClose) {
+        break;
+      }
+    }
+    return cnt;
+  }
+
+  /**
+   * Get the name of the server serving this region from .META.
+   *
+   * @param region
+   * @return serverName (host, port and startcode)
+   */
+  private String getRegionServerName(HRegionInfo region) {
+    try {
+      //TODO: is there a better way to do this?
+      HTable metaTable = new HTable(this.master.getConfiguration(),
+          HConstants.META_TABLE_NAME);
+
+      Result result = metaTable.getRowOrBefore(region.getRegionName(),
+          HConstants.CATALOG_FAMILY);
+      HRegionInfo metaRegionInfo = this.master.getHRegionInfo(
+          region.getRegionName(), result);
+      if (metaRegionInfo.equals(region)) {
+        String serverAddr = BaseScanner.getServerAddress(result);
+        if (serverAddr != null && serverAddr.length() > 0) {
+          long startCode = BaseScanner.getStartCode(result);
+          String serverName = HServerInfo.getServerName(serverAddr, startCode);
+          return serverName;
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not get the server name for : " + region.getRegionNameAsString());
+    }
+    return null;
+  }
+
+  /**
+   * Called when a region closes, if it is one of the reopening regions, add it
+   * to preferred assignment
+   *
+   * @param region
+   * @param serverInfo
+   */
+  public synchronized void addPreferredAssignmentForReopen(HRegionInfo region,
+      HServerInfo serverInfo) {
+    if (regionsBeingReopened.contains(region)) {
+      regionManager.addRegionToPreferredAssignment(
+          serverInfo.getServerAddress(), region);
+    }
+  }
+
+  /**
+   * Called to start reopening regions of tableName
+   *
+   * @throws IOException
+   */
+  public synchronized void reOpenRegionsThrottle() throws IOException {
+    if (HTable.isTableEnabled(tableName)) {
+      LOG.info("Initiating reopen for all regions of " + tableName);
+      if (closeSomeRegions() == 0) {
+        regionManager.deleteThrottledReopener(tableName);
+        throw new IOException("Could not reopen regions of the table, "
+            + "retry the alter operation");
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/hbase/admin.rb?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.89/src/main/ruby/hbase/admin.rb Tue Oct 11 17:44:01 2011
@@ -28,6 +28,7 @@ java_import org.apache.hadoop.hbase.io.h
 java_import org.apache.hadoop.hbase.regionserver.StoreFile
 java_import org.apache.hadoop.hbase.HRegionInfo
 java_import org.apache.zookeeper.ZooKeeper
+java_import org.apache.hadoop.hbase.util.Pair
 
 # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
 
@@ -207,8 +208,26 @@ module Hbase
     end
 
     #----------------------------------------------------------------------------------------------
+    # Check the status of alter command (number of regions reopened)
+       def alter_status(table_name)
+         # Table name should be a string
+         raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String)
+
+         # Table should exist
+         raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
+
+         status = Pair.new()
+         begin
+           status = @admin.getAlterStatus(table_name.to_java_bytes)
+           puts "#{status.getSecond() - status.getFirst()}/#{status.getSecond()} regions updated."
+           sleep 1
+         end while status != nil && status.getFirst() != 0
+         puts "Done."
+       end
+
+    #----------------------------------------------------------------------------------------------
     # Change table structure or table options
-    def alter(table_name, *args)
+    def alter(table_name, wait = true, *args)
       # Table name should be a string
       raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String)
 
@@ -216,7 +235,7 @@ module Hbase
       raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
 
       # Table should be disabled
-      raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name)
+      # raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name)
 
       # There should be at least one argument
       raise(ArgumentError, "There should be at least one argument but the table name") if args.empty?
@@ -240,8 +259,16 @@ module Hbase
           # If column already exist, then try to alter it. Create otherwise.
           if htd.hasFamily(column_name.to_java_bytes)
             @admin.modifyColumn(table_name, column_name, descriptor)
+            if wait == true
+              puts "Updating all regions with the new schema..."
+              alter_status(table_name)
+            end
           else
             @admin.addColumn(table_name, descriptor)
+            if wait == true
+              puts "Updating all regions with the new schema..."
+              alter_status(table_name)
+            end
           end
           next
         end
@@ -250,11 +277,17 @@ module Hbase
         if method == "delete"
           raise(ArgumentError, "NAME parameter missing for delete method") unless arg[NAME]
           @admin.deleteColumn(table_name, arg[NAME])
+          if wait == true
+            puts "Updating all regions with the new schema..."
+            alter_status(table_name)
+          end
           next
         end
 
         # Change table attributes
         if method == "table_att"
+          # Table should be disabled
+          raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name)
           htd.setMaxFileSize(JLong.valueOf(arg[MAX_FILESIZE])) if arg[MAX_FILESIZE]
           htd.setReadOnly(JBoolean.valueOf(arg[READONLY])) if arg[READONLY]
           htd.setMemStoreFlushSize(JLong.valueOf(arg[MEMSTORE_FLUSHSIZE])) if arg[MEMSTORE_FLUSHSIZE]

Modified: hbase/branches/0.89/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell.rb?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell.rb Tue Oct 11 17:44:01 2011
@@ -219,6 +219,8 @@ Shell.load_command_group(
     list
     locate_regionservers
     show_filters
+    alter_status
+    alter_async
   ]
 )
 

Modified: hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb Tue Oct 11 17:44:01 2011
@@ -50,7 +50,7 @@ module Shell
 
       def command(table, *args)
         format_simple_command do
-          admin.alter(table, *args)
+          admin.alter(table, true, *args)
         end
       end
     end

Copied: hbase/branches/0.89/src/main/ruby/shell/commands/alter_async.rb (from r1181952, hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/alter_async.rb?p2=hbase/branches/0.89/src/main/ruby/shell/commands/alter_async.rb&p1=hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb&r1=1181952&r2=1181953&rev=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/alter.rb (original)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/alter_async.rb Tue Oct 11 17:44:01 2011
@@ -20,37 +20,45 @@
 
 module Shell
   module Commands
-    class Alter < Command
+    class AlterAsync < Command
       def help
         return <<-EOF
-          Alter column family schema;  pass table name and a dictionary
-          specifying new column family schema. Dictionaries are described
-          on the main help command output. Dictionary must include name
-          of column family to alter. For example,
+          Alter column family schema, does not wait for all regions to receive the
+          schema changes. Pass table name and a dictionary specifying new column
+          family schema. Dictionaries are described on the main help command output.
+          Dictionary must include name of column family to alter. For example,
 
           To change or add the 'f1' column family in table 't1' from defaults
           to instead keep a maximum of 5 cell VERSIONS, do:
-          hbase> alter 't1', NAME => 'f1', VERSIONS => 5
+
+            hbase> alter_async 't1', NAME => 'f1', VERSIONS => 5
 
           To delete the 'f1' column family in table 't1', do:
-          hbase> alter 't1', NAME => 'f1', METHOD => 'delete'
+
+            hbase> alter_async 't1', NAME => 'f1', METHOD => 'delete'
+
           or a shorter version:
-          hbase> alter 't1', 'delete' => 'f1'
+
+            hbase> alter_async 't1', 'delete' => 'f1'
 
           You can also change table-scope attributes like MAX_FILESIZE
           MEMSTORE_FLUSHSIZE, READONLY, and DEFERRED_LOG_FLUSH.
 
           For example, to change the max size of a family to 128MB, do:
-          hbase> alter 't1', METHOD => 'table_att', MAX_FILESIZE => '134217728'
+
+            hbase> alter 't1', METHOD => 'table_att', MAX_FILESIZE => '134217728'
 
           There could be more than one alteration in one command:
-          hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
+
+            hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
+
+          To check if all the regions have been updated, use alter_status <table_name>
         EOF
       end
 
       def command(table, *args)
         format_simple_command do
-          admin.alter(table, *args)
+          admin.alter(table, false, *args)
         end
       end
     end

Added: hbase/branches/0.89/src/main/ruby/shell/commands/alter_status.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/ruby/shell/commands/alter_status.rb?rev=1181953&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/ruby/shell/commands/alter_status.rb (added)
+++ hbase/branches/0.89/src/main/ruby/shell/commands/alter_status.rb Tue Oct 11 17:44:01 2011
@@ -0,0 +1,39 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class AlterStatus < Command
+      def help
+        return <<-EOF
+Get the status of the alter command. Indicates the number of regions of the
+table that have received the updated schema
+Pass table name.
+
+hbase> alter_status 't1'
+EOF
+      end
+
+      def command(table)
+        admin.alter_status(table)
+      end
+    end
+  end
+end
\ No newline at end of file

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1181953&r1=1181952&r2=1181953&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Tue Oct 11 17:44:01 2011
@@ -20,12 +20,14 @@
 package org.apache.hadoop.hbase.client;
 
 
+import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,11 +38,15 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -267,9 +273,11 @@ public class TestAdmin {
     } catch (RetriesExhaustedException e) {
       ok = true;
     }
+    // with online schema change it is possible to add column
+    // without disabling the table
     assertEquals(true, ok);
     this.admin.enableTable(table);
-
+    ok = true;
     //Test that table is enabled
     try {
       ht.get(get);
@@ -429,11 +437,6 @@ public class TestAdmin {
     }
     this.admin.addColumn(tableName, new HColumnDescriptor("col2"));
     this.admin.enableTable(tableName);
-    try {
-      this.admin.deleteColumn(tableName, Bytes.toBytes("col2"));
-    } catch(TableNotDisabledException e) {
-      // Expected
-    }
     this.admin.disableTable(tableName);
     this.admin.deleteColumn(tableName, Bytes.toBytes("col2"));
     this.admin.deleteTable(tableName);
@@ -622,5 +625,56 @@ public class TestAdmin {
 
     assertEquals(htd.compareTo(confirmedHtd), 0);
   }
+
+  @Test
+  public void testOnlineChangeTableSchema() throws IOException,
+      InterruptedException {
+    final byte[] tableName = Bytes.toBytes("changeTableSchemaOnline");
+    HTableDescriptor[] tables = admin.listTables();
+    int numTables = tables.length;
+    TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
+    tables = this.admin.listTables();
+    assertEquals(numTables + 1, tables.length);
+
+    HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
+
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    HTable table = new HTable(master.getConfiguration(),
+        tableName);
+    Map<HRegionInfo, HServerAddress> hriToHsa = table.getRegionsInfo();
+
+    // Try adding a column
+    this.admin.enableTable(tableName);
+    assertFalse(this.admin.isTableDisabled(tableName));
+    final String xtracolName = "xtracol";
+    HColumnDescriptor xtracol = new HColumnDescriptor(xtracolName);
+    xtracol.setValue(xtracolName, xtracolName);
+    boolean expectedException = false;
+    try {
+      this.admin.addColumn(tableName, xtracol);
+    } catch (TableNotDisabledException re) {
+      expectedException = true;
+    }
+    // Add column should work even if the table is enabled
+    assertFalse(expectedException);
+
+    // wait for all regions to reopen
+    while (this.admin.getAlterStatus(tableName).getFirst() != 0) {
+      Thread.sleep(100);
+    }
+    // get the table descriptor from META
+    htd = this.admin.getTableDescriptor(tableName);
+    List<Pair<HRegionInfo,HServerAddress>> regionToRegionServer = master.getTableRegions(tableName);
+    // check if all regions have the column the correct schema.
+    for (Pair<HRegionInfo, HServerAddress> p : regionToRegionServer) {
+      HRegionInfo regionInfo = p.getFirst();
+      HTableDescriptor modifiedHtd = regionInfo.getTableDesc();
+      // ensure that the Htable descriptor on the master and the region servers
+      // of all regions is the same
+      assertTrue(htd.equals(modifiedHtd));
+    }
+
+  }
+
 }
 



Mime
View raw message