hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1445862 - in /hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot: FlushSnapshotSubprocedure.java RegionServerSnapshotManager.java
Date Wed, 13 Feb 2013 19:05:56 GMT
Author: jmhsieh
Date: Wed Feb 13 19:05:55 2013
New Revision: 1445862

URL: http://svn.apache.org/r1445862
Log:
HBASE-7651 RegionServerSnapshotManager fails with CancellationException if previous snapshot
fails in per region task


Modified:
    hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
    hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java?rev=1445862&r1=1445861&r2=1445862&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
(original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
Wed Feb 13 19:05:55 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.procedure
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 
 /**
  * This online snapshot implementation forces uses the distributed procedure framework to
force a
@@ -74,6 +75,7 @@ public class FlushSnapshotSubprocedure e
       // snapshots that involve multiple regions and regionservers.  It is still possible
to have
       // an interleaving such that globally regions are missing, so we still need the verification
       // step.
+      LOG.debug("Starting region operation on " + region);
       region.startRegionOperation();
       try {
         LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
@@ -81,6 +83,7 @@ public class FlushSnapshotSubprocedure e
         region.addRegionToSnapshot(snapshot, monitor);
         LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
       } finally {
+        LOG.debug("Closing region operation on " + region);
         region.closeRegionOperation();
       }
       return null;
@@ -95,6 +98,13 @@ public class FlushSnapshotSubprocedure e
 
     monitor.rethrowException();
 
+    // assert that the taskManager is empty.
+    if (taskManager.hasTasks()) {
+      throw new IllegalStateException("Attempting to take snapshot "
+          + SnapshotDescriptionUtils.toString(snapshot)
+          + " but we have currently have outstanding tasks");
+    }
+    
     // Add all hfiles already existing in region.
     for (HRegion region : regions) {
       // submit one task per region for parallelize by region.
@@ -104,7 +114,11 @@ public class FlushSnapshotSubprocedure e
 
     // wait for everything to complete.
     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
-    taskManager.waitForOutstandingTasks();
+    try {
+      taskManager.waitForOutstandingTasks();
+    } catch (InterruptedException e) {
+      throw new ForeignException(getMemberName(), e);
+    }
   }
 
   /**
@@ -128,9 +142,13 @@ public class FlushSnapshotSubprocedure e
    */
   @Override
   public void cleanup(Exception e) {
-    LOG.info("Aborting all log roll online snapshot subprocedure task threads for '"
+    LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
         + snapshot.getName() + "' due to error", e);
-    taskManager.cancelTasks();
+    try {
+      taskManager.cancelTasks();
+    } catch (InterruptedException e1) {
+      Thread.currentThread().interrupt();
+    }
   }
 
   /**

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1445862&r1=1445861&r2=1445862&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
(original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
Wed Feb 13 19:05:55 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -34,7 +35,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
@@ -66,7 +66,7 @@ import com.google.protobuf.InvalidProtoc
  * <p>
  * On startup, requires {@link #start()} to be called.
  * <p>
- * On shutdown, requires {@link #stop(boolean)} to be called
+ * On shutdown, requires {@link #stop()} to be called
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -95,26 +95,23 @@ public class RegionServerSnapshotManager
   private final RegionServerServices rss;
   private final ProcedureMemberRpcs memberRpcs;
   private final ProcedureMember member;
-  private final SnapshotSubprocedurePool taskManager;
 
   /**
    * Exposed for testing.
-   * @param conf
+   * @param conf HBase configuration.
    * @param parent parent running the snapshot handler
-   * @param controller use a custom snapshot controller
-   * @param cohortMember use a custom cohort member
+   * @param memberRpc use specified memberRpc instance
+   * @param procMember use specified ProcedureMember
    */
    RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
-      ProcedureMemberRpcs controller, ProcedureMember cohortMember) {
+      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
     this.rss = parent;
-    this.memberRpcs = controller;
-    this.member = cohortMember;
-    // read in the snapshot request configuration properties
-    taskManager = new SnapshotSubprocedurePool(parent, conf);
+    this.memberRpcs = memberRpc;
+    this.member = procMember;
   }
 
   /**
-   * Create a default snapshot handler - uses a zookeeper based cohort controller.
+   * Create a default snapshot handler - uses a zookeeper based member controller.
    * @param rss region server running the handler
    * @throws KeeperException if the zookeeper cluster cannot be reached
    */
@@ -135,9 +132,6 @@ public class RegionServerSnapshotManager
     // create the actual snapshot procedure member
     ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads,
nodeName);
     this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
