hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL
Date Sun, 14 Jun 2015 18:26:28 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 71d39bb06 -> 31e81829b


HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/31e81829
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/31e81829
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/31e81829

Branch: refs/heads/branch-1.1
Commit: 31e81829bf21b2c44499372c1ac5a17f562377af
Parents: 71d39bb
Author: Enis Soztutar <enis@apache.org>
Authored: Sun Jun 14 11:26:18 2015 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Sun Jun 14 11:26:18 2015 -0700

----------------------------------------------------------------------
 .../test/IntegrationTestBigLinkedList.java      | 16 +++++---
 .../procedure/flush/FlushTableSubprocedure.java |  1 -
 .../RegionServerFlushTableProcedureManager.java | 22 ++++++++---
 .../hadoop/hbase/regionserver/HRegion.java      | 40 ++++++++++++++++++--
 .../hbase/regionserver/RSRpcServices.java       | 10 ++++-
 .../hadoop/hbase/regionserver/Region.java       | 21 +++++-----
 .../hbase/regionserver/RegionMergeRequest.java  |  7 +++-
 .../hadoop/hbase/regionserver/SplitRequest.java |  7 +++-
 .../snapshot/RegionServerSnapshotManager.java   | 18 ++++++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  1 +
 .../hbase/regionserver/wal/TestWALReplay.java   |  2 +
 11 files changed, 110 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 43bc5d4..1e53c72 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
@@ -86,7 +85,6 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
 import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -222,7 +220,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
 
   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
 
-  private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters
+  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
 
   private static final int WIDTH_DEFAULT = 1000000;
   private static final int WRAP_DEFAULT = 25;
