hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-15527 Refactor Compactor related classes
Date Sun, 10 Apr 2016 02:02:53 GMT
Repository: hbase
Updated Branches:
  refs/heads/master a4dcf5141 -> f7d44e929


HBASE-15527 Refactor Compactor related classes


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f7d44e92
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f7d44e92
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f7d44e92

Branch: refs/heads/master
Commit: f7d44e929fd2a8dac5f15c50c2eea1d448e92eb7
Parents: a4dcf51
Author: zhangduo <zhangduo@apache.org>
Authored: Sat Apr 9 16:18:08 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Sun Apr 10 09:26:28 2016 +0800

----------------------------------------------------------------------
 .../hbase/mob/DefaultMobStoreCompactor.java     |  76 +++++---
 .../AbstractMultiOutputCompactor.java           | 131 +++----------
 .../regionserver/compactions/Compactor.java     | 189 +++++++++++++++----
 .../compactions/DateTieredCompactor.java        |  33 ++--
 .../compactions/DefaultCompactor.java           | 171 +++++------------
 .../compactions/StripeCompactor.java            |  60 +++---
 6 files changed, 317 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 33eb7b9..fe640c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -42,11 +42,12 @@ import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -58,6 +59,45 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
   private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
   private long mobSizeThreshold;
   private HMobStore mobStore;
+
+  private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
+
+    @Override
+    public ScanType getScanType(CompactionRequest request) {
+      return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
+          : ScanType.COMPACT_DROP_DELETES;
+    }
+
+    @Override
+    public InternalScanner createScanner(List<StoreFileScanner> scanners,
+        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
+      Scan scan = new Scan();
+      scan.setMaxVersions(store.getFamily().getMaxVersions());
+      if (scanType == ScanType.COMPACT_DROP_DELETES) {
+        // In major compaction, we need to write the delete markers to del files, so we have
to
+        // retain the them in scanning.
+        scanType = ScanType.COMPACT_RETAIN_DELETES;
+        return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
+            scanType, smallestReadPoint, fd.earliestPutTs, true);
+      } else {
+        return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
+            scanType, smallestReadPoint, fd.earliestPutTs, false);
+      }
+    }
+  };
+
+  private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>()
{
+
+    @Override
+    public Writer createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+        boolean shouldDropBehind) throws IOException {
+      // make this writer with tags always because of possible new cells with tags.
+      return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
+        shouldDropBehind);
+    }
+  };
+
   public DefaultMobStoreCompactor(Configuration conf, Store store) {
     super(conf, store);
     // The mob cells reside in the mob-enabled column family which is held by HMobStore.
@@ -71,36 +111,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     mobSizeThreshold = store.getFamily().getMobThreshold();
   }
 
-  /**
-   * Creates a writer for a new file in a temporary directory.
-   * @param fd The file details.
-   * @param shouldDropBehind Should the writer drop behind.
-   * @return Writer for a new StoreFile in the tmp dir.
-   * @throws IOException
-   */
-  @Override
-  protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException
{
-    // make this writer with tags always because of possible new cells with tags.
-    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
-      true, true, true, shouldDropBehind);
-    return writer;
-  }
-
   @Override
-  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
-      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
-    Scan scan = new Scan();
-    scan.setMaxVersions(store.getFamily().getMaxVersions());
-    if (scanType == ScanType.COMPACT_DROP_DELETES) {
-      // In major compaction, we need to write the delete markers to del files, so we have
to
-      // retain the them in scanning.
-      scanType = ScanType.COMPACT_RETAIN_DELETES;
-      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
-          scanType, smallestReadPoint, earliestPutTs, true);
-    } else {
-      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
-          scanType, smallestReadPoint, earliestPutTs, false);
-    }
+  public List<Path> compact(CompactionRequest request, ThroughputController throughputController,
+      User user) throws IOException {
+    return compact(request, scannerFactory, writerFactory, throughputController, user);
   }
 
   // TODO refactor to take advantage of the throughput controller.
@@ -166,7 +180,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     boolean hasMore;
     Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
     byte[] fileName = null;
