hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [2/2] hbase git commit: HBASE-11580 Failover handling for secondary region replicas
Date Tue, 03 Mar 2015 21:06:04 GMT
HBASE-11580 Failover handling for secondary region replicas


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9899aab1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9899aab1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9899aab1

Branch: refs/heads/master
Commit: 9899aab12b419144f7f8a8280bedbccc68ee7452
Parents: ce1b81c
Author: Enis Soztutar <enis@apache.org>
Authored: Tue Mar 3 11:48:12 2015 -0800
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue Mar 3 11:48:12 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientSmallScanner.java |   8 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |   1 +
 .../hbase/client/FlushRegionCallable.java       | 102 +++++
 .../client/RegionAdminServiceCallable.java      |  63 +++-
 .../apache/hadoop/hbase/executor/EventType.java |  31 +-
 .../hadoop/hbase/executor/ExecutorType.java     |   3 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |  12 +
 .../apache/hadoop/hbase/util/RetryCounter.java  |   4 +-
 ...IntegrationTestRegionReplicaReplication.java |   5 -
 .../hbase/chaos/actions/RemoveColumnAction.java |   2 +-
 .../hbase/protobuf/generated/AdminProtos.java   | 335 ++++++++++++++---
 .../hbase/protobuf/generated/WALProtos.java     |  58 ++-
 hbase-protocol/src/main/protobuf/Admin.proto    |   2 +
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../hadoop/hbase/regionserver/HRegion.java      | 246 +++++++++---
 .../hbase/regionserver/HRegionServer.java       |  50 ++-
 .../hadoop/hbase/regionserver/HStore.java       |   7 +-
 .../hbase/regionserver/RSRpcServices.java       |  15 +-
 .../handler/RegionReplicaFlushHandler.java      | 187 ++++++++++
 .../RegionReplicaReplicationEndpoint.java       | 129 +++----
 .../hbase/util/ServerRegionReplicaUtil.java     |  25 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  20 +
 .../hbase/io/encoding/TestPrefixTree.java       |   4 +-
 .../regionserver/TestHRegionReplayEvents.java   | 209 +++++++++++
 .../regionserver/TestRegionReplicaFailover.java | 373 +++++++++++++++++++
 .../hbase/regionserver/wal/TestWALReplay.java   |   7 +-
 ...egionReplicaReplicationEndpointNoMaster.java |   1 +
 27 files changed, 1637 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 9fc9cc6..8c51e70 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -104,14 +104,14 @@ public class ClientSmallScanner extends ClientScanner {
       if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
           || checkScanStopRow(endKey) || done) {
         close();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Finished with small scan at " + this.currentRegion);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Finished with small scan at " + this.currentRegion);
         }
         return false;
       }
       localStartKey = endKey;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Finished with region " + this.currentRegion);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Finished with region " + this.currentRegion);
       }
     } else if (this.lastResult != null) {
       localStartKey = this.lastResult.getRow();

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index dae2499..a035e2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -94,6 +94,7 @@ public final class ConnectionUtils {
    */
   public static void setServerSideHConnectionRetriesConfig(
       final Configuration c, final String sn, final Log log) {
+    // TODO: Fix this. Not all connections from server side should have 10 times the retries.
     int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     // Go big.  Multiply by 10.  If we can't get to meta after this many retries

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
new file mode 100644
index 0000000..b2c4a57
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.mortbay.log.Log;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * A Callable for flushRegion() RPC.
+ */
+@InterfaceAudience.Private
+public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
+
+  private final byte[] regionName;
+  private final boolean writeFlushWalMarker;
+  private boolean reload;
+
+  public FlushRegionCallable(ClusterConnection connection,
+      RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] regionName,
+      byte[] regionStartKey, boolean writeFlushWalMarker) {
+    super(connection, rpcControllerFactory, tableName, regionStartKey);
+    this.regionName = regionName;
+    this.writeFlushWalMarker = writeFlushWalMarker;
+  }
+
+  public FlushRegionCallable(ClusterConnection connection,
+      RpcControllerFactory rpcControllerFactory, HRegionInfo regionInfo,
+      boolean writeFlushWalMarker) {
+    this(connection, rpcControllerFactory, regionInfo.getTable(), regionInfo.getRegionName(),
+      regionInfo.getStartKey(), writeFlushWalMarker);
+  }
+
+  @Override
+  public FlushRegionResponse call(int callTimeout) throws Exception {
+    return flushRegion();
+  }
+
+  @Override
+  public void prepare(boolean reload) throws IOException {
+    super.prepare(reload);
+    this.reload = reload;
+  }
+
+  private FlushRegionResponse flushRegion() throws IOException {
+    // check whether we should still do the flush to this region. If the regions are changed due
+    // to splits or merges, etc return success
+    if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
+      if (!reload) {
+        throw new IOException("Cached location seems to be different than requested region.");
+      }
+      Log.info("Skipping flush region, because the located region "
+          + Bytes.toStringBinary(location.getRegionInfo().getRegionName()) + " is different than "
+          + " requested region " + Bytes.toStringBinary(regionName));
+      return FlushRegionResponse.newBuilder()
+          .setLastFlushTime(EnvironmentEdgeManager.currentTime())
+          .setFlushed(false)
+          .setWroteFlushWalMarker(false)
+          .build();
+    }
+
+    FlushRegionRequest request =
+        RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
+
+    try {
+      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+      controller.setPriority(tableName);
+      return stub.flushRegion(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 66dcdce..189dbaa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -23,13 +23,17 @@ import java.io.InterruptedIOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
@@ -42,25 +46,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
 
   protected final ClusterConnection connection;
 
+  protected final RpcControllerFactory rpcControllerFactory;
+
   protected AdminService.BlockingInterface stub;
 
   protected HRegionLocation location;
 
   protected final TableName tableName;
   protected final byte[] row;
+  protected final int replicaId;
 
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
-  public RegionAdminServiceCallable(ClusterConnection connection, TableName tableName, byte[] row) {
-    this(connection, null, tableName, row);
+  public RegionAdminServiceCallable(ClusterConnection connection,
+      RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
+    this(connection, rpcControllerFactory, null, tableName, row);
   }
 
-  public RegionAdminServiceCallable(ClusterConnection connection, HRegionLocation location,
+  public RegionAdminServiceCallable(ClusterConnection connection,
+      RpcControllerFactory rpcControllerFactory, HRegionLocation location,
       TableName tableName, byte[] row) {
+    this(connection, rpcControllerFactory, location,
+      tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+  }
+
+  public RegionAdminServiceCallable(ClusterConnection connection,
+      RpcControllerFactory rpcControllerFactory, HRegionLocation location,
+      TableName tableName, byte[] row, int replicaId) {
     this.connection = connection;
+    this.rpcControllerFactory = rpcControllerFactory;
     this.location = location;
     this.tableName = tableName;
     this.row = row;
+    this.replicaId = replicaId;
   }
 
   @Override
@@ -85,7 +103,18 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     this.stub = stub;
   }
 
-  public abstract HRegionLocation getLocation(boolean useCache) throws IOException;
+  public HRegionLocation getLocation(boolean useCache) throws IOException {
+    RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
+    if (rl == null) {
+      throw new HBaseIOException(getExceptionMessage());
+    }
+    HRegionLocation location = rl.getRegionLocation(replicaId);
+    if (location == null) {
+      throw new HBaseIOException(getExceptionMessage());
+    }
+
+    return location;
+  }
 
   @Override
   public void throwable(Throwable t, boolean retrying) {
@@ -115,7 +144,8 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
 
   //subclasses can override this.
   protected String getExceptionMessage() {
-    return "There is no location";
+    return "There is no location" + " table=" + tableName
+        + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
   }
 
   @Override
@@ -132,4 +162,27 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     }
     return sleep;
   }
+
+  public static RegionLocations getRegionLocations(
+      ClusterConnection connection, TableName tableName, byte[] row,
+      boolean useCache, int replicaId)
+      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
+    RegionLocations rl;
+    try {
+      rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
+    } catch (DoNotRetryIOException e) {
+      throw e;
+    } catch (RetriesExhaustedException e) {
+      throw e;
+    } catch (InterruptedIOException e) {
+      throw e;
+    } catch (IOException e) {
+      throw new RetriesExhaustedException("Can't get the location", e);
+    }
+    if (rl == null) {
+      throw new RetriesExhaustedException("Can't get the locations");
+    }
+
+    return rl;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 9764efd..ac76edb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -37,22 +37,22 @@ public enum EventType {
   // Messages originating from RS (NOTE: there is NO direct communication from
   // RS to Master). These are a result of RS updates into ZK.
   // RS_ZK_REGION_CLOSING    (1),   // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739)
-  
+
   /**
    * RS_ZK_REGION_CLOSED<br>
-   * 
+   *
    * RS has finished closing a region.
    */
   RS_ZK_REGION_CLOSED       (2, ExecutorType.MASTER_CLOSE_REGION),
   /**
    * RS_ZK_REGION_OPENING<br>
-   * 
+   *
    * RS is in process of opening a region.
    */
   RS_ZK_REGION_OPENING      (3, null),
   /**
    * RS_ZK_REGION_OPENED<br>
-   * 
+   *
    * RS has finished opening a region.
    */
   RS_ZK_REGION_OPENED       (4, ExecutorType.MASTER_OPEN_REGION),
@@ -70,7 +70,7 @@ public enum EventType {
   RS_ZK_REGION_SPLIT        (6, ExecutorType.MASTER_SERVER_OPERATIONS),
   /**
    * RS_ZK_REGION_FAILED_OPEN<br>
-   * 
+   *
    * RS failed to open a region.
    */
   RS_ZK_REGION_FAILED_OPEN  (7, ExecutorType.MASTER_CLOSE_REGION),
@@ -217,7 +217,7 @@ public enum EventType {
    * Master adds this region as closing in ZK
    */
   M_ZK_REGION_CLOSING       (51, null),
-  
+
   /**
    * Master controlled events to be executed on the master
    * M_SERVER_SHUTDOWN
@@ -232,14 +232,14 @@ public enum EventType {
   M_META_SERVER_SHUTDOWN    (72, ExecutorType.MASTER_META_SERVER_OPERATIONS),
   /**
    * Master controlled events to be executed on the master.<br>
-   * 
+   *
    * M_MASTER_RECOVERY<br>
    * Master is processing recovery of regions found in ZK RIT
    */
   M_MASTER_RECOVERY         (73, ExecutorType.MASTER_SERVER_OPERATIONS),
   /**
    * Master controlled events to be executed on the master.<br>
-   * 
+   *
    * M_LOG_REPLAY<br>
    * Master is processing log replay of failed region server
    */
@@ -247,18 +247,25 @@ public enum EventType {
 
   /**
    * RS controlled events to be executed on the RS.<br>
-   * 
+   *
    * RS_PARALLEL_SEEK
    */
   RS_PARALLEL_SEEK          (80, ExecutorType.RS_PARALLEL_SEEK),
-  
+
   /**
    * RS wal recovery work items(either creating recover.edits or directly replay wals)
    * to be executed on the RS.<br>
-   * 
+   *
    * RS_LOG_REPLAY
    */
-  RS_LOG_REPLAY             (81, ExecutorType.RS_LOG_REPLAY_OPS);
+  RS_LOG_REPLAY             (81, ExecutorType.RS_LOG_REPLAY_OPS),
+
+  /**
+   * RS flush triggering from secondary region replicas to primary region replica. <br>
+   *
+   * RS_REGION_REPLICA_FLUSH
+   */
+  RS_REGION_REPLICA_FLUSH   (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
 
   private final int code;
   private final ExecutorType executor;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 5590b0a..d0f6bee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -45,7 +45,8 @@ public enum ExecutorType {
   RS_CLOSE_ROOT              (24),
   RS_CLOSE_META              (25),
   RS_PARALLEL_SEEK           (26),
-  RS_LOG_REPLAY_OPS          (27);
+  RS_LOG_REPLAY_OPS          (27),
+  RS_REGION_REPLICA_FLUSH_OPS  (28);
 
   ExecutorType(int value) {}
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index d23aa02..508cf39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -739,10 +739,22 @@ public final class RequestConverter {
   */
  public static FlushRegionRequest
      buildFlushRegionRequest(final byte[] regionName) {
+   return buildFlushRegionRequest(regionName, false);
+ }
+
+ /**
+  * Create a protocol buffer FlushRegionRequest for a given region name
+  *
+  * @param regionName the name of the region to get info
+  * @return a protocol buffer FlushRegionRequest
+  */
+ public static FlushRegionRequest
+     buildFlushRegionRequest(final byte[] regionName, boolean writeFlushWALMarker) {
    FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
+   builder.setWriteFlushWalMarker(writeFlushWALMarker);
    return builder.build();
  }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
index c88cae3..73512fa 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
@@ -152,7 +152,9 @@ public class RetryCounter {
   public void sleepUntilNextRetry() throws InterruptedException {
     int attempts = getAttemptTimes();
     long sleepTime = retryConfig.backoffPolicy.getBackoffTime(retryConfig, attempts);
-    LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
+    }
     retryConfig.getTimeUnit().sleep(sleepTime);
     useRetry();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
index 30da5c0..33b2554 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java
@@ -112,11 +112,6 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge
     runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
   }
 
-  @Override
-  protected void startMonkey() throws Exception {
-    // TODO: disabled for now
-  }
-
   /**
    * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
    * threads to become available to the MultiThradedReader threads. We add this delay because of

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java
index efb4413..c083d9c 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java
@@ -54,7 +54,7 @@ public class RemoveColumnAction extends Action {
     HTableDescriptor tableDescriptor = admin.getTableDescriptor(tableName);
     HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
 
-    if (columnDescriptors.length <= 1) {
+    if (columnDescriptors.length <= (protectedColumns == null ? 1 : protectedColumns.size())) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index 3828742..ea022b5 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -7996,6 +7996,24 @@ public final class AdminProtos {
      * <code>optional uint64 if_older_than_ts = 2;</code>
      */
     long getIfOlderThanTs();
+
+    // optional bool write_flush_wal_marker = 3;
+    /**
+     * <code>optional bool write_flush_wal_marker = 3;</code>
+     *
+     * <pre>
+     * whether to write a marker to WAL even if not flushed
+     * </pre>
+     */
+    boolean hasWriteFlushWalMarker();
+    /**
+     * <code>optional bool write_flush_wal_marker = 3;</code>
+     *
+     * <pre>
+     * whether to write a marker to WAL even if not flushed
+     * </pre>
+     */
+    boolean getWriteFlushWalMarker();
   }
   /**
    * Protobuf type {@code FlushRegionRequest}
@@ -8073,6 +8091,11 @@ public final class AdminProtos {
               ifOlderThanTs_ = input.readUInt64();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              writeFlushWalMarker_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8151,9 +8174,34 @@ public final class AdminProtos {
       return ifOlderThanTs_;
     }
 
+    // optional bool write_flush_wal_marker = 3;
+    public static final int WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3;
+    private boolean writeFlushWalMarker_;
+    /**
+     * <code>optional bool write_flush_wal_marker = 3;</code>
+     *
+     * <pre>
+     * whether to write a marker to WAL even if not flushed
+     * </pre>
+     */
+    public boolean hasWriteFlushWalMarker() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional bool write_flush_wal_marker = 3;</code>
+     *
+     * <pre>
+     * whether to write a marker to WAL even if not flushed
+     * </pre>
+     */
+    public boolean getWriteFlushWalMarker() {
+      return writeFlushWalMarker_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       ifOlderThanTs_ = 0L;
+      writeFlushWalMarker_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8181,6 +8229,9 @@ public final class AdminProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeUInt64(2, ifOlderThanTs_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(3, writeFlushWalMarker_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -8198,6 +8249,10 @@ public final class AdminProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(2, ifOlderThanTs_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(3, writeFlushWalMarker_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -8231,6 +8286,11 @@ public final class AdminProtos {
         result = result && (getIfOlderThanTs()
             == other.getIfOlderThanTs());
       }
+      result = result && (hasWriteFlushWalMarker() == other.hasWriteFlushWalMarker());
+      if (hasWriteFlushWalMarker()) {
+        result = result && (getWriteFlushWalMarker()
+            == other.getWriteFlushWalMarker());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8252,6 +8312,10 @@ public final class AdminProtos {
         hash = (37 * hash) + IF_OLDER_THAN_TS_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getIfOlderThanTs());
       }
+      if (hasWriteFlushWalMarker()) {
+        hash = (37 * hash) + WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getWriteFlushWalMarker());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -8377,6 +8441,8 @@ public final class AdminProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         ifOlderThanTs_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
+        writeFlushWalMarker_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -8417,6 +8483,10 @@ public final class AdminProtos {
           to_bitField0_ |= 0x00000002;
         }
         result.ifOlderThanTs_ = ifOlderThanTs_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.writeFlushWalMarker_ = writeFlushWalMarker_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -8439,6 +8509,9 @@ public final class AdminProtos {
         if (other.hasIfOlderThanTs()) {
           setIfOlderThanTs(other.getIfOlderThanTs());
         }
+        if (other.hasWriteFlushWalMarker()) {
+          setWriteFlushWalMarker(other.getWriteFlushWalMarker());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8624,6 +8697,55 @@ public final class AdminProtos {
         return this;
       }
 
+      // optional bool write_flush_wal_marker = 3;
+      private boolean writeFlushWalMarker_ ;
+      /**
+       * <code>optional bool write_flush_wal_marker = 3;</code>
+       *
+       * <pre>
+       * whether to write a marker to WAL even if not flushed
+       * </pre>
+       */
+      public boolean hasWriteFlushWalMarker() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional bool write_flush_wal_marker = 3;</code>
+       *
+       * <pre>
+       * whether to write a marker to WAL even if not flushed
+       * </pre>
+       */
+      public boolean getWriteFlushWalMarker() {
+        return writeFlushWalMarker_;
+      }
+      /**
+       * <code>optional bool write_flush_wal_marker = 3;</code>
+       *
+       * <pre>
+       * whether to write a marker to WAL even if not flushed
+       * </pre>
+       */
+      public Builder setWriteFlushWalMarker(boolean value) {
+        bitField0_ |= 0x00000004;
+        writeFlushWalMarker_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool write_flush_wal_marker = 3;</code>
+       *
+       * <pre>
+       * whether to write a marker to WAL even if not flushed
+       * </pre>
+       */
+      public Builder clearWriteFlushWalMarker() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        writeFlushWalMarker_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:FlushRegionRequest)
     }
 
@@ -8657,6 +8779,16 @@ public final class AdminProtos {
      * <code>optional bool flushed = 2;</code>
      */
     boolean getFlushed();
+
+    // optional bool wrote_flush_wal_marker = 3;
+    /**
+     * <code>optional bool wrote_flush_wal_marker = 3;</code>
+     */
+    boolean hasWroteFlushWalMarker();
+    /**
+     * <code>optional bool wrote_flush_wal_marker = 3;</code>
+     */
+    boolean getWroteFlushWalMarker();
   }
   /**
    * Protobuf type {@code FlushRegionResponse}
@@ -8719,6 +8851,11 @@ public final class AdminProtos {
               flushed_ = input.readBool();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              wroteFlushWalMarker_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8791,9 +8928,26 @@ public final class AdminProtos {
       return flushed_;
     }
 
+    // optional bool wrote_flush_wal_marker = 3;
+    public static final int WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3;
+    private boolean wroteFlushWalMarker_;
+    /**
+     * <code>optional bool wrote_flush_wal_marker = 3;</code>
+     */
+    public boolean hasWroteFlushWalMarker() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional bool wrote_flush_wal_marker = 3;</code>
+     */
+    public boolean getWroteFlushWalMarker() {
+      return wroteFlushWalMarker_;
+    }
+
     private void initFields() {
       lastFlushTime_ = 0L;
       flushed_ = false;
+      wroteFlushWalMarker_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8817,6 +8971,9 @@ public final class AdminProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBool(2, flushed_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(3, wroteFlushWalMarker_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -8834,6 +8991,10 @@ public final class AdminProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(2, flushed_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(3, wroteFlushWalMarker_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -8867,6 +9028,11 @@ public final class AdminProtos {
         result = result && (getFlushed()
             == other.getFlushed());
       }
+      result = result && (hasWroteFlushWalMarker() == other.hasWroteFlushWalMarker());
+      if (hasWroteFlushWalMarker()) {
+        result = result && (getWroteFlushWalMarker()
+            == other.getWroteFlushWalMarker());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8888,6 +9054,10 @@ public final class AdminProtos {
         hash = (37 * hash) + FLUSHED_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getFlushed());
       }
+      if (hasWroteFlushWalMarker()) {
+        hash = (37 * hash) + WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getWroteFlushWalMarker());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -9001,6 +9171,8 @@ public final class AdminProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         flushed_ = false;
         bitField0_ = (bitField0_ & ~0x00000002);
+        wroteFlushWalMarker_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -9037,6 +9209,10 @@ public final class AdminProtos {
           to_bitField0_ |= 0x00000002;
         }
         result.flushed_ = flushed_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.wroteFlushWalMarker_ = wroteFlushWalMarker_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -9059,6 +9235,9 @@ public final class AdminProtos {
         if (other.hasFlushed()) {
           setFlushed(other.getFlushed());
         }
+        if (other.hasWroteFlushWalMarker()) {
+          setWroteFlushWalMarker(other.getWroteFlushWalMarker());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9156,6 +9335,39 @@ public final class AdminProtos {
         return this;
       }
 
+      // optional bool wrote_flush_wal_marker = 3;
+      private boolean wroteFlushWalMarker_ ;
+      /**
+       * <code>optional bool wrote_flush_wal_marker = 3;</code>
+       */
+      public boolean hasWroteFlushWalMarker() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional bool wrote_flush_wal_marker = 3;</code>
+       */
+      public boolean getWroteFlushWalMarker() {
+        return wroteFlushWalMarker_;
+      }
+      /**
+       * <code>optional bool wrote_flush_wal_marker = 3;</code>
+       */
+      public Builder setWroteFlushWalMarker(boolean value) {
+        bitField0_ |= 0x00000004;
+        wroteFlushWalMarker_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool wrote_flush_wal_marker = 3;</code>
+       */
+      public Builder clearWroteFlushWalMarker() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        wroteFlushWalMarker_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:FlushRegionResponse)
     }
 
@@ -22073,66 +22285,67 @@ public final class AdminProtos {
       "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" +
       " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
       "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(",
-      "\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
+      "\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
       "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
-      "\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" +
-      "_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" +
-      "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
-      "fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" +
-      "Response\"W\n\024CompactRegionRequest\022 \n\006regi" +
-      "on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" +
-      "\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" +
-      "se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda",
-      "te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" +
-      "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" +
-      "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" +
-      "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" +
-      "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" +
-      "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" +
-      "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" +
-      "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" +
-      "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" +
-      "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
-      "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" +
-      "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" +
-      "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" +
-      "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" +
-      "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" +
-      "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" +
-      "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" +
-      " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" +
-      "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " +
-      "\n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upda",
-      "teConfigurationRequest\"\035\n\033UpdateConfigur" +
-      "ationResponse2\230\010\n\014AdminService\022>\n\rGetReg" +
-      "ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" +
-      "onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" +
-      "eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" +
-      "etOnlineRegion\022\027.GetOnlineRegionRequest\032" +
-      "\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" +
-      "\022.OpenRegionRequest\032\023.OpenRegionResponse" +
-      "\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" +
-      "loseRegionResponse\0228\n\013FlushRegion\022\023.Flus",
-      "hRegionRequest\032\024.FlushRegionResponse\0228\n\013" +
-      "SplitRegion\022\023.SplitRegionRequest\032\024.Split" +
-      "RegionResponse\022>\n\rCompactRegion\022\025.Compac" +
-      "tRegionRequest\032\026.CompactRegionResponse\022;" +
-      "\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" +
-      "ergeRegionsResponse\022J\n\021ReplicateWALEntry" +
-      "\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" +
-      "ALEntryResponse\022?\n\006Replay\022\031.ReplicateWAL" +
-      "EntryRequest\032\032.ReplicateWALEntryResponse" +
-      "\022>\n\rRollWALWriter\022\025.RollWALWriterRequest",
-      "\032\026.RollWALWriterResponse\022>\n\rGetServerInf" +
-      "o\022\025.GetServerInfoRequest\032\026.GetServerInfo" +
-      "Response\0225\n\nStopServer\022\022.StopServerReque" +
-      "st\032\023.StopServerResponse\022M\n\022UpdateFavored" +
-      "Nodes\022\032.UpdateFavoredNodesRequest\032\033.Upda" +
-      "teFavoredNodesResponse\022P\n\023UpdateConfigur" +
-      "ation\022\033.UpdateConfigurationRequest\032\034.Upd" +
-      "ateConfigurationResponseBA\n*org.apache.h" +
-      "adoop.hbase.protobuf.generatedB\013AdminPro" +
-      "tosH\001\210\001\001\240\001\001"
+      "\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" +
+      "lushRegionResponse\022\027\n\017last_flush_time\030\001 " +
+      "\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" +
+      "marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" +
+      "gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" +
+      "int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp" +
+      "actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
+      "nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(",
+      "\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" +
+      "oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." +
+      "UpdateFavoredNodesRequest.RegionUpdateIn" +
+      "fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" +
+      ".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" +
+      "verName\".\n\032UpdateFavoredNodesResponse\022\020\n" +
+      "\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" +
+      "\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re" +
+      "gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" +
+      "le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"",
+      "X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" +
+      "_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" +
+      "unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" +
+      "\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" +
+      "ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" +
+      "RollWALWriterResponse\022\027\n\017region_to_flush" +
+      "\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " +
+      "\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn" +
+      "foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" +
+      " \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n",
+      "\025GetServerInfoResponse\022 \n\013server_info\030\001 " +
+      "\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" +
+      "equest\"\035\n\033UpdateConfigurationResponse2\230\010" +
+      "\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" +
+      "ionInfoRequest\032\026.GetRegionInfoResponse\022;" +
+      "\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" +
+      "etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" +
+      ".GetOnlineRegionRequest\032\030.GetOnlineRegio" +
+      "nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" +
+      "est\032\023.OpenRegionResponse\0228\n\013CloseRegion\022",
+      "\023.CloseRegionRequest\032\024.CloseRegionRespon" +
+      "se\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024" +
+      ".FlushRegionResponse\0228\n\013SplitRegion\022\023.Sp" +
+      "litRegionRequest\032\024.SplitRegionResponse\022>" +
+      "\n\rCompactRegion\022\025.CompactRegionRequest\032\026" +
+      ".CompactRegionResponse\022;\n\014MergeRegions\022\024" +
+      ".MergeRegionsRequest\032\025.MergeRegionsRespo" +
+      "nse\022J\n\021ReplicateWALEntry\022\031.ReplicateWALE" +
+      "ntryRequest\032\032.ReplicateWALEntryResponse\022" +
+      "?\n\006Replay\022\031.ReplicateWALEntryRequest\032\032.R",
+      "eplicateWALEntryResponse\022>\n\rRollWALWrite" +
+      "r\022\025.RollWALWriterRequest\032\026.RollWALWriter" +
+      "Response\022>\n\rGetServerInfo\022\025.GetServerInf" +
+      "oRequest\032\026.GetServerInfoResponse\0225\n\nStop" +
+      "Server\022\022.StopServerRequest\032\023.StopServerR" +
+      "esponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFa" +
+      "voredNodesRequest\032\033.UpdateFavoredNodesRe" +
+      "sponse\022P\n\023UpdateConfiguration\022\033.UpdateCo" +
+      "nfigurationRequest\032\034.UpdateConfiguration" +
+      "ResponseBA\n*org.apache.hadoop.hbase.prot",
+      "obuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -22210,13 +22423,13 @@ public final class AdminProtos {
           internal_static_FlushRegionRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_FlushRegionRequest_descriptor,
-              new java.lang.String[] { "Region", "IfOlderThanTs", });
+              new java.lang.String[] { "Region", "IfOlderThanTs", "WriteFlushWalMarker", });
           internal_static_FlushRegionResponse_descriptor =
             getDescriptor().getMessageTypes().get(11);
           internal_static_FlushRegionResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_FlushRegionResponse_descriptor,
-              new java.lang.String[] { "LastFlushTime", "Flushed", });
+              new java.lang.String[] { "LastFlushTime", "Flushed", "WroteFlushWalMarker", });
           internal_static_SplitRegionRequest_descriptor =
             getDescriptor().getMessageTypes().get(12);
           internal_static_SplitRegionRequest_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 35192cc..fa73077 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -5695,6 +5695,14 @@ public final class WALProtos {
        * <code>ABORT_FLUSH = 2;</code>
        */
       ABORT_FLUSH(2, 2),
+      /**
+       * <code>CANNOT_FLUSH = 3;</code>
+       *
+       * <pre>
+       * marker for indicating that a flush has been requested but cannot complete
+       * </pre>
+       */
+      CANNOT_FLUSH(3, 3),
       ;
 
       /**
@@ -5709,6 +5717,14 @@ public final class WALProtos {
        * <code>ABORT_FLUSH = 2;</code>
        */
       public static final int ABORT_FLUSH_VALUE = 2;
+      /**
+       * <code>CANNOT_FLUSH = 3;</code>
+       *
+       * <pre>
+       * marker for indicating that a flush has been requested but cannot complete
+       * </pre>
+       */
+      public static final int CANNOT_FLUSH_VALUE = 3;
 
 
       public final int getNumber() { return value; }
@@ -5718,6 +5734,7 @@ public final class WALProtos {
           case 0: return START_FLUSH;
           case 1: return COMMIT_FLUSH;
           case 2: return ABORT_FLUSH;
+          case 3: return CANNOT_FLUSH;
           default: return null;
         }
       }
@@ -11848,7 +11865,7 @@ public final class WALProtos {
       "n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
       "paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
       "\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
-      "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
+      "_name\030\007 \001(\014\"\222\003\n\017FlushDescriptor\022,\n\006actio" +
       "n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
       "\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
       "\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
@@ -11856,25 +11873,26 @@ public final class WALProtos {
       "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
       "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
       "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
-      "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
-      "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
-      "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
-      "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
-      "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
-      "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
-      "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
-      "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
-      "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
-      "nEventDescriptor.EventType\022\022\n\ntable_name" +
-      "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
-      "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
-      "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
-      "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
-      "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
-      "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
-      "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
-      "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
-      "eneratedB\tWALProtosH\001\210\001\000\240\001\001"
+      "utput\030\003 \003(\t\"S\n\013FlushAction\022\017\n\013START_FLUS" +
+      "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020" +
+      "\n\014CANNOT_FLUSH\020\003\"R\n\017StoreDescriptor\022\023\n\013f" +
+      "amily_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t",
+      "\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescrip" +
+      "tor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023e" +
+      "ncoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(\013" +
+      "2\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030\004" +
+      " \002(\003\"\237\002\n\025RegionEventDescriptor\0224\n\nevent_" +
+      "type\030\001 \002(\0162 .RegionEventDescriptor.Event" +
+      "Type\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_regi" +
+      "on_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001" +
+      "(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n\006" +
+      "server\030\006 \001(\0132\013.ServerName\022\023\n\013region_name",
+      "\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014" +
+      "REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeTyp" +
+      "e\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLIC" +
+      "ATION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoo" +
+      "p.hbase.protobuf.generatedB\tWALProtosH\001\210" +
+      "\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index fcc4e1d..5f0572a 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -115,11 +115,13 @@ message CloseRegionResponse {
 message FlushRegionRequest {
   required RegionSpecifier region = 1;
   optional uint64 if_older_than_ts = 2;
+  optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
 }
 
 message FlushRegionResponse {
   required uint64 last_flush_time = 1;
   optional bool flushed = 2;
+  optional bool wrote_flush_wal_marker = 3;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 3fd6025..9853e36 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -109,6 +109,7 @@ message FlushDescriptor {
     START_FLUSH = 0;
     COMMIT_FLUSH = 1;
     ABORT_FLUSH = 2;
+    CANNOT_FLUSH = 3; // marker for indicating that a flush has been requested but cannot complete
   }
 
   message StoreFlushDescriptor {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9899aab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8f73af5..ac13382 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -480,6 +480,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     final Result result;
     final String failureReason;
     final long flushSequenceId;
+    final boolean wroteFlushWalMarker;
 
     /**
      * Convenience constructor to use when the flush is successful, the failure message is set to
@@ -489,7 +490,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
      *                        memstores.
      */
     FlushResult(Result result, long flushSequenceId) {
-      this(result, flushSequenceId, null);
+      this(result, flushSequenceId, null, false);
       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
           .FLUSHED_COMPACTION_NEEDED;
     }
@@ -499,8 +500,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
      * @param failureReason Reason why we couldn't flush.
      */
-    FlushResult(Result result, String failureReason) {
-      this(result, -1, failureReason);
+    FlushResult(Result result, String failureReason, boolean wroteFlushMarker) {
+      this(result, -1, failureReason, wroteFlushMarker);
       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
     }
 
@@ -510,10 +511,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
      * @param failureReason Reason why we couldn't flush, or null.
      */
-    FlushResult(Result result, long flushSequenceId, String failureReason) {
+    FlushResult(Result result, long flushSequenceId, String failureReason,
+      boolean wroteFlushMarker) {
       this.result = result;
       this.flushSequenceId = flushSequenceId;
       this.failureReason = failureReason;
+      this.wroteFlushWalMarker = wroteFlushMarker;
     }
 
     /**
@@ -1787,7 +1790,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * @throws IOException
    */
   public FlushResult flushcache() throws IOException {
-    return flushcache(true);
+    return flushcache(true, false);
   }
 
   /**
@@ -1811,11 +1814,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * because a Snapshot was not properly persisted.
    */
   public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
+    return flushcache(forceFlushAllStores, false);
+  }
+
+
+  /**
+   * Flush the cache.
+   *
+   * When this method is called the cache will be flushed unless:
+   * <ol>
+   *   <li>the cache is empty</li>
+   *   <li>the region is closed.</li>
+   *   <li>a flush is already in progress</li>
+   *   <li>writes are disabled</li>
+   * </ol>
+   *
+   * <p>This method may block for some time, so it should not be called from a
+   * time-sensitive thread.
+   * @param forceFlushAllStores whether we want to flush all stores
+   * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
+   * @return whether the flush is success and whether the region needs compacting
+   *
+   * @throws IOException general io exceptions
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted.
+   */
+  public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
+      throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
       String msg = "Skipping flush on " + this + " because closing";
       LOG.debug(msg);
-      return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+      return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
     }
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
     status.setStatus("Acquiring readlock on region");
@@ -1826,7 +1856,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         String msg = "Skipping flush on " + this + " because closed";
         LOG.debug(msg);
         status.abort(msg);
-        return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+        return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor pre-flush hooks");
@@ -1851,14 +1881,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
               + (writestate.flushing ? "already flushing"
               : "writes not enabled");
           status.abort(msg);
-          return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+          return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
         }
       }
 
       try {
         Collection<Store> specificStoresToFlush =
             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
-        FlushResult fs = internalFlushcache(specificStoresToFlush, status);
+        FlushResult fs = internalFlushcache(specificStoresToFlush,
+          status, writeFlushRequestWalMarker);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -1955,7 +1986,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    */
   private FlushResult internalFlushcache(MonitoredTask status)
       throws IOException {
-    return internalFlushcache(stores.values(), status);
+    return internalFlushcache(stores.values(), status, false);
   }
 
   /**
@@ -1964,9 +1995,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
    */
   private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
-        status);
+        status, writeFlushWalMarker);
   }
 
   /**
@@ -1998,9 +2029,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    *           properly persisted.
    */
   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
-      final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
+      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+          throws IOException {
     PrepareFlushResult result
-      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
+      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
     if (result.result == null) {
       return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
     } else {
@@ -2010,7 +2042,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
   protected PrepareFlushResult internalPrepareFlushCache(
       final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
-      MonitoredTask status, boolean isReplay)
+      MonitoredTask status, boolean writeFlushWalMarker)
           throws IOException {
 
     if (this.rsServices != null && this.rsServices.isAborted()) {
@@ -2036,14 +2068,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             w = mvcc.beginMemstoreInsert();
             long flushSeqId = getNextSequenceId(wal);
             FlushResult flushResult = new FlushResult(
-              FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
+              FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush",
+              writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
             w.setWriteNumber(flushSeqId);
             mvcc.waitForPreviousTransactionsComplete(w);
             w = null;
             return new PrepareFlushResult(flushResult, myseqid);
           } else {
             return new PrepareFlushResult(
-              new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
+              new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush",
+                false),
               myseqid);
           }
         }
@@ -2110,7 +2144,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             String msg = "Flush will not be started for ["
                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
             status.setStatus(msg);
-            return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
+            return new PrepareFlushResult(
+              new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false),
               myseqid);
           }
           flushOpSeqId = getNextSequenceId(wal);
@@ -2198,6 +2233,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       flushedSeqId, totalFlushableSizeOfFlushableStores);
   }
 
+  /**
+   * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
+   * reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
+   * @param wal
+   * @return
+   */
+  private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
+    if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
+      FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
+        getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
+      try {
+        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+          desc, sequenceId, true);
+        return true;
+      } catch (IOException e) {
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Received exception while trying to write the flush request to wal", e);
+      }
+    }
+    return false;
+  }
+
   protected FlushResult internalFlushCacheAndCommit(
         final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
         final Collection<Store> storesToFlush)
@@ -2267,8 +2324,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
             desc, sequenceId, false);
         } catch (Throwable ex) {
-          LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
-              StringUtils.stringifyException(ex));
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+              + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
+              + StringUtils.stringifyException(ex));
           // ignore this since we will be aborting the RS with DSE.
         }
         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
@@ -3546,7 +3604,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
   protected void checkReadsEnabled() throws IOException {
     if (!this.writestate.readsEnabled) {
-      throw new IOException("The region's reads are disabled. Cannot serve the request");
+      throw new IOException(getRegionInfo().getEncodedName()
+        + ": The region's reads are disabled. Cannot serve the request");
     }
   }
 
@@ -3835,7 +3894,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
     if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqid, stores.values(), status);
+      internalFlushcache(null, seqid, stores.values(), status, false);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
@@ -3937,7 +3996,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           if (currentEditSeqId > key.getLogSeqNum()) {
             // when this condition is true, it means we have a serious defect because we need to
             // maintain increasing SeqId for WAL edits per region
-            LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
+            LOG.error(getRegionInfo().getEncodedName() + " : "
+                 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
                 + "; edit=" + val);
           } else {
             currentEditSeqId = key.getLogSeqNum();
@@ -4001,7 +4061,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             editsCount++;
           }
           if (flush) {
-            internalFlushcache(null, currentEditSeqId, stores.values(), status);
+            internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
           }
 
           if (coprocessorHost != null) {
@@ -4060,18 +4120,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       "Compaction marker from WAL ", compaction);
 
     if (replaySeqId < lastReplayedOpenRegionSeqId) {
-      LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
-        + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
-        + " of " + lastReplayedOpenRegionSeqId);
+      LOG.warn(getRegionInfo().getEncodedName() + " : "
+          + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+          + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+          + " of " + lastReplayedOpenRegionSeqId);
       return;
     }
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getRegionInfo().getEncodedName() + " : "
+          + "Replaying compaction marker " + TextFormat.shortDebugString(compaction));
+    }
+
     startRegionOperation(Operation.REPLAY_EVENT);
     try {
       Store store = this.getStore(compaction.getFamilyName().toByteArray());
       if (store == null) {
-        LOG.warn("Found Compaction WAL edit for deleted family:" +
-            Bytes.toString(compaction.getFamilyName().toByteArray()));
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Found Compaction WAL edit for deleted family:"
+            + Bytes.toString(compaction.getFamilyName().toByteArray()));
         return;
       }
       store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
@@ -4080,7 +4147,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
   }
 
-  void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
+  void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
     checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
       "Flush marker from WAL ", flush);
 
@@ -4089,7 +4156,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
+      LOG.debug(getRegionInfo().getEncodedName() + " : "
+          + "Replaying flush marker " + TextFormat.shortDebugString(flush));
     }
 
     startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
@@ -4105,9 +4173,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       case ABORT_FLUSH:
         replayWALFlushAbortMarker(flush);
         break;
+      case CANNOT_FLUSH:
+        replayWALFlushCannotFlushMarker(flush, replaySeqId);
+        break;
       default:
-        LOG.warn("Received a flush event with unknown action, ignoring. "
-            + TextFormat.shortDebugString(flush));
+        LOG.warn(getRegionInfo().getEncodedName() + " : " +
+          "Received a flush event with unknown action, ignoring. " +
+          TextFormat.shortDebugString(flush));
         break;
       }
     } finally {
@@ -4128,7 +4200,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       byte[] family = storeFlush.getFamilyName().toByteArray();
       Store store = getStore(family);
       if (store == null) {
-        LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+          + "Received a flush start marker from primary, but the family is not found. Ignoring"
           + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
         continue;
       }
@@ -4142,9 +4215,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     synchronized (writestate) {
       try {
         if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
-          LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
-            + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
-            + " of " + lastReplayedOpenRegionSeqId);
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+              + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+              + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+              + " of " + lastReplayedOpenRegionSeqId);
           return null;
         }
         if (numMutationsWithoutWAL.get() > 0) {
@@ -4158,7 +4232,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
           // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
           PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
-            flushSeqId, storesToFlush, status, true);
+            flushSeqId, storesToFlush, status, false);
           if (prepareResult.result == null) {
             // save the PrepareFlushResult so that we can use it later from commit flush
             this.writestate.flushing = true;
@@ -4169,6 +4243,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
             }
           } else {
+            // special case empty memstore. We will still save the flush result in this case, since
+            // our memstore ie empty, but the primary is still flushing
+            if (prepareResult.result.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
+              this.writestate.flushing = true;
+              this.prepareFlushResult = prepareResult;
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(getRegionInfo().getEncodedName() + " : "
+                  + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
+              }
+            }
             status.abort("Flush prepare failed with " + prepareResult.result);
             // nothing much to do. prepare flush failed because of some reason.
           }
