hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [2/2] hbase git commit: HBASE-15389 Write out multiple files when compaction
Date Fri, 25 Mar 2016 07:24:00 GMT
HBASE-15389 Write out multiple files when compaction


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11d11d3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11d11d3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11d11d3f

Branch: refs/heads/master
Commit: 11d11d3fcc591227cccf3531b911e46c68774501
Parents: e9c4f12
Author: zhangduo <zhangduo@apache.org>
Authored: Fri Mar 25 15:07:54 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Fri Mar 25 15:07:54 2016 +0800

----------------------------------------------------------------------
 .../regionserver/AbstractMultiFileWriter.java   | 120 +++++++
 .../regionserver/DateTieredMultiFileWriter.java |  83 +++++
 .../hadoop/hbase/regionserver/StoreFile.java    |   9 +-
 .../regionserver/StripeMultiFileWriter.java     | 239 ++++++--------
 .../hbase/regionserver/StripeStoreFlusher.java  |  30 +-
 .../AbstractMultiOutputCompactor.java           | 161 +++++++++
 .../regionserver/compactions/Compactor.java     |  10 +-
 .../compactions/DateTieredCompactor.java        |  86 +++++
 .../compactions/DefaultCompactor.java           |   4 +-
 .../compactions/StripeCompactionPolicy.java     |  13 +-
 .../compactions/StripeCompactor.java            | 169 +++-------
 .../hbase/regionserver/TestStripeCompactor.java | 325 -------------------
 .../regionserver/compactions/TestCompactor.java | 212 ++++++++++++
 .../compactions/TestDateTieredCompactor.java    | 169 ++++++++++
 .../compactions/TestStripeCompactionPolicy.java |  24 +-
 .../compactions/TestStripeCompactor.java        | 223 +++++++++++++
 16 files changed, 1272 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
