hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL
Date Sun, 14 Jun 2015 18:24:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 0a0ff3354 -> 10772d3af


HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/10772d3a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/10772d3a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/10772d3a

Branch: refs/heads/branch-1
Commit: 10772d3af9d0e4e9a2bdd73730dbc7548c461ccb
Parents: 0a0ff33
Author: Enis Soztutar <enis@apache.org>
Authored: Sun Jun 14 11:24:48 2015 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Sun Jun 14 11:24:48 2015 -0700

----------------------------------------------------------------------
 .../test/IntegrationTestBigLinkedList.java      | 16 ++++++---
 .../procedure/flush/FlushTableSubprocedure.java |  1 -
 .../RegionServerFlushTableProcedureManager.java | 22 +++++++++----
 .../hadoop/hbase/regionserver/HRegion.java      | 34 +++++++++++++++++++-
 .../hbase/regionserver/RSRpcServices.java       | 10 ++++--
 .../hadoop/hbase/regionserver/Region.java       |  2 ++
 .../hbase/regionserver/RegionMergeRequest.java  |  7 +++-
 .../hadoop/hbase/regionserver/SplitRequest.java |  7 +++-
 .../snapshot/RegionServerSnapshotManager.java   | 18 ++++++++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  1 +
 .../hbase/regionserver/wal/TestWALReplay.java   |  2 ++
 11 files changed, 98 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 43bc5d4..1e53c72 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
@@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
 import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -222,7 +220,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
 
   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
 
-  private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters
+  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
 
   private static final int WIDTH_DEFAULT = 1000000;
   private static final int WRAP_DEFAULT = 25;