@@ -4177,20 +4261,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           // we already have an active snapshot.
           if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
             // They define the same flush. Log and continue.
-            LOG.warn("Received a flush prepare marker with the same seqId: " +
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a flush prepare marker with the same seqId: " +
                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
             // ignore
           } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
             // We received a flush with a smaller seqNum than what we have prepared. We can only
             // ignore this prepare flush request.
-            LOG.warn("Received a flush prepare marker with a smaller seqId: " +
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a flush prepare marker with a smaller seqId: " +
                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
             // ignore
           } else {
             // We received a flush with a larger seqNum than what we have prepared
-            LOG.warn("Received a flush prepare marker with a larger seqId: " +
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a flush prepare marker with a larger seqId: " +
                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
             // We do not have multiple active snapshots in the memstore or a way to merge current
@@ -4225,7 +4312,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     synchronized (writestate) {
       try {
         if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
-          LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
             + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
             + " of " + lastReplayedOpenRegionSeqId);
           return;
@@ -4253,7 +4341,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // we received a flush commit with a smaller seqId than what we have prepared
             // we will pick the flush file up from this commit (if we have not seen it), but we
             // will not drop the memstore
-            LOG.warn("Received a flush commit marker with smaller seqId: "
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a flush commit marker with smaller seqId: "
                 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
                 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
                 +"  prepared memstore snapshot");
@@ -4267,7 +4356,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // we will pick the flush file for this. We will also obtain the updates lock and
             // look for contents of the memstore to see whether we have edits after this seqId.
             // If not, we will drop all the memstore edits and the snapshot as well.
-            LOG.warn("Received a flush commit marker with larger seqId: "
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a flush commit marker with larger seqId: "
                 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
                 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
                 +" memstore snapshot");
@@ -4284,6 +4374,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             this.prepareFlushResult = null;
             writestate.flushing = false;
           }
+          // If we were waiting for observing a flush or region opening event for not showing
+          // partial data after a secondary region crash, we can allow reads now. We can only make
+          // sure that we are not showing partial data (for example skipping some previous edits)
+          // until we observe a full flush start and flush commit. So if we were not able to find
+          // a previous flush we will not enable reads now.
+          this.setReadsEnabled(true);
         } else {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
@@ -4337,14 +4433,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       byte[] family = storeFlush.getFamilyName().toByteArray();
       Store store = getStore(family);
       if (store == null) {
-        LOG.warn("Received a flush commit marker from primary, but the family is not found." +
-            "Ignoring StoreFlushDescriptor:" + storeFlush);
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Received a flush commit marker from primary, but the family is not found."
+            + "Ignoring StoreFlushDescriptor:" + storeFlush);
         continue;
       }
       List<String> flushFiles = storeFlush.getFlushOutputList();
       StoreFlushContext ctx = null;
       long startTime = EnvironmentEdgeManager.currentTime();
-      if (prepareFlushResult == null) {
+      if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
         ctx = store.createFlushContext(flush.getFlushSequenceNumber());
       } else {
         ctx = prepareFlushResult.storeFlushCtxs.get(family);
@@ -4352,7 +4449,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
 
       if (ctx == null) {
-        LOG.warn("Unexpected: flush commit marker received from store "
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Unexpected: flush commit marker received from store "
             + Bytes.toString(family) + " but no associated flush context. Ignoring");
         continue;
       }
@@ -4376,7 +4474,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       long currentSeqId = getSequenceId().get();
       if (seqId >= currentSeqId) {
         // then we can drop the memstore contents since everything is below this seqId
-        LOG.info("Dropping memstore contents as well since replayed flush seqId: "
+        LOG.info(getRegionInfo().getEncodedName() + " : "
+            + "Dropping memstore contents as well since replayed flush seqId: "
             + seqId + " is greater than current seqId:" + currentSeqId);
 
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
@@ -4388,7 +4487,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           dropStoreMemstoreContentsForSeqId(store, currentSeqId);
         }
       } else {
-        LOG.info("Not dropping memstore contents since replayed flush seqId: "
+        LOG.info(getRegionInfo().getEncodedName() + " : "
+            + "Not dropping memstore contents since replayed flush seqId: "
             + seqId + " is smaller than current seqId:" + currentSeqId);
       }
     } finally {
@@ -4409,6 +4509,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     // that will drop the snapshot
   }
 
+  private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
+    synchronized (writestate) {
+      if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+          + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+          + " because its sequence id " + replaySeqId + " is smaller than this regions "
+          + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
+        return;
+      }
+
+      // If we were waiting for observing a flush or region opening event for not showing partial
+      // data after a secondary region crash, we can allow reads now. This event means that the
+      // primary was not able to flush because memstore is empty when we requested flush. By the
+      // time we observe this, we are guaranteed to have up to date seqId with our previous
+      // assignment.
+      this.setReadsEnabled(true);
+    }
+  }
+
   @VisibleForTesting
   PrepareFlushResult getPrepareFlushResult() {
     return prepareFlushResult;
@@ -4429,13 +4548,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         return;
       }
       if (regionEvent.getEventType() != EventType.REGION_OPEN) {
-        LOG.warn("Unknown region event received, ignoring :"
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Unknown region event received, ignoring :"
             + TextFormat.shortDebugString(regionEvent));
         return;
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
+        LOG.debug(getRegionInfo().getEncodedName() + " : "
+          + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
       }
 
       // we will use writestate as a coarse-grain lock for all the replay events
@@ -4446,10 +4567,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         // region open event's seqid. Since this is the first event that the region puts (after
         // possibly flushing recovered.edits), after seeing this event, we can ignore every edit
         // smaller than this seqId
-        if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
+        if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
           this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
         } else {
-          LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
             + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
             + " of " + lastReplayedOpenRegionSeqId);
           return;