-    StoreFile.Writer mobFileWriter = null, delFileWriter = null;
+    Writer mobFileWriter = null, delFileWriter = null;
     long mobCells = 0, deleteMarkersCount = 0;
     Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
         store.getTableName().getName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 29d8561..ef39a6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -18,11 +18,6 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,16 +28,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
-import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
-import org.apache.hadoop.hbase.security.User;
-
-import com.google.common.io.Closeables;
 
 /**
  * Base class for implementing a Compactor which will generate multiple output files after
@@ -50,7 +38,7 @@ import com.google.common.io.Closeables;
  */
 @InterfaceAudience.Private
 public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
-    extends Compactor {
+    extends Compactor<T> {
 
   private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
 
@@ -58,104 +46,31 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
     super(conf, store);
   }
 
-  protected interface InternalScannerFactory {
-
-    ScanType getScanType(CompactionRequest request);
-
-    InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
-        FileDetails fd, long smallestReadPoint) throws IOException;
+  protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
+      final FileDetails fd, final boolean shouldDropBehind) {
+    WriterFactory writerFactory = new WriterFactory() {
+      @Override
+      public Writer createWriter() throws IOException {
+        return createTmpWriter(fd, shouldDropBehind);
+      }
+    };
+    // Prepare multi-writer, and perform the compaction using scanner and writer.
+    // It is ok here if storeScanner is null.
+    StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner
: null;
+    writer.init(storeScanner, writerFactory);
   }
 