new file mode 100644
index 0000000..4987c59
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
+
+/**
+ * Base class for cell sink that separates the provided cells into multiple files.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMultiFileWriter implements CellSink {
+
+  private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class);
+
+  /** Factory that is used to produce single StoreFile.Writer-s */
+  protected WriterFactory writerFactory;
+
+  /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
+  protected StoreScanner sourceScanner;
+
+  public interface WriterFactory {
+    public StoreFile.Writer createWriter() throws IOException;
+  }
+
+  /**
+   * Initializes multi-writer before usage.
+   * @param sourceScanner Optional store scanner to obtain the information about read progress.
+   * @param factory Factory used to produce individual file writers.
+   */
+  public void init(StoreScanner sourceScanner, WriterFactory factory) {
+    this.writerFactory = factory;
+    this.sourceScanner = sourceScanner;
+  }
+
+  /**
+   * Commit all writers.
+   * <p>
+   * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't
+   * find an easy to find enough sequence ids for different output files in some corner cases. See
+   * comments in HBASE-15400 for more details.
+   */
+  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
+    preCommitWriters();
+    Collection<StoreFile.Writer> writers = this.writers();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
+          + ", majorCompaction=" + majorCompaction);
+    }
+    List<Path> paths = new ArrayList<Path>();
+    for (Writer writer : writers) {
+      if (writer == null) {
+        continue;
+      }
+      writer.appendMetadata(maxSeqId, majorCompaction);
+      preCloseWriter(writer);
+      paths.add(writer.getPath());
+      writer.close();
+    }
+    return paths;
+  }
+
+  /**
+   * Close all writers without throwing any exceptions. This is used when compaction failed usually.
+   */
+  public List<Path> abortWriters() {
+    List<Path> paths = new ArrayList<Path>();
+    for (StoreFile.Writer writer : writers()) {
+      try {
+        if (writer != null) {
+          paths.add(writer.getPath());
+          writer.close();
+        }
+      } catch (Exception ex) {
+        LOG.error("Failed to close the writer after an unfinished compaction.", ex);
+      }
+    }
+    return paths;
+  }
+
+  protected abstract Collection<StoreFile.Writer> writers();
+
+  /**
+   * Subclasses override this method to be called at the end of a successful sequence of append; all
+   * appends are processed before this method is called.
+   */
+  protected void preCommitWriters() throws IOException {
+  }
+
+  /**
+   * Subclasses override this method to be called before we close the give writer. Usually you can
+   * append extra metadata to the writer.
+   */
+  protected void preCloseWriter(StoreFile.Writer writer) throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
new file mode 100644
index 0000000..f0bd444
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+
+/**
+ * class for cell sink that separates the provided cells into multiple files for date tiered
+ * compaction.
+ */
+@InterfaceAudience.Private
+public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
+
+  private final NavigableMap<Long, StoreFile.Writer> lowerBoundary2Writer
+    = new TreeMap<Long, StoreFile.Writer>();
+
+  private final boolean needEmptyFile;
+
+  /**
+   * @param needEmptyFile whether need to create an empty store file if we haven't written out
+   *          anything.
+   */
+  public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
+    for (Long lowerBoundary : lowerBoundaries) {
+      lowerBoundary2Writer.put(lowerBoundary, null);
+    }
+    this.needEmptyFile = needEmptyFile;
+  }
+
+  @Override
+  public void append(Cell cell) throws IOException {
+    Map.Entry<Long, StoreFile.Writer> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
+    StoreFile.Writer writer = entry.getValue();
+    if (writer == null) {
+      writer = writerFactory.createWriter();
+      lowerBoundary2Writer.put(entry.getKey(), writer);
+    }
+    writer.append(cell);
+  }
+
+  @Override
+  protected Collection<Writer> writers() {
+    return lowerBoundary2Writer.values();
+  }
+
+  @Override
+  protected void preCommitWriters() throws IOException {
+    if (!needEmptyFile) {
+      return;
+    }
+    for (StoreFile.Writer writer : lowerBoundary2Writer.values()) {
+      if (writer != null) {
+        return;
+      }
+    }
+    // we haven't written out any data, create an empty file to retain metadata
+    lowerBoundary2Writer.put(lowerBoundary2Writer.firstKey(), writerFactory.createWriter());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index b6164b2..868bee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -261,6 +261,13 @@ public class StoreFile {
   }
 
   /**
+   * Clone a StoreFile for opening private reader.
+   */
+  public StoreFile cloneForReader() {
+    return new StoreFile(this);
+  }
+
+  /**
    * @return the StoreFile object associated to this StoreFile.
    *         null if the StoreFile is not a reference.
    */
@@ -294,7 +301,7 @@ public class StoreFile {
    * @return True if this is HFile.
    */
   public boolean isHFile() {
-    return this.fileInfo.isHFile(this.fileInfo.getPath());
+    return StoreFileInfo.isHFile(this.fileInfo.getPath());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index 651b863..1c3f14c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -20,52 +20,36 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Base class for cell sink that separates the provided cells into multiple files.
+ * Base class for cell sink that separates the provided cells into multiple files for stripe
+ * compaction.
  */
 @InterfaceAudience.Private
-public abstract class StripeMultiFileWriter implements Compactor.CellSink {
-  private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
+public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
 
-  /** Factory that is used to produce single StoreFile.Writer-s */
-  protected WriterFactory writerFactory;
-  protected CellComparator comparator;
+  private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
 
+  protected final CellComparator comparator;
   protected List<StoreFile.Writer> existingWriters;
   protected List<byte[]> boundaries;
-  /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
-  protected StoreScanner sourceScanner;
 
   /** Whether to write stripe metadata */
   private boolean doWriteStripeMetadata = true;
 
-  public interface WriterFactory {
-    public StoreFile.Writer createWriter() throws IOException;
-  }
-
-  /**
-   * Initializes multi-writer before usage.
-   * @param sourceScanner Optional store scanner to obtain the information about read progress.
-   * @param factory Factory used to produce individual file writers.
-   * @param comparator Comparator used to compare rows.
-   */
-  public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator)
-      throws IOException {
-    this.writerFactory = factory;
-    this.sourceScanner = sourceScanner;
+  public StripeMultiFileWriter(CellComparator comparator) {
     this.comparator = comparator;
   }
 
@@ -73,41 +57,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
     this.doWriteStripeMetadata = false;
   }
 
-  public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
+  @Override
+  protected Collection<Writer> writers() {
+    return existingWriters;
+  }
+
+  protected abstract void preCommitWritersInternal() throws IOException;
+
+  @Override
+  protected final void preCommitWriters() throws IOException {
+    // do some sanity check here.
     assert this.existingWriters != null;
-    commitWritersInternal();
+    preCommitWritersInternal();
     assert this.boundaries.size() == (this.existingWriters.size() + 1);
-    LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
-      + "riting out metadata for " + this.existingWriters.size() + " writers");
-    List<Path> paths = new ArrayList<Path>();
-    for (int i = 0; i < this.existingWriters.size(); ++i) {
-      StoreFile.Writer writer = this.existingWriters.get(i);
-      if (writer == null) continue; // writer was skipped due to 0 KVs
-      if (doWriteStripeMetadata) {
-        writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
-        writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
-      }
-      writer.appendMetadata(maxSeqId, isMajor);
-      paths.add(writer.getPath());
-      writer.close();
-    }
-    this.existingWriters = null;
-    return paths;
   }
 
-  public List<Path> abortWriters() {
-    assert this.existingWriters != null;
-    List<Path> paths = new ArrayList<Path>();
-    for (StoreFile.Writer writer : this.existingWriters) {
-      try {
-        paths.add(writer.getPath());
-        writer.close();
-      } catch (Exception ex) {
-        LOG.error("Failed to close the writer after an unfinished compaction.", ex);
+  @Override
+  protected void preCloseWriter(Writer writer) throws IOException {
+    if (doWriteStripeMetadata) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Write stripe metadata for " + writer.getPath().toString());
+      }
+      int index = existingWriters.indexOf(writer);
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index));
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString());
       }
     }
-    this.existingWriters = null;
-    return paths;
   }
 
   /**
@@ -115,13 +93,12 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
    * @param left The left boundary of the writer.
    * @param cell The cell whose row has to be checked.
    */
