Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C81819BC0 for ; Fri, 25 Mar 2016 07:24:05 +0000 (UTC) Received: (qmail 22804 invoked by uid 500); 25 Mar 2016 07:24:00 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 22664 invoked by uid 500); 25 Mar 2016 07:24:00 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 22550 invoked by uid 99); 25 Mar 2016 07:24:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Mar 2016 07:24:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16C3BDFB8A; Fri, 25 Mar 2016 07:23:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Fri, 25 Mar 2016 07:24:00 -0000 Message-Id: <3f9b0162abd54ed993a6393ef5d6f5f0@git.apache.org> In-Reply-To: <0b70024260fb4a1784fc93d5aa35f83e@git.apache.org> References: <0b70024260fb4a1784fc93d5aa35f83e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hbase git commit: HBASE-15389 Write out multiple files when compaction HBASE-15389 Write out multiple files when compaction Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11d11d3f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11d11d3f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11d11d3f Branch: refs/heads/master Commit: 11d11d3fcc591227cccf3531b911e46c68774501 Parents: e9c4f12 Author: zhangduo Authored: Fri Mar 25 15:07:54 2016 +0800 Committer: zhangduo Committed: Fri Mar 25 15:07:54 2016 +0800 ---------------------------------------------------------------------- .../regionserver/AbstractMultiFileWriter.java | 120 +++++++ .../regionserver/DateTieredMultiFileWriter.java | 83 +++++ .../hadoop/hbase/regionserver/StoreFile.java | 9 +- .../regionserver/StripeMultiFileWriter.java | 239 ++++++-------- .../hbase/regionserver/StripeStoreFlusher.java | 30 +- .../AbstractMultiOutputCompactor.java | 161 +++++++++ .../regionserver/compactions/Compactor.java | 10 +- .../compactions/DateTieredCompactor.java | 86 +++++ .../compactions/DefaultCompactor.java | 4 +- .../compactions/StripeCompactionPolicy.java | 13 +- .../compactions/StripeCompactor.java | 169 +++------- .../hbase/regionserver/TestStripeCompactor.java | 325 ------------------- .../regionserver/compactions/TestCompactor.java | 212 ++++++++++++ .../compactions/TestDateTieredCompactor.java | 169 ++++++++++ .../compactions/TestStripeCompactionPolicy.java | 24 +- .../compactions/TestStripeCompactor.java | 223 +++++++++++++ 16 files changed, 1272 insertions(+), 605 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java new file mode 100644 index 0000000..4987c59 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; + +/** + * Base class for cell sink that separates the provided cells into multiple files. + */ +@InterfaceAudience.Private +public abstract class AbstractMultiFileWriter implements CellSink { + + private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class); + + /** Factory that is used to produce single StoreFile.Writer-s */ + protected WriterFactory writerFactory; + + /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ + protected StoreScanner sourceScanner; + + public interface WriterFactory { + public StoreFile.Writer createWriter() throws IOException; + } + + /** + * Initializes multi-writer before usage. + * @param sourceScanner Optional store scanner to obtain the information about read progress. + * @param factory Factory used to produce individual file writers. + */ + public void init(StoreScanner sourceScanner, WriterFactory factory) { + this.writerFactory = factory; + this.sourceScanner = sourceScanner; + } + + /** + * Commit all writers. + *

+ * Notice that here we use the same maxSeqId for all output files since we haven't + * find an easy to find enough sequence ids for different output files in some corner cases. See + * comments in HBASE-15400 for more details. + */ + public List commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { + preCommitWriters(); + Collection writers = this.writers(); + if (LOG.isDebugEnabled()) { + LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + + ", majorCompaction=" + majorCompaction); + } + List paths = new ArrayList(); + for (Writer writer : writers) { + if (writer == null) { + continue; + } + writer.appendMetadata(maxSeqId, majorCompaction); + preCloseWriter(writer); + paths.add(writer.getPath()); + writer.close(); + } + return paths; + } + + /** + * Close all writers without throwing any exceptions. This is used when compaction failed usually. + */ + public List abortWriters() { + List paths = new ArrayList(); + for (StoreFile.Writer writer : writers()) { + try { + if (writer != null) { + paths.add(writer.getPath()); + writer.close(); + } + } catch (Exception ex) { + LOG.error("Failed to close the writer after an unfinished compaction.", ex); + } + } + return paths; + } + + protected abstract Collection writers(); + + /** + * Subclasses override this method to be called at the end of a successful sequence of append; all + * appends are processed before this method is called. + */ + protected void preCommitWriters() throws IOException { + } + + /** + * Subclasses override this method to be called before we close the give writer. Usually you can + * append extra metadata to the writer. + */ + protected void preCloseWriter(StoreFile.Writer writer) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java new file mode 100644 index 0000000..f0bd444 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; + +/** + * class for cell sink that separates the provided cells into multiple files for date tiered + * compaction. + */ +@InterfaceAudience.Private +public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { + + private final NavigableMap lowerBoundary2Writer + = new TreeMap(); + + private final boolean needEmptyFile; + + /** + * @param needEmptyFile whether need to create an empty store file if we haven't written out + * anything. + */ + public DateTieredMultiFileWriter(List lowerBoundaries, boolean needEmptyFile) { + for (Long lowerBoundary : lowerBoundaries) { + lowerBoundary2Writer.put(lowerBoundary, null); + } + this.needEmptyFile = needEmptyFile; + } + + @Override + public void append(Cell cell) throws IOException { + Map.Entry entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp()); + StoreFile.Writer writer = entry.getValue(); + if (writer == null) { + writer = writerFactory.createWriter(); + lowerBoundary2Writer.put(entry.getKey(), writer); + } + writer.append(cell); + } + + @Override + protected Collection writers() { + return lowerBoundary2Writer.values(); + } + + @Override + protected void preCommitWriters() throws IOException { + if (!needEmptyFile) { + return; + } + for (StoreFile.Writer writer : lowerBoundary2Writer.values()) { + if (writer != null) { + return; + } + } + // we haven't written out any data, create an empty file to retain metadata + lowerBoundary2Writer.put(lowerBoundary2Writer.firstKey(), writerFactory.createWriter()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index b6164b2..868bee0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -261,6 +261,13 @@ public class StoreFile { } /** + * Clone a StoreFile for opening private reader. + */ + public StoreFile cloneForReader() { + return new StoreFile(this); + } + + /** * @return the StoreFile object associated to this StoreFile. * null if the StoreFile is not a reference. */ @@ -294,7 +301,7 @@ public class StoreFile { * @return True if this is HFile. */ public boolean isHFile() { - return this.fileInfo.isHFile(this.fileInfo.getPath()); + return StoreFileInfo.isHFile(this.fileInfo.getPath()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index 651b863..1c3f14c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -20,52 +20,36 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.util.Bytes; /** - * Base class for cell sink that separates the provided cells into multiple files. + * Base class for cell sink that separates the provided cells into multiple files for stripe + * compaction. */ @InterfaceAudience.Private -public abstract class StripeMultiFileWriter implements Compactor.CellSink { - private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); +public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { - /** Factory that is used to produce single StoreFile.Writer-s */ - protected WriterFactory writerFactory; - protected CellComparator comparator; + private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); + protected final CellComparator comparator; protected List existingWriters; protected List boundaries; - /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ - protected StoreScanner sourceScanner; /** Whether to write stripe metadata */ private boolean doWriteStripeMetadata = true; - public interface WriterFactory { - public StoreFile.Writer createWriter() throws IOException; - } - - /** - * Initializes multi-writer before usage. - * @param sourceScanner Optional store scanner to obtain the information about read progress. - * @param factory Factory used to produce individual file writers. - * @param comparator Comparator used to compare rows. - */ - public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator) - throws IOException { - this.writerFactory = factory; - this.sourceScanner = sourceScanner; + public StripeMultiFileWriter(CellComparator comparator) { this.comparator = comparator; } @@ -73,41 +57,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { this.doWriteStripeMetadata = false; } - public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { + @Override + protected Collection writers() { + return existingWriters; + } + + protected abstract void preCommitWritersInternal() throws IOException; + + @Override + protected final void preCommitWriters() throws IOException { + // do some sanity check here. assert this.existingWriters != null; - commitWritersInternal(); + preCommitWritersInternal(); assert this.boundaries.size() == (this.existingWriters.size() + 1); - LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w") - + "riting out metadata for " + this.existingWriters.size() + " writers"); - List paths = new ArrayList(); - for (int i = 0; i < this.existingWriters.size(); ++i) { - StoreFile.Writer writer = this.existingWriters.get(i); - if (writer == null) continue; // writer was skipped due to 0 KVs - if (doWriteStripeMetadata) { - writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i)); - writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1)); - } - writer.appendMetadata(maxSeqId, isMajor); - paths.add(writer.getPath()); - writer.close(); - } - this.existingWriters = null; - return paths; } - public List abortWriters() { - assert this.existingWriters != null; - List paths = new ArrayList(); - for (StoreFile.Writer writer : this.existingWriters) { - try { - paths.add(writer.getPath()); - writer.close(); - } catch (Exception ex) { - LOG.error("Failed to close the writer after an unfinished compaction.", ex); + @Override + protected void preCloseWriter(Writer writer) throws IOException { + if (doWriteStripeMetadata) { + if (LOG.isDebugEnabled()) { + LOG.debug("Write stripe metadata for " + writer.getPath().toString()); + } + int index = existingWriters.indexOf(writer); + writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index)); + writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString()); } } - this.existingWriters = null; - return paths; } /** @@ -115,13 +93,12 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { * @param left The left boundary of the writer. * @param cell The cell whose row has to be checked. */ - protected void sanityCheckLeft( - byte[] left, Cell cell) throws IOException { - if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) && - comparator.compareRows(cell, left, 0, left.length) < 0) { - String error = "The first row is lower than the left boundary of [" + Bytes.toString(left) - + "]: [" + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - + "]"; + protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException { + if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) + && comparator.compareRows(cell, left, 0, left.length) < 0) { + String error = + "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: [" + + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; LOG.error(error); throw new IOException(error); } @@ -131,28 +108,22 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { * Subclasses can call this method to make sure the last KV is within multi-writer range. * @param right The right boundary of the writer. */ - protected void sanityCheckRight( - byte[] right, Cell cell) throws IOException { - if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) && - comparator.compareRows(cell, right, 0, right.length) >= 0) { - String error = "The last row is higher or equal than the right boundary of [" - + Bytes.toString(right) + "]: [" - + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; + protected void sanityCheckRight(byte[] right, Cell cell) throws IOException { + if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) + && comparator.compareRows(cell, right, 0, right.length) >= 0) { + String error = + "The last row is higher or equal than the right boundary of [" + Bytes.toString(right) + + "]: [" + + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]"; LOG.error(error); throw new IOException(error); } } /** - * Subclasses override this method to be called at the end of a successful sequence of - * append; all appends are processed before this method is called. - */ - protected abstract void commitWritersInternal() throws IOException; - - /** - * MultiWriter that separates the cells based on fixed row-key boundaries. - * All the KVs between each pair of neighboring boundaries from the list supplied to ctor - * will end up in one file, and separate from all other such pairs. + * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between + * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and + * separate from all other such pairs. */ public static class BoundaryMultiWriter extends StripeMultiFileWriter { private StoreFile.Writer currentWriter; @@ -165,31 +136,28 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { /** * @param targetBoundaries The boundaries on which writers/files are separated. - * @param majorRangeFrom Major range is the range for which at least one file should be - * written (because all files are included in compaction). - * majorRangeFrom is the left boundary. + * @param majorRangeFrom Major range is the range for which at least one file should be written + * (because all files are included in compaction). majorRangeFrom is the left boundary. * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom). */ - public BoundaryMultiWriter(List targetBoundaries, + public BoundaryMultiWriter(CellComparator comparator, List targetBoundaries, byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException { - super(); + super(comparator); this.boundaries = targetBoundaries; this.existingWriters = new ArrayList(this.boundaries.size() - 1); // "major" range (range for which all files are included) boundaries, if any, // must match some target boundaries, let's find them. - assert (majorRangeFrom == null) == (majorRangeTo == null); + assert (majorRangeFrom == null) == (majorRangeTo == null); if (majorRangeFrom != null) { - majorRangeFromIndex = Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) - ? 0 - : Collections.binarySearch(boundaries, majorRangeFrom, - Bytes.BYTES_COMPARATOR); - majorRangeToIndex = Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) - ? boundaries.size() - : Collections.binarySearch(boundaries, majorRangeTo, - Bytes.BYTES_COMPARATOR); + majorRangeFromIndex = + Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) ? 0 : Collections + .binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR); + majorRangeToIndex = + Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) ? boundaries.size() + : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR); if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) { - throw new IOException("Major range does not match writer boundaries: [" + - Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from " + throw new IOException("Major range does not match writer boundaries: [" + + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from " + majorRangeFromIndex + " to " + majorRangeToIndex); } } @@ -199,8 +167,7 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { public void append(Cell cell) throws IOException { if (currentWriter == null && existingWriters.isEmpty()) { // First append ever, do a sanity check. - sanityCheckLeft(this.boundaries.get(0), - cell); + sanityCheckLeft(this.boundaries.get(0), cell); } prepareWriterFor(cell); currentWriter.append(cell); @@ -209,19 +176,18 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { } private boolean isCellAfterCurrentWriter(Cell cell) { - return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) && - (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0); + return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) + && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0); } @Override - protected void commitWritersInternal() throws IOException { + protected void preCommitWritersInternal() throws IOException { stopUsingCurrentWriter(); while (existingWriters.size() < boundaries.size() - 1) { createEmptyWriter(); } if (lastCell != null) { - sanityCheckRight(boundaries.get(boundaries.size() - 1), - lastCell); + sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell); } } @@ -241,14 +207,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { } /** - * Called if there are no cells for some stripe. - * We need to have something in the writer list for this stripe, so that writer-boundary - * list indices correspond to each other. We can insert null in the writer list for that - * purpose, except in the following cases where we actually need a file: - * 1) If we are in range for which we are compacting all the files, we need to create an - * empty file to preserve stripe metadata. - * 2) If we have not produced any file at all for this compactions, and this is the - * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059). + * Called if there are no cells for some stripe. We need to have something in the writer list + * for this stripe, so that writer-boundary list indices correspond to each other. We can insert + * null in the writer list for that purpose, except in the following cases where we actually + * need a file: 1) If we are in range for which we are compacting all the files, we need to + * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all + * for this compactions, and this is the last chance (the last stripe), we need to preserve last + * seqNum (see also HBASE-6059). */ private void createEmptyWriter() throws IOException { int index = existingWriters.size(); @@ -258,12 +223,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { boolean needEmptyFile = isInMajorRange || isLastWriter; existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null); hasAnyWriter |= needEmptyFile; - currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) - ? null : boundaries.get(existingWriters.size() + 1); + currentWriterEndKey = + (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters + .size() + 1); } private void checkCanCreateWriter() throws IOException { - int maxWriterCount = boundaries.size() - 1; + int maxWriterCount = boundaries.size() - 1; assert existingWriters.size() <= maxWriterCount; if (existingWriters.size() >= maxWriterCount) { throw new IOException("Cannot create any more writers (created " + existingWriters.size() @@ -280,16 +246,16 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { cellsInCurrentWriter = 0; } currentWriter = null; - currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) - ? null : boundaries.get(existingWriters.size() + 1); + currentWriterEndKey = + (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters + .size() + 1); } } /** - * MultiWriter that separates the cells based on target cell number per file and file count. - * New file is started every time the target number of KVs is reached, unless the fixed - * count of writers has already been created (in that case all the remaining KVs go into - * the last writer). + * MultiWriter that separates the cells based on target cell number per file and file count. New + * file is started every time the target number of KVs is reached, unless the fixed count of + * writers has already been created (in that case all the remaining KVs go into the last writer). */ public static class SizeMultiWriter extends StripeMultiFileWriter { private int targetCount; @@ -310,8 +276,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { * @param left The left boundary of the first writer. * @param right The right boundary of the last writer. */ - public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) { - super(); + public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left, + byte[] right) { + super(comparator); this.targetCount = targetCount; this.targetCells = targetKvs; this.left = left; @@ -331,11 +298,11 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { sanityCheckLeft(left, cell); doCreateWriter = true; } else if (lastRowInCurrentWriter != null - && !CellUtil.matchingRow(cell, - lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) { + && !CellUtil.matchingRow(cell, lastRowInCurrentWriter, 0, + lastRowInCurrentWriter.length)) { if (LOG.isDebugEnabled()) { LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter) - + "] row; wrote out " + cellsInCurrentWriter + " kvs"); + + "] row; wrote out " + cellsInCurrentWriter + " kvs"); } lastRowInCurrentWriter = null; cellsInCurrentWriter = 0; @@ -343,7 +310,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { doCreateWriter = true; } if (doCreateWriter) { - byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); // make a copy + // make a copy + byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); if (LOG.isDebugEnabled()) { LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]"); } @@ -357,34 +325,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { ++cellsInCurrentWriter; cellsSeen = cellsInCurrentWriter; if (this.sourceScanner != null) { - cellsSeen = Math.max(cellsSeen, - this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious); + cellsSeen = + Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned() + - cellsSeenInPrevious); } // If we are not already waiting for opportunity to close, start waiting if we can // create any more writers and if the current one is too big. - if (lastRowInCurrentWriter == null - && existingWriters.size() < targetCount + if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount && cellsSeen >= targetCells) { lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy if (LOG.isDebugEnabled()) { - LOG.debug("Preparing to start a new writer after [" + Bytes.toString( - lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out " - + cellsInCurrentWriter + " kvs"); + LOG.debug("Preparing to start a new writer after [" + + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen + + " kvs and wrote out " + cellsInCurrentWriter + " kvs"); } } } @Override - protected void commitWritersInternal() throws IOException { + protected void preCommitWritersInternal() throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" + - ((this.sourceScanner == null) ? "" : ("; observed estimated " + LOG.debug("Stopping with " + + cellsInCurrentWriter + + " kvs in last writer" + + ((this.sourceScanner == null) ? "" : ("; observed estimated " + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total"))); } if (lastCell != null) { - sanityCheckRight( - right, lastCell); + sanityCheckRight(right, lastCell); } // When expired stripes were going to be merged into one, and if no writer was created during http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 9a06a88..34e8497 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -69,7 +70,8 @@ public class StripeStoreFlusher extends StoreFlusher { } // Let policy select flush method. - StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount); + StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes, + cellsCount); boolean success = false; StripeMultiFileWriter mw = null; @@ -78,7 +80,7 @@ public class StripeStoreFlusher extends StoreFlusher { StripeMultiFileWriter.WriterFactory factory = createWriterFactory( snapshot.getTimeRangeTracker(), cellsCount); StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; - mw.init(storeScanner, factory, store.getComparator()); + mw.init(storeScanner, factory); synchronized (flushLock) { performFlush(scanner, mw, smallestReadPoint, throughputController); @@ -123,10 +125,17 @@ public class StripeStoreFlusher extends StoreFlusher { /** Stripe flush request wrapper that writes a non-striped file. */ public static class StripeFlushRequest { + + protected final CellComparator comparator; + + public StripeFlushRequest(CellComparator comparator) { + this.comparator = comparator; + } + @VisibleForTesting public StripeMultiFileWriter createWriter() throws IOException { - StripeMultiFileWriter writer = - new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY); + StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1, + Long.MAX_VALUE, OPEN_KEY, OPEN_KEY); writer.setNoStripeMetadata(); return writer; } @@ -137,13 +146,15 @@ public class StripeStoreFlusher extends StoreFlusher { private final List targetBoundaries; /** @param targetBoundaries New files should be written with these boundaries. */ - public BoundaryStripeFlushRequest(List targetBoundaries) { + public BoundaryStripeFlushRequest(CellComparator comparator, List targetBoundaries) { + super(comparator); this.targetBoundaries = targetBoundaries; } @Override public StripeMultiFileWriter createWriter() throws IOException { - return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null); + return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null, + null); } } @@ -157,15 +168,16 @@ public class StripeStoreFlusher extends StoreFlusher { * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than * total number of kvs, all the overflow data goes into the last stripe. */ - public SizeStripeFlushRequest(int targetCount, long targetKvs) { + public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) { + super(comparator); this.targetCount = targetCount; this.targetKvs = targetKvs; } @Override public StripeMultiFileWriter createWriter() throws IOException { - return new StripeMultiFileWriter.SizeMultiWriter( - this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY); + return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs, + OPEN_KEY, OPEN_KEY); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java new file mode 100644 index 0000000..29d8561 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +import com.google.common.io.Closeables; + +/** + * Base class for implementing a Compactor which will generate multiple output files after + * compaction. + */ +@InterfaceAudience.Private +public abstract class AbstractMultiOutputCompactor + extends Compactor { + + private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class); + + public AbstractMultiOutputCompactor(Configuration conf, Store store) { + super(conf, store); + } + + protected interface InternalScannerFactory { + + ScanType getScanType(CompactionRequest request); + + InternalScanner createScanner(List scanners, ScanType scanType, + FileDetails fd, long smallestReadPoint) throws IOException; + } + + protected List compact(T writer, final CompactionRequest request, + InternalScannerFactory scannerFactory, ThroughputController throughputController, User user) + throws IOException { + final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); + this.progress = new CompactionProgress(fd.maxKeyCount); + + // Find the smallest read point across all the Scanners. + long smallestReadPoint = getSmallestReadPoint(); + + List scanners; + Collection readersToClose; + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(f.cloneForReader()); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); + } + InternalScanner scanner = null; + boolean finished = false; + try { + /* Include deletes, unless we are doing a major compaction */ + ScanType scanType = scannerFactory.getScanType(request); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + if (scanner == null) { + scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); + } + scanner = postCreateCoprocScanner(request, scanType, scanner, user); + if (scanner == null) { + // NULL scanner returned from coprocessor hooks means skip normal processing. + return new ArrayList(); + } + boolean cleanSeqId = false; + if (fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } + // Create the writer factory for compactions. + final boolean needMvcc = fd.maxMVCCReadpoint >= 0; + WriterFactory writerFactory = new WriterFactory() { + @Override + public Writer createWriter() throws IOException { + return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc, + fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); + } + }; + // Prepare multi-writer, and perform the compaction using scanner and writer. + // It is ok here if storeScanner is null. + StoreScanner storeScanner + = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; + writer.init(storeScanner, writerFactory); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles()); + if (!finished) { + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); + } + } finally { + Closeables.close(scanner, true); + for (StoreFile f : readersToClose) { + try { + f.closeReader(true); + } catch (IOException e) { + LOG.warn("Exception closing " + f, e); + } + } + if (!finished) { + FileSystem fs = store.getFileSystem(); + for (Path leftoverFile : writer.abortWriters()) { + try { + fs.delete(leftoverFile, false); + } catch (IOException e) { + LOG.error("Failed to delete the leftover file " + leftoverFile + + " after an unfinished compaction.", + e); + } + } + } + } + assert finished : "We should have exited the method on all error paths"; + return commitMultiWriter(writer, fd, request); + } + + protected abstract List commitMultiWriter(T writer, FileDetails fd, + CompactionRequest request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 0e6ab05..9125684 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -64,14 +64,14 @@ public abstract class Compactor { private static final Log LOG = LogFactory.getLog(Compactor.class); private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; protected CompactionProgress progress; - protected Configuration conf; - protected Store store; + protected final Configuration conf; + protected final Store store; - protected int compactionKVMax; - protected Compression.Algorithm compactionCompression; + protected final int compactionKVMax; + protected final Compression.Algorithm compactionCompression; /** specify how many days to keep MVCC values during major compaction **/ - protected int keepSeqIdPeriod; + protected final int keepSeqIdPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java new file mode 100644 index 0000000..413b29c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +/** + * This compactor will generate StoreFile for different time ranges. + */ +@InterfaceAudience.Private +public class DateTieredCompactor extends AbstractMultiOutputCompactor { + + private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class); + + public DateTieredCompactor(Configuration conf, Store store) { + super(conf, store); + } + + private boolean needEmptyFile(CompactionRequest request) { + // if we are going to compact the last N files, then we need to emit an empty file to retain the + // maxSeqId if we haven't written out anything. + return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId(); + } + + public List compact(final CompactionRequest request, List lowerBoundaries, + ThroughputController throughputController, User user) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing compaction with " + lowerBoundaries.size() + + "windows, lower boundaries: " + lowerBoundaries); + } + + DateTieredMultiFileWriter writer = + new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request)); + return compact(writer, request, new InternalScannerFactory() { + + @Override + public ScanType getScanType(CompactionRequest request) { + return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES + : ScanType.COMPACT_DROP_DELETES; + } + + @Override + public InternalScanner createScanner(List scanners, ScanType scanType, + FileDetails fd, long smallestReadPoint) throws IOException { + return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint, + fd.earliestPutTs); + } + }, throughputController, user); + } + + @Override + protected List commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd, + CompactionRequest request) throws IOException { + return writer.commitWriters(fd.maxSeqId, request.isAllFiles()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index e7e0cca..22a45b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -65,10 +65,10 @@ public class DefaultCompactor extends Compactor { Collection readersToClose; if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFileFiles, and their readers + // HFiles, and their readers readersToClose = new ArrayList(request.getFiles().size()); for (StoreFile f : request.getFiles()) { - readersToClose.add(new StoreFile(f)); + readersToClose.add(f.cloneForReader()); } scanners = createFileScanners(readersToClose, smallestReadPoint, store.throttleCompaction(request.getSize())); http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 2bb8fc8..e8a4340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -27,9 +27,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; @@ -84,18 +85,20 @@ public class StripeCompactionPolicy extends CompactionPolicy { request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst()); } - public StripeStoreFlusher.StripeFlushRequest selectFlush( + public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator, StripeInformationProvider si, int kvCount) { if (this.config.isUsingL0Flush()) { - return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request. + // L0 is used, return dumb request. + return new StripeStoreFlusher.StripeFlushRequest(comparator); } if (si.getStripeCount() == 0) { // No stripes - start with the requisite count, derive KVs per stripe. int initialCount = this.config.getInitialCount(); - return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount); + return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount, + kvCount / initialCount); } // There are stripes - do according to the boundaries. - return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries()); + return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries()); } public StripeCompactionRequest selectCompaction(StripeInformationProvider si, http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index fd0e2b2..1364ce0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -18,50 +18,65 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; -import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; /** - * This is the placeholder for stripe compactor. The implementation, - * as well as the proper javadoc, will be added in HBASE-7967. + * This is the placeholder for stripe compactor. The implementation, as well as the proper javadoc, + * will be added in HBASE-7967. */ @InterfaceAudience.Private -public class StripeCompactor extends Compactor { +public class StripeCompactor extends AbstractMultiOutputCompactor { private static final Log LOG = LogFactory.getLog(StripeCompactor.class); + public StripeCompactor(Configuration conf, Store store) { super(conf, store); } - public List compact(CompactionRequest request, List targetBoundaries, - byte[] majorRangeFromRow, byte[] majorRangeToRow, - ThroughputController throughputController) throws IOException { - return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, - throughputController, null); + private final class StripeInternalScannerFactory implements InternalScannerFactory { + + private final byte[] majorRangeFromRow; + + private final byte[] majorRangeToRow; + + public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] majorRangeToRow) { + this.majorRangeFromRow = majorRangeFromRow; + this.majorRangeToRow = majorRangeToRow; + } + + @Override + public ScanType getScanType(CompactionRequest request) { + // If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return + // value to create InternalScanner. See the createScanner method below. The return value is + // also used when calling coprocessor hooks. + return ScanType.COMPACT_RETAIN_DELETES; + } + + @Override + public InternalScanner createScanner(List scanners, ScanType scanType, + FileDetails fd, long smallestReadPoint) throws IOException { + return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners, + scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store, + scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); + } } public List compact(CompactionRequest request, List targetBoundaries, - byte[] majorRangeFromRow, byte[] majorRangeToRow, - ThroughputController throughputController, User user) throws IOException { + byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController, + User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -70,116 +85,32 @@ public class StripeCompactor extends Compactor { } LOG.debug(sb.toString()); } - StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( - targetBoundaries, majorRangeFromRow, majorRangeToRow); - return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController, user); - } - - public List compact(CompactionRequest request, int targetCount, long targetSize, - byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - ThroughputController throughputController) throws IOException { - return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, - majorRangeToRow, throughputController, null); + StripeMultiFileWriter writer = + new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries, + majorRangeFromRow, majorRangeToRow); + return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow, + majorRangeToRow), throughputController, user); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Executing compaction with " + targetSize - + " target file size, no more than " + targetCount + " files, in [" - + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range"); + LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + + targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + + "] range"); } - StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( - targetCount, targetSize, left, right); - return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController, user); + StripeMultiFileWriter writer = + new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize, + left, right); + return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow, + majorRangeToRow), throughputController, user); } - private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, - byte[] majorRangeFromRow, byte[] majorRangeToRow, - ThroughputController throughputController, User user) throws IOException { - final Collection filesToCompact = request.getFiles(); - final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); - this.progress = new CompactionProgress(fd.maxKeyCount); - - long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, - smallestReadPoint, store.throttleCompaction(request.getSize())); - - boolean finished = false; - InternalScanner scanner = null; - boolean cleanSeqId = false; - try { - // Get scanner to use. - ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user); - if (scanner == null) { - scanner = (majorRangeFromRow == null) - ? createScanner(store, scanners, - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs) - : createScanner(store, scanners, - smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); - } - scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user); - if (scanner == null) { - // NULL scanner returned from coprocessor hooks means skip normal processing. - return new ArrayList(); - } - - // Create the writer factory for compactions. - if(fd.minSeqIdToKeep > 0) { - smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); - cleanSeqId = true; - } - - final boolean needMvcc = fd.maxMVCCReadpoint > 0; - - final Compression.Algorithm compression = store.getFamily().getCompactionCompressionType(); - StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { - @Override - public Writer createWriter() throws IOException { - return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0, - store.throttleCompaction(request.getSize())); - } - }; - - // Prepare multi-writer, and perform the compaction using scanner and writer. - // It is ok here if storeScanner is null. - StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; - mw.init(storeScanner, factory, store.getComparator()); - finished = - performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController, - request.isMajor()); - if (!finished) { - throw new InterruptedIOException( "Aborting compaction of store " + store + - " in region " + store.getRegionInfo().getRegionNameAsString() + - " because it was interrupted."); - } - } finally { - if (scanner != null) { - try { - scanner.close(); - } catch (Throwable t) { - // Don't fail the compaction if this fails. - LOG.error("Failed to close scanner after compaction.", t); - } - } - if (!finished) { - for (Path leftoverFile : mw.abortWriters()) { - try { - store.getFileSystem().delete(leftoverFile, false); - } catch (Exception ex) { - LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex); - } - } - } - } - - assert finished : "We should have exited the method on all error paths"; - List newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor()); + @Override + protected List commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd, + CompactionRequest request) throws IOException { + List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java deleted file mode 100644 index cb586f3..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ /dev/null @@ -1,325 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY; -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - - -@Category({RegionServerTests.class, SmallTests.class}) -public class TestStripeCompactor { - private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); - private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); - - private static final byte[] KEY_B = Bytes.toBytes("bbb"); - private static final byte[] KEY_C = Bytes.toBytes("ccc"); - private static final byte[] KEY_D = Bytes.toBytes("ddd"); - - private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa")); - private static final KeyValue KV_B = kvAfter(KEY_B); - private static final KeyValue KV_C = kvAfter(KEY_C); - private static final KeyValue KV_D = kvAfter(KEY_D); - - private static KeyValue kvAfter(byte[] key) { - return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L); - } - - private static T[] a(T... a) { - return a; - } - - private static KeyValue[] e() { - return TestStripeCompactor.a(); - } - - @Test - public void testBoundaryCompactions() throws Exception { - // General verification - verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D), - a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D))); - verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C))); - verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) }); - } - - @Test - public void testBoundaryCompactionEmptyFiles() throws Exception { - // No empty file if there're already files. - verifyBoundaryCompaction( - a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false); - verifyBoundaryCompaction(a(KV_A, KV_C), - a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false); - // But should be created if there are no file. - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false); - // In major range if there's major range. - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false); - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false); - // Major range should have files regardless of KVs. - verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), - a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false); - verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), - a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false); - - } - - public static void verifyBoundaryCompaction( - KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception { - verifyBoundaryCompaction(input, boundaries, output, null, null, true); - } - - public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, - KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) - throws Exception { - StoreFileWritersCapture writers = new StoreFileWritersCapture(); - StripeCompactor sc = createCompactor(writers, input); - List paths = - sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, - NoLimitThroughputController.INSTANCE); - writers.verifyKvs(output, allFiles, true); - if (allFiles) { - assertEquals(output.length, paths.size()); - writers.verifyBoundaries(boundaries); - } - } - - @Test - public void testSizeCompactions() throws Exception { - // General verification with different sizes. - verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B), a(KV_C), a(KV_D))); - verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C))); - // Verify row boundaries are preserved. - verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B, KV_B), a(KV_C))); - // Too much data, count limits the number of files. - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B, KV_C, KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D, - new KeyValue[][] { a(KV_A, KV_B, KV_C) }); - // Too little data/large count, no extra files. - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_B), a(KV_C, KV_D))); - } - - public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, - byte[] left, byte[] right, KeyValue[][] output) throws Exception { - StoreFileWritersCapture writers = new StoreFileWritersCapture(); - StripeCompactor sc = createCompactor(writers, input); - List paths = - sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, - NoLimitThroughputController.INSTANCE); - assertEquals(output.length, paths.size()); - writers.verifyKvs(output, true, true); - List boundaries = new ArrayList(); - boundaries.add(left); - for (int i = 1; i < output.length; ++i) { - boundaries.add(CellUtil.cloneRow(output[i][0])); - } - boundaries.add(right); - writers.verifyBoundaries(boundaries.toArray(new byte[][] {})); - } - - private static StripeCompactor createCompactor( - StoreFileWritersCapture writers, KeyValue[] input) throws Exception { - Configuration conf = HBaseConfiguration.create(); - final Scanner scanner = new Scanner(input); - - // Create store mock that is satisfactory for compactor. - HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS); - ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR); - Store store = mock(Store.class); - when(store.getFamily()).thenReturn(col); - when(store.getScanInfo()).thenReturn(si); - when(store.areWritesEnabled()).thenReturn(true); - when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); - when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); - when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), - anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); - when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); - - return new StripeCompactor(conf, store) { - @Override - protected InternalScanner createScanner(Store store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { - return scanner; - } - - @Override - protected InternalScanner createScanner(Store store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - return scanner; - } - }; - } - - private static CompactionRequest createDummyRequest() throws Exception { - // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs. - // But compaction depends on everything under the sun, so stub everything with dummies. - StoreFile sf = mock(StoreFile.class); - StoreFile.Reader r = mock(StoreFile.Reader.class); - when(r.length()).thenReturn(1L); - when(r.getBloomFilterType()).thenReturn(BloomType.NONE); - when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) - .thenReturn(mock(StoreFileScanner.class)); - when(sf.getReader()).thenReturn(r); - when(sf.createReader()).thenReturn(r); - when(sf.createReader(anyBoolean())).thenReturn(r); - return new CompactionRequest(Arrays.asList(sf)); - } - - private static class Scanner implements InternalScanner { - private final ArrayList kvs; - public Scanner(KeyValue... kvs) { - this.kvs = new ArrayList(Arrays.asList(kvs)); - } - - @Override - public boolean next(List results) throws IOException { - if (kvs.isEmpty()) return false; - results.add(kvs.remove(0)); - return !kvs.isEmpty(); - } - - @Override - public boolean next(List result, ScannerContext scannerContext) - throws IOException { - return next(result); - } - - @Override - public void close() throws IOException {} - } - - // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted. - public static class StoreFileWritersCapture implements - Answer, StripeMultiFileWriter.WriterFactory { - public static class Writer { - public ArrayList kvs = new ArrayList(); - public TreeMap data = new TreeMap(Bytes.BYTES_COMPARATOR); - } - - private List writers = new ArrayList(); - - @Override - public StoreFile.Writer createWriter() throws IOException { - final Writer realWriter = new Writer(); - writers.add(realWriter); - StoreFile.Writer writer = mock(StoreFile.Writer.class); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]); - }}).when(writer).append(any(KeyValue.class)); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - return realWriter.data.put((byte[])args[0], (byte[])args[1]); - }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class)); - return writer; - } - - @Override - public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable { - return createWriter(); - } - - public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) { - if (allFiles) { - assertEquals(kvss.length, writers.size()); - } - int skippedWriters = 0; - for (int i = 0; i < kvss.length; ++i) { - KeyValue[] kvs = kvss[i]; - if (kvs != null) { - Writer w = writers.get(i - skippedWriters); - if (requireMetadata) { - assertNotNull(w.data.get(STRIPE_START_KEY)); - assertNotNull(w.data.get(STRIPE_END_KEY)); - } else { - assertNull(w.data.get(STRIPE_START_KEY)); - assertNull(w.data.get(STRIPE_END_KEY)); - } - assertEquals(kvs.length, w.kvs.size()); - for (int j = 0; j < kvs.length; ++j) { - assertEquals(kvs[j], w.kvs.get(j)); - } - } else { - assertFalse(allFiles); - ++skippedWriters; - } - } - } - - public void verifyBoundaries(byte[][] boundaries) { - assertEquals(boundaries.length - 1, writers.size()); - for (int i = 0; i < writers.size(); ++i) { - assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY)); - assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY)); - } - } - } -}