hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject svn commit: r1452257 [4/14] - in /hbase/branches/0.94: security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/ src/main/jamon/org/apache/hadoop/hbase/tmpl/master/ src/main/java/org...
Date Mon, 04 Mar 2013 11:24:53 GMT
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,916 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.snapshot;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SnapshotSentinel;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
+import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
+import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
+import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
+import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class manages the procedure of taking and restoring snapshots. There is only one
+ * SnapshotManager for the master.
+ * <p>
+ * The class provides methods for monitoring in-progress snapshot actions.
+ * <p>
+ * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
+ * simplification in the current implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SnapshotManager implements Stoppable {
+  private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
+
+  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
+  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
+
+  /** Enable or disable snapshot support */
+  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
+
+  /**
+   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
+   * completion.
+   */
+  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
+
+  /** By default, check to see if the snapshot is complete (ms) */
+  private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
+
+  /**
+   * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
+   * completion.
+   */
+  private static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
+
+  /** Name of the operation to use in the controller */
+  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
+
+  // TODO - enable having multiple snapshots with multiple monitors/threads
+  // this needs to be configuration based when running multiple snapshots is implemented
+  /** number of current operations running on the master */
+  private static final int opThreads = 1;
+
+  private boolean stopped;
+  private final long wakeFrequency;
+  private final MasterServices master;  // Needed by TableEventHandlers
+  private final ProcedureCoordinator coordinator;
+
+  // Is snapshot feature enabled?
+  private boolean isSnapshotSupported = false;
+
+  // A reference to a handler.  If the handler is non-null, then it is assumed that a snapshot is
+  // in progress currently
+  // TODO: this is a bad smell;  likely replace with a collection in the future.  Also this gets
+  // reset by every operation.
+  private TakeSnapshotHandler handler;
+
+  private final Path rootDir;
+  private final ExecutorService executorService;
+
+  // Restore Sentinels map, with table name as key
+  private Map<String, SnapshotSentinel> restoreHandlers = new HashMap<String, SnapshotSentinel>();
+
+  /**
+   * Construct a snapshot manager.
+   * @param master
+   */
+  public SnapshotManager(final MasterServices master) throws KeeperException, IOException,
+    UnsupportedOperationException {
+    this.master = master;
+    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+
+    // get the configuration for the coordinator
+    Configuration conf = master.getConfiguration();
+    this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
+    long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency);
+    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+        master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
+    this.coordinator = new ProcedureCoordinator(comms, tpool);
+    this.rootDir = master.getMasterFileSystem().getRootDir();
+    this.executorService = master.getExecutorService();
+    resetTempDir();
+  }
+
+  /**
+   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
+   * @param master services for the master where the manager is running
+   * @param coordinator procedure coordinator instance.  exposed for testing.
+   * @param pool HBase ExecutorServcie instance, exposed for testing.
+   */
+  public SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, ExecutorService pool)
+      throws IOException, UnsupportedOperationException {
+    this.master = master;
+    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+
+    this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
+      SNAPSHOT_WAKE_MILLIS_DEFAULT);
+    this.coordinator = coordinator;
+    this.rootDir = master.getMasterFileSystem().getRootDir();
+    this.executorService = pool;
+    resetTempDir();
+  }
+
+  /**
+   * Gets the list of all completed snapshots.
+   * @return list of SnapshotDescriptions
+   * @throws IOException File system exception
+   */
+  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
+    List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
+    // first create the snapshot root path and check to see if it exists
+    Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+    // if there are no snapshots, return an empty list
+    if (!fs.exists(snapshotDir)) {
+      return snapshotDescs;
+    }
+
+    // ignore all the snapshots in progress
+    FileStatus[] snapshots = fs.listStatus(snapshotDir,
+      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
+    // loop through all the completed snapshots
+    for (FileStatus snapshot : snapshots) {
+      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
+      // if the snapshot is bad
+      if (!fs.exists(info)) {
+        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
+        continue;
+      }
+      FSDataInputStream in = null;
+      try {
+        in = fs.open(info);
+        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
+        snapshotDescs.add(desc);
+      } catch (IOException e) {
+        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+      }
+    }
+    return snapshotDescs;
+  }
+
+  /**
+   * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
+   * snapshot attempts.
+   *
+   * @throws IOException if we can't reach the filesystem
+   */
+  void resetTempDir() throws IOException {
+    // cleanup any existing snapshots.
+    Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
+    if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
+      LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
+    }
+  }
+
+  /**
+   * Delete the specified snapshot
+   * @param snapshot
+   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
+   * @throws IOException For filesystem IOExceptions
+   */
+  public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
+
+    // call coproc pre hook
+    MasterCoprocessorHost cpHost = master.getCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preDeleteSnapshot(snapshot);
+    }
+
+    // check to see if it is completed
+    if (!isSnapshotCompleted(snapshot)) {
+      throw new SnapshotDoesNotExistException(snapshot);
+    }
+
+    String snapshotName = snapshot.getName();
+    LOG.debug("Deleting snapshot: " + snapshotName);
+    // first create the snapshot description and check to see if it exists
+    MasterFileSystem fs = master.getMasterFileSystem();
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+    // delete the existing snapshot
+    if (!fs.getFileSystem().delete(snapshotDir, true)) {
+      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
+    }
+
+    // call coproc post hook
+    if (cpHost != null) {
+      cpHost.postDeleteSnapshot(snapshot);
+    }
+
+  }
+
+  /**
+   * Return the handler if it is currently running and has the same snapshot target name.
+   * @param snapshot
+   * @return null if doesn't match, else a live handler.
+   */
+  private synchronized TakeSnapshotHandler getTakeSnapshotHandler(SnapshotDescription snapshot) {
+    TakeSnapshotHandler h = this.handler;
+    if (h == null) {
+      return null;
+    }
+
+    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
+      // specified snapshot is to the one currently running
+      return null;
+    }
+
+    return h;
+  }
+
+  /**
+   * Check if the specified snapshot is done
+   * @param expected
+   * @return true if snapshot is ready to be restored, false if it is still being taken.
+   * @throws IOException IOException if error from HDFS or RPC
+   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
+   */
+  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
+    // check the request to make sure it has a snapshot
+    if (expected == null) {
+      throw new UnknownSnapshotException(
+         "No snapshot name passed in request, can't figure out which snapshot you want to check.");
+    }
+
+    String ssString = SnapshotDescriptionUtils.toString(expected);
+
+    // check to see if the sentinel exists
+    TakeSnapshotHandler handler = getTakeSnapshotHandler(expected);
+    if (handler == null) {
+      // doesn't exist, check if it is already completely done.
+      if (!isSnapshotCompleted(expected)) {
+        throw new UnknownSnapshotException("Snapshot " + ssString
+            + " is not currently running or one of the known completed snapshots.");
+      }
+      // was done, return true;
+      return true;
+    }
+
+    // pass on any failure we find in the sentinel
+    try {
+      handler.rethrowException();
+    } catch (ForeignException e) {
+      // Give some procedure info on an exception.
+      String status;
+      Procedure p = coordinator.getProcedure(expected.getName());
+      if (p != null) {
+        status = p.getStatus();
+      } else {
+        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
+      }
+      throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
+          expected);
+    }
+
+    // check to see if we are done
+    if (handler.isFinished()) {
+      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
+      return true;
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
+    }
+    return false;
+  }
+
+  /**
+   * Check to see if there are any snapshots in progress currently.  Currently we have a
+   * limitation only allowing a single snapshot attempt at a time.
+   * @return <tt>true</tt> if there any snapshots in progress, <tt>false</tt> otherwise
+   * @throws SnapshotCreationException if the snapshot failed
+   */
+  synchronized boolean isTakingSnapshot() throws SnapshotCreationException {
+    // TODO later when we handle multiple there would be a map with ssname to handler.
+    return handler != null && !handler.isFinished();
+  }
+
+  /**
+   * Check to see if the specified table has a snapshot in progress.  Currently we have a
+   * limitation only allowing a single snapshot attempt at a time.
+   * @param tableName name of the table being snapshotted.
+   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
+   */
+  private boolean isTakingSnapshot(final String tableName) {
+    if (handler != null && handler.getSnapshot().getTable().equals(tableName)) {
+      return !handler.isFinished();
+    }
+    return false;
+  }
+
+  /**
+   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
+   * aren't already running a snapshot.
+   * @param snapshot description of the snapshot we want to start
+   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
+   */
+  private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
+      throws HBaseSnapshotException {
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+
+    // make sure we aren't already running a snapshot
+    if (isTakingSnapshot()) {
+      throw new SnapshotCreationException("Rejected taking "
+          + SnapshotDescriptionUtils.toString(snapshot)
+          + " because we are already running another snapshot "
+          + SnapshotDescriptionUtils.toString(this.handler.getSnapshot()), snapshot);
+    }
+
+    // make sure we aren't running a restore on the same table
+    if (isRestoringTable(snapshot.getTable())) {
+      throw new SnapshotCreationException("Rejected taking "
+          + SnapshotDescriptionUtils.toString(snapshot)
+          + " because we are already have a restore in progress on the same snapshot "
+          + SnapshotDescriptionUtils.toString(this.handler.getSnapshot()), snapshot);
+    }
+
+    try {
+      // delete the working directory, since we aren't running the snapshot. Likely leftovers
+      // from a failed attempt.
+      fs.delete(workingDir, true);
+
+      // recreate the working directory for the snapshot
+      if (!fs.mkdirs(workingDir)) {
+        throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
+            + ") for snapshot" , snapshot);
+      }
+    } catch (HBaseSnapshotException e) {
+      throw e;
+    } catch (IOException e) {
+      throw new SnapshotCreationException(
+          "Exception while checking to see if snapshot could be started.", e, snapshot);
+    }
+  }
+
+  /**
+   * Take a snapshot of an enabled table.
+   * <p>
+   * The thread limitation on the executorService's thread pool for snapshots ensures the
+   * snapshot won't be started if there is another snapshot already running. Does
+   * <b>not</b> check to see if another snapshot of the same name already exists.
+   * @param snapshot description of the snapshot to take.
+   * @throws HBaseSnapshotException if the snapshot could not be started
+   */
+  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
+      throws HBaseSnapshotException {
+    TakeSnapshotHandler handler;
+    try {
+      handler = new EnabledTableSnapshotHandler(snapshot, master, this);
+      this.executorService.submit(handler);
+      this.handler = handler;
+    } catch (IOException e) {
+      // cleanup the working directory by trying to delete it from the fs.
+      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+      try {
+        if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
+          LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:"
+              + SnapshotDescriptionUtils.toString(snapshot));
+        }
+      } catch (IOException e1) {
+        LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:" +
+            SnapshotDescriptionUtils.toString(snapshot));
+      }
+      // fail the snapshot
+      throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
+    }
+  }
+
+  /**
+   * Take a snapshot based on the enabled/disabled state of the table.
+   *
+   * @param snapshot
+   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
+   * @throws IOException when some sort of generic IO exception occurs.
+   */
+  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
+    // check to see if we already completed the snapshot
+    if (isSnapshotCompleted(snapshot)) {
+      throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
+          + "' already stored on the filesystem.", snapshot);
+    }
+
+    LOG.debug("No existing snapshot, attempting snapshot...");
+
+    // check to see if the table exists
+    HTableDescriptor desc = null;
+    try {
+      desc = master.getTableDescriptors().get(snapshot.getTable());
+    } catch (FileNotFoundException e) {
+      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
+      LOG.error(msg);
+      throw new SnapshotCreationException(msg, e, snapshot);
+    } catch (IOException e) {
+      throw new SnapshotCreationException("Error while geting table description for table "
+          + snapshot.getTable(), e, snapshot);
+    }
+    if (desc == null) {
+      throw new SnapshotCreationException("Table '" + snapshot.getTable()
+          + "' doesn't exist, can't take snapshot.", snapshot);
+    }
+
+    // set the snapshot version, now that we are ready to take it
+    snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
+        .build();
+
+    // call pre coproc hook
+    MasterCoprocessorHost cpHost = master.getCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preSnapshot(snapshot, desc);
+    }
+
+    // setup the snapshot
+    prepareToTakeSnapshot(snapshot);
+
+    // if the table is enabled, then have the RS run actually the snapshot work
+    AssignmentManager assignmentMgr = master.getAssignmentManager();
+    if (assignmentMgr.getZKTable().isEnabledTable(snapshot.getTable())) {
+      LOG.debug("Table enabled, starting distributed snapshot.");
+      snapshotEnabledTable(snapshot);
+      LOG.debug("Started snapshot: " + SnapshotDescriptionUtils.toString(snapshot));
+    }
+    // For disabled table, snapshot is created by the master
+    else if (assignmentMgr.getZKTable().isDisabledTable(snapshot.getTable())) {
+      LOG.debug("Table is disabled, running snapshot entirely on master.");
+      snapshotDisabledTable(snapshot);
+      LOG.debug("Started snapshot: " + SnapshotDescriptionUtils.toString(snapshot));
+    } else {
+      LOG.error("Can't snapshot table '" + snapshot.getTable()
+          + "', isn't open or closed, we don't know what to do!");
+      TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
+          + " isn't fully open.");
+      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
+    }
+
+    // call post coproc hook
+    if (cpHost != null) {
+      cpHost.postSnapshot(snapshot, desc);
+    }
+  }
+
+  /**
+   * Take a snapshot of a disabled table.
+   * <p>
+   * The thread limitation on the executorService's thread pool for snapshots ensures the
+   * snapshot won't be started if there is another snapshot already running. Does
+   * <b>not</b> check to see if another snapshot of the same name already exists.
+   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
+   * @throws HBaseSnapshotException if the snapshot could not be started
+   */
+  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
+      throws HBaseSnapshotException {
+
+    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
+    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
+
+    DisabledTableSnapshotHandler handler;
+    try {
+      handler = new DisabledTableSnapshotHandler(snapshot, this.master);
+      this.executorService.submit(handler);
+      this.handler = handler;
+    } catch (IOException e) {
+      // cleanup the working directory by trying to delete it from the fs.
+      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+      try {
+        if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
+          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
+              SnapshotDescriptionUtils.toString(snapshot));
+        }
+      } catch (IOException e1) {
+        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
+            SnapshotDescriptionUtils.toString(snapshot));
+      }
+      // fail the snapshot
+      throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
+    }
+  }
+
+  /**
+   * Set the handler for the current snapshot
+   * <p>
+   * Exposed for TESTING
+   * @param handler handler the master should use
+   *
+   * TODO get rid of this if possible, repackaging, modify tests.
+   */
+  public synchronized void setSnapshotHandlerForTesting(TakeSnapshotHandler handler) {
+    this.handler = handler;
+  }
+
+  /**
+   * @return distributed commit coordinator for all running snapshots
+   */
+  ProcedureCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  /**
+   * Check to see if the snapshot is one of the currently completed snapshots
+   * @param expected snapshot to check
+   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
+   *         not stored
+   * @throws IOException if the filesystem throws an unexpected exception,
+   * @throws IllegalArgumentException if snapshot name is invalid.
+   */
+  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
+    try {
+      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+      FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+      // check to see if the snapshot already exists
+      return fs.exists(snapshotDir);
+    } catch (IllegalArgumentException iae) {
+      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
+    }
+  }
+
+  /**
+   * Clone the specified snapshot into a new table.
+   * The operation will fail if the destination table has a snapshot or restore in progress.
+   *
+   * @param snapshot Snapshot Descriptor
+   * @param hTableDescriptor Table Descriptor of the table to create
+   * @param waitTime timeout before considering the clone failed
+   */
+  synchronized void cloneSnapshot(final SnapshotDescription snapshot,
+      final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
+    String tableName = hTableDescriptor.getNameAsString();
+
+    // make sure we aren't running a snapshot on the same table
+    if (isTakingSnapshot(tableName)) {
+      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
+    }
+
+    // make sure we aren't running a restore on the same table
+    if (isRestoringTable(tableName)) {
+      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
+    }
+
+    try {
+      CloneSnapshotHandler handler =
+        new CloneSnapshotHandler(master, snapshot, hTableDescriptor);
+      this.executorService.submit(handler);
+      restoreHandlers.put(tableName, handler);
+    } catch (Exception e) {
+      String msg = "Couldn't clone the snapshot=" + SnapshotDescriptionUtils.toString(snapshot) +
+        " on table=" + tableName;
+      LOG.error(msg, e);
+      throw new RestoreSnapshotException(msg, e);
+    }
+  }
+
+  /**
+   * Restore the specified snapshot
+   * @param reqSnapshot
+   * @throws IOException
+   */
+  public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
+    FileSystem fs = master.getMasterFileSystem().getFileSystem();
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
+    MasterCoprocessorHost cpHost = master.getCoprocessorHost();
+
+    // check if the snapshot exists
+    if (!fs.exists(snapshotDir)) {
+      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
+      throw new SnapshotDoesNotExistException(reqSnapshot);
+    }
+
+    // read snapshot information
+    SnapshotDescription fsSnapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    HTableDescriptor snapshotTableDesc = FSTableDescriptors.getTableDescriptor(fs, snapshotDir);
+    String tableName = reqSnapshot.getTable();
+
+    // stop tracking completed restores
+    cleanupRestoreSentinels();
+
+    // Execute the restore/clone operation
+    if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) {
+      if (master.getAssignmentManager().getZKTable().isEnabledTable(fsSnapshot.getTable())) {
+        throw new UnsupportedOperationException("Table '" +
+          fsSnapshot.getTable() + "' must be disabled in order to perform a restore operation.");
+      }
+
+      // call coproc pre hook
+      if (cpHost != null) {
+        cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
+      }
+      restoreSnapshot(fsSnapshot, snapshotTableDesc);
+      LOG.info("Restore snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
+
+      if (cpHost != null) {
+        cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
+      }
+    } else {
+      HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc,
+                                                         Bytes.toBytes(tableName));
+      if (cpHost != null) {
+        cpHost.preCloneSnapshot(reqSnapshot, htd);
+      }
+      cloneSnapshot(fsSnapshot, htd);
+      LOG.info("Clone snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
+
+      if (cpHost != null) {
+        cpHost.postCloneSnapshot(reqSnapshot, htd);
+      }
+    }
+  }
+
+  /**
+   * Restore the specified snapshot.
+   * The restore will fail if the destination table has a snapshot or restore in progress.
+   *
+   * @param snapshot Snapshot Descriptor
+   * @param hTableDescriptor Table Descriptor
+   * @param waitTime timeout before considering the restore failed
+   */
+  private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
+      final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
+    String tableName = hTableDescriptor.getNameAsString();
+
+    // make sure we aren't running a snapshot on the same table
+    if (isTakingSnapshot(tableName)) {
+      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
+    }
+
+    // make sure we aren't running a restore on the same table
+    if (isRestoringTable(tableName)) {
+      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
+    }
+
+    try {
+      RestoreSnapshotHandler handler =
+        new RestoreSnapshotHandler(master, snapshot, hTableDescriptor);
+      this.executorService.submit(handler);
+      restoreHandlers.put(hTableDescriptor.getNameAsString(), handler);
+    } catch (Exception e) {
+      String msg = "Couldn't restore the snapshot=" + SnapshotDescriptionUtils.toString(snapshot)  +
+          " on table=" + tableName;
+      LOG.error(msg, e);
+      throw new RestoreSnapshotException(msg, e);
+    }
+  }
+
+  /**
+   * Verify if the restore of the specified table is in progress.
+   *
+   * @param tableName table under restore
+   * @return <tt>true</tt> if there is a restore in progress of the specified table.
+   */
+  private boolean isRestoringTable(final String tableName) {
+    SnapshotSentinel sentinel = restoreHandlers.get(tableName);
+    return(sentinel != null && !sentinel.isFinished());
+  }
+
+  /**
+   * Returns status of a restore request, specifically comparing source snapshot and target table
+   * names.  Throws exception if not a known snapshot.
+   * @param snapshot
+   * @return true if in progress, false if snapshot is completed.
+   * @throws UnknownSnapshotException if specified source snapshot does not exit.
+   * @throws IOException if there was some sort of IO failure
+   */
+  public boolean isRestoringTable(final SnapshotDescription snapshot) throws IOException {
+    // check to see if the snapshot is already on the fs
+    if (!isSnapshotCompleted(snapshot)) {
+      throw new UnknownSnapshotException("Snapshot:" + snapshot.getName()
+          + " is not one of the known completed snapshots.");
+    }
+
+    SnapshotSentinel sentinel = getRestoreSnapshotSentinel(snapshot.getTable());
+    if (sentinel == null) {
+      // there is no sentinel so restore is not in progress.
+      return false;
+    }
+    if (!sentinel.getSnapshot().getName().equals(snapshot.getName())) {
+      // another handler is trying to restore to the table, but it isn't the same snapshot source.
+      return false;
+    }
+
+    LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
+        + sentinel.getSnapshot().getName() + " table=" + snapshot.getTable());
+    ForeignException e = sentinel.getExceptionIfFailed();
+    if (e != null) throw e;
+
+    // check to see if we are done
+    if (sentinel.isFinished()) {
+      LOG.debug("Restore snapshot=" + SnapshotDescriptionUtils.toString(snapshot) +
+          " has completed. Notifying the client.");
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
+          SnapshotDescriptionUtils.toString(snapshot));
+    }
+    return true;
+  }
+
+  /**
+   * Get the restore snapshot sentinel for the specified table
+   * @param tableName table under restore
+   * @return the restore snapshot handler
+   */
+  private synchronized SnapshotSentinel getRestoreSnapshotSentinel(final String tableName) {
+    try {
+      return restoreHandlers.get(tableName);
+    } finally {
+      cleanupRestoreSentinels();
+    }
+  }
+
+  /**
+   * Scan the restore handlers and remove the finished ones.
+   */
+  private synchronized void cleanupRestoreSentinels() {
+    Iterator<Map.Entry<String, SnapshotSentinel>> it = restoreHandlers.entrySet().iterator();
+    while (it.hasNext()) {
+        Map.Entry<String, SnapshotSentinel> entry = it.next();
+        SnapshotSentinel sentinel = entry.getValue();
+        if (sentinel.isFinished()) {
+          it.remove();
+        }
+    }
+  }
+
+  //
+  // Implementing Stoppable interface
+  //
+
+  @Override
+  public void stop(String why) {
+    // short circuit
+    if (this.stopped) return;
+    // make sure we get stop
+    this.stopped = true;
+    // pass the stop onto take snapshot handlers
+    if (this.handler != null) this.handler.cancel(why);
+
+    // pass the stop onto all the restore handlers
+    for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
+      restoreHandler.cancel(why);
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
+   * Called at the beginning of snapshot() and restoreSnapshot() methods.
+   * @throws UnsupportedOperationException if snapshot are not supported
+   */
+  public void checkSnapshotSupport() throws UnsupportedOperationException {
+    if (!this.isSnapshotSupported) {
+      throw new UnsupportedOperationException(
+        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
+          HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
+    }
+  }
+
+  /**
+   * Called at startup, to verify if snapshot operation is supported, and to avoid
+   * starting the master if there're snapshots present but the cleaners needed are missing.
+   * Otherwise we can end up with snapshot data loss.
+   * @param conf The {@link Configuration} object to use
+   * @param mfs The MasterFileSystem to use
+   * @throws IOException in case of file-system operation failure
+   * @throws UnsupportedOperationException in case cleaners are missing and
+   *         there're snapshot in the system
+   */
+  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
+      throws IOException, UnsupportedOperationException {
+    // Verify if snapshot is disabled by the user
+    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
+    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
+    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
+
+    // Extract cleaners from conf
+    Set<String> hfileCleaners = new HashSet<String>();
+    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
+
+    Set<String> logCleaners = new HashSet<String>();
+    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
+    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
+
+    // If the user has enabled the snapshot, we force the cleaners to be present
+    // otherwise we still need to check if cleaners are enabled or not and verify
+    // that there're no snapshot in the .snapshot folder.
+    if (snapshotEnabled) {
+      // Inject snapshot cleaners, if snapshot.enable is true
+      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
+      hfileCleaners.add(HFileLinkCleaner.class.getName());
+      logCleaners.add(SnapshotLogCleaner.class.getName());
+
+      // Set cleaners conf
+      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
+        hfileCleaners.toArray(new String[hfileCleaners.size()]));
+      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
+        logCleaners.toArray(new String[logCleaners.size()]));
+    } else {
+      // Verify if cleaners are present
+      snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
+        hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
+        hfileCleaners.contains(HFileLinkCleaner.class.getName());
+
+      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
+      if (snapshotEnabled) {
+        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
+          "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
+          (userDisabled ? "is set to 'false'." : "is not set."));
+      }
+    }
+
+    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
+    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
+
+    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
+    // otherwise we end up with snapshot data loss.
+    if (!snapshotEnabled) {
+      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
+      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
+      FileSystem fs = mfs.getFileSystem();
+      if (fs.exists(snapshotDir)) {
+        FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
+          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
+        if (snapshots != null) {
+          LOG.error("Snapshots are present, but cleaners are not enabled.");
+          checkSnapshotSupport();
+        }
+      }
+    }
+  }
+}

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.snapshot;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SnapshotSentinel;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.TableInfoCopyTask;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A handler for taking snapshots from the master.
+ *
+ * This is not a subclass of TableEventHandler because using that would incur an extra META scan.
+ *
+ * The {@link #snapshotRegions(List)} call should get implemented for each snapshot flavor.
+ */
+@InterfaceAudience.Private
+public abstract class TakeSnapshotHandler extends EventHandler implements SnapshotSentinel,
+    ForeignExceptionSnare {
+  private static final Log LOG = LogFactory.getLog(TakeSnapshotHandler.class);
+
+  private volatile boolean finished;
+
+  // none of these should ever be null
+  protected final MasterServices master;
+  protected final SnapshotDescription snapshot;
+  protected final Configuration conf;
+  protected final FileSystem fs;
+  protected final Path rootDir;
+  private final Path snapshotDir;
+  protected final Path workingDir;
+  private final MasterSnapshotVerifier verifier;
+  protected final ForeignExceptionDispatcher monitor;
+
+  /**
+   * @param snapshot descriptor of the snapshot to take
+   * @param masterServices master services provider
+   * @throws IOException on unexpected error
+   */
+  public TakeSnapshotHandler(SnapshotDescription snapshot,
+      final MasterServices masterServices) throws IOException {
+    super(masterServices, EventType.C_M_SNAPSHOT_TABLE);
+    assert snapshot != null : "SnapshotDescription must not be nul1";
+    assert masterServices != null : "MasterServices must not be nul1";
+
+    this.master = masterServices;
+    this.snapshot = snapshot;
+    this.conf = this.master.getConfiguration();
+    this.fs = this.master.getMasterFileSystem().getFileSystem();
+    this.rootDir = this.master.getMasterFileSystem().getRootDir();
+    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
+    this.monitor =  new ForeignExceptionDispatcher();
+
+    loadTableDescriptor(); // check that .tableinfo is present
+
+    // prepare the verify
+    this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
+  }
+
+  private HTableDescriptor loadTableDescriptor()
+      throws FileNotFoundException, IOException {
+    final String name = snapshot.getTable();
+    HTableDescriptor htd =
+      this.master.getTableDescriptors().get(name);
+    if (htd == null) {
+      throw new IOException("HTableDescriptor missing for " + name);
+    }
+    return htd;
+  }
+
+  /**
+   * Execute the core common portions of taking a snapshot. The {@link #snapshotRegions(List)}
+   * call should get implemented for each snapshot flavor.
+   */
+  @Override
+  public void process() {
+    LOG.info("Running table snapshot operation " + eventType + " on table " + snapshot.getTable());
+    try {
+      // If regions move after this meta scan, the region specific snapshot should fail, triggering
+      // an external exception that gets captured here.
+
+      // write down the snapshot info in the working directory
+      SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
+      new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
+      monitor.rethrowException();
+
+      List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
+          MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
+            Bytes.toBytes(snapshot.getTable()), true);
+
+      // run the snapshot
+      snapshotRegions(regionsAndLocations);
+
+      // extract each pair to separate lists
+      Set<String> serverNames = new HashSet<String>();
+      for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
+        serverNames.add(p.getSecond().toString());
+      }
+
+      // verify the snapshot is valid
+      verifier.verifySnapshot(this.workingDir, serverNames);
+
+      // complete the snapshot, atomically moving from tmp to .snapshot dir.
+      completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
+    } catch (Exception e) {
+      String reason = "Failed taking snapshot " + SnapshotDescriptionUtils.toString(snapshot)
+          + " due to exception:" + e.getMessage();
+      LOG.error(reason, e);
+      ForeignException ee = new ForeignException(reason, e);
+      monitor.receive(ee);
+      // need to mark this completed to close off and allow cleanup to happen.
+      cancel("Failed to take snapshot '" + SnapshotDescriptionUtils.toString(snapshot)
+          + "' due to exception");
+    } finally {
+      LOG.debug("Launching cleanup of working dir:" + workingDir);
+      try {
+        // if the working dir is still present, the snapshot has failed.  it is present we delete
+        // it.
+        if (fs.exists(workingDir) && !this.fs.delete(workingDir, true)) {
+          LOG.error("Couldn't delete snapshot working directory:" + workingDir);
+        }
+      } catch (IOException e) {
+        LOG.error("Couldn't delete snapshot working directory:" + workingDir);
+      }
+    }
+  }
+
+  /**
+   * Reset the manager to allow another snapshot to proceed
+   *
+   * @param snapshotDir final path of the snapshot
+   * @param workingDir directory where the in progress snapshot was built
+   * @param fs {@link FileSystem} where the snapshot was built
+   * @throws SnapshotCreationException if the snapshot could not be moved
+   * @throws IOException the filesystem could not be reached
+   */
+  public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs)
+      throws SnapshotCreationException, IOException {
+    LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+        + snapshotDir);
+    if (!fs.rename(workingDir, snapshotDir)) {
+      throw new SnapshotCreationException("Failed to move working directory(" + workingDir
+          + ") to completed directory(" + snapshotDir + ").");
+    }
+    finished = true;
+  }
+
+  /**
+   * Snapshot the specified regions
+   */
+  protected abstract void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regions)
+      throws IOException, KeeperException;
+
+  @Override
+  public void cancel(String why) {
+    if (finished) return;
+
+    this.finished = true;
+    LOG.info("Stop taking snapshot=" + SnapshotDescriptionUtils.toString(snapshot) + " because: "
+        + why);
+    CancellationException ce = new CancellationException(why);
+    monitor.receive(new ForeignException(master.getServerName().toString(), ce));
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished;
+  }
+
+  @Override
+  public SnapshotDescription getSnapshot() {
+    return snapshot;
+  }
+
+  @Override
+  public ForeignException getExceptionIfFailed() {
+    return monitor.getException();
+  }
+
+  @Override
+  public void rethrowException() throws ForeignException {
+    monitor.rethrowException();
+  }
+
+  @Override
+  public boolean hasException() {
+    return monitor.hasException();
+  }
+
+  @Override
+  public ForeignException getException() {
+    return monitor.getException();
+  }
+
+}

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A globally-barriered distributed procedure.  This class encapsulates state and methods for
+ * tracking and managing a distributed procedure, as well as aborting if any member encounters
+ * a problem or if a cancellation is requested.
+ * <p>
+ * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
+ * method.  The procedure contacts all members and waits for all subprocedures to execute
+ * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
+ * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
+ * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
+ * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
+ * {@link Subprocedure#insideBarrier} executions complete at the members.  When
+ * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
+ * the coordinator.  Once all members complete, the coordinator calls
+ * {@link #sendGlobalBarrierComplete()}.
+ * <p>
+ * If errors are encountered remotely, they are forwarded to the coordinator, and
+ * {@link Subprocedure#cleanup(Exception)} is called.
+ * <p>
+ * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
+ * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
+ * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
+ * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
+ * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
+ * <p>
+ * Users should generally not directly create or subclass instances of this.  They are created
+ * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
+ * String, byte[], List)}}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class Procedure implements Callable<Void>, ForeignExceptionListener {
+  private static final Log LOG = LogFactory.getLog(Procedure.class);
+
+  //
+  // Arguments and naming
+  //
+
+  // Name of the procedure
+  final private String procName;
+  // Arguments for this procedure execution
+  final private byte[] args;
+
+  //
+  // Execution State
+  //
+  /** latch for waiting until all members have acquire in barrier state */
+  final CountDownLatch acquiredBarrierLatch;
+  /** latch for waiting until all members have executed and released their in barrier state */
+  final CountDownLatch releasedBarrierLatch;
+  /** latch for waiting until a procedure has completed */
+  final CountDownLatch completedLatch;
+  /** monitor to check for errors */
+  private final ForeignExceptionDispatcher monitor;
+
+  //
+  // Execution Timeout Handling.
+  //
+
+  /** frequency to check for errors (ms) */
+  protected final long wakeFrequency;
+  protected final TimeoutExceptionInjector timeoutInjector;
+
+  //
+  // Members' and Coordinator's state
+  //
+
+  /** lock to prevent nodes from acquiring and then releasing before we can track them */
+  private Object joinBarrierLock = new Object();
+  private final List<String> acquiringMembers;
+  private final List<String> inBarrierMembers;
+  private ProcedureCoordinator coord;
+
+  /**
+   * Creates a procedure. (FOR TESTING)
+   *
+   * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
+   * @param coord coordinator to call back to for general errors (e.g.
+   *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
+   * @param monitor error monitor to check for external errors
+   * @param wakeFreq frequency to check for errors while waiting
+   * @param timeout amount of time to allow the procedure to run before cancelling
+   * @param procName name of the procedure instance
+   * @param args argument data associated with the procedure instance
+   * @param expectedMembers names of the expected members
+   */
+  public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
+      long timeout, String procName, byte[] args, List<String> expectedMembers) {
+    this.coord = coord;
+    this.acquiringMembers = new ArrayList<String>(expectedMembers);
+    this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
+    this.procName = procName;
+    this.args = args;
+    this.monitor = monitor;
+    this.wakeFrequency = wakeFreq;
+
+    int count = expectedMembers.size();
+    this.acquiredBarrierLatch = new CountDownLatch(count);
+    this.releasedBarrierLatch = new CountDownLatch(count);
+    this.completedLatch = new CountDownLatch(1);
+    this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
+  }
+
+  /**
+   * Create a procedure.
+   *
+   * Users should generally not directly create instances of this.  They are created them
+   * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
+   * String, byte[], List)}}
+   *
+   * @param coord coordinator to call back to for general errors (e.g.
+   *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
+   * @param wakeFreq frequency to check for errors while waiting
+   * @param timeout amount of time to allow the procedure to run before cancelling
+   * @param procName name of the procedure instance
+   * @param args argument data associated with the procedure instance
+   * @param expectedMembers names of the expected members
+   */
+  public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
+      String procName, byte[] args, List<String> expectedMembers) {
+    this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
+        expectedMembers);
+  }
+
+  public String getName() {
+    return procName;
+  }
+
+  /**
+   * @return String of the procedure members both trying to enter the barrier and already in barrier
+   */
+  public String getStatus() {
+    String waiting, done;
+    synchronized (joinBarrierLock) {
+      waiting = acquiringMembers.toString();
+      done = inBarrierMembers.toString();
+    }
+    return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
+  }
+  
+  /**
+   * Get the ForeignExceptionDispatcher
+   * @return the Procedure's monitor.
+   */
+  public ForeignExceptionDispatcher getErrorMonitor() {
+    return monitor;
+  }
+
+  /**
+   * This call is the main execution thread of the barriered procedure.  It sends messages and
+   * essentially blocks until all procedure members acquire or later complete but periodically
+   * checks for foreign exceptions.
+   */
+  @Override
+  @SuppressWarnings("finally")
+  final public Void call() {
+    LOG.info("Starting procedure '" + procName + "'");
+    // start the timer
+    timeoutInjector.start();
+
+    // run the procedure
+    try {
+      // start by checking for error first
+      monitor.rethrowException();
+      LOG.debug("Procedure '" + procName + "' starting 'acquire'");
+      sendGlobalBarrierStart();
+
+      // wait for all the members to report acquisition
+      LOG.debug("Waiting for all members to 'acquire'");
+      waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
+      monitor.rethrowException();
+
+      LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
+      sendGlobalBarrierReached();
+
+      // wait for all members to report barrier release
+      waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
+
+      // make sure we didn't get an error during in barrier execution and release
+      monitor.rethrowException();
+      LOG.info("Procedure '" + procName + "' execution completed");
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      String msg = "Procedure '" + procName +"' execution failed!";
+      LOG.error(msg, e);
+      receive(new ForeignException(getName(), e));
+    } finally {
+      LOG.debug("Running finish phase.");
+      sendGlobalBarrierComplete();
+      completedLatch.countDown();
+
+      // tell the timer we are done, if we get here successfully
+      timeoutInjector.complete();
+      return null;
+    }
+  }
+
+  /**
+   * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
+   * the {@link Subprocedure#acquireBarrier} step.
+   * @throws ForeignException
+   */
+  public void sendGlobalBarrierStart() throws ForeignException {
+    // start the procedure
+    LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
+    try {
+      // send procedure barrier start to specified list of members. cloning the list to avoid
+      // concurrent modification from the controller setting the prepared nodes
+      coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
+    } catch (IOException e) {
+      coord.rpcConnectionFailure("Can't reach controller.", e);
+    } catch (IllegalArgumentException e) {
+      throw new ForeignException(getName(), e);
+    }
+  }
+
+  /**
+   * Sends a message to all members that the global barrier condition has been satisfied.  This
+   * should only be executed after all members have completed its
+   * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
+   * {@link Subprocedure#insideBarrier} method.
+   * @throws ForeignException
+   */
+  public void sendGlobalBarrierReached() throws ForeignException {
+    try {
+      // trigger to have member run {@link Subprocedure#insideBarrier}
+      coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
+    } catch (IOException e) {
+      coord.rpcConnectionFailure("Can't reach controller.", e);
+    }
+  }
+
+  /**
+   * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
+   * After this executes, the coordinator can assume that any state resources about this barrier
+   * procedure state has been released.
+   */
+  public void sendGlobalBarrierComplete() {
+    LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
+    try {
+      coord.getRpcs().resetMembers(this);
+    } catch (IOException e) {
+      coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
+    }
+  }
+
+  //
+  // Call backs from other external processes.
+  //
+
+  /**
+   * Call back triggered by an individual member upon successful local barrier acquisition
+   * @param member
+   */
+  public void barrierAcquiredByMember(String member) {
+    LOG.debug("member: '" + member + "' joining prepared barrier for procedure '" + procName
+        + "' on coordinator");
+    if (this.acquiringMembers.contains(member)) {
+      synchronized (joinBarrierLock) {
+        if (this.acquiringMembers.remove(member)) {
+          this.inBarrierMembers.add(member);
+          acquiredBarrierLatch.countDown();
+        }
+      }
+      LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
+    } else {
+      LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
+          " Continuing on.");
+    }
+  }
+
+  /**
+   * Call back triggered by a individual member upon successful local in-barrier execution and
+   * release
+   * @param member
+   */
+  public void barrierReleasedByMember(String member) {
+    boolean removed = false;
+    synchronized (joinBarrierLock) {
+      removed = this.inBarrierMembers.remove(member);
+      if (removed) {
+        releasedBarrierLatch.countDown();
+      }
+    }
+    if (removed) {
+      LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
+          + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
+          + " more");
+    } else {
+      LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
+          + "', but we weren't waiting on it to release!");
+    }
+  }
+
+  /**
+   * Waits until the entire procedure has globally completed, or has been aborted.
+   * @throws ForeignException
+   * @throws InterruptedException
+   */
+  public void waitForCompleted() throws ForeignException, InterruptedException {
+    waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
+  }
+
+  /**
+   * A callback that handles incoming ForeignExceptions.
+   */
+  @Override
+  public void receive(ForeignException e) {
+    monitor.receive(e);
+  }
+
+  /**
+   * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
+   * check for errors
+   * @param latch latch to wait on
+   * @param monitor monitor to check for errors while waiting
+   * @param wakeFrequency frequency to wake up and check for errors (in
+   *          {@link TimeUnit#MILLISECONDS})
+   * @param latchDescription description of the latch, for logging
+   * @throws ForeignException type of error the monitor can throw, if the task fails
+   * @throws InterruptedException if we are interrupted while waiting on latch
+   */
+  public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
+      long wakeFrequency, String latchDescription) throws ForeignException,
+      InterruptedException {
+    boolean released = false;
+    while (!released) {
+      if (monitor != null) {
+        monitor.rethrowException();
+      }
+      /*
+      ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
+          + wakeFrequency + " ms)"); */
+      released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
+    }
+  }
+}

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+
+import com.google.common.collect.MapMaker;
+
+/**
+ * This is the master side of a distributed complex procedure execution.
+ * <p>
+ * The {@link Procedure} is generic and subclassing or customization shouldn't be
+ * necessary -- any customization should happen just in {@link Subprocedure}s.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureCoordinator {
+  private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
+
+  final static long TIMEOUT_MILLIS_DEFAULT = 60000;
+  final static long WAKE_MILLIS_DEFAULT = 500;
+
+  private final ProcedureCoordinatorRpcs rpcs;
+  private final ExecutorService pool;
+
+  // Running procedure table.  Maps procedure name to running procedure reference
+  private final ConcurrentMap<String, Procedure> procedures =
+      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
+
+  /**
+   * Create and start a ProcedureCoordinator.
+   *
+   * The rpc object registers the ProcedureCoordinator and starts any threads in this
+   * constructor.
+   *
+   * @param rpcs
+   * @param pool Used for executing procedures.
+   */
+  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
+    this.rpcs = rpcs;
+    this.pool = pool;
+    this.rpcs.start(this);
+  }
+
+  /**
+   * Default thread pool for the procedure
+   */
+  public static ThreadPoolExecutor defaultPool(String coordName, long keepAliveTime, int opThreads,
+      long wakeFrequency) {
+    return new ThreadPoolExecutor(1, opThreads, keepAliveTime, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
+  }
+
+  /**
+   * Shutdown the thread pools and release rpc resources
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    // have to use shutdown now to break any latch waiting
+    pool.shutdownNow();
+    rpcs.close();
+  }
+
+  /**
+   * Submit an procedure to kick off its dependent subprocedures.
+   * @param proc Procedure to execute
+   * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
+   *         procedure or any subprocedures could not be started.  Failure could be due to
+   *         submitting a procedure multiple times (or one with the same name), or some sort
+   *         of IO problem.  On errors, the procedure's monitor holds a reference to the exception
+   *         that caused the failure.
+   */
+  boolean submitProcedure(Procedure proc) {
+    // if the submitted procedure was null, then we don't want to run it
+    if (proc == null) {
+      return false;
+    }
+    String procName = proc.getName();
+
+    // make sure we aren't already running a procedure of that name
+    synchronized (procedures) {
+      Procedure oldProc = procedures.get(procName);
+      if (oldProc != null) {
+        // procedures are always eventually completed on both successful and failed execution
+        if (oldProc.completedLatch.getCount() != 0) {
+          LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
+          return false;
+        }
+        LOG.debug("Procedure " + procName + " was in running list but was completed.  Accepting new attempt.");
+        procedures.remove(procName);
+      }
+    }
+
+    // kick off the procedure's execution in a separate thread
+    Future<Void> f = null;
+    try {
+      synchronized (procedures) {
+        f = this.pool.submit(proc);
+        // if everything got started properly, we can add it known running procedures
+        this.procedures.put(procName, proc);
+      }
+      return true;
+    } catch (RejectedExecutionException e) {
+      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error and " +
+          "cancelling operation.", e);
+      // the thread pool is full and we can't run the procedure
+      proc.receive(new ForeignException(procName, e));
+
+      // cancel procedure proactively
+      if (f != null) {
+        f.cancel(true);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * The connection to the rest of the procedure group (members and coordinator) has been
+   * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
+   * members since we cannot reach them anymore.
+   * @param message description of the error
+   * @param cause the actual cause of the failure
+   */
+  void rpcConnectionFailure(final String message, final IOException cause) {
+    Collection<Procedure> toNotify = procedures.values();
+
+    for (Procedure proc : toNotify) {
+      if (proc == null) {
+        continue;
+      }
+      // notify the elements, if they aren't null
+      proc.receive(new ForeignException(proc.getName(), cause));
+    }
+  }
+
+  /**
+   * Abort the procedure with the given name
+   * @param procName name of the procedure to abort
+   * @param reason serialized information about the abort
+   */
+  public void abortProcedure(String procName, ForeignException reason) {
+    // if we know about the Procedure, notify it
+    synchronized(procedures) {
+      Procedure proc = procedures.get(procName);
+      if (proc == null) {
+        return;
+      }
+      proc.receive(reason);
+    }
+  }
+
+  /**
+   * Exposed for hooking with unit tests.
+   * @param procName
+   * @param procArgs
+   * @param expectedMembers
+   * @return
+   */
+  Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+      List<String> expectedMembers) {
+    // build the procedure
+    return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, TIMEOUT_MILLIS_DEFAULT,
+        procName, procArgs, expectedMembers);
+  }
+
+  /**
+   * Kick off the named procedure
+   * @param procName name of the procedure to start
+   * @param procArgs arguments for the procedure
+   * @param expectedMembers expected members to start
+   * @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise
+   * @throws RejectedExecutionException if there are no more available threads to run the procedure
+   */
+  public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+      List<String> expectedMembers) throws RejectedExecutionException {
+    Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
+    if (!this.submitProcedure(proc)) {
+      LOG.error("Failed to submit procedure '" + procName + "'");
+      return null;
+    }
+    return proc;
+  }
+
+  /**
+   * Notification that the procedure had the specified member acquired its part of the barrier
+   * via {@link Subprocedure#acquireBarrier()}.
+   * @param procName name of the procedure that acquired
+   * @param member name of the member that acquired
+   */
+  void memberAcquiredBarrier(String procName, final String member) {
+    Procedure proc = procedures.get(procName);
+    if (proc != null) {
+      proc.barrierAcquiredByMember(member);
+    }
+  }
+
+  /**
+   * Notification that the procedure had another member finished executing its in-barrier subproc
+   * via {@link Subprocedure#insideBarrier()}.
+   * @param procName name of the subprocedure that finished
+   * @param member name of the member that executed and released its barrier
+   */
+  void memberFinishedBarrier(String procName, final String member) {
+    Procedure proc = procedures.get(procName);
+    if (proc != null) {
+      proc.barrierReleasedByMember(member);
+    }
+  }
+
+  /**
+   * @return the rpcs implementation for all current procedures
+   */
+  ProcedureCoordinatorRpcs getRpcs() {
+    return rpcs;
+  }
+
+  /**
+   * Returns the procedure.  This Procedure is a live instance so should not be modified but can
+   * be inspected.
+   * @param name Name of the procedure
+   * @return Procedure or null if not present any more
+   */
+  public Procedure getProcedure(String name) {
+    return procedures.get(name);
+  }
+
+  /**
+   * @return Return set of all procedure names.
+   */
+  public Set<String> getProcedureNames() {
+    return new HashSet<String>(procedures.keySet());
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * RPCs for the coordinator to run a barriered procedure with subprocedures executed at
+ * distributed members.
+ * @see ProcedureCoordinator
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProcedureCoordinatorRpcs extends Closeable {
+
+  /**
+   * Initialize and start threads necessary to connect an implementation's rpc mechanisms.
+   * @param listener
+   * @return true if succeed, false if encountered initialization errors.
+   */
+  public boolean start(final ProcedureCoordinator listener);
+
+  /**
+   * Notify the members that the coordinator has aborted the procedure and that it should release
+   * barrier resources.
+   *
+   * @param procName name of the procedure that was aborted
+   * @param cause the reason why the procedure needs to be aborted
+   * @throws IOException if the rpcs can't reach the other members of the procedure (and can't
+   *           recover).
+   */
+  public void sendAbortToMembers(Procedure procName, ForeignException cause) throws IOException;
+
+  /**
+   * Notify the members to acquire barrier for the procedure
+   *
+   * @param procName name of the procedure to start
+   * @param info information that should be passed to all members
+   * @param members names of the members requested to reach the acquired phase
+   * @throws IllegalArgumentException if the procedure was already marked as failed
+   * @throws IOException if we can't reach the remote notification mechanism
+   */
+  public void sendGlobalBarrierAcquire(Procedure procName, byte[] info, List<String> members)
+      throws IOException, IllegalArgumentException;
+
+  /**
+   * Notify members that all members have acquired their parts of the barrier and that they can
+   * now execute under the global barrier.
+   *
+   * Must come after calling {@link #sendGlobalBarrierAcquire(Procedure, byte[], List)}
+   *
+   * @param procName name of the procedure to start
+   * @param members members to tell we have reached in-barrier phase
+   * @throws IOException if we can't reach the remote notification mechanism
+   */
+  public void sendGlobalBarrierReached(Procedure procName, List<String> members) throws IOException;
+
+  /**
+   * Notify Members to reset the distributed state for procedure
+   * @param procName name of the procedure to reset
+   * @throws IOException if the remote notification mechanism cannot be reached
+   */
+  public void resetMembers(Procedure procName) throws IOException;
+}

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+import com.google.common.collect.MapMaker;
+
+/**
+ * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
+ * specialized part of a {@link Procedure} that actually does procedure type-specific work
+ * and reports back to the coordinator as it completes each phase.
+ * <p>
+ * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
+ * currently running subprocedures are notify to failed since there is no longer a way to reach any
+ * other members or coordinators since the rpcs are down.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureMember implements Closeable {
+  private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
+
+  private final SubprocedureFactory builder;
+  private final ProcedureMemberRpcs rpcs;
+
+  private final ConcurrentMap<String,Subprocedure> subprocs =
+      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
+  private final ExecutorService pool;
+
+  /**
+   * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
+   *
+   * @param rpcs controller used to send notifications to the procedure coordinator
+   * @param pool thread pool to submit subprocedures
+   * @param factory class that creates instances of a subprocedure.
+   */
+  public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
+      SubprocedureFactory factory) {
+    this.pool = pool;
+    this.rpcs = rpcs;
+    this.builder = factory;
+  }
+
+  public static ThreadPoolExecutor defaultPool(long wakeFrequency, long keepAlive,
+      int procThreads, String memberName) {
+    return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
+  }
+
+  /**
+   * Package exposed.  Not for public use.
+   *
+   * @return reference to the Procedure member's rpcs object
+   */
+  ProcedureMemberRpcs getRpcs() {
+     return rpcs;
+  }
+
+
+  /**
+   * This is separated from execution so that we can detect and handle the case where the
+   * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
+   * sent here)
+   * @param opName
+   * @param data
+   * @return subprocedure
+   */
+  public Subprocedure createSubprocedure(String opName, byte[] data) {
+    return builder.buildSubprocedure(opName, data);
+  }
+
+  /**
+   * Submit an subprocedure for execution.  This starts the local acquire phase.
+   * @param subproc the subprocedure to execute.
+   * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
+   *         could not be started. In the latter case, the subprocedure holds a reference to
+   *         the exception that caused the failure.
+   */
+  public boolean submitSubprocedure(Subprocedure subproc) {
+     // if the submitted subprocedure was null, bail.
+    if (subproc == null) {
+      LOG.warn("Submitted null subprocedure, nothing to run here.");
+      return false;
+    }
+
+    String procName = subproc.getName();
+    if (procName == null || procName.length() == 0) {
+      LOG.error("Subproc name cannot be null or the empty string");
+      return false;
+    }
+
+    // make sure we aren't already running an subprocedure of that name
+    Subprocedure rsub;
+    synchronized (subprocs) {
+      rsub = subprocs.get(procName);
+    }
+    if (rsub != null) {
+      if (!rsub.isComplete()) {
+        LOG.error("Subproc '" + procName + "' is already running. Bailing out");
+        return false;
+      }
+      LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
+      subprocs.remove(procName);
+    }
+
+    LOG.debug("Submitting new Subprocedure:" + procName);
+
+    // kick off the subprocedure
+    Future<Void> future = null;
+    try {
+      future = this.pool.submit(subproc);
+      synchronized (subprocs) {
+        subprocs.put(procName, subproc);
+      }
+      return true;
+    } catch (RejectedExecutionException e) {
+      // the thread pool is full and we can't run the subprocedure
+      String msg = "Subprocedure pool is full!";
+      subproc.cancel(msg, e.getCause());
+
+      // cancel all subprocedures proactively
+      if (future != null) {
+        future.cancel(true);
+      }
+    }
+
+    LOG.error("Failed to start subprocedure '" + procName + "'");
+    return false;
+  }
+
+   /**
+    * Notification that procedure coordinator has reached the global barrier
+    * @param procName name of the subprocedure that should start running the the in-barrier phase
+    */
+   public void receivedReachedGlobalBarrier(String procName) {
+     Subprocedure subproc = subprocs.get(procName);
+     if (subproc == null) {
+       LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
+       return;
+     }
+     subproc.receiveReachedGlobalBarrier();
+   }
+
+  /**
+   * Best effort attempt to close the threadpool via Thread.interrupt.
+   */
+  @Override
+  public void close() throws IOException {
+    // have to use shutdown now to break any latch waiting
+    pool.shutdownNow();
+  }
+
+  /**
+   * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
+   * @param timeoutMs timeout limit in millis
+   * @return true if successfully, false if bailed due to timeout.
+   * @throws InterruptedException
+   */
+  boolean closeAndWait(long timeoutMs) throws InterruptedException {
+    pool.shutdown();
+    return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * The connection to the rest of the procedure group (member and coordinator) has been
+   * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
+   * other members since we cannot reach them anymore.
+   * @param message description of the error
+   * @param cause the actual cause of the failure
+   *
+   * TODO i'm tempted to just remove this code completely and treat it like any other abort.
+   * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
+   */
+  public void controllerConnectionFailure(final String message, final IOException cause) {
+    Collection<Subprocedure> toNotify = subprocs.values();
+    LOG.error(message, cause);
+    for (Subprocedure sub : toNotify) {
+      // TODO notify the elements, if they aren't null
+      sub.cancel(message, cause);
+    }
+  }
+
+  /**
+   * Send abort to the specified procedure
+   * @param procName name of the procedure to about
+   * @param ee exception information about the abort
+   */
+  public void receiveAbortProcedure(String procName, ForeignException ee) {
+    LOG.debug("Request received to abort procedure " + procName, ee);
+    // if we know about the procedure, notify it
+    Subprocedure sub = subprocs.get(procName);
+    if (sub == null) {
+      LOG.info("Received abort on procedure with no local subprocedure " + procName +
+          ", ignoring it.", ee);
+      return; // Procedure has already completed
+    }
+    LOG.error("Propagating foreign exception to subprocedure " + sub.getName(), ee);
+    sub.monitor.receive(ee);
+  }
+}
\ No newline at end of file



Mime
View raw message