-  protected void sanityCheckLeft(
-      byte[] left, Cell cell) throws IOException {
-    if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) &&
-        comparator.compareRows(cell, left, 0, left.length) < 0) {
-      String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
-          + "]: [" + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
-          + "]";
+  protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException {
+    if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left)
+        && comparator.compareRows(cell, left, 0, left.length) < 0) {
+      String error =
+          "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: ["
+              + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
       LOG.error(error);
       throw new IOException(error);
     }
@@ -131,28 +108,22 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
    * Subclasses can call this method to make sure the last KV is within multi-writer range.
    * @param right The right boundary of the writer.
    */
-  protected void sanityCheckRight(
-      byte[] right, Cell cell) throws IOException {
-    if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) &&
-        comparator.compareRows(cell, right, 0, right.length) >= 0) {
-      String error = "The last row is higher or equal than the right boundary of ["
-          + Bytes.toString(right) + "]: ["
-          + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
+  protected void sanityCheckRight(byte[] right, Cell cell) throws IOException {
+    if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right)
+        && comparator.compareRows(cell, right, 0, right.length) >= 0) {
+      String error =
+          "The last row is higher or equal than the right boundary of [" + Bytes.toString(right)
+              + "]: ["
+              + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
       LOG.error(error);
       throw new IOException(error);
     }
   }
 
   /**
-   * Subclasses override this method to be called at the end of a successful sequence of
-   * append; all appends are processed before this method is called.
-   */
-  protected abstract void commitWritersInternal() throws IOException;
-
-  /**
-   * MultiWriter that separates the cells based on fixed row-key boundaries.
-   * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
-   * will end up in one file, and separate from all other such pairs.
+   * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between
+   * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and
+   * separate from all other such pairs.
    */
   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
     private StoreFile.Writer currentWriter;
@@ -165,31 +136,28 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
 
     /**
      * @param targetBoundaries The boundaries on which writers/files are separated.
-     * @param majorRangeFrom Major range is the range for which at least one file should be
-     *                       written (because all files are included in compaction).
-     *                       majorRangeFrom is the left boundary.
+     * @param majorRangeFrom Major range is the range for which at least one file should be written
+     *          (because all files are included in compaction). majorRangeFrom is the left boundary.
      * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
      */
-    public BoundaryMultiWriter(List<byte[]> targetBoundaries,
+    public BoundaryMultiWriter(CellComparator comparator, List<byte[]> targetBoundaries,
         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
-      super();
+      super(comparator);
       this.boundaries = targetBoundaries;
       this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
       // "major" range (range for which all files are included) boundaries, if any,
       // must match some target boundaries, let's find them.
-      assert  (majorRangeFrom == null) == (majorRangeTo == null);
+      assert (majorRangeFrom == null) == (majorRangeTo == null);
       if (majorRangeFrom != null) {
-        majorRangeFromIndex = Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY)
-                                ? 0
-                                : Collections.binarySearch(boundaries, majorRangeFrom,
-                                                           Bytes.BYTES_COMPARATOR);
-        majorRangeToIndex = Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY)
-                              ? boundaries.size()
-                              : Collections.binarySearch(boundaries, majorRangeTo,
-                                                         Bytes.BYTES_COMPARATOR);
+        majorRangeFromIndex =
+            Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) ? 0 : Collections
+                .binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
+        majorRangeToIndex =
+            Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
+                : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
-          throw new IOException("Major range does not match writer boundaries: [" +
-              Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+          throw new IOException("Major range does not match writer boundaries: ["
+              + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
               + majorRangeFromIndex + " to " + majorRangeToIndex);
         }
       }
@@ -199,8 +167,7 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
     public void append(Cell cell) throws IOException {
       if (currentWriter == null && existingWriters.isEmpty()) {
         // First append ever, do a sanity check.
-        sanityCheckLeft(this.boundaries.get(0),
-            cell);
+        sanityCheckLeft(this.boundaries.get(0), cell);
       }
       prepareWriterFor(cell);
       currentWriter.append(cell);
@@ -209,19 +176,18 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
     }
 
     private boolean isCellAfterCurrentWriter(Cell cell) {
-      return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) &&
-            (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0);
+      return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY)
+        && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0);
     }
 
     @Override
-    protected void commitWritersInternal() throws IOException {
+    protected void preCommitWritersInternal() throws IOException {
       stopUsingCurrentWriter();
       while (existingWriters.size() < boundaries.size() - 1) {
         createEmptyWriter();
       }
       if (lastCell != null) {
-        sanityCheckRight(boundaries.get(boundaries.size() - 1),
-            lastCell);
+        sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell);
       }
     }
 
