hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17407: Correct update of maxFlushedSeqId in HRegion
Date Mon, 23 Jan 2017 01:23:40 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 3abd13dac -> f254e278e


HBASE-17407: Correct update of maxFlushedSeqId in HRegion

Signed-off-by: zhangduo <zhangduo@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f254e278
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f254e278
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f254e278

Branch: refs/heads/master
Commit: f254e278ece751e67c92570aef4b15fddab22a94
Parents: 3abd13d
Author: eshcar <eshcar@yahoo-inc.com>
Authored: Thu Jan 19 01:11:58 2017 +0200
Committer: zhangduo <zhangduo@apache.org>
Committed: Mon Jan 23 09:22:51 2017 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  | 24 ++++++++++++++++----
 .../hbase/regionserver/CompactionPipeline.java  |  8 +++++++
 .../hbase/regionserver/DefaultMemStore.java     |  4 +++-
 .../hadoop/hbase/regionserver/HRegion.java      | 10 ++++----
 .../hadoop/hbase/regionserver/HStore.java       |  4 ++--
 .../hadoop/hbase/regionserver/MemStore.java     | 10 ++++----
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  9 ++++++++
 .../regionserver/wal/SequenceIdAccounting.java  | 21 +++++++++++++----
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  6 +++++
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  2 ++
 10 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index ed7d274..48dc880 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -124,13 +125,20 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   /**
-   * This method is called when it is clear that the flush to disk is completed.
-   * The store may do any post-flush actions at this point.
-   * One example is to update the WAL with sequence number that is known only at the store
level.
+   * This method is called before the flush is executed.
+   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the
flush
+   * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
    */
   @Override