@@ -672,6 +670,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
        */
       public static class WALMapperSearcher extends WALMapper {
         private SortedSet<byte []> keysToFind;
+        private AtomicInteger rows = new AtomicInteger(0);
 
         @Override
         public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context
context)
@@ -693,8 +692,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase
{
           boolean b = this.keysToFind.contains(row);
           if (b) {
             String keyStr = Bytes.toStringBinary(row);
-            LOG.info("Found cell=" + cell);
-            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            try {
+              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
+            } catch (IOException|InterruptedException e) {
+              LOG.warn(e);
+            }
+            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            }
+            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
           }
           return b;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 8d64f2a..5723919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
 
 /**
  * This flush region implementation uses the distributed procedure framework to flush

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 7664dee..a441a6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
@@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
 
     FlushTableSubprocedurePool taskManager =
-        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
+        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
       timeoutMillis, involvedRegions, table, taskManager);
   }
@@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
    * failures.
    */
   static class FlushTableSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    FlushTableSubprocedurePool(String name, Configuration conf) {
+    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
@@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in FlushSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+     * This attempts to cancel out all pending and in progress tasks. Does not interrupt
the running
+     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
      * @throws InterruptedException
      */
     void cancelTasks() throws InterruptedException {
@@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * Abruptly shutdown the thread pool.  Call when exiting a region server.
+     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
+     * interrupted (see HBASE-13877)
      */
     void stop() {
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0e83dd3..4fe9741 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -478,6 +478,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
      * @return true if the memstores were flushed, else false.
      */
+    @Override
     public boolean isFlushSucceeded() {
       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
           .FLUSHED_COMPACTION_NEEDED;
@@ -487,6 +488,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
      * @return True if the flush requested a compaction, else false (doesn't even mean it
flushed).
      */
+    @Override
     public boolean isCompactionNeeded() {
       return result == Result.FLUSHED_COMPACTION_NEEDED;
     }
@@ -1110,7 +1112,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   public long getNumMutationsWithoutWAL() {
     return numMutationsWithoutWAL.get();
   }
-  
+
   @Override
   public long getDataInMemoryWithoutWAL() {
     return dataInMemoryWithoutWAL.get();
@@ -1277,6 +1279,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * vector if already closed and null if judged that it should not close.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close() throws IOException {
     return close(false);
@@ -1314,6 +1319,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * we are not to close at this time or we are already closed.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException
{
     // Only allow one thread to close at a time. Serialize them so dual
@@ -1332,6 +1340,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     }
   }
 
+  /**
+   * Exposed for some very specific unit tests.
+   */
+  @VisibleForTesting
+  public void setClosing(boolean closing) {
+    this.closing.set(closing);
+  }
+
   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask
status)
       throws IOException {
     if (isClosed()) {
@@ -1832,7 +1848,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of wal is required
-   * because a Snapshot was not properly persisted.
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and
the
+   * caller MUST abort after this.
    */
   public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
       throws IOException {
@@ -2330,6 +2347,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           Bytes.toStringBinary(getRegionInfo().getRegionName()));
       dse.initCause(t);
       status.abort("Flush failed: " + StringUtils.stringifyException(t));
+
+      // Callers for flushcache() should catch DroppedSnapshotException and abort the region
server.
+      // However, since we may have the region read lock, we cannot call close(true) here
since
+      // we cannot promote to a write lock. Instead we are setting closing so that all other
region
+      // operations except for close will be rejected.
+      this.closing.set(true);
+
+      if (rsServices != null) {
+        // This is a safeguard against the case where the caller fails to explicitly handle
aborting
+        rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
+      }
+
       throw dse;
     }
 
@@ -2367,7 +2396,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     LOG.info(msg);
     status.setStatus(msg);
 
-    return new FlushResultImpl(compactionRequested ? 
+    return new FlushResultImpl(compactionRequested ?
         FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
           FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
         flushOpSeqId);
@@ -5354,7 +5383,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-      
+
       // If the size limit was reached it means a partial Result is being returned. Returning
a
       // partial Result means that we should not reset the filters; filters should only be
reset in
       // between rows
@@ -6475,6 +6504,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     return results;
   }
 
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     // Don't need nonces here - RowMutations only supports puts and deletes
     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
@@ -6501,6 +6531,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
    * @throws IOException
    */
+  @Override
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException
{
     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
@@ -7499,6 +7530,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
 
   /** @return the coprocessor host */
+  @Override
   public RegionCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 504ad03..615efb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -956,7 +956,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   Configuration getConfiguration() {
     return regionServer.getConfiguration();
   }
-  
+
   private RegionServerQuotaManager getQuotaManager() {
     return regionServer.getRegionServerQuotaManager();
   }
@@ -1330,6 +1330,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
       return MergeRegionsResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
@@ -1765,6 +1768,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       ((HRegion)region).forceSplit(splitPoint);
       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
       return SplitRegionResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
@@ -2438,7 +2444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
             }
           }
-          
+
           quota.addScanResult(results);
 
           // If the scanner's filter - if any - is done with the scan

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 8910042..e61a186 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver {
    */
   long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
 
-  /** 
+  /**
    * @return map of column family names to max sequence id that was read from storage when
this
    * region was opened
    */
@@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver {
 
   ///////////////////////////////////////////////////////////////////////////
   // Metrics
-  
+
   /** @return read requests count for this region */
   long getReadRequestsCount();
 
@@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver {
 
   /** @return the number of mutations processed bypassing the WAL */
   long getNumMutationsWithoutWAL();
-  
+
   /** @return the size of data processed bypassing the WAL, in bytes */
   long getDataInMemoryWithoutWAL();
 
@@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * This method needs to be called before any public call that reads or
-   * modifies data. 
+   * modifies data.
    * Acquires a read lock and checks if the region is closing or closed.
    * <p>{@link #closeRegionOperation} MUST then always be called after
    * the operation has completed, whether it succeeded or failed.
@@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * This method needs to be called before any public call that reads or
-   * modifies data. 
+   * modifies data.
    * Acquires a read lock and checks if the region is closing or closed.
    * <p>{@link #closeRegionOperation} MUST then always be called after
    * the operation has completed, whether it succeeded or failed.
@@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * Perform atomic mutations within the region.
-   * 
+   *
    * @param mutations The list of mutations to perform.
    * <code>mutations</code> can contain operations for multiple rows.
    * Caller has to ensure that all rows are contained in this region.
@@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver {
       CANNOT_FLUSH_MEMSTORE_EMPTY,
       CANNOT_FLUSH
     }
-    
+
     /** @return the detailed result code */
     Result getResult();
 
     /** @return true if the memstores were flushed, else false */
     boolean isFlushSucceeded();
-    
+
     /** @return True if the flush requested a compaction, else false */
     boolean isCompactionNeeded();
   }
@@ -638,7 +638,8 @@ public interface Region extends ConfigurationObserver {
    * the region needs compacting
    *
    * @throws IOException general io exceptions
-   * @throws DroppedSnapshotException Thrown when abort is required
+   * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch
this
+   * exception and MUST abort. Any further operation to the region may cause data loss.
    * because a snapshot was not properly persisted.
    */
   FlushResult flush(boolean force) throws IOException;
@@ -647,7 +648,7 @@ public interface Region extends ConfigurationObserver {
    * Synchronously compact all stores in the region.
    * <p>This operation could block for a long time, so don't call it from a
    * time-sensitive thread.
-   * <p>Note that no locks are taken to prevent possible conflicts between 
+   * <p>Note that no locks are taken to prevent possible conflicts between
    * compaction and splitting activities. The regionserver does not normally compact
    * and split in parallel. However by calling this method you may introduce
    * unexpected and unhandled concurrency. Don't do this unless you know what

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
index a9d5863..62990b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.warn("Running rollback/cleanup of failed merge of "
                   + region_a +" and "+ region_b + "; " + e.getMessage(), e);
@@ -132,7 +137,7 @@ class RegionMergeRequest implements Runnable {
       try {
         this.tableLock.release();
       } catch (IOException ex) {
-        LOG.error("Could not release the table lock (something is really wrong). " 
+        LOG.error("Could not release the table lock (something is really wrong). "
            + "Aborting this server to avoid holding the lock forever.");
         this.server.abort("Abort; we got an error when releasing the table lock "
                          + "on " + region_a.getRegionInfo().getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index 8c73a1b..4003f1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -91,6 +92,10 @@ class SplitRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.info("Running rollback/cleanup of failed split of " +
             parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e);
@@ -147,7 +152,7 @@ class SplitRequest implements Runnable {
       try {
         this.tableLock.release();
       } catch (IOException ex) {
-        LOG.error("Could not release the table lock (something is really wrong). " 
+        LOG.error("Could not release the table lock (something is really wrong). "
            + "Aborting this server to avoid holding the lock forever.");
         this.server.abort("Abort; we got an error when releasing the table lock "
                          + "on " + parent.getRegionInfo().getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 021c16f..f04feb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
     switch (snapshot.getType()) {
     case FLUSH:
       SnapshotSubprocedurePool taskManager =
-        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
       return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
           timeoutMillis, involvedRegions, snapshot, taskManager);
     case SKIPFLUSH:
@@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
          * To minimized the code change, class name is not changed.
          */
         SnapshotSubprocedurePool taskManager2 =
-            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
         return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
             timeoutMillis, involvedRegions, snapshot, taskManager2);
 
@@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
    * operations such as compactions and replication  sinks.
    */
   static class SnapshotSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    SnapshotSubprocedurePool(String name, Configuration conf) {
+    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
@@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager
{
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 680e57f..3b023f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -363,6 +363,7 @@ public class TestHRegion {
             Assert.fail("Didn't bubble up IOE!");
           } catch (DroppedSnapshotException dse) {
             // What we are expecting
+            region.closing.set(false); // this is needed for the rest of the test to work
           }
           // Make it so all writes succeed from here on out
           ffs.fault.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/31e81829/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 3535ba2..a8aa4d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -718,6 +718,8 @@ public class TestWALReplay {
           + t.getMessage());
       // simulated to abort server
       Mockito.doReturn(true).when(rsServices).isAborted();
+      region.setClosing(false); // region normally does not accept writes after
+      // DroppedSnapshotException. We mock around it for this test.
     }
     // writing more data
     int moreRow = 10;


Mime
View raw message