@@ -241,14 +207,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
     }
 
     /**
-     * Called if there are no cells for some stripe.
-     * We need to have something in the writer list for this stripe, so that writer-boundary
-     * list indices correspond to each other. We can insert null in the writer list for that
-     * purpose, except in the following cases where we actually need a file:
-     * 1) If we are in range for which we are compacting all the files, we need to create an
-     * empty file to preserve stripe metadata.
-     * 2) If we have not produced any file at all for this compactions, and this is the
-     * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
+     * Called if there are no cells for some stripe. We need to have something in the writer list
+     * for this stripe, so that writer-boundary list indices correspond to each other. We can insert
+     * null in the writer list for that purpose, except in the following cases where we actually
+     * need a file: 1) If we are in range for which we are compacting all the files, we need to
+     * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all
+     * for this compactions, and this is the last chance (the last stripe), we need to preserve last
+     * seqNum (see also HBASE-6059).
      */
     private void createEmptyWriter() throws IOException {
       int index = existingWriters.size();
@@ -258,12 +223,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
       boolean needEmptyFile = isInMajorRange || isLastWriter;
       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
       hasAnyWriter |= needEmptyFile;
-      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
-          ? null : boundaries.get(existingWriters.size() + 1);
+      currentWriterEndKey =
+          (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
+              .size() + 1);
     }
 
     private void checkCanCreateWriter() throws IOException {
-      int maxWriterCount =  boundaries.size() - 1;
+      int maxWriterCount = boundaries.size() - 1;
       assert existingWriters.size() <= maxWriterCount;
       if (existingWriters.size() >= maxWriterCount) {
         throw new IOException("Cannot create any more writers (created " + existingWriters.size()
@@ -280,16 +246,16 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
         cellsInCurrentWriter = 0;
       }
       currentWriter = null;
-      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
-          ? null : boundaries.get(existingWriters.size() + 1);
+      currentWriterEndKey =
+          (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
+              .size() + 1);
     }
   }
 
   /**
-   * MultiWriter that separates the cells based on target cell number per file and file count.
-   * New file is started every time the target number of KVs is reached, unless the fixed
-   * count of writers has already been created (in that case all the remaining KVs go into
-   * the last writer).
+   * MultiWriter that separates the cells based on target cell number per file and file count. New
+   * file is started every time the target number of KVs is reached, unless the fixed count of
+   * writers has already been created (in that case all the remaining KVs go into the last writer).
    */
   public static class SizeMultiWriter extends StripeMultiFileWriter {
     private int targetCount;
@@ -310,8 +276,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
      * @param left The left boundary of the first writer.
      * @param right The right boundary of the last writer.
      */
-    public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
-      super();
+    public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left,
+        byte[] right) {
+      super(comparator);
       this.targetCount = targetCount;
       this.targetCells = targetKvs;
       this.left = left;
@@ -331,11 +298,11 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
         sanityCheckLeft(left, cell);
         doCreateWriter = true;
       } else if (lastRowInCurrentWriter != null
-          && !CellUtil.matchingRow(cell,
-              lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
+          && !CellUtil.matchingRow(cell, lastRowInCurrentWriter, 0,
+            lastRowInCurrentWriter.length)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
-              + "] row; wrote out "  + cellsInCurrentWriter + " kvs");
+              + "] row; wrote out " + cellsInCurrentWriter + " kvs");
         }
         lastRowInCurrentWriter = null;
         cellsInCurrentWriter = 0;
@@ -343,7 +310,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
         doCreateWriter = true;
       }
       if (doCreateWriter) {
-        byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); // make a copy
+        // make a copy
+        byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
         }
@@ -357,34 +325,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
       ++cellsInCurrentWriter;
       cellsSeen = cellsInCurrentWriter;
       if (this.sourceScanner != null) {
-        cellsSeen = Math.max(cellsSeen,
-            this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
+        cellsSeen =
+            Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned()
+                - cellsSeenInPrevious);
       }
 
       // If we are not already waiting for opportunity to close, start waiting if we can
       // create any more writers and if the current one is too big.
-      if (lastRowInCurrentWriter == null
-          && existingWriters.size() < targetCount
+      if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount
           && cellsSeen >= targetCells) {
         lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
-              lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
-              + cellsInCurrentWriter + " kvs");
+          LOG.debug("Preparing to start a new writer after ["
+              + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen
+              + " kvs and wrote out " + cellsInCurrentWriter + " kvs");
         }
       }
     }
 
     @Override