-
-    // setup the task manager to run all the snapshots tasks
-    taskManager = new SnapshotSubprocedurePool(rss, conf);
   }
 
   /**
@@ -156,12 +150,6 @@ public class RegionServerSnapshotManager
     String mode = force ? "abruptly" : "gracefully";
     LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
 
-    if (force) {
-      this.taskManager.stop();
-    } else {
-      this.taskManager.shutdown();
-    }
-
     try {
       this.member.close();
     } finally {
@@ -201,14 +189,19 @@ public class RegionServerSnapshotManager
     // expects participation in the procedure and without sending message the snapshot attempt
     // will hang and fail.
 
-    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table
" + snapshot.getTable());
+    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table
"
+        + snapshot.getTable());
     ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
     Configuration conf = rss.getConfiguration();
-    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
-    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
+    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
+        SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
+        SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
 
     switch (snapshot.getType()) {
     case FLUSH:
+      SnapshotSubprocedurePool taskManager =
+        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
       return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
           timeoutMillis, involvedRegions, snapshot, taskManager);
     default:
@@ -274,24 +267,28 @@ public class RegionServerSnapshotManager
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    SnapshotSubprocedurePool(Server parent, Configuration conf) {
+    SnapshotSubprocedurePool(String name, Configuration conf) {
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
       int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
-      this.name = parent.getServerName().toString();
+      this.name = name;
       executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
               + name + ")-snapshot-pool"));
       taskPool = new ExecutorCompletionService<Void>(executor);
     }
 
+    boolean hasTasks() {
+      return futures.size() != 0;
+    }
+    
     /**
      * Submit a task to the pool.
      *
      * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
This
-     * version does not support issuing tasks from multiple concurrnty table snapshots requests.
+     * version does not support issuing tasks from multiple concurrent table snapshots requests.
      */
     void submitTask(final Callable<Void> task) {
       Future<Void> f = this.taskPool.submit(task);
@@ -302,18 +299,16 @@ public class RegionServerSnapshotManager
      * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
      * This *must* be called to after all tasks are submitted via submitTask.
      *
-     * TODO: to support multiple concurrent snapshots this code needs to be rewritten.  The
common
-     * pool of futures not being thread safe is part of the problem.
-     *
      * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+     * @throws InterruptedException
      * @throws SnapshotCreationException if the snapshot failed while we were waiting
      */
-    boolean waitForOutstandingTasks() throws ForeignException {
+    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
       LOG.debug("Waiting for local region snapshots to finish.");
 
+      int sz = futures.size();
       try {
         // Using the completion service to process the futures that finish first first.
-        int sz = futures.size();
         for (int i = 0; i < sz; i++) {
           Future<Void> f = taskPool.take();
           f.get();
@@ -325,48 +320,48 @@ public class RegionServerSnapshotManager
         LOG.debug("Completed " + sz +  " local region snapshots.");
         return true;
       } catch (InterruptedException e) {
-        // TODO this isn't completely right and needs to be revisited.
         LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
-        if (stopped) {
+        if (!stopped) {
+          Thread.currentThread().interrupt();
           throw new ForeignException("SnapshotSubprocedurePool", e);
         }
-        Thread.currentThread().interrupt();
+        // we are stopped so we can just exit.
       } catch (ExecutionException e) {
         if (e.getCause() instanceof ForeignException) {
+          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
           throw (ForeignException)e.getCause();
         }
+        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
       } finally {
-        // close off remaining tasks
-        for (Future<Void> f: futures) {
-          if (!f.isDone()){
-            LOG.warn("cancelling region task");
-            f.cancel(true);
-          }
-        }
-        futures.clear();
+        cancelTasks();
       }
       return false;
     }
 
     /**
      * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+     * @throws InterruptedException
      */
-    void cancelTasks() {
-      for (Future<Void> f: futures) {
+    void cancelTasks() throws InterruptedException {
+      Collection<Future<Void>> tasks = futures;
+      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
+      for (Future<Void> f: tasks) {
         f.cancel(true);
       }
-    }
 
-    /**
-     * This politely shutsdown the thread pool.  Call when gracefully exiting a region server.
-     */
-    void shutdown() {
-      this.executor.shutdown();
+      // evict remaining tasks and futures from taskPool.
+      LOG.debug(taskPool);
+      while (!futures.isEmpty()) {
+        // block to remove cancelled futures;
+        LOG.warn("Removing cancelled elements from taskPool");
+        futures.remove(taskPool.take());
+      }
+      stop();
     }
 
     /**
-     * This abruptly shutsdown the thread pool.  Call when exiting a region server.
+     * Abruptly shutdown the thread pool.  Call when exiting a region server.
      */
     void stop() {
       if (this.stopped) return;



Mime
View raw message