Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 20068200C01 for ; Wed, 4 Jan 2017 08:39:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1E8FE160B4A; Wed, 4 Jan 2017 07:39:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F26FF160B47 for ; Wed, 4 Jan 2017 08:39:11 +0100 (CET) Received: (qmail 91605 invoked by uid 500); 4 Jan 2017 07:39:07 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 86936 invoked by uid 99); 4 Jan 2017 07:39:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 07:39:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A55DDDFB86; Wed, 4 Jan 2017 07:39:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: syuanjiang@apache.org To: commits@hbase.apache.org Date: Wed, 04 Jan 2017 07:39:30 -0000 Message-Id: <98acae55ad474f90bf74e66b6d289205@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [28/50] [abbrv] hbase git commit: HBASE-17081 [Recommit]Flush the entire CompactingMemStore content to disk (Anastasia) archived-at: Wed, 04 Jan 2017 07:39:14 -0000 HBASE-17081 [Recommit]Flush the entire CompactingMemStore content to disk (Anastasia) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/463ffa79 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/463ffa79 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/463ffa79 Branch: refs/heads/hbase-12439 Commit: 463ffa792a23799d8cf2406321d1c8a3acacded1 Parents: 8fa5b0b Author: Ramkrishna Authored: Mon Dec 26 22:05:13 2016 +0530 Committer: Ramkrishna Committed: Mon Dec 26 22:05:13 2016 +0530 ---------------------------------------------------------------------- .../hbase/regionserver/AbstractMemStore.java | 35 +- .../hbase/regionserver/CompactingMemStore.java | 83 +++-- .../hbase/regionserver/CompactionPipeline.java | 34 +- .../regionserver/CompositeImmutableSegment.java | 352 +++++++++++++++++++ .../hbase/regionserver/DefaultMemStore.java | 23 +- .../hadoop/hbase/regionserver/HRegion.java | 5 +- .../hbase/regionserver/ImmutableSegment.java | 23 +- .../hbase/regionserver/MemStoreCompactor.java | 4 +- .../hadoop/hbase/regionserver/MemstoreSize.java | 25 +- .../hadoop/hbase/regionserver/Segment.java | 21 +- .../hbase/regionserver/SegmentFactory.java | 10 + .../regionserver/TestCompactingMemStore.java | 8 +- .../hbase/regionserver/TestDefaultMemStore.java | 12 +- .../TestWalAndCompactingMemStoreFlush.java | 238 +++++++------ 14 files changed, 698 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..8564045 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -159,14 +159,12 @@ public abstract class AbstractMemStore implements MemStore { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - try { - for (Segment segment : getSegments()) { - buf.append("Segment (" + i + ") " + segment.toString() + "; "); - i++; - } - } catch (IOException e){ - return e.toString(); + + for (Segment segment : getSegments()) { + buf.append("Segment (" + i + ") " + segment.toString() + "; "); + i++; } + return buf.toString(); } @@ -232,6 +230,7 @@ public abstract class AbstractMemStore implements MemStore { * @return Next row or null if none found. If one found, will be a new * KeyValue -- can be destroyed by subsequent calls to this method. */ + @VisibleForTesting protected Cell getNextRow(final Cell key, final NavigableSet set) { Cell result = null; @@ -249,6 +248,26 @@ public abstract class AbstractMemStore implements MemStore { return result; } + /** + * @param cell Find the row that comes after this one. If null, we return the + * first. + * @return Next row or null if none found. + */ + @VisibleForTesting + Cell getNextRow(final Cell cell) { + Cell lowest = null; + List segments = getSegments(); + for (Segment segment : segments) { + if (lowest == null) { + //TODO: we may want to move the getNextRow ability to the segment + lowest = getNextRow(cell, segment.getCellSet()); + } else { + lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); + } + } + return lowest; + } + private Cell maybeCloneWithAllocator(Cell cell) { return active.maybeCloneWithAllocator(cell); } @@ -307,6 +326,6 @@ public abstract class AbstractMemStore implements MemStore { /** * @return an ordered list of segments from most recent to oldest in memstore */ - protected abstract List getSegments() throws IOException; + protected abstract List getSegments(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index f8192a2..1cd30dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -72,6 +72,7 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + private boolean compositeSnapshot = true; public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, @@ -160,7 +161,12 @@ public class CompactingMemStore extends AbstractMemStore { stopCompaction(); pushActiveToPipeline(this.active); snapshotId = EnvironmentEdgeManager.currentTime(); - pushTailToSnapshot(); + // in both cases whatever is pushed to snapshot is cleared from the pipeline + if (compositeSnapshot) { + pushPipelineToSnapshot(); + } else { + pushTailToSnapshot(); + } } return new MemStoreSnapshot(snapshotId, this.snapshot); } @@ -173,8 +179,13 @@ public class CompactingMemStore extends AbstractMemStore { public MemstoreSize getFlushableSize() { MemstoreSize snapshotSize = getSnapshotSize(); if (snapshotSize.getDataSize() == 0) { - // if snapshot is empty the tail of the pipeline is flushed - snapshotSize = pipeline.getTailSize(); + // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed + if (compositeSnapshot) { + snapshotSize = pipeline.getPipelineSize(); + snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead()); + } else { + snapshotSize = pipeline.getTailSize(); + } } return snapshotSize.getDataSize() > 0 ? snapshotSize : new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); @@ -213,16 +224,28 @@ public class CompactingMemStore extends AbstractMemStore { } } + // the getSegments() method is used for tests only + @VisibleForTesting @Override public List getSegments() { List pipelineList = pipeline.getSegments(); List list = new ArrayList(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); - list.add(this.snapshot); + list.addAll(this.snapshot.getAllSegments()); + return list; } + // the following three methods allow to manipulate the settings of composite snapshot + public void setCompositeSnapshot(boolean useCompositeSnapshot) { + this.compositeSnapshot = useCompositeSnapshot; + } + + public boolean isCompositeSnapshot() { + return this.compositeSnapshot; + } + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { return pipeline.swap(versionedList, result, !merge); @@ -262,18 +285,20 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List pipelineList = pipeline.getSegments(); - long order = pipelineList.size(); - // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element - List list = new ArrayList(pipelineList.size() + 2); - list.add(this.active.getScanner(readPt, order + 1)); - for (Segment item : pipelineList) { - list.add(item.getScanner(readPt, order)); - order--; - } - list.add(this.snapshot.getScanner(readPt, order)); - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + + int order = 1; // for active segment + order += pipeline.size(); // for all segments in the pipeline + order += snapshot.getNumOfSegments(); // for all segments in the snapshot + // TODO: check alternatives to using this order + // The list of elements in pipeline + the active element + the snapshot segments + // The order is the Segment ordinal + List list = new ArrayList(order); + list.add(this.active.getScanner(readPt, order)); + order--; + list.addAll(pipeline.getScanners(readPt,order)); + order -= pipeline.size(); + list.addAll(snapshot.getScanners(readPt,order)); + return Collections.singletonList(new MemStoreScanner(getComparator(), list)); } /** @@ -380,6 +405,14 @@ public class CompactingMemStore extends AbstractMemStore { } } + private void pushPipelineToSnapshot() { + List segments = pipeline.drain(); + if (!segments.isEmpty()) { + this.snapshot = + SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); + } + } + private RegionServicesForStores getRegionServices() { return regionServices; } @@ -427,24 +460,6 @@ public class CompactingMemStore extends AbstractMemStore { compactor.initiateAction(compactionType); } - /** - * @param cell Find the row that comes after this one. If null, we return the - * first. - * @return Next row or null if none found. - */ - Cell getNextRow(final Cell cell) { - Cell lowest = null; - List segments = getSegments(); - for (Segment segment : segments) { - if (lowest == null) { - lowest = getNextRow(cell, segment.getCellSet()); - } else { - lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); - } - } - return lowest; - } - // debug method public void debug() { String msg = "active size=" + this.active.keySize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6676170..2fd2a14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -77,6 +78,19 @@ public class CompactionPipeline { } } + public List drain() { + int drainSize = pipeline.size(); + List result = new ArrayList(drainSize); + synchronized (pipeline){ + version++; + for(int i=0; i segmentList = new LinkedList(pipeline); @@ -193,8 +207,7 @@ public class CompactionPipeline { public List getSegments() { synchronized (pipeline){ - List res = new LinkedList(pipeline); - return res; + return new LinkedList(pipeline); } } @@ -202,6 +215,18 @@ public class CompactionPipeline { return pipeline.size(); } + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(this.pipeline.size()); + for (Segment segment : this.pipeline) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + assert order>=0; // order should never be negative so this is just a sanity check + } + return scanners; + } + + public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; if (!isEmpty()) { @@ -215,6 +240,11 @@ public class CompactionPipeline { return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); } + public MemstoreSize getPipelineSize() { + if (isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline)); + } + private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { version++; http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java new file mode 100644 index 0000000..4fdd2d0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -0,0 +1,352 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.SortedSet; + +/** + * The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports + * the interface of a single ImmutableSegments. + * The CompositeImmutableSegments is planned to be used only as a snapshot, + * thus only relevant interfaces are supported + */ +@InterfaceAudience.Private +public class CompositeImmutableSegment extends ImmutableSegment { + + private final List segments; + private final CellComparator comparator; + // CompositeImmutableSegment is used for snapshots and snapshot should + // support getTimeRangeTracker() interface. + // Thus we hold a constant TRT build in the construction time from TRT of the given segments. + private final TimeRangeTracker timeRangeTracker; + private long keySize = 0; + + // This scanner need to be remembered in order to close it when the snapshot is cleared. + // Initially CollectionBackedScanner didn't raise the scanner counters thus there was no + // need to close it. Now when MemStoreScanner is used instead we need to decrease the + // scanner counters. + private KeyValueScanner flushingScanner = null; + + public CompositeImmutableSegment(CellComparator comparator, List segments) { + super(comparator); + this.comparator = comparator; + this.segments = segments; + this.timeRangeTracker = new TimeRangeTracker(); + for (ImmutableSegment s : segments) { + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); + this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin()); + this.keySize += s.keySize(); + } + } + + @VisibleForTesting + public List getAllSegments() { + return new LinkedList(segments); + } + + public long getNumOfSegments() { + return segments.size(); + } + + /** + * Builds a special scanner for the MemStoreSnapshot object that is different than the + * general segment scanner. + * @return a special scanner for the MemStoreSnapshot object + */ + public KeyValueScanner getKeyValueScanner() { + KeyValueScanner scanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(Long.MAX_VALUE)); + } + + try { + scanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + flushingScanner = scanner; + return scanner; + } + + @Override + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(this.segments.size()); + for (Segment segment : this.segments) { + scanners.add(segment.getScanner(readPoint, order)); + // The order is the Segment ordinal + order--; + // order should never be negative so this is just a sanity check + order = (order<0) ? 0 : order; + } + return scanners; + } + + /** + * @return whether the segment has any cells + */ + public boolean isEmpty() { + for (ImmutableSegment s : segments) { + if (!s.isEmpty()) return false; + } + return true; + } + + /** + * @return number of cells in segment + */ + public int getCellsCount() { + int result = 0; + for (ImmutableSegment s : segments) { + result += s.getCellsCount(); + } + return result; + } + + /** + * @return the first cell in the segment that has equal or greater key than the given cell + */ + public Cell getFirstAfter(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Closing a segment before it is being discarded + */ + public void close() { + if (flushingScanner != null) { + flushingScanner.close(); + flushingScanner = null; + } + for (ImmutableSegment s : segments) { + s.close(); + } + } + + /** + * If the segment has a memory allocator the cell is being cloned to this space, and returned; + * otherwise the given cell is returned + * @return either the given cell or its clone + */ + public Cell maybeCloneWithAllocator(Cell cell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinTimestamp(){ + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Creates the scanner for the given read point + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + /** + * Creates the scanner for the given read point, and a specific order in a list + * @return a scanner for the given read point + */ + public KeyValueScanner getScanner(long readPoint, long order) { + KeyValueScanner resultScanner; + List list = new ArrayList(segments.size()); + for (ImmutableSegment s : segments) { + list.add(s.getScanner(readPoint,order)); + } + + try { + resultScanner = new MemStoreScanner(getComparator(), list); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + + return resultScanner; + } + + public boolean isTagsPresent() { + for (ImmutableSegment s : segments) { + if (s.isTagsPresent()) return true; + } + return false; + } + + public void incScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public void decScannerCount() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Setting the CellSet of the segment - used only for flat immutable segment for setting + * immutable CellSet after its creation in immutable segment constructor + * @return this object + */ + + protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return Sum of all cell's size. + */ + public long keySize() { + return this.keySize; + } + + /** + * @return The heap overhead of this segment. + */ + public long heapOverhead() { + long result = 0; + for (ImmutableSegment s : segments) { + result += s.heapOverhead(); + } + return result; + } + + /** + * Updates the heap size counter of the segment by the given delta + */ + protected void incSize(long delta, long heapOverhead) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void incHeapOverheadSize(long delta) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public long getMinSequenceId() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public TimeRangeTracker getTimeRangeTracker() { + return this.timeRangeTracker; + } + + //*** Methods for SegmentsScanner + public Cell last() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public Iterator iterator() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public SortedSet headSet(Cell firstKeyOnRow) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compare(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + public int compareRows(Cell left, Cell right) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * @return a set of all cells in the segment + */ + protected CellSet getCellSet() { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns the Cell comparator used by this segment + * @return the Cell comparator used by this segment + */ + protected CellComparator getComparator() { + return comparator; + } + + protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, + MemstoreSize memstoreSize) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + protected long heapOverheadChange(Cell cell, boolean succ) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + /** + * Returns a subset of the segment cell set, which starts with the given cell + * @param firstCell a cell in the segment + * @return a subset of the segment cell set, which starts with the given cell + */ + protected SortedSet tailSet(Cell firstCell) { + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + // Debug methods + /** + * Dumps all cells of the segment into the given log + */ + void dump(Log log) { + for (ImmutableSegment s : segments) { + s.dump(log); + } + } + + @Override + public String toString() { + StringBuilder sb = + new StringBuilder("This is CompositeImmutableSegment and those are its segments:: "); + for (ImmutableSegment s : segments) { + sb.append(s.toString()); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d4e6e12..76442e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -127,30 +128,20 @@ public class DefaultMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); list.add(this.active.getScanner(readPt, 1)); - list.add(this.snapshot.getScanner(readPt, 0)); - return Collections. singletonList( - new MemStoreScanner(getComparator(), list)); + list.addAll(this.snapshot.getScanners(readPt, 0)); + return Collections. singletonList(new MemStoreScanner(getComparator(), list)); } + // the getSegments() method is used for tests only + @VisibleForTesting @Override - protected List getSegments() throws IOException { + protected List getSegments() { List list = new ArrayList(2); list.add(this.active); - list.add(this.snapshot); + list.addAll(this.snapshot.getAllSegments()); return list; } - /** - * @param cell Find the row that comes after this one. If null, we return the - * first. - * @return Next row or null if none found. - */ - Cell getNextRow(final Cell cell) { - return getLowest( - getNextRow(cell, this.active.getCellSet()), - getNextRow(cell, this.snapshot.getCellSet())); - } - @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e11a31c..b664a4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -6483,8 +6484,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final Configuration conf, final HTableDescriptor hTableDescriptor, final WAL wal, final boolean initialize) throws IOException { - LOG.info("creating HRegion " + info.getTable().getNameAsString() - + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + + LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); FileSystem fs = FileSystem.get(conf); Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 4cdb29d..547d332 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.CollectionBackedScanner; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, @@ -69,6 +73,14 @@ public class ImmutableSegment extends Segment { ///////////////////// CONSTRUCTORS ///////////////////// /**------------------------------------------------------------------------ + * Empty C-tor to be used only for CompositeImmutableSegment + */ + protected ImmutableSegment(CellComparator comparator) { + super(comparator); + this.timeRange = null; + } + + /**------------------------------------------------------------------------ * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. * This C-tor should be used when active MutableSegment is pushed into the compaction * pipeline and becomes an ImmutableSegment. @@ -142,6 +154,15 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } + public long getNumOfSegments() { + return 1; + } + + public List getAllSegments() { + List res = new ArrayList(Arrays.asList(this)); + return res; + } + /**------------------------------------------------------------------------ * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * based on CellArrayMap. @@ -232,7 +253,7 @@ public class ImmutableSegment extends Segment { Cell curCell; int idx = 0; // create this segment scanner with maximal possible read point, to go over all Cells - SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE); + KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE); try { while ((curCell = segmentScanner.next()) != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 84f88f0..29fd78a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -56,7 +56,7 @@ public class MemStoreCompactor { // The upper bound for the number of segments we store in the pipeline prior to merging. // This constant is subject to further experimentation. - private static final int THRESHOLD_PIPELINE_SEGMENTS = 1; + private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); @@ -276,6 +276,8 @@ public class MemStoreCompactor { case NONE: action = Action.NOOP; break; case BASIC: action = Action.MERGE; + // if multiple segments appear in the pipeline flush them to the disk later together + compactingMemStore.setCompositeSnapshot(true); break; case EAGER: action = Action.COMPACT; break; http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java index 77cea51..fa7c342 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java @@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class MemstoreSize { - static final MemstoreSize EMPTY_SIZE = new MemstoreSize(); - private long dataSize; private long heapOverhead; + final private boolean isEmpty; + + static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true); public MemstoreSize() { dataSize = 0; heapOverhead = 0; + isEmpty = false; + } + + public MemstoreSize(boolean isEmpty) { + dataSize = 0; + heapOverhead = 0; + this.isEmpty = isEmpty; + } + + public boolean isEmpty() { + return isEmpty; } public MemstoreSize(long dataSize, long heapOverhead) { this.dataSize = dataSize; this.heapOverhead = heapOverhead; + this.isEmpty = false; } public void incMemstoreSize(long dataSize, long heapOverhead) { @@ -61,11 +74,13 @@ public class MemstoreSize { } public long getDataSize() { - return dataSize; + + return isEmpty ? 0 : dataSize; } public long getHeapOverhead() { - return heapOverhead; + + return isEmpty ? 0 : heapOverhead; } @Override @@ -74,7 +89,7 @@ public class MemstoreSize { return false; } MemstoreSize other = (MemstoreSize) obj; - return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead; + return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index afdfe6f..8581517 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +66,15 @@ public abstract class Segment { protected final TimeRangeTracker timeRangeTracker; protected volatile boolean tagsPresent; + // Empty constructor to be used when Segment is used as interface, + // and there is no need in true Segments state + protected Segment(CellComparator comparator) { + this.comparator = comparator; + this.dataSize = new AtomicLong(0); + this.heapOverhead = new AtomicLong(0); + this.timeRangeTracker = new TimeRangeTracker(); + } + // This constructor is used to create empty Segments. protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet.set(cellSet); @@ -91,7 +102,7 @@ public abstract class Segment { * Creates the scanner for the given read point * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint) { + public KeyValueScanner getScanner(long readPoint) { return new SegmentScanner(this, readPoint); } @@ -99,10 +110,16 @@ public abstract class Segment { * Creates the scanner for the given read point, and a specific order in a list * @return a scanner for the given read point */ - public SegmentScanner getScanner(long readPoint, long order) { + public KeyValueScanner getScanner(long readPoint, long order) { return new SegmentScanner(this, readPoint, order); } + public List getScanners(long readPoint, long order) { + List scanners = new ArrayList(1); + scanners.add(getScanner(readPoint, order)); + return scanners; + } + /** * @return whether the segment has any cells */ http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 01e07ef..7e53026 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -47,6 +47,13 @@ public final class SegmentFactory { return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf)); } + // create composite immutable segment from a list of segments + public CompositeImmutableSegment createCompositeImmutableSegment( + final CellComparator comparator, List segments) { + return new CompositeImmutableSegment(comparator, segments); + + } + // create new flat immutable segment from compacting old immutable segments public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, @@ -102,6 +109,9 @@ public final class SegmentFactory { private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List segments) { List mslabs = new ArrayList(); + if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { + return null; + } for (ImmutableSegment segment : segments) { mslabs.add(segment.getMemStoreLAB()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index b0b63a9..0c1880c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -137,6 +137,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR, store, regionServicesForStores, HColumnDescriptor.MemoryCompaction.EAGER); + this.memstore.add(kv1.clone(), null); // As compaction is starting in the background the repetition // of the k1 might be removed BUT the scanners created earlier @@ -177,6 +178,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Add more versions to make it a little more interesting. Thread.sleep(1); addRows(this.memstore); + ((CompactingMemStore)this.memstore).setCompositeSnapshot(true); + + Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); @@ -277,7 +281,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.memstore.upsert(l, 2, null);// readpoint is 2 MemstoreSize newSize = this.memstore.size(); - assert (newSize.getDataSize() > oldSize.getDataSize()); + assertTrue("\n<<< The old size is " + oldSize.getDataSize() + " and the new size is " + + newSize.getDataSize() + "\n", + newSize.getDataSize() > oldSize.getDataSize()); //The kv1 should be removed. assert (memstore.getActive().getCellsCount() == 2); http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 27ed295..93d28d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -65,8 +65,6 @@ import org.junit.rules.TestName; import org.junit.rules.TestRule; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -180,6 +178,10 @@ public class TestDefaultMemStore { // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; + +// assertTrue("\n<<< The memstore scanners without snapshot are: \n" + memstorescanners +// + "\n",false); + try { while (s.next(result)) { LOG.info(result); @@ -207,8 +209,10 @@ public class TestDefaultMemStore { s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; int snapshotIndex = 5; + try { while (s.next(result)) { + LOG.info(result); // Assert the stuff is coming out in right order. assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count))); @@ -216,6 +220,7 @@ public class TestDefaultMemStore { assertEquals("count=" + count + ", result=" + result, rowCount, result.size()); count++; if (count == snapshotIndex) { + MemStoreSnapshot snapshot = this.memstore.snapshot(); this.memstore.clearSnapshot(snapshot.getId()); // Added more rows into kvset. But the scanner wont see these rows. @@ -227,7 +232,8 @@ public class TestDefaultMemStore { } finally { s.close(); } - assertEquals(rowCount, count); + assertEquals("\n<<< The row count is " + rowCount + " and the iteration count is " + count, + rowCount, count); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/463ffa79/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 133c53b..332a125 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -22,13 +22,7 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -38,6 +32,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,40 +50,48 @@ public class TestWalAndCompactingMemStoreFlush { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); - public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", - "t1"); + public static final TableName TABLENAME = + TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); - public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), - Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; + public static final byte[][] FAMILIES = + { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; public static final byte[] FAMILY1 = FAMILIES[0]; public static final byte[] FAMILY2 = FAMILIES[1]; public static final byte[] FAMILY3 = FAMILIES[2]; private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { - int i=0; + MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE; + assertEquals(memstrsize1.getDataSize(), 0); + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); + int i = 0; HTableDescriptor htd = new HTableDescriptor(TABLENAME); for (byte[] family : FAMILIES) { HColumnDescriptor hcd = new HColumnDescriptor(family); // even column families are going to have compacted memstore + if(i%2 == 0) { hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.valueOf( conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); } else { hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.NONE); } + htd.addFamily(hcd); i++; } - + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); Path path = new Path(DIR, callingMethod); - return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); + HRegion result = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); + return result; } // A helper function to create puts. private Put createPut(int familyNum, int putNum) { - byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] qf = Bytes.toBytes("q" + familyNum); byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); Put p = new Put(row); @@ -98,7 +101,7 @@ public class TestWalAndCompactingMemStoreFlush { // A helper function to create double puts, so something can be compacted later. private Put createDoublePut(int familyNum, int putNum) { - byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] qf = Bytes.toBytes("q" + familyNum); byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); Put p = new Put(row); @@ -122,16 +125,21 @@ public class TestWalAndCompactingMemStoreFlush { byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), - r.getFamilyMap(family).get(qf)); + r.getFamilyMap(family).get(qf)); assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), - Arrays.equals(r.getFamilyMap(family).get(qf), val)); + Arrays.equals(r.getFamilyMap(family).get(qf), val)); } + @Before public void setUp() throws Exception { + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); + } + + // test selective flush with data-compaction @Test(timeout = 180000) public void testSelectiveFlushWithEager() throws IOException { - // Set up the configuration Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class.getName()); @@ -175,17 +183,14 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); // Get the overall smallest LSN in the region's memstores. - long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseI = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); String s = "\n\n----------------------------------\n" - + "Upon initial insert and before any flush, size of CF1 is:" - + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" - + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" - + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" - + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" - + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" - + region.getStore(FAMILY3).isSloppyMemstore() + "\n"; + + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" + + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" + + region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI + + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemstore() + "\n"; // The overall smallest LSN in the region's memstores should be the same as // the LSN of the smallest edit in CF1 @@ -200,12 +205,12 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - String msg = "totalMemstoreSize="+totalMemstoreSize + - " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + - " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + - " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() - + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + String msg = "totalMemstoreSize=" + totalMemstoreSize + + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI + + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; + assertEquals(msg, totalMemstoreSize, + cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); // Flush!!!!!!!!!!!!!!!!!!!!!! // We have big compacting memstore CF1 and two small memstores: @@ -225,8 +230,8 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseII = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // Find the smallest LSNs for edits wrt to each CF. long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); @@ -260,16 +265,20 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" + + smallestSeqCF3PhaseII + "\n"; // How much does the CF1 memstore occupy? Will be used later. MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore(); long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII - + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; - + + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" + + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region + .getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4]) + .getFlushedCellsSize() + "; the entire region size is: " + region.getMemstoreSize() + "\n"; + ; // Flush!!!!!!!!!!!!!!!!!!!!!! // Flush again, CF1 is flushed to disk @@ -282,21 +291,22 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseIV = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" - + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV - + "\n"; + + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region + .getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4]) + .getFlushedCellsSize() + "\n"; s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV - + "\n"; + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" + + smallestSeqCF3PhaseIV + "\n" + "the entire region size is: " + region.getMemstoreSize() + "\n"; // CF1's pipeline component (inserted before first flush) should be flushed to disk // CF2 should be flushed to disk @@ -321,13 +331,21 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseV = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - assertEquals(MemstoreSize.EMPTY_SIZE , cf1MemstoreSizePhaseV); + assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV); + s = s + "----AFTER THIRD FLUSH, the entire region size is:" + region.getMemstoreSize() + + " (empty memstore size is " + MemstoreSize.EMPTY_SIZE + + "), while the sizes of each memstore are as following \ncf1: " + cf1MemstoreSizePhaseV + + ", cf2: " + cf2MemstoreSizePhaseV + ", cf3: " + cf3MemstoreSizePhaseV + ", cf4: " + region + .getStore(FAMILIES[4]).getSizeOfMemStore() + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + + ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize() + + ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n"; + // What happens when we hit the memstore limit, but we are not able to find // any Column Family above the threshold? // In that case, we should flush all the CFs. @@ -345,24 +363,22 @@ public class TestWalAndCompactingMemStoreFlush { region.flush(false); - s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " + s = s + "----AFTER FORTH FLUSH, The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseV + ". After additional inserts and last flush, the entire region size is:" + region - .getMemstoreSize() - + "\n----------------------------------\n"; + .getMemstoreSize() + "\n----------------------------------\n"; // Since we won't find any CF above the threshold, and hence no specific // store to flush, we should flush all the memstores // Also compacted memstores are flushed to disk. - assertEquals(0, region.getMemstoreSize()); + assertEquals(s, 0, region.getMemstoreSize()); System.out.println(s); HBaseTestingUtility.closeRegionAndWAL(region); } /*------------------------------------------------------------------------------*/ /* Check the same as above but for index-compaction type of compacting memstore */ - @Test(timeout = 180000) - public void testSelectiveFlushWithIndexCompaction() throws IOException { + @Test(timeout = 180000) public void testSelectiveFlushWithIndexCompaction() throws IOException { /*------------------------------------------------------------------------------*/ /* SETUP */ @@ -379,7 +395,7 @@ public class TestWalAndCompactingMemStoreFlush { // Initialize the region Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); - + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); /*------------------------------------------------------------------------------*/ /* PHASE I - insertions */ // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 @@ -410,8 +426,8 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); // Get the overall smallest LSN in the region's memstores. - long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseI = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); /*------------------------------------------------------------------------------*/ /* PHASE I - validation */ @@ -427,8 +443,8 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() - + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + assertEquals(totalMemstoreSizePhaseI, + cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); /*------------------------------------------------------------------------------*/ /* PHASE I - Flush */ @@ -459,8 +475,8 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseII = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // Find the smallest LSNs for edits wrt to each CF. long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); long totalMemstoreSizePhaseII = region.getMemstoreSize(); @@ -468,13 +484,13 @@ public class TestWalAndCompactingMemStoreFlush { /*------------------------------------------------------------------------------*/ /* PHASE II - validation */ // CF1 was flushed to memory, should be flattened and take less space - assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize()); + assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead()); // CF2 should become empty assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII); // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) // if compacted CF# should be at least twice less because its every key was duplicated - assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize()); + assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); assertTrue( cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead()); @@ -484,8 +500,8 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline // items in CF1/2 - assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() - + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); + assertEquals(totalMemstoreSizePhaseII, + cf1MemstoreSizePhaseII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); /*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/ @@ -513,8 +529,8 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline // items in CF1/2 - assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() - + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); + assertEquals(totalMemstoreSizePhaseIII, + cf1MemstoreSizePhaseIII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); /*------------------------------------------------------------------------------*/ /* PHASE III - Flush */ @@ -530,8 +546,8 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseIV = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); /*------------------------------------------------------------------------------*/ @@ -561,8 +577,8 @@ public class TestWalAndCompactingMemStoreFlush { MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore(); - long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseV = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); long totalMemstoreSizePhaseV = region.getMemstoreSize(); /*------------------------------------------------------------------------------*/ @@ -617,22 +633,30 @@ public class TestWalAndCompactingMemStoreFlush { HBaseTestingUtility.closeRegionAndWAL(region); } - @Test(timeout = 180000) - public void testSelectiveFlushAndWALinDataCompaction() throws IOException { + // test WAL behavior together with selective flush while data-compaction + @Test(timeout = 180000) public void testDCwithWAL() throws IOException { + + MemstoreSize checkSize = MemstoreSize.EMPTY_SIZE; + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class - .getName()); - conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * - 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); // set memstore to do data compaction and not to use the speculative scan conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(HColumnDescriptor.MemoryCompaction.EAGER)); + MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE; + assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); // Intialize the HRegion HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + + MemstoreSize cf2MemstoreSizePhase0 = region.getStore(FAMILY2).getSizeOfMemStore(); + MemstoreSize cf1MemstoreSizePhase0 = region.getStore(FAMILY1).getSizeOfMemStore(); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); @@ -652,6 +676,7 @@ public class TestWalAndCompactingMemStoreFlush { // Find the sizes of the memstores of each CF. MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore(); + //boolean oldCF2 = region.getStore(FAMILY2).isSloppyMemstore(); MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); @@ -662,16 +687,20 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - String msg = "totalMemstoreSize="+totalMemstoreSize + - " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + - " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + - " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() - + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + String msg = "\n<<< totalMemstoreSize=" + totalMemstoreSize + + " DefaultMemStore.DEEP_OVERHEAD=" + DefaultMemStore.DEEP_OVERHEAD + + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI + + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; + assertEquals(msg, totalMemstoreSize, + cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + + cf3MemstoreSizePhaseI.getDataSize()); // Flush! CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; + MemStore cms2 = ((HStore) region.getStore(FAMILY2)).memstore; + MemstoreSize memstrsize2 = cms2.getSnapshotSize(); + MemstoreSize flshsize2 = cms2.getFlushableSize(); CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; cms1.flushInMemory(); cms3.flushInMemory(); @@ -684,15 +713,22 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + MemstoreSize newSize = new MemstoreSize(); // CF2 should have been cleared - assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII); - - String s = "\n\n----------------------------------\n" - + "Upon initial insert and flush, LSN of CF1 is:" - + smallestSeqCF1PhaseII + ". LSN of CF2 is:" - + smallestSeqCF2PhaseII + ". LSN of CF3 is:" - + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:" + assertEquals( + msg + "\n<<< CF2 is compacting " + ((HStore) region.getStore(FAMILY2)).memstore.isSloppy() + + ", snapshot and flushable size BEFORE flush " + memstrsize2 + "; " + flshsize2 + + ", snapshot and flushable size AFTER flush " + cms2.getSnapshotSize() + "; " + cms2 + .getFlushableSize() + "\n<<< cf2 size " + cms2.size() + "; the checked size " + + cf2MemstoreSizePhaseII + "; memstore empty size " + MemstoreSize.EMPTY_SIZE + + "; check size " + checkSize + "\n<<< first first first CF2 size " + + cf2MemstoreSizePhase0 + "; first first first CF1 size " + cf1MemstoreSizePhase0 + + "; new new new size " + newSize + "\n", MemstoreSize.EMPTY_SIZE, + cf2MemstoreSizePhaseII); + + String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:" + + smallestSeqCF1PhaseII + ". LSN of CF2 is:" + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; // Add same entries to compact them later @@ -718,8 +754,8 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n"; + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" + + smallestSeqCF3PhaseIII + "\n"; // Flush! cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; @@ -736,20 +772,22 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n"; + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" + + smallestSeqCF3PhaseIV + "\n"; // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction - assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV > - smallestSeqInRegionCurrentMemstorePhaseIII); + assertTrue(s, + smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); HBaseTestingUtility.closeRegionAndWAL(region); } + // test WAL behavior together with selective flush while index-compaction @Test(timeout = 180000) - public void testSelectiveFlushAndWALinIndexCompaction() throws IOException { + public void tstICwithWAL() throws IOException { + // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);