-  protected List<Path> compact(T writer, final CompactionRequest request,
-      InternalScannerFactory scannerFactory, ThroughputController throughputController, User
user)
-          throws IOException {
-    final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
-    this.progress = new CompactionProgress(fd.maxKeyCount);
-
-    // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = getSmallestReadPoint();
-
-    List<StoreFileScanner> scanners;
-    Collection<StoreFile> readersToClose;
-    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
-      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
-      // HFiles, and their readers
-      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
-      for (StoreFile f : request.getFiles()) {
-        readersToClose.add(f.cloneForReader());
-      }
-      scanners = createFileScanners(readersToClose, smallestReadPoint,
-        store.throttleCompaction(request.getSize()));
-    } else {
-      readersToClose = Collections.emptyList();
-      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
-        store.throttleCompaction(request.getSize()));
-    }
-    InternalScanner scanner = null;
-    boolean finished = false;
-    try {
-      /* Include deletes, unless we are doing a major compaction */
-      ScanType scanType = scannerFactory.getScanType(request);
-      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
-      if (scanner == null) {
-        scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
-      }
-      scanner = postCreateCoprocScanner(request, scanType, scanner, user);
-      if (scanner == null) {
-        // NULL scanner returned from coprocessor hooks means skip normal processing.
-        return new ArrayList<Path>();
-      }
-      boolean cleanSeqId = false;
-      if (fd.minSeqIdToKeep > 0) {
-        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
-        cleanSeqId = true;
-      }
-      // Create the writer factory for compactions.
-      final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
-      WriterFactory writerFactory = new WriterFactory() {
-        @Override
-        public Writer createWriter() throws IOException {
-          return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
-            fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
-        }
-      };
-      // Prepare multi-writer, and perform the compaction using scanner and writer.
-      // It is ok here if storeScanner is null.
-      StoreScanner storeScanner
-        = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
-      writer.init(storeScanner, writerFactory);
-      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
-        throughputController, request.isAllFiles());
-      if (!finished) {
-        throw new InterruptedIOException("Aborting compaction of store " + store + " in region
"
-            + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
-      }
-    } finally {
-      Closeables.close(scanner, true);
-      for (StoreFile f : readersToClose) {
-        try {
-          f.closeReader(true);
-        } catch (IOException e) {
-          LOG.warn("Exception closing " + f, e);
-        }
-      }
-      if (!finished) {
-        FileSystem fs = store.getFileSystem();
-        for (Path leftoverFile : writer.abortWriters()) {
-          try {
-            fs.delete(leftoverFile, false);
-          } catch (IOException e) {
-            LOG.error("Failed to delete the leftover file " + leftoverFile
-                + " after an unfinished compaction.",
-              e);
-          }
-        }
+  @Override
+  protected void abortWriter(T writer) throws IOException {
+    FileSystem fs = store.getFileSystem();
+    for (Path leftoverFile : writer.abortWriters()) {
+      try {
+        fs.delete(leftoverFile, false);
+      } catch (IOException e) {
+        LOG.warn(
+          "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
+          e);
       }
     }
-    assert finished : "We should have exited the method on all error paths";
-    return commitMultiWriter(writer, fd, request);
   }
-
-  protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
-      CompactionRequest request) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 9125684..f32e60a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -22,12 +22,14 @@ import java.io.InterruptedIOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
@@ -55,15 +59,17 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
+import com.google.common.io.Closeables;
+
 /**
  * A compactor is a compaction algorithm associated a given policy. Base class also contains
  * reusable parts for implementing compactors (what is common and what isn't is evolving).
  */
 @InterfaceAudience.Private
-public abstract class Compactor {
+public abstract class Compactor<T extends CellSink> {
   private static final Log LOG = LogFactory.getLog(Compactor.class);
   private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
-  protected CompactionProgress progress;
+  protected volatile CompactionProgress progress;
   protected final Configuration conf;
   protected final Store store;
 
@@ -89,6 +95,11 @@ public abstract class Compactor {
     void append(Cell cell) throws IOException;
   }
 
+  protected interface CellSinkFactory<S> {
+    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
+        throws IOException;
+  }
+
   public CompactionProgress getProgress() {
     return this.progress;
   }
@@ -145,7 +156,7 @@ public abstract class Compactor {
       fd.maxKeyCount += keyCount;
       // calculate the latest MVCC readpoint in any of the involved store files
       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
-      byte tmp[] = null;
+      byte[] tmp = null;
       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
       // SeqId number.
       if (r.isBulkLoaded()) {
@@ -217,6 +228,120 @@ public abstract class Compactor {
     return store.getSmallestReadPoint();
   }
 
+  protected interface InternalScannerFactory {
+
+    ScanType getScanType(CompactionRequest request);
+
+    InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException;
+  }
+
+  protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory()
{
+
+    @Override
+    public ScanType getScanType(CompactionRequest request) {
+      return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
+          : ScanType.COMPACT_DROP_DELETES;
+    }
+
+    @Override
+    public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType
scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException {
+      return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+        fd.earliestPutTs);
+    }
+  };
+
+  /**
+   * Creates a writer for a new file in a temporary directory.
+   * @param fd The file details.
+   * @return Writer for a new StoreFile in the tmp dir.
+   * @throws IOException if creation failed
+   */
+  protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException
{
+    // When all MVCC readpoints are 0, don't write them.
+    // See HBASE-8166, HBASE-12600, and HBASE-13389.
+    return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+    /* isCompaction = */true,
+    /* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
+    /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
+  }
+
+  protected List<Path> compact(final CompactionRequest request,
+      InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
+      ThroughputController throughputController, User user) throws IOException {
+    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
+    this.progress = new CompactionProgress(fd.maxKeyCount);
+
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = getSmallestReadPoint();
+
+    List<StoreFileScanner> scanners;
+    Collection<StoreFile> readersToClose;
+    T writer = null;
+    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
+      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
+      // HFiles, and their readers
+      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
+      for (StoreFile f : request.getFiles()) {
+        readersToClose.add(f.cloneForReader());
+      }
+      scanners = createFileScanners(readersToClose, smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    } else {
+      readersToClose = Collections.emptyList();
+      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    }
+    InternalScanner scanner = null;
+    boolean finished = false;
+    try {
+      /* Include deletes, unless we are doing a major compaction */
+      ScanType scanType = scannerFactory.getScanType(request);
+      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
+      if (scanner == null) {
+        scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
+      }
+      scanner = postCreateCoprocScanner(request, scanType, scanner, user);
+      if (scanner == null) {
+        // NULL scanner returned from coprocessor hooks means skip normal processing.
+        return new ArrayList<Path>();
+      }
+      boolean cleanSeqId = false;
+      if (fd.minSeqIdToKeep > 0) {
+        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+        cleanSeqId = true;
+      }
+      writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
+      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+        throughputController, request.isAllFiles());
+      if (!finished) {
+        throw new InterruptedIOException("Aborting compaction of store " + store + " in region
"
+            + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
+      }
+    } finally {
+      Closeables.close(scanner, true);
+      for (StoreFile f : readersToClose) {
+        try {
+          f.closeReader(true);
+        } catch (IOException e) {
+          LOG.warn("Exception closing " + f, e);
+        }
+      }
+      if (!finished && writer != null) {
+        abortWriter(writer);
+      }
+    }
+    assert finished : "We should have exited the method on all error paths";
+    assert writer != null : "Writer should be non-null if no error";
+    return commitWriter(writer, fd, request);
+  }
+
+  protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest
request)
+      throws IOException;
+
+  protected abstract void abortWriter(T writer) throws IOException;
+
   /**
    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
    * @param request Compaction request.
@@ -233,7 +358,9 @@ public abstract class Compactor {
   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
       final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner>
scanners,
       User user) throws IOException {
-    if (store.getCoprocessorHost() == null) return null;
+    if (store.getCoprocessorHost() == null) {
+      return null;
+    }
     if (user == null) {
       return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
         earliestPutTs, request);
@@ -261,25 +388,27 @@ public abstract class Compactor {
    * @param scanner The default scanner created for compaction.
    * @return Scanner scanner to use (usually the default); null if compaction should not
proceed.
    */
-   protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
+  protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
       final ScanType scanType, final InternalScanner scanner, User user) throws IOException
{
-     if (store.getCoprocessorHost() == null) return scanner;
-     if (user == null) {
-       return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
-     } else {
-       try {
-         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>()
{
-           @Override
-           public InternalScanner run() throws Exception {
-             return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
-           }
-         });
-       } catch (InterruptedException ie) {
-         InterruptedIOException iioe = new InterruptedIOException();
-         iioe.initCause(ie);
-         throw iioe;
-       }
-     }
+    if (store.getCoprocessorHost() == null) {
+      return scanner;
+    }
+    if (user == null) {
+      return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
+    } else {
+      try {
+        return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>()
{
+          @Override
+          public InternalScanner run() throws Exception {
+            return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
+          }
+        });
+      } catch (InterruptedException ie) {
+        InterruptedIOException iioe = new InterruptedIOException();
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    }
   }
 
   /**
@@ -288,7 +417,8 @@ public abstract class Compactor {
    * @param scanner Where to read from.
    * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
-   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
smallestReadPoint
+   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
+   *          smallestReadPoint
    * @param major Is a major compaction.
    * @return Whether compaction ended; false if it was interrupted for some reason.
    */