@@ -4462,7 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           byte[] family = storeDescriptor.getFamilyName().toByteArray();
           Store store = getStore(family);
           if (store == null) {
-            LOG.warn("Received a region open marker from primary, but the family is not found. "
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                + "Received a region open marker from primary, but the family is not found. "
                 + "Ignoring. StoreDescriptor:" + storeDescriptor);
             continue;
           }
@@ -4478,7 +4601,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           if (writestate.flushing) {
             // only drop memstore snapshots if they are smaller than last flush for the store
             if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
-              StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
+              StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
+                  null : this.prepareFlushResult.storeFlushCtxs.get(family);
               if (ctx != null) {
                 long snapshotSize = store.getFlushableSize();
                 ctx.abort();
@@ -4524,6 +4648,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         // either greater than flush seq number or they were already dropped via flush.
         getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
 
+        // If we were waiting for observing a flush or region opening event for not showing partial
+        // data after a secondary region crash, we can allow reads now.
+        this.setReadsEnabled(true);
+
         // C. Finally notify anyone waiting on memstore to clear:
         // e.g. checkResources().
         synchronized (this) {
@@ -4865,7 +4993,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
       // a sequence id that we can be sure is beyond the last hfile written).
       if (assignSeqId) {
-        FlushResult fs = this.flushcache(true);
+        FlushResult fs = this.flushcache();
         if (fs.isFlushSucceeded()) {
           seqId = fs.flushSequenceId;
         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -5832,8 +5960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     FileSystem fs = a.getRegionFileSystem().getFileSystem();
     // Make sure each region's cache is empty
-    a.flushcache(true);
-    b.flushcache(true);
+    a.flushcache();
+    b.flushcache();
 
     // Compact each region so we only have one store file per family
     a.compactStores(true);


Mime
View raw message