@@ -672,6 +670,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
        */
       public static class WALMapperSearcher extends WALMapper {
         private SortedSet<byte []> keysToFind;
+        private AtomicInteger rows = new AtomicInteger(0);
 
         @Override
         public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context
context)
@@ -693,8 +692,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
           boolean b = this.keysToFind.contains(row);
           if (b) {
             String keyStr = Bytes.toStringBinary(row);
-            LOG.info("Found cell=" + cell);
-            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            try {
+              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
+            } catch (IOException|InterruptedException e) {
+              LOG.warn(e);
+            }
+            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            }
+            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
           }
           return b;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 8d64f2a..5723919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
 
 /**
  * This flush region implementation uses the distributed procedure framework to flush

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 7664dee..a441a6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
@@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
 
     FlushTableSubprocedurePool taskManager =
-        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
+        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
       timeoutMillis, involvedRegions, table, taskManager);
   }
@@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
    * failures.
    */
   static class FlushTableSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    FlushTableSubprocedurePool(String name, Configuration conf) {
+    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
@@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in FlushSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+     * This attempts to cancel out all pending and in progress tasks. Does not interrupt
the running
+     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
      * @throws InterruptedException
      */
     void cancelTasks() throws InterruptedException {
@@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * Abruptly shutdown the thread pool.  Call when exiting a region server.
+     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
+     * interrupted (see HBASE-13877)
      */
     void stop() {
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 271a6eb..c1666e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -477,6 +477,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
      * @return true if the memstores were flushed, else false.
      */
+    @Override
     public boolean isFlushSucceeded() {
       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
           .FLUSHED_COMPACTION_NEEDED;
@@ -486,6 +487,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
      * @return True if the flush requested a compaction, else false (doesn't even mean it
flushed).
      */
+    @Override
     public boolean isCompactionNeeded() {
       return result == Result.FLUSHED_COMPACTION_NEEDED;
     }
@@ -1277,6 +1279,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * vector if already closed and null if judged that it should not close.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close() throws IOException {
     return close(false);
@@ -1314,6 +1319,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * we are not to close at this time or we are already closed.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException
{
     // Only allow one thread to close at a time. Serialize them so dual
@@ -1332,6 +1340,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     }
   }
 
+  /**
+   * Exposed for some very specific unit tests.
+   */
+  @VisibleForTesting
+  public void setClosing(boolean closing) {
+    this.closing.set(closing);
+  }
+
   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask
status)
       throws IOException {
     if (isClosed()) {
@@ -1831,7 +1847,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of wal is required
-   * because a Snapshot was not properly persisted.
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
       throws IOException {
@@ -2342,6 +2359,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           Bytes.toStringBinary(getRegionInfo().getRegionName()));
       dse.initCause(t);
       status.abort("Flush failed: " + StringUtils.stringifyException(t));
+
+      // Callers for flushcache() should catch DroppedSnapshotException and abort the region
server.
+      // However, since we may have the region read lock, we cannot call close(true) here
since
+      // we cannot promote to a write lock. Instead we are setting closing so that all other
region
+      // operations except for close will be rejected.
+      this.closing.set(true);
+
+      if (rsServices != null) {
+        // This is a safeguard against the case where the caller fails to explicitly handle
aborting
+        rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
+      }
+
       throw dse;
     }
 
@@ -6541,6 +6570,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     return results;
   }
 
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     // Don't need nonces here - RowMutations only supports puts and deletes
     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
@@ -6567,6 +6597,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
    * @throws IOException
    */
+  @Override
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException
{
     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
@@ -7565,6 +7596,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
 
   /** @return the coprocessor host */
+  @Override
   public RegionCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index f593e3e..78b11de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -960,7 +960,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   Configuration getConfiguration() {
     return regionServer.getConfiguration();
   }
-  
+
   private RegionServerQuotaManager getQuotaManager() {
     return regionServer.getRegionServerQuotaManager();
   }
@@ -1334,6 +1334,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
       return MergeRegionsResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
@@ -1769,6 +1772,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       ((HRegion)region).forceSplit(splitPoint);
       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
       return SplitRegionResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
@@ -2462,7 +2468,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
             }
           }
-          
+
           quota.addScanResult(results);
 
           // If the scanner's filter - if any - is done with the scan

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index d2d661e..4e54252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -649,6 +649,8 @@ public interface Region extends ConfigurationObserver {
    *
    * @throws IOException general io exceptions
    * because a snapshot was not properly persisted.
+   * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch
this
+   * exception and MUST abort. Any further operation to the region may cause data loss.
    */
   FlushResult flush(boolean force) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
index 320f2a8..50040c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.warn("Running rollback/cleanup of failed merge of "
                   + region_a +" and "+ region_b + "; " + e.getMessage(), e);
@@ -132,7 +137,7 @@ class RegionMergeRequest implements Runnable {
       try {
         this.tableLock.release();
       } catch (IOException ex) {
-        LOG.error("Could not release the table lock (something is really wrong). " 
+        LOG.error("Could not release the table lock (something is really wrong). "
            + "Aborting this server to avoid holding the lock forever.");
         this.server.abort("Abort; we got an error when releasing the table lock "
                          + "on " + region_a.getRegionInfo().getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index 3762e47..f3e0a20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -91,6 +92,10 @@ class SplitRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.info("Running rollback/cleanup of failed split of " +
             parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e);
@@ -147,7 +152,7 @@ class SplitRequest implements Runnable {
       try {
         this.tableLock.release();
       } catch (IOException ex) {
-        LOG.error("Could not release the table lock (something is really wrong). " 
+        LOG.error("Could not release the table lock (something is really wrong). "
            + "Aborting this server to avoid holding the lock forever.");
         this.server.abort("Abort; we got an error when releasing the table lock "
                          + "on " + parent.getRegionInfo().getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 021c16f..f04feb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
     switch (snapshot.getType()) {
     case FLUSH:
       SnapshotSubprocedurePool taskManager =
-        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
       return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
           timeoutMillis, involvedRegions, snapshot, taskManager);
     case SKIPFLUSH:
@@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
          * To minimized the code change, class name is not changed.
          */
         SnapshotSubprocedurePool taskManager2 =
-            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
         return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
             timeoutMillis, involvedRegions, snapshot, taskManager2);
 
@@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
    * operations such as compactions and replication  sinks.
    */
   static class SnapshotSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    SnapshotSubprocedurePool(String name, Configuration conf) {
+    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
@@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 6b342d7..3bc7a12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -392,6 +392,7 @@ public class TestHRegion {
             Assert.fail("Didn't bubble up IOE!");
           } catch (DroppedSnapshotException dse) {
             // What we are expecting
+            region.closing.set(false); // this is needed for the rest of the test to work
           }
           // Make it so all writes succeed from here on out
           ffs.fault.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/10772d3a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 959b1c2..ba30262 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -718,6 +718,8 @@ public class TestWALReplay {
           + t.getMessage());
       // simulated to abort server
       Mockito.doReturn(true).when(rsServices).isAborted();
+      region.setClosing(false); // region normally does not accept writes after
+      // DroppedSnapshotException. We mock around it for this test.
     }
     // writing more data
     int moreRow = 10;


Mime
View raw message