@@ -421,17 +551,4 @@ public abstract class Compactor {
     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
   }
-
-  /**
-   * Appends the metadata and closes the writer.
-   * @param writer The current store writer.
-   * @param fd The file details.
-   * @param isMajor Is a major compaction.
-   * @throws IOException
-   */
-  protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
-      boolean isMajor) throws IOException {
-    writer.appendMetadata(fd.maxSeqId, isMajor);
-    writer.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index 413b29c..b1203c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -27,10 +27,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
@@ -52,34 +50,29 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
     return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
   }
 
-  public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
+  public List<Path> compact(final CompactionRequest request, final List<Long>
lowerBoundaries,
       ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Executing compaction with " + lowerBoundaries.size()
           + "windows, lower boundaries: " + lowerBoundaries);
     }
 
-    DateTieredMultiFileWriter writer =
-        new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
-    return compact(writer, request, new InternalScannerFactory() {
+    return compact(request, defaultScannerFactory,
+      new CellSinkFactory<DateTieredMultiFileWriter>() {
 
-      @Override
-      public ScanType getScanType(CompactionRequest request) {
-        return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
-            : ScanType.COMPACT_DROP_DELETES;
-      }
-
-      @Override
-      public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType
scanType,
-          FileDetails fd, long smallestReadPoint) throws IOException {
-        return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
-          fd.earliestPutTs);
-      }
-    }, throughputController, user);
+        @Override
+        public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails
fd,
+            boolean shouldDropBehind) throws IOException {
+          DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
+              needEmptyFile(request));
+          initMultiWriter(writer, scanner, fd, shouldDropBehind);
+          return writer;
+        }
+      }, throughputController, user);
   }
 
   @Override