-    protected void commitWritersInternal() throws IOException {
+    protected void preCommitWritersInternal() throws IOException {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Stopping with "  + cellsInCurrentWriter + " kvs in last writer" +
-            ((this.sourceScanner == null) ? "" : ("; observed estimated "
+        LOG.debug("Stopping with "
+            + cellsInCurrentWriter
+            + " kvs in last writer"
+            + ((this.sourceScanner == null) ? "" : ("; observed estimated "
                 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
       }
       if (lastCell != null) {
-        sanityCheckRight(
-            right, lastCell);
+        sanityCheckRight(right, lastCell);
       }
 
       // When expired stripes were going to be merged into one, and if no writer was created during

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 9a06a88..34e8497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -69,7 +70,8 @@ public class StripeStoreFlusher extends StoreFlusher {
     }
 
     // Let policy select flush method.
-    StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
+    StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
+      cellsCount);
 
     boolean success = false;
     StripeMultiFileWriter mw = null;
@@ -78,7 +80,7 @@ public class StripeStoreFlusher extends StoreFlusher {
       StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
           snapshot.getTimeRangeTracker(), cellsCount);
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
-      mw.init(storeScanner, factory, store.getComparator());
+      mw.init(storeScanner, factory);
 
       synchronized (flushLock) {
         performFlush(scanner, mw, smallestReadPoint, throughputController);
@@ -123,10 +125,17 @@ public class StripeStoreFlusher extends StoreFlusher {
 
   /** Stripe flush request wrapper that writes a non-striped file. */
   public static class StripeFlushRequest {
+
+    protected final CellComparator comparator;
+
+    public StripeFlushRequest(CellComparator comparator) {
+      this.comparator = comparator;
+    }
+
     @VisibleForTesting
     public StripeMultiFileWriter createWriter() throws IOException {
-      StripeMultiFileWriter writer =
-          new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
+      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
+          Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
       writer.setNoStripeMetadata();
       return writer;
     }
@@ -137,13 +146,15 @@ public class StripeStoreFlusher extends StoreFlusher {
     private final List<byte[]> targetBoundaries;
 
     /** @param targetBoundaries New files should be written with these boundaries. */
-    public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
+    public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
+      super(comparator);
       this.targetBoundaries = targetBoundaries;
     }
 
     @Override
     public StripeMultiFileWriter createWriter() throws IOException {
-      return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
+      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
+          null);
     }
   }
 
@@ -157,15 +168,16 @@ public class StripeStoreFlusher extends StoreFlusher {
      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
      *                  total number of kvs, all the overflow data goes into the last stripe.
      */
-    public SizeStripeFlushRequest(int targetCount, long targetKvs) {
+    public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
+      super(comparator);
       this.targetCount = targetCount;
       this.targetKvs = targetKvs;
     }
 
     @Override
     public StripeMultiFileWriter createWriter() throws IOException {
-      return new StripeMultiFileWriter.SizeMultiWriter(
-          this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
+      return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
+          OPEN_KEY, OPEN_KEY);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
new file mode 100644
index 0000000..29d8561
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Base class for implementing a Compactor which will generate multiple output files after
+ * compaction.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
+    extends Compactor {
+
+  private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
+
+  public AbstractMultiOutputCompactor(Configuration conf, Store store) {
+    super(conf, store);
+  }
+
+  protected interface InternalScannerFactory {
+
+    ScanType getScanType(CompactionRequest request);
+
+    InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException;
+  }
+
+  protected List<Path> compact(T writer, final CompactionRequest request,
+      InternalScannerFactory scannerFactory, ThroughputController throughputController, User user)
+          throws IOException {
+    final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
+    this.progress = new CompactionProgress(fd.maxKeyCount);
+
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = getSmallestReadPoint();
+
+    List<StoreFileScanner> scanners;
+    Collection<StoreFile> readersToClose;
+    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
+      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
+      // HFiles, and their readers
+      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
+      for (StoreFile f : request.getFiles()) {
+        readersToClose.add(f.cloneForReader());
+      }
+      scanners = createFileScanners(readersToClose, smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    } else {
+      readersToClose = Collections.emptyList();
+      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    }
+    InternalScanner scanner = null;
+    boolean finished = false;
+    try {
+      /* Include deletes, unless we are doing a major compaction */
+      ScanType scanType = scannerFactory.getScanType(request);
+      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
+      if (scanner == null) {
+        scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
+      }
+      scanner = postCreateCoprocScanner(request, scanType, scanner, user);
+      if (scanner == null) {
+        // NULL scanner returned from coprocessor hooks means skip normal processing.
+        return new ArrayList<Path>();
+      }
+      boolean cleanSeqId = false;
+      if (fd.minSeqIdToKeep > 0) {
+        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+        cleanSeqId = true;
+      }
+      // Create the writer factory for compactions.
+      final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
+      WriterFactory writerFactory = new WriterFactory() {
+        @Override
+        public Writer createWriter() throws IOException {
+          return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
+            fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+        }
+      };
+      // Prepare multi-writer, and perform the compaction using scanner and writer.
+      // It is ok here if storeScanner is null.
+      StoreScanner storeScanner
+        = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
+      writer.init(storeScanner, writerFactory);
+      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+        throughputController, request.isAllFiles());
+      if (!finished) {
+        throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+            + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
+      }
+    } finally {
+      Closeables.close(scanner, true);
+      for (StoreFile f : readersToClose) {
+        try {
+          f.closeReader(true);
+        } catch (IOException e) {
+          LOG.warn("Exception closing " + f, e);
+        }
+      }
+      if (!finished) {
+        FileSystem fs = store.getFileSystem();
+        for (Path leftoverFile : writer.abortWriters()) {
+          try {
+            fs.delete(leftoverFile, false);
+          } catch (IOException e) {
+            LOG.error("Failed to delete the leftover file " + leftoverFile
+                + " after an unfinished compaction.",
+              e);
+          }
+        }
+      }
+    }
+    assert finished : "We should have exited the method on all error paths";
+    return commitMultiWriter(writer, fd, request);
+  }
+
+  protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
+      CompactionRequest request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 0e6ab05..9125684 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -64,14 +64,14 @@ public abstract class Compactor {
   private static final Log LOG = LogFactory.getLog(Compactor.class);
   private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
   protected CompactionProgress progress;
-  protected Configuration conf;
-  protected Store store;
+  protected final Configuration conf;
+  protected final Store store;
 
-  protected int compactionKVMax;
-  protected Compression.Algorithm compactionCompression;
+  protected final int compactionKVMax;
+  protected final Compression.Algorithm compactionCompression;
 
   /** specify how many days to keep MVCC values during major compaction **/ 
-  protected int keepSeqIdPeriod;
+  protected final int keepSeqIdPeriod;
 
   //TODO: depending on Store is not good but, realistically, all compactors currently do.
   Compactor(final Configuration conf, final Store store) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
new file mode 100644
index 0000000..413b29c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * This compactor will generate StoreFile for different time ranges.
+ */
+@InterfaceAudience.Private
+public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
+
+  private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class);
+
+  public DateTieredCompactor(Configuration conf, Store store) {
+    super(conf, store);
+  }
+
+  private boolean needEmptyFile(CompactionRequest request) {
+    // if we are going to compact the last N files, then we need to emit an empty file to retain the
+    // maxSeqId if we haven't written out anything.
+    return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
+  }
+
+  public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
+      ThroughputController throughputController, User user) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing compaction with " + lowerBoundaries.size()
+          + "windows, lower boundaries: " + lowerBoundaries);
+    }
+
+    DateTieredMultiFileWriter writer =
+        new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
+    return compact(writer, request, new InternalScannerFactory() {
+
+      @Override
+      public ScanType getScanType(CompactionRequest request) {
+        return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
+            : ScanType.COMPACT_DROP_DELETES;
+      }
+
+      @Override
+      public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+          FileDetails fd, long smallestReadPoint) throws IOException {
+        return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+          fd.earliestPutTs);
+      }
+    }, throughputController, user);
+  }
+
+  @Override
+  protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd,
+      CompactionRequest request) throws IOException {
+    return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index e7e0cca..22a45b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -65,10 +65,10 @@ public class DefaultCompactor extends Compactor {
     Collection<StoreFile> readersToClose;
     if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
       // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
-      // HFileFiles, and their readers
+      // HFiles, and their readers
       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
       for (StoreFile f : request.getFiles()) {
-        readersToClose.add(new StoreFile(f));
+        readersToClose.add(f.cloneForReader());
       }
       scanners = createFileScanners(readersToClose, smallestReadPoint,
           store.throttleCompaction(request.getSize()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index 2bb8fc8..e8a4340 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -27,9 +27,10 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
@@ -84,18 +85,20 @@ public class StripeCompactionPolicy extends CompactionPolicy {
         request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
   }
 
-  public StripeStoreFlusher.StripeFlushRequest selectFlush(
+  public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator,
       StripeInformationProvider si, int kvCount) {
     if (this.config.isUsingL0Flush()) {
-      return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
+      // L0 is used, return dumb request.
+      return new StripeStoreFlusher.StripeFlushRequest(comparator);
     }
     if (si.getStripeCount() == 0) {
       // No stripes - start with the requisite count, derive KVs per stripe.
       int initialCount = this.config.getInitialCount();
-      return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
+      return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
+          kvCount / initialCount);
     }
     // There are stripes - do according to the boundaries.
-    return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
+    return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
   }
 
   public StripeCompactionRequest selectCompaction(StripeInformationProvider si,

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index fd0e2b2..1364ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -18,50 +18,65 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * This is the placeholder for stripe compactor. The implementation,
- * as well as the proper javadoc, will be added in HBASE-7967.
+ * This is the placeholder for stripe compactor. The implementation, as well as the proper javadoc,
+ * will be added in HBASE-7967.
  */
 @InterfaceAudience.Private
-public class StripeCompactor extends Compactor {
+public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFileWriter> {
   private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
+
   public StripeCompactor(Configuration conf, Store store) {
     super(conf, store);
   }
 
-  public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
-    byte[] majorRangeFromRow, byte[] majorRangeToRow,
-    ThroughputController throughputController) throws IOException {
-    return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
-      throughputController, null);
+  private final class StripeInternalScannerFactory implements InternalScannerFactory {
+
+    private final byte[] majorRangeFromRow;
+
+    private final byte[] majorRangeToRow;
+
+    public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] majorRangeToRow) {
+      this.majorRangeFromRow = majorRangeFromRow;
+      this.majorRangeToRow = majorRangeToRow;
+    }
+
+    @Override
+    public ScanType getScanType(CompactionRequest request) {
+      // If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return
+      // value to create InternalScanner. See the createScanner method below. The return value is
+      // also used when calling coprocessor hooks.
+      return ScanType.COMPACT_RETAIN_DELETES;
+    }
+
+    @Override
+    public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException {
+      return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
+        scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
+        scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
+    }
   }
 
   public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
-      byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      ThroughputController throughputController, User user) throws IOException {
+      byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
+      User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       StringBuilder sb = new StringBuilder();
       sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@@ -70,116 +85,32 @@ public class StripeCompactor extends Compactor {
       }
       LOG.debug(sb.toString());
     }
-    StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
-        targetBoundaries, majorRangeFromRow, majorRangeToRow);
-    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
-      throughputController, user);
-  }
-
-  public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
-    byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
-    ThroughputController throughputController) throws IOException {
-    return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
-      majorRangeToRow, throughputController, null);
+    StripeMultiFileWriter writer =
+        new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
+            majorRangeFromRow, majorRangeToRow);
+    return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
+        majorRangeToRow), throughputController, user);
   }
 
   public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
       byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
       ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing compaction with " + targetSize
