hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish'
Date Wed, 12 Jul 2017 01:19:43 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 d11b5d1fd -> 2857b75c2


HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush()
to finish'


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2857b75c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2857b75c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2857b75c

Branch: refs/heads/branch-1.2
Commit: 2857b75c2750f2cc0bd29a3d100794021dec509d
Parents: d11b5d1
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Jul 11 18:19:37 2017 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Jul 11 18:19:37 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 30 +++++++++++++++++++
 .../snapshot/FlushSnapshotSubprocedure.java     | 31 +++++++++++++++++++-
 2 files changed, 60 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2857b75c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d0d457f..43bb046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1572,6 +1572,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     }
   }
 
+  public void waitForFlushes() {
+    synchronized (writestate) {
+      if (this.writestate.readOnly) {
+        // we should not wait for replayed flushed if we are read only (for example in case
the
+        // region is a secondary replica).
+        return;
+      }
+      if (!writestate.flushing) return;
+      long start = System.currentTimeMillis();
+      boolean interrupted = false;
+      try {
+        while (writestate.flushing) {
+          LOG.debug("waiting for cache flush to complete for region " + this);
+          try {
+            writestate.wait();
+          } catch (InterruptedException iex) {
+            // essentially ignore and propagate the interrupt back up
+            LOG.warn("Interrupted while waiting");
+            interrupted = true;
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      long duration = System.currentTimeMillis() - start;
+      LOG.debug("Waited " + duration + " ms for flush to complete");
+    }
+  }
   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
       final String threadNamePrefix) {
     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/2857b75c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
index f083601..5669452 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver.snapshot;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -24,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -52,6 +55,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
   private final SnapshotSubprocedurePool taskManager;
   private boolean snapshotSkipFlush = false;
 
+  // the maximum number of attempts we flush
+  final static int MAX_RETRIES = 3;
+
   public FlushSnapshotSubprocedure(ProcedureMember member,
       ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
       List<Region> regions, SnapshotDescription snapshot,
@@ -96,7 +102,30 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
           LOG.debug("take snapshot without flush memstore first");
         } else {
           LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
-          region.flush(true);
+          boolean succeeded = false;
+          long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
+          for (int i = 0; i < MAX_RETRIES; i++) {
+            FlushResult res = region.flush(true);
+            if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
+              if (region instanceof HRegion) {
+                HRegion hreg = (HRegion) region;
+                // CANNOT_FLUSH may mean that a flush is already on-going
+                // we need to wait for that flush to complete
+                hreg.waitForFlushes();
+              }
+              if (region.getMaxFlushedSeqId() >= readPt) {
+                // writes at the start of the snapshot have been persisted
+                succeeded = true;
+                break;
+              }
+            } else {
+              succeeded = true;
+              break;
+            }
+          }
+          if (!succeeded) {
+            throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
+          }
         }
         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
         if (snapshotSkipFlush) {


Mime
View raw message