-  protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails
fd,
+  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
       CompactionRequest request) throws IOException {
     return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 22a45b1..9759d2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -18,166 +18,59 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
+import com.google.common.collect.Lists;
+
 /**
  * Compact passed set of files. Create an instance and then call
  * {@link #compact(CompactionRequest, ThroughputController, User)}
  */
 @InterfaceAudience.Private
-public class DefaultCompactor extends Compactor {
+public class DefaultCompactor extends Compactor<Writer> {
   private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
 
   public DefaultCompactor(final Configuration conf, final Store store) {
     super(conf, store);
   }
 
-  /**
-   * Do a minor/major compaction on an explicit set of storefiles from a Store.
-   */
-  public List<Path> compact(final CompactionRequest request,
-      ThroughputController throughputController, User user) throws IOException {
-    FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
-    this.progress = new CompactionProgress(fd.maxKeyCount);
-
-    // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = getSmallestReadPoint();
+  private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>()
{
 
-    List<StoreFileScanner> scanners;
-    Collection<StoreFile> readersToClose;
-    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
-      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
-      // HFiles, and their readers
-      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
-      for (StoreFile f : request.getFiles()) {
-        readersToClose.add(f.cloneForReader());
-      }
-      scanners = createFileScanners(readersToClose, smallestReadPoint,
-          store.throttleCompaction(request.getSize()));
-    } else {
-      readersToClose = Collections.emptyList();
-      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
-          store.throttleCompaction(request.getSize()));
+    @Override
+    public Writer createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+        boolean shouldDropBehind) throws IOException {
+      return createTmpWriter(fd, shouldDropBehind);
     }
-
-    StoreFile.Writer writer = null;
-    List<Path> newFiles = new ArrayList<Path>();
-    boolean cleanSeqId = false;
-    IOException e = null;
-    try {
-      InternalScanner scanner = null;
-      try {
-        /* Include deletes, unless we are doing a compaction of all files */
-          ScanType scanType =
-                  request.isRetainDeleteMarkers() ?
-                          ScanType.COMPACT_RETAIN_DELETES :
-                          ScanType.COMPACT_DROP_DELETES;
-        scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
-        if (scanner == null) {
-          scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
-        }
-        scanner = postCreateCoprocScanner(request, scanType, scanner, user);
-        if (scanner == null) {
-          // NULL scanner returned from coprocessor hooks means skip normal processing.
-          return newFiles;
-        }
-        // Create the writer even if no kv(Empty store file is also ok),
-        // because we need record the max seq id for the store file, see HBASE-6059
-        if(fd.minSeqIdToKeep > 0) {
-          smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
-          cleanSeqId = true;
-        }
-
-
-        writer = createTmpWriter(fd, store.throttleCompaction(request.getSize()));
-        boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
-          throughputController, request.isAllFiles());
-
-        if (!finished) {
-          writer.close();
-          store.getFileSystem().delete(writer.getPath(), false);
-          writer = null;
-          throw new InterruptedIOException("Aborting compaction of store " + store +
-              " in region " + store.getRegionInfo().getRegionNameAsString() +
-              " because it was interrupted.");
-         }
-       } finally {
-         if (scanner != null) {
-           scanner.close();
-         }
-      }
-    } catch (IOException ioe) {
-      e = ioe;
-      // Throw the exception
-      throw ioe;
-    }
-    finally {
-      try {
-        if (writer != null) {
-          if (e != null) {
-            writer.close();
-          } else {
-            writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
-            writer.close();
-            newFiles.add(writer.getPath());
-          }
-        }
-      } finally {
-        for (StoreFile f : readersToClose) {
-          try {
-            f.closeReader(true);
-          } catch (IOException ioe) {
-            LOG.warn("Exception closing " + f, ioe);
-          }
-        }
-      }
-    }
-    return newFiles;
-  }
+  };
 
   /**
-   * Creates a writer for a new file in a temporary directory.
-   * @param fd The file details.
-   * @return Writer for a new StoreFile in the tmp dir.
-   * @throws IOException
+   * Do a minor/major compaction on an explicit set of storefiles from a Store.
    */