-  public void finalizeFlush() {
-    updateLowestUnflushedSequenceIdInWAL(false);
+  public long preFlushSeqIDEstimation() {
+    if(compositeSnapshot) {
+      return HConstants.NO_SEQNUM;
+    }
+    Segment segment = getLastSegment();
+    if(segment == null) {
+      return HConstants.NO_SEQNUM;
+    }
+    return segment.getMinSequenceId();
   }
 
   @Override
@@ -364,6 +372,12 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
+  private Segment getLastSegment() {
+    Segment localActive = getActive();
+    Segment tail = pipeline.getTail();
+    return tail == null ? localActive : tail;
+  }
+
   private byte[] getFamilyNameInBytes() {
     return store.getFamily().getName();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e533bd0..9a844e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -267,6 +267,14 @@ public class CompactionPipeline {
     if(segment != null) pipeline.addLast(segment);
   }
 
+  public Segment getTail() {
+    List<? extends Segment> localCopy = getSegments();
+    if(localCopy.isEmpty()) {
+      return null;
+    }
+    return localCopy.get(localCopy.size()-1);
+  }
+
   private boolean addFirst(ImmutableSegment segment) {
     pipeline.addFirst(segment);
     return true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index d4e6e12..63af570 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -169,7 +170,8 @@ public class DefaultMemStore extends AbstractMemStore {
   }
 
   @Override
-  public void finalizeFlush() {
+  public long preFlushSeqIDEstimation() {
+    return HConstants.NO_SEQNUM;
   }
 
   @Override public boolean isSloppy() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f35d788..ef6239d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2412,9 +2412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
     MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
 
-    Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+    Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
     for (Store store: storesToFlush) {
-      flushedFamilyNames.add(store.getFamily().getName());
+      flushedFamilyNamesToSeq.put(store.getFamily().getName(),
+          ((HStore) store).preFlushSeqIDEstimation());
     }
 
     TreeMap<byte[], StoreFlushContext> storeFlushCtxs
@@ -2434,7 +2435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     try {
       if (wal != null) {
         Long earliestUnflushedSequenceIdForTheRegion =
-            wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
+            wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq);
         if (earliestUnflushedSequenceIdForTheRegion == null) {
           // This should never happen. This is how startCacheFlush signals flush cannot proceed.
           String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
@@ -2677,9 +2678,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     }
 
     // If we get to here, the HStores have been written.
-    for(Store storeToFlush :storesToFlush) {
-      ((HStore) storeToFlush).finalizeFlush();
-    }
     if (wal != null) {
       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 425667a..ad23ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2509,8 +2509,8 @@ public class HStore implements Store {
     }
   }
 
-  public void finalizeFlush() {
-    memstore.finalizeFlush();
+  public Long preFlushSeqIDEstimation() {
+    return memstore.preFlushSeqIDEstimation();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index b094476..38d3e44 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -119,12 +119,12 @@ public interface MemStore {
   MemstoreSize size();
 
   /**
-   * This method is called when it is clear that the flush to disk is completed.
-   * The store may do any post-flush actions at this point.
-   * One example is to update the wal with sequence number that is known only at the store
level.
+   * This method is called before the flush is executed.
+   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the
flush
+   * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
    */
-  void finalizeFlush();
+  long preFlushSeqIDEstimation();
 
-  /* Return true if the memstore may need some extra memory space*/
+  /* Return true if the memstore may use some extra memory space*/
   boolean isSloppy();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 316e2f6..7e3bd59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -429,6 +429,15 @@ public abstract class AbstractFSWAL<W> implements WAL {
   }
 
   @Override
+  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq)
{
+    if (!closeBarrier.beginOp()) {
+      LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
+      return null;
+    }
+    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
+  }
+
+  @Override
   public void completeCacheFlush(byte[] encodedRegionName) {
     this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
     closeBarrier.endOp();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index 6e7ad9b..8226b82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -264,6 +264,14 @@ class SequenceIdAccounting {
    * oldest/lowest outstanding edit.
    */
   Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families)
{
+    Map<byte[],Long> familytoSeq = new HashMap<>();
+    for (byte[] familyName : families){
+      familytoSeq.put(familyName,HConstants.NO_SEQNUM);
+    }
+    return startCacheFlush(encodedRegionName,familytoSeq);
+  }
+
+  Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq)
{
     Map<ImmutableByteArray, Long> oldSequenceIds = null;
     Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
     synchronized (tieLock) {
@@ -273,9 +281,14 @@ class SequenceIdAccounting {
         // circumstance because another concurrent thread now may add sequenceids for this
family
         // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually
it
         // is fine because updates are blocked when this method is called. Make sure!!!
-        for (byte[] familyName : families) {
-          ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);
-          Long seqId = m.remove(familyNameWrapper);
+        for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {
+          ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());
+          Long seqId = null;
+          if(entry.getValue() == HConstants.NO_SEQNUM) {
+            seqId = m.remove(familyNameWrapper);
+          } else {
+            seqId = m.replace(familyNameWrapper, entry.getValue());
+          }
           if (seqId != null) {
             if (oldSequenceIds == null) {
               oldSequenceIds = new HashMap<>();
@@ -344,7 +357,7 @@ class SequenceIdAccounting {
     if (flushing != null) {
       for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {
         Long currentId = tmpMap.get(e.getKey());
-        if (currentId != null && currentId.longValue() <= e.getValue().longValue())
{
+        if (currentId != null && currentId.longValue() < e.getValue().longValue())
{
           String errorStr = Bytes.toString(encodedRegionName) + " family "
               + e.getKey().toStringUtf8() + " acquired edits out of order current memstore
seq="
               + currentId + ", previous oldest unflushed id=" + e.getValue();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 337f2b4..8f224fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,6 +196,11 @@ class DisabledWALProvider implements WALProvider {
       sync();
     }
 
+    public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
+        flushedFamilyNamesToSeq) {
+      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
+    }
+
     @Override
     public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames)
{
       if (closed.get()) return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f254e278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 030d8b6..b7adc60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -161,6 +161,8 @@ public interface WAL extends Closeable {
    */
   Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
 
+  Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> familyToSeq);
+
   /**
    * Complete the cache flush.
    * @param encodedRegionName Encoded region name.


Mime
View raw message