hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1536570 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/
Date Tue, 29 Oct 2013 00:40:23 GMT
Author: sershe
Date: Tue Oct 29 00:40:22 2013
New Revision: 1536570

URL: http://svn.apache.org/r1536570
Log:
HBASE-7967 implement compactor for stripe compactions

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1536570&r1=1536569&r2=1536570&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 29 00:40:22 2013
@@ -81,6 +81,7 @@ public class StoreScanner extends NonLaz
    * KVs skipped via seeking to next row/column. TODO: estimate them?
    */
   private long kvsScanned = 0;
+  private KeyValue prevKV = null;
 
   /** We don't ever expect to change this, the constant is just for clarity. */
   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
@@ -411,7 +412,6 @@ public class StoreScanner extends NonLaz
     }
 
     KeyValue kv;
-    KeyValue prevKV = null;
 
     // Only do a sanity-check if store and comparator are available.
     KeyValue.KVComparator comparator =
@@ -419,7 +419,7 @@ public class StoreScanner extends NonLaz
 
     int count = 0;
     LOOP: while((kv = this.heap.peek()) != null) {
-      ++kvsScanned;
+      if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
       // Check that the heap gives us KVs in an increasing order.
       assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
         "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java?rev=1536570&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java Tue Oct 29 00:40:22 2013
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Base class for cell sink that separates the provided cells into multiple files.
+ */
+public abstract class StripeMultiFileWriter implements Compactor.CellSink {
+  private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
+
+  /** Factory that is used to produce single StoreFile.Writer-s */
+  protected WriterFactory writerFactory;
+  protected KVComparator comparator;
+
+  protected List<StoreFile.Writer> existingWriters;
+  protected List<byte[]> boundaries;
+  /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
+  protected StoreScanner sourceScanner;
+
+  public interface WriterFactory {
+    public StoreFile.Writer createWriter() throws IOException;
+  }
+
+  /**
+   * Initializes multi-writer before usage.
+   * @param sourceScanner Optional store scanner to obtain the information about read progress.
+   * @param factory Factory used to produce individual file writers.
+   * @param comparator Comparator used to compare rows.
+   */
+  public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
+      throws IOException {
+    this.writerFactory = factory;
+    this.sourceScanner = sourceScanner;
+    this.comparator = comparator;
+  }
+
+  public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
+    assert this.existingWriters != null;
+    commitWritersInternal();
+    assert this.boundaries.size() == (this.existingWriters.size() + 1);
+    LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers");
+
+    List<Path> paths = new ArrayList<Path>();
+    for (int i = 0; i < this.existingWriters.size(); ++i) {
+      StoreFile.Writer writer = this.existingWriters.get(i);
+      if (writer == null) continue; // writer was skipped due to 0 KVs
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
+      writer.appendMetadata(maxSeqId, isMajor);
+      paths.add(writer.getPath());
+      writer.close();
+    }
+    this.existingWriters = null;
+    return paths;
+  }
+
+  public List<Path> abortWriters() {
+    assert this.existingWriters != null;
+    List<Path> paths = new ArrayList<Path>();
+    for (StoreFile.Writer writer : this.existingWriters) {
+      try {
+        paths.add(writer.getPath());
+        writer.close();
+      } catch (Exception ex) {
+        LOG.error("Failed to close the writer after an unfinished compaction.", ex);
+      }
+    }
+    this.existingWriters = null;
+    return paths;
+  }
+
+  /**
+   * Subclasses can call this method to make sure the first KV is within multi-writer range.
+   * @param left The left boundary of the writer.
+   * @param row The row to check.
+   * @param rowOffset Offset for row.
+   * @param rowLength Length for row.
+   */
+  protected void sanityCheckLeft(
+      byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
+    if (StripeStoreFileManager.OPEN_KEY != left &&
+        comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
+      String error = "The first row is lower than the left boundary of ["
+        + Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
+      LOG.error(error);
+      throw new IOException(error);
+    }
+  }
+
+  /**
+   * Subclasses can call this method to make sure the last KV is within multi-writer range.
+   * @param right The right boundary of the writer.
+   * @param row The row to check.
+   * @param rowOffset Offset for row.
+   * @param rowLength Length for row.
+   */
+  protected void sanityCheckRight(
+      byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
+    if (StripeStoreFileManager.OPEN_KEY != right &&
+        comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
+      String error = "The last row is higher or equal than the right boundary of ["
+          + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
+      LOG.error(error);
+      throw new IOException(error);
+    }
+  }
+
+  /**
+   * Subclasses override this method to be called at the end of a successful sequence of
+   * append; all appends are processed before this method is called.
+   */
+  protected abstract void commitWritersInternal() throws IOException;
+
+  /**
+   * MultiWriter that separates the cells based on fixed row-key boundaries.
+   * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
+   * will end up in one file, and separate from all other such pairs.
+   */
+  public static class BoundaryMultiWriter extends StripeMultiFileWriter {
+    private StoreFile.Writer currentWriter;
+    private byte[] currentWriterEndKey;
+
+    private KeyValue lastKv;
+    private long kvsInCurrentWriter = 0;
+    private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
+    private boolean hasAnyWriter = false;
+
+    /**
+     * @param targetBoundaries The boundaries on which writers/files are separated.
+     * @param majorRangeFrom Major range is the range for which at least one file should be
+     *                       written (because all files are included in compaction).
+     *                       majorRangeFrom is the left boundary.
+     * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
+     */
+    public BoundaryMultiWriter(List<byte[]> targetBoundaries,
+        byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
+      super();
+      this.boundaries = targetBoundaries;
+      this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
+      // "major" range (range for which all files are included) boundaries, if any,
+      // must match some target boundaries, let's find them.
+      assert  (majorRangeFrom == null) == (majorRangeTo == null);
+      if (majorRangeFrom != null) {
+        majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
+          : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
+        majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
+          : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
+        if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
+          throw new IOException("Major range does not match writer boundaries: [" +
+              Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+              + majorRangeFromIndex + " to " + majorRangeToIndex);
+        }
+      }
+    }
+
+    @Override
+    public void append(KeyValue kv) throws IOException {
+      if (currentWriter == null && existingWriters.isEmpty()) {
+        // First append ever, do a sanity check.
+        sanityCheckLeft(this.boundaries.get(0),
+            kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+      }
+      prepareWriterFor(kv);
+      currentWriter.append(kv);
+      lastKv = kv; // for the sanity check
+      ++kvsInCurrentWriter;
+    }
+
+    private boolean isKvAfterCurrentWriter(KeyValue kv) {
+      return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
+            (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+                currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
+    }
+
+    @Override
+    protected void commitWritersInternal() throws IOException {
+      stopUsingCurrentWriter();
+      while (existingWriters.size() < boundaries.size() - 1) {
+        createEmptyWriter();
+      }
+      if (lastKv != null) {
+        sanityCheckRight(boundaries.get(boundaries.size() - 1),
+            lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
+      }
+    }
+
+    private void prepareWriterFor(KeyValue kv) throws IOException {
+      if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return; // Use same writer.
+
+      stopUsingCurrentWriter();
+      // See if KV will be past the writer we are about to create; need to add another one.
+      while (isKvAfterCurrentWriter(kv)) {
+        checkCanCreateWriter();
+        createEmptyWriter();
+      }
+      checkCanCreateWriter();
+      hasAnyWriter = true;
+      currentWriter = writerFactory.createWriter();
+      existingWriters.add(currentWriter);
+    }
+
+    /**
+     * Called if there are no cells for some stripe.
+     * We need to have something in the writer list for this stripe, so that writer-boundary
+     * list indices correspond to each other. We can insert null in the writer list for that
+     * purpose, except in the following cases where we actually need a file:
+     * 1) If we are in range for which we are compacting all the files, we need to create an
+     * empty file to preserve stripe metadata.
+     * 2) If we have not produced any file at all for this compactions, and this is the
+     * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
+     */
+    private void createEmptyWriter() throws IOException {
+      int index = existingWriters.size();
+      boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
+      // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
+      boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
+      boolean needEmptyFile = isInMajorRange || isLastWriter;
+      existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
+      hasAnyWriter |= needEmptyFile;
+      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
+          ? null : boundaries.get(existingWriters.size() + 1);
+    }
+
+    private void checkCanCreateWriter() throws IOException {
+      int maxWriterCount =  boundaries.size() - 1;
+      assert existingWriters.size() <= maxWriterCount;
+      if (existingWriters.size() >= maxWriterCount) {
+        throw new IOException("Cannot create any more writers (created " + existingWriters.size()
+            + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
+      }
+    }
+
+    private void stopUsingCurrentWriter() {
+      if (currentWriter != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
+              + "] row; wrote out " + kvsInCurrentWriter + " kvs");
+        }
+        kvsInCurrentWriter = 0;
+      }
+      currentWriter = null;
+      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
+          ? null : boundaries.get(existingWriters.size() + 1);
+    }
+  }
+
+  /**
+   * MultiWriter that separates the cells based on target cell number per file and file count.
+   * New file is started every time the target number of KVs is reached, unless the fixed
+   * count of writers has already been created (in that case all the remaining KVs go into
+   * the last writer).
+   */
+  public static class SizeMultiWriter extends StripeMultiFileWriter {
+    private int targetCount;
+    private long targetKvs;
+    private byte[] left;
+    private byte[] right;
+
+    private KeyValue lastKv;
+    private StoreFile.Writer currentWriter;
+    protected byte[] lastRowInCurrentWriter = null;
+    private long kvsInCurrentWriter = 0;
+    private long kvsSeen = 0;
+    private long kvsSeenInPrevious = 0;
+
+    /**
+     * @param targetCount The maximum count of writers that can be created.
+     * @param targetKvs The number of KVs to read from source before starting each new writer.
+     * @param left The left boundary of the first writer.
+     * @param right The right boundary of the last writer.
+     */
+    public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
+      super();
+      this.targetCount = targetCount;
+      this.targetKvs = targetKvs;
+      this.left = left;
+      this.right = right;
+      int preallocate = Math.min(this.targetCount, 64);
+      this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
+      this.boundaries = new ArrayList<byte[]>(preallocate + 1);
+    }
+
+    @Override
+    public void append(KeyValue kv) throws IOException {
+      // If we are waiting for opportunity to close and we started writing different row,
+      // discard the writer and stop waiting.
+      boolean doCreateWriter = false;
+      if (currentWriter == null) {
+        // First append ever, do a sanity check.
+        sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+        doCreateWriter = true;
+      } else if (lastRowInCurrentWriter != null
+          && !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+              lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
+              + "] row; wrote out "  + kvsInCurrentWriter + " kvs");
+        }
+        lastRowInCurrentWriter = null;
+        kvsInCurrentWriter = 0;
+        kvsSeenInPrevious += kvsSeen;
+        doCreateWriter = true;
+      }
+      if (doCreateWriter) {
+        byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow(); // make a copy
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
+        }
+        currentWriter = writerFactory.createWriter();
+        boundaries.add(boundary);
+        existingWriters.add(currentWriter);
+      }
+
+      currentWriter.append(kv);
+      lastKv = kv; // for the sanity check
+      ++kvsInCurrentWriter;
+      kvsSeen = kvsInCurrentWriter;
+      if (this.sourceScanner != null) {
+        kvsSeen = Math.max(kvsSeen,
+            this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
+      }
+
+      // If we are not already waiting for opportunity to close, start waiting if we can
+      // create any more writers and if the current one is too big.
+      if (lastRowInCurrentWriter == null
+          && existingWriters.size() < targetCount
+          && kvsSeen >= targetKvs) {
+        lastRowInCurrentWriter = kv.getRow(); // make a copy
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
+              lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
+              + kvsInCurrentWriter + " kvs");
+        }
+      }
+    }
+
+    @Override
+    protected void commitWritersInternal() throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Stopping with "  + kvsInCurrentWriter + " kvs in last writer" +
+            ((this.sourceScanner == null) ? "" : ("; observed estimated "
+                + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
+      }
+      if (lastKv != null) {
+        sanityCheckRight(
+            right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
+      }
+      this.boundaries.add(right);
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1536570&r1=1536569&r2=1536570&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Oct 29 00:40:22 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.CellOutputStream;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
@@ -97,6 +96,12 @@ public abstract class Compactor {
     public int maxTagsLength = 0;
   }
 
+  /**
+   * Extracts some details about the files to compact that are commonly needed by compactors.
+   * @param filesToCompact Files.
+   * @param calculatePutTs Whether earliest put TS is needed.
+   * @return The result.
+   */
   protected FileDetails getFileDetails(
       Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
     FileDetails fd = new FileDetails();
@@ -151,6 +156,11 @@ public abstract class Compactor {
     return fd;
   }
 
+  /**
+   * Creates file scanners for compaction.
+   * @param filesToCompact Files.
+   * @return Scanners.
+   */
   protected List<StoreFileScanner> createFileScanners(
       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
@@ -161,6 +171,14 @@ public abstract class Compactor {
     return store.getSmallestReadPoint();
   }
 
+  /**
+   * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
+   * @param request Compaction request.
+   * @param scanType Scan type.
+   * @param earliestPutTs Earliest put ts.
+   * @param scanners File scanners for compaction files.
+   * @return Scanner override by coprocessor; null if not overriding.
+   */
   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
     if (store.getCoprocessorHost() == null) return null;
@@ -168,13 +186,27 @@ public abstract class Compactor {
         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
   }
 
-  protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
+  /**
+   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
+   * @param request Compaction request.
+   * @param scanType Scan type.
+   * @param scanner The default scanner created for compaction.
+   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
+   */
+   protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
       ScanType scanType, InternalScanner scanner) throws IOException {
     if (store.getCoprocessorHost() == null) return scanner;
     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
   }
 
   @SuppressWarnings("deprecation")
+  /**
+   * Performs the compaction.
+   * @param scanner Where to read from.
+   * @param writer Where to write to.
+   * @param smallestReadPoint Smallest read point.
+   * @return Whether compaction ended; false if it was interrupted for some reason.
+   */
   protected boolean performCompaction(InternalScanner scanner,
       CellSink writer, long smallestReadPoint) throws IOException {
     int bytesWritten = 0;
@@ -213,12 +245,8 @@ public abstract class Compactor {
     return true;
   }
 
-  protected void abortWriter(final StoreFile.Writer writer) throws IOException {
-    writer.close();
-    store.getFileSystem().delete(writer.getPath(), false);
-  }
-
   /**
+   * @param store store
    * @param scanners Store file scanners.
    * @param scanType Scan type.
    * @param smallestReadPoint Smallest MVCC read point.
@@ -232,4 +260,20 @@ public abstract class Compactor {
     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
         scanType, smallestReadPoint, earliestPutTs);
   }
+
+  /**
+   * @param scanners Store file scanners.
+   * @param scanType Scan type.
+   * @param smallestReadPoint Smallest MVCC read point.
+   * @param earliestPutTs Earliest put across all files.
+   * @return A compaction scanner.
+   */
+  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+     long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+     byte[] dropDeletesToRow) throws IOException {
+    Scan scan = new Scan();
+    scan.setMaxVersions(store.getFamily().getMaxVersions());
+    return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
+        earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1536570&r1=1536569&r2=1536570&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Tue Oct 29 00:40:22 2013
@@ -75,7 +75,8 @@ public class DefaultCompactor extends Co
             fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
         boolean finished = performCompaction(scanner, writer, smallestReadPoint);
         if (!finished) {
-          abortWriter(writer);
+          writer.close();
+          store.getFileSystem().delete(writer.getPath(), false);
           writer = null;
           throw new InterruptedIOException( "Aborting compaction of store " + store +
               " in region " + store.getRegionInfo().getRegionNameAsString() +

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java?rev=1536570&r1=1536569&r2=1536570&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java Tue Oct 29 00:40:22 2013
@@ -380,7 +380,7 @@ public class StripeCompactionPolicy exte
      * @param compactor Compactor.
      * @return result of compact(...)
      */
-    public abstract List<Path> execute(StripeCompactor compactor);
+    public abstract List<Path> execute(StripeCompactor compactor) throws IOException;
 
     public StripeCompactionRequest(CompactionRequest request) {
       this.request = request;
@@ -431,7 +431,7 @@ public class StripeCompactionPolicy exte
     }
 
     @Override
-    public List<Path> execute(StripeCompactor compactor) {
+    public List<Path> execute(StripeCompactor compactor) throws IOException {
       return compactor.compact(
           this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
     }
@@ -481,7 +481,7 @@ public class StripeCompactionPolicy exte
     }
 
     @Override
-    public List<Path> execute(StripeCompactor compactor) {
+    public List<Path> execute(StripeCompactor compactor) throws IOException {
       return compactor.compact(this.request, this.targetCount, this.targetKvs,
           this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java?rev=1536570&r1=1536569&r2=1536570&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java Tue Oct 29 00:40:22 2013
@@ -17,30 +17,140 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * This is the placeholder for stripe compactor. The implementation,
  * as well as the proper javadoc, will be added in HBASE-7967.
  */
 public class StripeCompactor extends Compactor {
-
-  public StripeCompactor(Configuration conf, final Store store) {
+  private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
+  public StripeCompactor(Configuration conf, Store store) {
     super(conf, store);
   }
 
   public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
-      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
-    throw new NotImplementedException();
+      byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
+      for (byte[] tb : targetBoundaries) {
+        sb.append(" [").append(Bytes.toString(tb)).append("]");
+      }
+      LOG.debug(sb.toString());
+    }
+    StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
+        targetBoundaries, majorRangeFromRow, majorRangeToRow);
+    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
   }
 
   public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
-      byte[] left, byte[] right, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
-    throw new NotImplementedException();
+      byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing compaction with " + targetSize
+          + " target file size, no more than " + targetCount + " files, in ["
+          + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
+    }
+    StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
+        targetCount, targetSize, left, right);
+    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
+  }
+
+  private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
+      byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
+    final Collection<StoreFile> filesToCompact = request.getFiles();
+    final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
+    this.progress = new CompactionProgress(fd.maxKeyCount);
+
+    long smallestReadPoint = getSmallestReadPoint();
+    List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
+
+    boolean finished = false;
+    InternalScanner scanner = null;
+    try {
+      // Get scanner to use.
+      ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
+      scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners);
+      if (scanner == null) {
+        scanner = (majorRangeFromRow == null)
+            ? createScanner(store, scanners,
+                ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
+            : createScanner(store, scanners,
+                smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
+      }
+      scanner = postCreateCoprocScanner(request, coprocScanType, scanner);
+      if (scanner == null) {
+        // NULL scanner returned from coprocessor hooks means skip normal processing.
+        return new ArrayList<Path>();
+      }
+
+      // Create the writer factory for compactions.
+      final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
+      final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
+      StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
+        @Override
+        public Writer createWriter() throws IOException {
+          return store.createWriterInTmp(
+              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+        }
+      };
+
+      // Prepare multi-writer, and perform the compaction using scanner and writer.
+      // It is ok here if storeScanner is null.
+      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
+      mw.init(storeScanner, factory, store.getComparator());
+      finished = performCompaction(scanner, mw, smallestReadPoint);
+      if (!finished) {
+        throw new InterruptedIOException( "Aborting compaction of store " + store +
+            " in region " + store.getRegionInfo().getRegionNameAsString() +
+            " because it was interrupted.");
+      }
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (Throwable t) {
+          // Don't fail the compaction if this fails.
+          LOG.error("Failed to close scanner after compaction.", t);
+        }
+      }
+      if (!finished) {
+        for (Path leftoverFile : mw.abortWriters()) {
+          try {
+            store.getFileSystem().delete(leftoverFile, false);
+          } catch (Exception ex) {
+            LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
+          }
+        }
+      }
+    }
+
+    assert finished : "We should have exited the method on all error paths";
+    List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
+    assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
+    return newFiles;
   }
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java?rev=1536570&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java Tue Oct 29 00:40:22 2013
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+@Category(SmallTests.class)
+public class TestStripeCompactor {
+  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
+  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
+
+  private static final byte[] KEY_B = Bytes.toBytes("bbb");
+  private static final byte[] KEY_C = Bytes.toBytes("ccc");
+  private static final byte[] KEY_D = Bytes.toBytes("ddd");
+
+  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
+  private static final KeyValue KV_B = kvAfter(KEY_B);
+  private static final KeyValue KV_C = kvAfter(KEY_C);
+  private static final KeyValue KV_D = kvAfter(KEY_D);
+
+  private static KeyValue kvAfter(byte[] key) {
+    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
+  }
+
+  private static <T> T[] a(T... a) {
+    return a;
+  }
+
+  private static KeyValue[] e() {
+    return TestStripeCompactor.<KeyValue>a();
+  }
+
+  @Test
+  public void testBoundaryCompactions() throws Exception {
+    // General verification
+    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
+        a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
+  }
+
+  @Test
+  public void testBoundaryCompactionEmptyFiles() throws Exception {
+    // No empty file if there're already files.
+    verifyBoundaryCompaction(
+        a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
+    verifyBoundaryCompaction(a(KV_A, KV_C),
+        a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
+    // But should be created if there are no file.
+    verifyBoundaryCompaction(
+        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
+    // In major range if there's major range.
+    verifyBoundaryCompaction(
+        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
+    verifyBoundaryCompaction(
+        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
+    // Major range should have files regardless of KVs.
+    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
+        a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
+    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
+        a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
+
+  }
+
+  public static void verifyBoundaryCompaction(
+      KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
+    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
+  }
+
+  public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
+      KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
+          throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths =
+        sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
+    writers.verifyKvs(output, allFiles);
+    if (allFiles) {
+      assertEquals(output.length, paths.size());
+      writers.verifyBoundaries(boundaries);
+    }
+  }
+
+  @Test
+  public void testSizeCompactions() throws Exception {
+    // General verification with different sizes.
+    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
+    // Verify row boundaries are preserved.
+    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
+    // Too much data, count limits the number of files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A), a(KV_B, KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
+        new KeyValue[][] { a(KV_A, KV_B, KV_C) });
+    // Too little data/large count, no extra files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
+        a(a(KV_A, KV_B), a(KV_C, KV_D)));
+  }
+
+  public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
+      byte[] left, byte[] right, KeyValue[][] output) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths = sc.compact(
+        createDummyRequest(), targetCount, targetSize, left, right, null, null);
+    assertEquals(output.length, paths.size());
+    writers.verifyKvs(output, true);
+    List<byte[]> boundaries = new ArrayList<byte[]>();
+    boundaries.add(left);
+    for (int i = 1; i < output.length; ++i) {
+      boundaries.add(output[i][0].getRow());
+    }
+    boundaries.add(right);
+    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
+  }
+
+  private static StripeCompactor createCompactor(
+      StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    final Scanner scanner = new Scanner(input);
+
+    // Create store mock that is satisfactory for compactor.
+    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
+    ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
+    Store store = mock(Store.class);
+    when(store.getFamily()).thenReturn(col);
+    when(store.getScanInfo()).thenReturn(si);
+    when(store.areWritesEnabled()).thenReturn(true);
+    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
+    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
+    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
+        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.getComparator()).thenReturn(new KVComparator());
+
+    return new StripeCompactor(conf, store) {
+      @Override
+      protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+          long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+          byte[] dropDeletesToRow) throws IOException {
+        return scanner;
+      }
+
+      @Override
+      protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+          ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+        return scanner;
+      }
+    };
+  }
+
+  private static CompactionRequest createDummyRequest() throws Exception {
+    // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
+    // But compaction depends on everything under the sun, so stub everything with dummies.
+    StoreFile sf = mock(StoreFile.class);
+    StoreFile.Reader r = mock(StoreFile.Reader.class);
+    when(r.length()).thenReturn(1L);
+    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
+    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
+      .thenReturn(mock(StoreFileScanner.class));
+    when(sf.getReader()).thenReturn(r);
+    when(sf.createReader()).thenReturn(r);
+    return new CompactionRequest(Arrays.asList(sf));
+  }
+
+  private static class Scanner implements InternalScanner {
+    private final ArrayList<KeyValue> kvs;
+    public Scanner(KeyValue... kvs) {
+      this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+      if (kvs.isEmpty()) return false;
+      results.add(kvs.remove(0));
+      return !kvs.isEmpty();
+    }
+    @Override
+    public boolean next(List<Cell> result, int limit) throws IOException {
+      return next(result);
+    }
+    @Override
+    public void close() throws IOException {}
+  }
+
+  // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
+  private static class StoreFileWritersCapture implements Answer<StoreFile.Writer> {
+    public static class Writer {
+      public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+      public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+    }
+
+    private List<Writer> writers = new ArrayList<Writer>();
+
+    @Override
+    public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
+      final Writer realWriter = new Writer();
+      writers.add(realWriter);
+      StoreFile.Writer writer = mock(StoreFile.Writer.class);
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
+        }}).when(writer).append(any(KeyValue.class));
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          return realWriter.data.put((byte[])args[0], (byte[])args[1]);
+        }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
+      return writer;
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles) {
+      if (allFiles) {
+        assertEquals(kvss.length, writers.size());
+      }
+      int skippedWriters = 0;
+      for (int i = 0; i < kvss.length; ++i) {
+        KeyValue[] kvs = kvss[i];
+        if (kvs != null) {
+          Writer w = writers.get(i - skippedWriters);
+          assertNotNull(w.data.get(STRIPE_START_KEY));
+          assertNotNull(w.data.get(STRIPE_END_KEY));
+          assertEquals(kvs.length, w.kvs.size());
+          for (int j = 0; j < kvs.length; ++j) {
+            assertEquals(kvs[j], w.kvs.get(j));
+          }
+        } else {
+          assertFalse(allFiles);
+          ++skippedWriters;
+        }
+      }
+    }
+
+    public void verifyBoundaries(byte[][] boundaries) {
+      assertEquals(boundaries.length - 1, writers.size());
+      for (int i = 0; i < writers.size(); ++i) {
+        assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
+        assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
+      }
+    }
+  }
+}



Mime
View raw message