-          + " target file size, no more than " + targetCount + " files, in ["
-          + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
+      LOG.debug("Executing compaction with " + targetSize + " target file size, no more than "
+          + targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
+          + "] range");
     }
-    StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
-        targetCount, targetSize, left, right);
-    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
-      throughputController, user);
+    StripeMultiFileWriter writer =
+        new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
+            left, right);
+    return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
+        majorRangeToRow), throughputController, user);
   }
 
-  private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
-      byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      ThroughputController throughputController, User user) throws IOException {
-    final Collection<StoreFile> filesToCompact = request.getFiles();
-    final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
-    this.progress = new CompactionProgress(fd.maxKeyCount);
-
-    long smallestReadPoint = getSmallestReadPoint();
-    List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
-        smallestReadPoint, store.throttleCompaction(request.getSize()));
-
-    boolean finished = false;
-    InternalScanner scanner = null;
-    boolean cleanSeqId = false;
-    try {
-      // Get scanner to use.
-      ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
-      scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
-      if (scanner == null) {
-        scanner = (majorRangeFromRow == null)
-            ? createScanner(store, scanners,
-                ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
-            : createScanner(store, scanners,
-                smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
-      }
-      scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
-      if (scanner == null) {
-        // NULL scanner returned from coprocessor hooks means skip normal processing.
-        return new ArrayList<Path>();
-      }
-
-      // Create the writer factory for compactions.
-      if(fd.minSeqIdToKeep > 0) {
-        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
-        cleanSeqId = true;
-      }
-
-      final boolean needMvcc = fd.maxMVCCReadpoint > 0;
-
-      final Compression.Algorithm compression = store.getFamily().getCompactionCompressionType();
-      StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
-        @Override
-        public Writer createWriter() throws IOException {
-          return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
-              store.throttleCompaction(request.getSize()));
-        }
-      };
-
-      // Prepare multi-writer, and perform the compaction using scanner and writer.
-      // It is ok here if storeScanner is null.
-      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
-      mw.init(storeScanner, factory, store.getComparator());
-      finished =
-          performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController,
-                  request.isMajor());
-      if (!finished) {
-        throw new InterruptedIOException( "Aborting compaction of store " + store +
-            " in region " + store.getRegionInfo().getRegionNameAsString() +
-            " because it was interrupted.");
-      }
-    } finally {
-      if (scanner != null) {
-        try {
-          scanner.close();
-        } catch (Throwable t) {
-          // Don't fail the compaction if this fails.
-          LOG.error("Failed to close scanner after compaction.", t);
-        }
-      }
-      if (!finished) {
-        for (Path leftoverFile : mw.abortWriters()) {
-          try {
-            store.getFileSystem().delete(leftoverFile, false);
-          } catch (Exception ex) {
-            LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
-          }
-        }
-      }
-    }
-
-    assert finished : "We should have exited the method on all error paths";
-    List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
+  @Override
+  protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd,
+      CompactionRequest request) throws IOException {
+    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
     return newFiles;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
deleted file mode 100644
index cb586f3..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestStripeCompactor {
-  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
-  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
-
-  private static final byte[] KEY_B = Bytes.toBytes("bbb");
-  private static final byte[] KEY_C = Bytes.toBytes("ccc");
-  private static final byte[] KEY_D = Bytes.toBytes("ddd");
-
-  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
-  private static final KeyValue KV_B = kvAfter(KEY_B);
-  private static final KeyValue KV_C = kvAfter(KEY_C);
-  private static final KeyValue KV_D = kvAfter(KEY_D);
-
-  private static KeyValue kvAfter(byte[] key) {
-    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
-  }
-
-  private static <T> T[] a(T... a) {
-    return a;
-  }
-
-  private static KeyValue[] e() {
-    return TestStripeCompactor.<KeyValue>a();
-  }
-
-  @Test
-  public void testBoundaryCompactions() throws Exception {
-    // General verification
-    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
-        a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
-    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
-    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
-  }
-
-  @Test
-  public void testBoundaryCompactionEmptyFiles() throws Exception {
-    // No empty file if there're already files.
-    verifyBoundaryCompaction(
-        a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
-    verifyBoundaryCompaction(a(KV_A, KV_C),
-        a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
-    // But should be created if there are no file.
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
-    // In major range if there's major range.
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
-    // Major range should have files regardless of KVs.
-    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
-        a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
-    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
-        a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
-
-  }
-
-  public static void verifyBoundaryCompaction(
-      KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
-    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
-  }
-
-  public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
-      KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
-          throws Exception {
-    StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeCompactor sc = createCompactor(writers, input);
-    List<Path> paths =
-        sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
-          NoLimitThroughputController.INSTANCE);
-    writers.verifyKvs(output, allFiles, true);
-    if (allFiles) {
-      assertEquals(output.length, paths.size());
-      writers.verifyBoundaries(boundaries);
-    }
-  }
-
-  @Test
-  public void testSizeCompactions() throws Exception {
-    // General verification with different sizes.
-    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
-    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
-    // Verify row boundaries are preserved.
-    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
-    // Too much data, count limits the number of files.
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B, KV_C, KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
-        new KeyValue[][] { a(KV_A, KV_B, KV_C) });
-    // Too little data/large count, no extra files.
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A, KV_B), a(KV_C, KV_D)));
-  }
-
-  public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
-      byte[] left, byte[] right, KeyValue[][] output) throws Exception {
-    StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeCompactor sc = createCompactor(writers, input);
-    List<Path> paths =
-        sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
-          NoLimitThroughputController.INSTANCE);
-    assertEquals(output.length, paths.size());
-    writers.verifyKvs(output, true, true);
-    List<byte[]> boundaries = new ArrayList<byte[]>();
-    boundaries.add(left);
-    for (int i = 1; i < output.length; ++i) {
-      boundaries.add(CellUtil.cloneRow(output[i][0]));
-    }
-    boundaries.add(right);
-    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
-  }
-
-  private static StripeCompactor createCompactor(
-      StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    final Scanner scanner = new Scanner(input);
-
-    // Create store mock that is satisfactory for compactor.
-    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
-    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR);
-    Store store = mock(Store.class);
-    when(store.getFamily()).thenReturn(col);
-    when(store.getScanInfo()).thenReturn(si);
-    when(store.areWritesEnabled()).thenReturn(true);
-    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
-    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
-    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
-        anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(store.getComparator()).thenReturn(CellComparator.COMPARATOR);
-
-    return new StripeCompactor(conf, store) {
-      @Override
-      protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
-          long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-          byte[] dropDeletesToRow) throws IOException {
-        return scanner;
-      }
-
-      @Override
-      protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
-          ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
-        return scanner;
-      }
-    };
-  }
-
-  private static CompactionRequest createDummyRequest() throws Exception {
-    // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
-    // But compaction depends on everything under the sun, so stub everything with dummies.
-    StoreFile sf = mock(StoreFile.class);
-    StoreFile.Reader r = mock(StoreFile.Reader.class);
-    when(r.length()).thenReturn(1L);
-    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
-    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
-      .thenReturn(mock(StoreFileScanner.class));
-    when(sf.getReader()).thenReturn(r);
-    when(sf.createReader()).thenReturn(r);
-    when(sf.createReader(anyBoolean())).thenReturn(r);
-    return new CompactionRequest(Arrays.asList(sf));
-  }
-
-  private static class Scanner implements InternalScanner {
-    private final ArrayList<KeyValue> kvs;
-    public Scanner(KeyValue... kvs) {
-      this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-      if (kvs.isEmpty()) return false;
-      results.add(kvs.remove(0));
-      return !kvs.isEmpty();
-    }
-
-    @Override
-    public boolean next(List<Cell> result, ScannerContext scannerContext)
-        throws IOException {
-      return next(result);
-    }
-
-    @Override
-    public void close() throws IOException {}
-  }
-
-  // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
-  public static class StoreFileWritersCapture implements
-    Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
-    public static class Writer {
-      public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
-      public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
-    }
-
-    private List<Writer> writers = new ArrayList<Writer>();
-
-    @Override
-    public StoreFile.Writer createWriter() throws IOException {
-      final Writer realWriter = new Writer();
-      writers.add(realWriter);
-      StoreFile.Writer writer = mock(StoreFile.Writer.class);
-      doAnswer(new Answer<Object>() {
-        public Object answer(InvocationOnMock invocation) {
-          return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
-        }}).when(writer).append(any(KeyValue.class));
-      doAnswer(new Answer<Object>() {
-        public Object answer(InvocationOnMock invocation) {
-          Object[] args = invocation.getArguments();
-          return realWriter.data.put((byte[])args[0], (byte[])args[1]);
-        }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
-      return writer;
-    }
-
-    @Override
-    public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
-      return createWriter();
-    }
-
-    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
-      if (allFiles) {
-        assertEquals(kvss.length, writers.size());
-      }
-      int skippedWriters = 0;
-      for (int i = 0; i < kvss.length; ++i) {
-        KeyValue[] kvs = kvss[i];
-        if (kvs != null) {
-          Writer w = writers.get(i - skippedWriters);
-          if (requireMetadata) {
-            assertNotNull(w.data.get(STRIPE_START_KEY));
-            assertNotNull(w.data.get(STRIPE_END_KEY));
-          } else {
-            assertNull(w.data.get(STRIPE_START_KEY));
-            assertNull(w.data.get(STRIPE_END_KEY));
-          }
-          assertEquals(kvs.length, w.kvs.size());
-          for (int j = 0; j < kvs.length; ++j) {
-            assertEquals(kvs[j], w.kvs.get(j));
-          }
-        } else {
-          assertFalse(allFiles);
-          ++skippedWriters;
-        }
-      }
-    }
-
-    public void verifyBoundaries(byte[][] boundaries) {
-      assertEquals(boundaries.length - 1, writers.size());
-      for (int i = 0; i < writers.size(); ++i) {
-        assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
-        assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
-      }
-    }
-  }
-}


Mime
View raw message