hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] hbase git commit: HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)
Date Mon, 09 Jan 2017 21:25:06 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 9cbeba6c3 -> dd1ae3714


HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15762691
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15762691
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15762691

Branch: refs/heads/master
Commit: 1576269123f18c9eb21b04a800e81952ec52c04d
Parents: 9cbeba6
Author: Michael Stack <stack@apache.org>
Authored: Mon Jan 9 10:46:34 2017 -0800
Committer: Michael Stack <stack@apache.org>
Committed: Mon Jan 9 10:46:34 2017 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  6 +-
 .../hbase/regionserver/CompactionPipeline.java  | 76 ++++++++++++--------
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  2 +
 3 files changed, 51 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
   @VisibleForTesting
   @Override
   protected List<Segment> getSegments() {
-    List<Segment> pipelineList = pipeline.getSegments();
-    List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+    List<? extends Segment> pipelineList = pipeline.getSegments();
+    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
     list.add(this.active);
     list.addAll(pipelineList);
     list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
-    List<Segment> pipelineList = pipeline.getSegments();
+    List<? extends Segment> pipelineList = pipeline.getSegments();
     long order = pipelineList.size();
     // The list of elements in pipeline + the active element + the snapshot segment
     // TODO : This will change when the snapshot is made of more than one element

http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..ebc8c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -25,50 +25,63 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
- * It supports pushing a segment at the head of the pipeline and pulling a segment from the
- * tail to flush to disk.
- * It also supports swap operation to allow the compactor swap a subset of the segments with
a new
- * (compacted) one. This swap succeeds only if the version number passed with the list of
segments
- * to swap is the same as the current version of the pipeline.
- * The pipeline version is updated whenever swapping segments or pulling the segment at the
tail.
+ * It supports pushing a segment at the head of the pipeline and removing a segment from
the
+ * tail when it is flushed to disk.
+ * It also supports swap method to allow the in-memory compaction swap a subset of the segments
+ * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the
version
+ * number passed with the list of segments to swap is the same as the current version of
the
+ * pipeline.
+ * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
+ * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
+ * The pipeline version is updated by swap(); it allows to identify conflicting operations
at the
+ * suffix of the pipeline.
+ *
+ * The synchronization model is copy-on-write. Methods which change the structure of the
+ * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also
make
+ * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a
read
+ * method accesses the read-only copy more than once it makes a local copy of it
+ * to ensure it accesses the same copy.
+ *
+ * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * protected by a lock since they need to have a consistent (atomic) view of the pipeline
lsit
+ * and version number.
  */
 @InterfaceAudience.Private
 public class CompactionPipeline {
   private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
 
   public final static long FIXED_OVERHEAD = ClassSize
-      .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
-  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
+      .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
 
   private final RegionServicesForStores region;
-  private LinkedList<ImmutableSegment> pipeline;
-  private long version;
+  private LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
+  private LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
+  private volatile long version = 0;
 
   public CompactionPipeline(RegionServicesForStores region) {
     this.region = region;
-    this.pipeline = new LinkedList<>();
-    this.version = 0;
   }
 
   public boolean pushHead(MutableSegment segment) {
     ImmutableSegment immutableSegment = SegmentFactory.instance().
         createImmutableSegment(segment);
     synchronized (pipeline){
-      return addFirst(immutableSegment);
+      boolean res = addFirst(immutableSegment);
+      readOnlyCopy = new LinkedList<>(pipeline);
+      return res;
     }
   }
 
   public VersionedSegmentsList getVersionedList() {
     synchronized (pipeline){
-      List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
-      return new VersionedSegmentsList(segmentList, version);
+      return new VersionedSegmentsList(readOnlyCopy, version);
     }
   }
 
@@ -93,8 +106,10 @@ public class CompactionPipeline {
    *        During index merge op this will be false and for compaction it will be true.
    * @return true iff swapped tail with new segment
    */
-  public boolean swap(
-      VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix)
{
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
+        justification="Increment is done under a synchronize block so safe")
+  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
+      boolean closeSuffix) {
     if (versionedList.getVersion() != version) {
       return false;
     }
@@ -115,6 +130,8 @@ public class CompactionPipeline {
             + ", and the number of cells in new segment is:" + count);
       }
       swapSuffix(suffix, segment, closeSuffix);
+      readOnlyCopy = new LinkedList<>(pipeline);
+      version++;
     }
     if (closeSuffix && region != null) {
       // update the global memstore size counter
@@ -193,35 +210,34 @@ public class CompactionPipeline {
   }
 
   public boolean isEmpty() {
-    return pipeline.isEmpty();
+    return readOnlyCopy.isEmpty();
   }
 
-  public List<Segment> getSegments() {
-    synchronized (pipeline){
-      return new LinkedList<>(pipeline);
-    }
+  public List<? extends Segment> getSegments() {
+    return readOnlyCopy;
   }
 
   public long size() {
-    return pipeline.size();
+    return readOnlyCopy.size();
   }
 
   public long getMinSequenceId() {
     long minSequenceId = Long.MAX_VALUE;
-    if (!isEmpty()) {
-      minSequenceId = pipeline.getLast().getMinSequenceId();
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
+    if (!localCopy.isEmpty()) {
+      minSequenceId = localCopy.getLast().getMinSequenceId();
     }
     return minSequenceId;
   }
 
   public MemstoreSize getTailSize() {
-    if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
-    return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
+    if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+    return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
   }
 
-  private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
+  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
-    version++;
     // During index merge we won't be closing the segments undergoing the merge. Segment#close()
     // will release the MSLAB chunks to pool. But in case of index merge there wont be any
data copy
     // from old MSLABs. So the new cells in new segment also refers to same chunks. In case
of data

http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 6e8f831..ceaadbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -325,6 +325,7 @@ public class TestHeapSize  {
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     expected += ClassSize.estimateBase(CompactionPipeline.class, false);
     expected += ClassSize.estimateBase(LinkedList.class, false);
+    expected += ClassSize.estimateBase(LinkedList.class, false);
     expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     if (expected != actual) {
@@ -333,6 +334,7 @@ public class TestHeapSize  {
       ClassSize.estimateBase(AtomicBoolean.class, true);
       ClassSize.estimateBase(CompactionPipeline.class, true);
       ClassSize.estimateBase(LinkedList.class, true);
+      ClassSize.estimateBase(LinkedList.class, true);
       ClassSize.estimateBase(MemStoreCompactor.class, true);
       ClassSize.estimateBase(AtomicBoolean.class, true);
       assertEquals(expected, actual);


Mime
View raw message