-  protected StoreFile.Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind)
-    throws IOException {
-
-      // When all MVCC readpoints are 0, don't write them.
-      // See HBASE-8166, HBASE-12600, and HBASE-13389.
-
-      return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
-            /* isCompaction = */ true,
-            /* includeMVCCReadpoint = */ fd.maxMVCCReadpoint > 0,
-            /* includesTags = */ fd.maxTagsLength > 0,
-            /* shouldDropBehind = */ shouldDropBehind);
+  public List<Path> compact(final CompactionRequest request,
+      ThroughputController throughputController, User user) throws IOException {
+    return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
   }
 
-
   /**
    * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass
to
    * {@link #compact(CompactionRequest, ThroughputController, User)};
-   * @param filesToCompact the files to compact. These are used as the compactionSelection
for
-   *          the generated {@link CompactionRequest}.
+   * @param filesToCompact the files to compact. These are used as the compactionSelection
for the
+   *          generated {@link CompactionRequest}.
    * @param isMajor true to major compact (prune all deletes, max versions, etc)
    * @return Product of compaction or an empty list if all cells expired or deleted and nothing
\
    *         made it through the compaction.
@@ -187,6 +80,32 @@ public class DefaultCompactor extends Compactor {
       throws IOException {
     CompactionRequest cr = new CompactionRequest(filesToCompact);
     cr.setIsMajor(isMajor, isMajor);
-    return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
+    return compact(cr, NoLimitThroughputController.INSTANCE, null);
+  }
+
+  @Override
+  protected List<Path> commitWriter(Writer writer, FileDetails fd,
+      CompactionRequest request) throws IOException {
+    List<Path> newFiles = Lists.newArrayList(writer.getPath());
+    writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
+    writer.close();
+    return newFiles;
+  }
+
+  @Override
+  protected void abortWriter(Writer writer) throws IOException {
+    Path leftoverFile = writer.getPath();
+    try {
+      writer.close();
+    } catch (IOException e) {
+      LOG.warn("Failed to close the writer after an unfinished compaction.", e);
+    }
+    try {
+      store.getFileSystem().delete(leftoverFile, false);
+    } catch (IOException e) {
+      LOG.warn(
+        "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
+        e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d44e92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 1364ce0..5e796ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -68,15 +68,17 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
     @Override
     public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType
scanType,
         FileDetails fd, long smallestReadPoint) throws IOException {
-      return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
-        scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
-        scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
+      return (majorRangeFromRow == null)
+          ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+            fd.earliestPutTs)
+          : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
+            majorRangeFromRow, majorRangeToRow);
     }
   }
 
-  public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
-      byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
-      User user) throws IOException {
+  public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries,
+      final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
+      ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       StringBuilder sb = new StringBuilder();
       sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@@ -85,30 +87,44 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
       }
       LOG.debug(sb.toString());
     }
-    StripeMultiFileWriter writer =
-        new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
-            majorRangeFromRow, majorRangeToRow);
-    return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
-        majorRangeToRow), throughputController, user);
+    return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
+      new CellSinkFactory<StripeMultiFileWriter>() {
+
+        @Override
+        public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
+            boolean shouldDropBehind) throws IOException {
+          StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
+              store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
+          initMultiWriter(writer, scanner, fd, shouldDropBehind);
+          return writer;
+        }
+      }, throughputController, user);
   }
 
-  public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
-      byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
+  public List<Path> compact(CompactionRequest request, final int targetCount, final
long targetSize,
+      final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
       ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing compaction with " + targetSize + " target file size, no more than
"
-          + targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
-          + "] range");
+      LOG.debug(
+        "Executing compaction with " + targetSize + " target file size, no more than " +
targetCount
+            + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + "]
range");
     }
-    StripeMultiFileWriter writer =
-        new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
-            left, right);
-    return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
-        majorRangeToRow), throughputController, user);
+    return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
+      new CellSinkFactory<StripeMultiFileWriter>() {
+
+        @Override
+        public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
+            boolean shouldDropBehind) throws IOException {
+          StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
+              store.getComparator(), targetCount, targetSize, left, right);
+          initMultiWriter(writer, scanner, fd, shouldDropBehind);
+          return writer;
+        }
+      }, throughputController, user);
   }
 
   @Override
-  protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails
fd,
+  protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
       CompactionRequest request) throws IOException {
     List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";


Mime
View raw message