flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2401. Optionally compress backup checkpoint.
Date Wed, 09 Jul 2014 00:21:49 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk f15f20785 -> 69fd6b3ad


FLUME-2401. Optionally compress backup checkpoint.

(Abraham Fine via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/69fd6b3a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/69fd6b3a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/69fd6b3a

Branch: refs/heads/trunk
Commit: 69fd6b3ad5e5b9ae6f1293b3d8e57ed57fd6701c
Parents: f15f207
Author: Hari Shreedharan <harishreedharan@gmail.com>
Authored: Tue Jul 8 17:20:36 2014 -0700
Committer: Hari Shreedharan <harishreedharan@gmail.com>
Committed: Tue Jul 8 17:20:36 2014 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-file-channel/pom.xml    |   5 +
 .../file/EventQueueBackingStoreFactory.java     |  15 +-
 .../file/EventQueueBackingStoreFile.java        |  28 +++-
 .../file/EventQueueBackingStoreFileV3.java      |   8 +-
 .../apache/flume/channel/file/FileChannel.java  |   7 +
 .../channel/file/FileChannelConfiguration.java  |   5 +
 .../java/org/apache/flume/channel/file/Log.java |  31 ++--
 .../flume/channel/file/Serialization.java       | 146 ++++++++++++++++++-
 .../flume/channel/file/TestFileChannelBase.java |   5 +
 .../channel/file/TestFileChannelRestart.java    |  98 ++++++++++++-
 pom.xml                                         |   6 +
 11 files changed, 323 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml
index 3113938..7b8114c 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -107,6 +107,11 @@
       <artifactId>mapdb</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index 07a3781..456df34 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -38,11 +38,12 @@ class EventQueueBackingStoreFactory {
 
   static EventQueueBackingStore get(File checkpointFile, int capacity,
       String name, boolean upgrade) throws Exception {
-    return get(checkpointFile, null, capacity, name, upgrade, false);
+    return get(checkpointFile, null, capacity, name, upgrade, false, false);
   }
   static EventQueueBackingStore get(File checkpointFile,
       File backupCheckpointDir, int capacity,String name,
-      boolean upgrade, boolean shouldBackup) throws Exception {
+      boolean upgrade, boolean shouldBackup, boolean compressBackup)
+      throws Exception {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     RandomAccessFile checkpointFileHandle = null;
     try {
@@ -68,19 +69,19 @@ class EventQueueBackingStoreFactory {
           throw new IOException("Cannot create " + checkpointFile);
         }
         return new EventQueueBackingStoreFileV3(checkpointFile,
-            capacity, name, backupCheckpointDir, shouldBackup);
+            capacity, name, backupCheckpointDir, shouldBackup, compressBackup);
       }
       // v3 due to meta file, version will be checked by backing store
       if(metaDataExists) {
         return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
-          name, backupCheckpointDir, shouldBackup);
+          name, backupCheckpointDir, shouldBackup, compressBackup);
       }
       checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
       int version = (int)checkpointFileHandle.readLong();
       if(Serialization.VERSION_2 == version) {
         if(upgrade) {
           return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
-            shouldBackup);
+            shouldBackup, compressBackup);
         }
         return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
       }
@@ -101,7 +102,7 @@ class EventQueueBackingStoreFactory {
 
   private static EventQueueBackingStore upgrade(File checkpointFile,
     int capacity, String name, File backupCheckpointDir,
-    boolean shouldBackup)
+    boolean shouldBackup, boolean compressBackup)
           throws Exception {
     LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
     EventQueueBackingStoreFileV2 backingStoreV2 =
@@ -114,7 +115,7 @@ class EventQueueBackingStoreFactory {
     EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
         metaDataFile);
     return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
-      backupCheckpointDir, shouldBackup);
+      backupCheckpointDir, shouldBackup, compressBackup);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 113dcd2..2b0987b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -56,6 +56,8 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
   protected static final int CHECKPOINT_COMPLETE = 0;
   protected static final int CHECKPOINT_INCOMPLETE = 1;
 
+  protected static final String COMPRESSED_FILE_EXTENSION = ".snappy";
+
   protected LongBuffer elementsBuffer;
   protected final Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
   protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap();
@@ -64,22 +66,24 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
   protected final File checkpointFile;
   private final Semaphore backupCompletedSema = new Semaphore(1);
   protected final boolean shouldBackup;
+  protected final boolean compressBackup;
   private final File backupDir;
   private final ExecutorService checkpointBackUpExecutor;
 
   protected EventQueueBackingStoreFile(int capacity, String name,
       File checkpointFile) throws IOException,
       BadCheckpointException {
-    this(capacity, name, checkpointFile, null, false);
+    this(capacity, name, checkpointFile, null, false, false);
   }
 
   protected EventQueueBackingStoreFile(int capacity, String name,
       File checkpointFile, File checkpointBackupDir,
-      boolean backupCheckpoint) throws IOException,
-      BadCheckpointException {
+      boolean backupCheckpoint, boolean compressBackup)
+    throws IOException, BadCheckpointException {
     super(capacity, name);
     this.checkpointFile = checkpointFile;
     this.shouldBackup = backupCheckpoint;
+    this.compressBackup = compressBackup;
     this.backupDir = checkpointBackupDir;
     checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
     long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
@@ -169,8 +173,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
       if(Log.EXCLUDES.contains(origFile.getName())) {
         continue;
       }
-      Serialization.copyFile(origFile, new File(backupDirectory,
-        origFile.getName()));
+      if (compressBackup && origFile.equals(checkpointFile)) {
+        Serialization.compressFile(origFile, new File(backupDirectory,
+          origFile.getName() + COMPRESSED_FILE_EXTENSION));
+      } else {
+        Serialization.copyFile(origFile, new File(backupDirectory,
+          origFile.getName()));
+      }
     }
     Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
       "while it is not supposed to. Are multiple channels configured to use " +
@@ -202,7 +211,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
         String fileName = backupFile.getName();
         if (!fileName.equals(BACKUP_COMPLETE_FILENAME) &&
           !fileName.equals(Log.FILE_LOCK)) {
-          Serialization.copyFile(backupFile, new File(checkpointDir, fileName));
+          if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)){
+            Serialization.decompressFile(
+              backupFile, new File(checkpointDir,
+              fileName.substring(0, fileName.lastIndexOf("."))));
+          } else {
+            Serialization.copyFile(backupFile, new File(checkpointDir,
+              fileName));
+          }
         }
       }
       return true;

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index c153558..9dfa0d1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -40,13 +40,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile
{
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
       String name) throws IOException, BadCheckpointException {
-    this(checkpointFile, capacity, name, null, false);
+    this(checkpointFile, capacity, name, null, false, false);
   }
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
       String name, File checkpointBackupDir,
-      boolean backupCheckpoint) throws IOException, BadCheckpointException {
-    super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint);
+      boolean backupCheckpoint, boolean compressBackup)
+      throws IOException, BadCheckpointException {
+    super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint,
+      compressBackup);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
     metaDataFile = Serialization.getMetaDataFile(checkpointFile);

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 0f242d2..413bfbc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -95,6 +95,7 @@ public class FileChannel extends BasicChannelSemantics {
   private String encryptionActiveKey;
   private String encryptionCipherProvider;
   private boolean useDualCheckpoints;
+  private boolean compressBackupCheckpoint;
   private boolean fsyncPerTransaction;
   private int fsyncInterval;
 
@@ -110,6 +111,11 @@ public class FileChannel extends BasicChannelSemantics {
     useDualCheckpoints = context.getBoolean(
         FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
         FileChannelConfiguration.DEFAULT_USE_DUAL_CHECKPOINTS);
+
+    compressBackupCheckpoint = context.getBoolean(
+        FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
+        FileChannelConfiguration.DEFAULT_COMPRESS_BACKUP_CHECKPOINT);
+
     String homePath = System.getProperty("user.home").replace('\\', '/');
 
     String strCheckpointDir =
@@ -272,6 +278,7 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setEncryptionKeyAlias(encryptionActiveKey);
       builder.setEncryptionCipherProvider(encryptionCipherProvider);
       builder.setUseDualCheckpoints(useDualCheckpoints);
+      builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
       builder.setBackupCheckpointDir(backupCheckpointDir);
       builder.setFsyncPerTransaction(fsyncPerTransaction);
       builder.setFsyncInterval(fsyncInterval);

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 87dc653..f8c0378 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -87,6 +87,11 @@ public class FileChannelConfiguration {
   public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
   public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
 
+  public static final String COMPRESS_BACKUP_CHECKPOINT =
+    "compressBackupCheckpoint";
+  public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT
+    = false;
+
   public static final String FSYNC_PER_TXN = "fsyncPerTransaction";
   public static final boolean DEFAULT_FSYNC_PRE_TXN = true;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 5bac0f4..5b581e1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -123,6 +123,7 @@ public class Log {
   private boolean didFastReplay = false;
   private boolean didFullReplayDueToBadCheckpointException = false;
   private final boolean useDualCheckpoints;
+  private final boolean compressBackupCheckpoint;
   private volatile boolean backupRestored = false;
 
   private final boolean fsyncPerTransaction;
@@ -151,6 +152,7 @@ public class Log {
     private String bEncryptionCipherProvider;
     private long bUsableSpaceRefreshInterval = 15L * 1000L;
     private boolean bUseDualCheckpoints = false;
+    private boolean bCompressBackupCheckpoint = false;
     private File bBackupCheckpointDir = null;
 
     private boolean fsyncPerTransaction = true;
@@ -242,6 +244,11 @@ public class Log {
       return this;
     }
 
+    Builder setCompressBackupCheckpoint(boolean compressBackupCheckpoint) {
+      this.bCompressBackupCheckpoint = compressBackupCheckpoint;
+      return this;
+    }
+
     Builder setBackupCheckpointDir(File backupCheckpointDir) {
       this.bBackupCheckpointDir = backupCheckpointDir;
       return this;
@@ -249,16 +256,17 @@ public class Log {
 
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-        bUseDualCheckpoints, bCheckpointDir, bBackupCheckpointDir, bName,
-        useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
-        bEncryptionKeyProvider, bEncryptionKeyAlias,
+        bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir,
+        bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
+        bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
         bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
         fsyncPerTransaction, fsyncInterval, bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-    boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
+    boolean useDualCheckpoints, boolean compressBackupCheckpoint,
+    File checkpointDir, File backupCheckpointDir,
     String name, boolean useLogReplayV1, boolean useFastReplay,
     long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
     @Nullable String encryptionKeyAlias,
@@ -338,6 +346,7 @@ public class Log {
     this.maxFileSize = maxFileSize;
     this.queueCapacity = queueCapacity;
     this.useDualCheckpoints = useDualCheckpoints;
+    this.compressBackupCheckpoint = compressBackupCheckpoint;
     this.checkpointDir = checkpointDir;
     this.backupCheckpointDir = backupCheckpointDir;
     this.logDirs = logDirs;
@@ -415,9 +424,10 @@ public class Log {
 
       try {
         backingStore =
-            EventQueueBackingStoreFactory.get(checkpointFile,
-                backupCheckpointDir, queueCapacity, channelNameDescriptor,
-                true, this.useDualCheckpoints);
+          EventQueueBackingStoreFactory.get(checkpointFile,
+            backupCheckpointDir, queueCapacity, channelNameDescriptor,
+            true, this.useDualCheckpoints,
+            this.compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
                 inflightPutsFile, queueSetDir);
         LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
@@ -451,9 +461,10 @@ public class Log {
                 "directory to recover from a corrupt or incomplete checkpoint");
           }
         }
-        backingStore = EventQueueBackingStoreFactory.get(checkpointFile,
-            backupCheckpointDir,
-            queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
+        backingStore = EventQueueBackingStoreFactory.get(
+          checkpointFile, backupCheckpointDir, queueCapacity,
+          channelNameDescriptor, true, useDualCheckpoints,
+          compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
                 inflightPutsFile, queueSetDir);
         // If the checkpoint was deleted due to BadCheckpointException, then

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index d55660d..a6eda75 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -25,11 +25,14 @@ import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
 
 import javax.annotation.Nullable;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Collections;
@@ -52,8 +55,8 @@ public class Serialization {
   public static final String OLD_METADATA_FILENAME = METADATA_FILENAME +
     ".old";
 
-  // 64 K buffer to copy files.
-  private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
+  // 64 K buffer to copy and compress files.
+  private static final int FILE_BUFFER_SIZE = 64 * 1024;
 
   public static final Logger LOG = LoggerFactory.getLogger(Serialization.class);
 
@@ -140,7 +143,7 @@ public class Serialization {
     try {
       in = new BufferedInputStream(new FileInputStream(from));
       out = new RandomAccessFile(to, "rw");
-      byte[] buf = new byte[FILE_COPY_BUFFER_SIZE];
+      byte[] buf = new byte[FILE_BUFFER_SIZE];
       int total = 0;
       while(true) {
         int read = in.read(buf);
@@ -184,4 +187,141 @@ public class Serialization {
     throw new IOException("Copying file: " + from.toString() + " to: " + to
       .toString() + " may have failed.");
   }
+
+  /**
+   * Compress file using Snappy
+   * @param uncompressed File to compress - this file should exist
+   * @param compressed Compressed file - this file should not exist
+   * @return true if compression was successful
+   */
+  public static boolean compressFile(File uncompressed, File compressed)
+    throws IOException {
+    Preconditions.checkNotNull(uncompressed,
+      "Source file is null, compression failed.");
+    Preconditions.checkNotNull(compressed,
+      "Destination file is null, compression failed.");
+    Preconditions.checkState(uncompressed.exists(), "Source file: " +
+      uncompressed.toString() + " does not exist.");
+    Preconditions.checkState(!compressed.exists(),
+      "Compressed file: " + compressed.toString() + " unexpectedly " +
+        "exists.");
+
+    BufferedInputStream in = null;
+    FileOutputStream out = null;
+    SnappyOutputStream snappyOut = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(uncompressed));
+      out = new FileOutputStream(compressed);
+      snappyOut = new SnappyOutputStream(out);
+
+      byte[] buf = new byte[FILE_BUFFER_SIZE];
+      while(true) {
+        int read = in.read(buf);
+        if (read == -1) {
+          break;
+        }
+        snappyOut.write(buf, 0, read);
+      }
+      out.getFD().sync();
+      return true;
+    } catch (Exception ex) {
+      LOG.error("Error while attempting to compress " +
+        uncompressed.toString() + " to " + compressed.toString()
+        + ".", ex);
+      Throwables.propagate(ex);
+    } finally {
+      Throwable th = null;
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while closing input file.", ex);
+        th = ex;
+      }
+      try {
+        if (snappyOut != null) {
+          snappyOut.close();
+        }
+      } catch (IOException ex) {
+        LOG.error("Error while closing output file.", ex);
+        Throwables.propagate(ex);
+      }
+      if (th != null) {
+        Throwables.propagate(th);
+      }
+    }
+    // Should never reach here.
+    throw new IOException("Copying file: " + uncompressed.toString()
+      + " to: " + compressed.toString() + " may have failed.");
+  }
+
+  /**
+   * Decompress file using Snappy
+   * @param compressed File to compress - this file should exist
+   * @param decompressed Compressed file - this file should not exist
+   * @return true if decompression was successful
+   */
+  public static boolean decompressFile(File compressed, File decompressed)
+    throws IOException {
+    Preconditions.checkNotNull(compressed,
+      "Source file is null, decompression failed.");
+    Preconditions.checkNotNull(decompressed, "Destination file is " +
+      "null, decompression failed.");
+    Preconditions.checkState(compressed.exists(), "Source file: " +
+      compressed.toString() + " does not exist.");
+    Preconditions.checkState(!decompressed.exists(),
+      "Decompressed file: " + decompressed.toString() +
+        " unexpectedly exists.");
+
+    BufferedInputStream in = null;
+    SnappyInputStream snappyIn = null;
+    FileOutputStream out = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(compressed));
+      snappyIn = new SnappyInputStream(in);
+      out = new FileOutputStream(decompressed);
+
+      byte[] buf = new byte[FILE_BUFFER_SIZE];
+      while(true) {
+        int read = snappyIn.read(buf);
+        if (read == -1) {
+          break;
+        }
+        out.write(buf, 0, read);
+      }
+      out.getFD().sync();
+      return true;
+    } catch (Exception ex) {
+      LOG.error("Error while attempting to compress " +
+        compressed.toString() + " to " + decompressed.toString() +
+        ".", ex);
+      Throwables.propagate(ex);
+    } finally {
+      Throwable th = null;
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while closing input file.", ex);
+        th = ex;
+      }
+      try {
+        if (snappyIn != null) {
+          snappyIn.close();
+        }
+      } catch (IOException ex) {
+        LOG.error("Error while closing output file.", ex);
+        Throwables.propagate(ex);
+      }
+      if (th != null) {
+        Throwables.propagate(th);
+      }
+    }
+    // Should never reach here.
+    throw new IOException("Decompressing file: " +
+      compressed.toString() + " to: " + decompressed.toString() +
+      " may have failed.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
index 1ee5320..9901b69 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -38,12 +38,17 @@ public class TestFileChannelBase {
   protected File[] dataDirs;
   protected String dataDir;
   protected File backupDir;
+  protected File uncompressedBackupCheckpoint;
+  protected File compressedBackupCheckpoint;
 
   @Before
   public void setup() throws Exception {
     baseDir = Files.createTempDir();
     checkpointDir = new File(baseDir, "chkpt");
     backupDir = new File(baseDir, "backup");
+    uncompressedBackupCheckpoint = new File(backupDir, "checkpoint");
+    compressedBackupCheckpoint = new File(backupDir,
+      "checkpoint.snappy");
     Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
     Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory());
     dataDirs = new File[3];

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index d16f3d5..0c6afc4 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -693,13 +693,27 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   // Make sure the entire channel was not replayed, only the events from the
   // backup.
   @Test
-  public void testBackupUsedEnsureNoFullReplay() throws Exception {
+  public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws
+    Exception {
+    testBackupUsedEnsureNoFullReplay(false);
+  }
+  @Test
+  public void testBackupUsedEnsureNoFullReplayWithCompression() throws
+    Exception {
+    testBackupUsedEnsureNoFullReplay(true);
+  }
+
+  private void testBackupUsedEnsureNoFullReplay(boolean compressedBackup)
+    throws Exception {
     File dataDir = Files.createTempDir();
     File tempBackup = Files.createTempDir();
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.DATA_DIRS,
       dataDir.getAbsolutePath());
-    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+      "true");
+    overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
+      String.valueOf(compressedBackup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -831,6 +845,86 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     }
   }
 
+  @Test
+  public void testCompressBackup() throws Throwable {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+      "true");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+    overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
+      "true");
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    putEvents(channel, "restart", 10, 100);
+    forceCheckpoint(channel);
+
+    //Wait for the backup checkpoint
+    Thread.sleep(2000);
+
+    Assert.assertTrue(compressedBackupCheckpoint.exists());
+
+    Serialization.decompressFile(compressedBackupCheckpoint,
+      uncompressedBackupCheckpoint);
+
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    Assert.assertTrue(FileUtils.contentEquals(checkpoint,
+      uncompressedBackupCheckpoint));
+
+    channel.stop();
+  }
+
+  @Test
+  public void testToggleCheckpointCompressionFromTrueToFalse()
+    throws Exception {
+    restartToggleCompression(true);
+  }
+
+  @Test
+  public void testToggleCheckpointCompressionFromFalseToTrue()
+    throws Exception {
+    restartToggleCompression(false);
+  }
+
+  public void restartToggleCompression(boolean originalCheckpointCompressed)
+    throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+      "true");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+    overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
+      String.valueOf(originalCheckpointCompressed));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = fillChannel(channel, "restart");
+    forceCheckpoint(channel);
+    Thread.sleep(2000);
+    Assert.assertEquals(compressedBackupCheckpoint.exists(),
+      originalCheckpointCompressed);
+    Assert.assertEquals(uncompressedBackupCheckpoint.exists(),
+      !originalCheckpointCompressed);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    Assert.assertTrue(checkpoint.delete());
+    File checkpointMetaData = Serialization.getMetaDataFile(
+      checkpoint);
+    Assert.assertTrue(checkpointMetaData.delete());
+    overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
+      String.valueOf(!originalCheckpointCompressed));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+    forceCheckpoint(channel);
+    Thread.sleep(2000);
+    Assert.assertEquals(compressedBackupCheckpoint.exists(),
+      !originalCheckpointCompressed);
+    Assert.assertEquals(uncompressedBackupCheckpoint.exists(),
+      originalCheckpointCompressed);
+  }
+
   private static void slowdownBackup(FileChannel channel) {
     Log log = field("log").ofType(Log.class).in(channel).get();
 

http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d31d4c..541548f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1270,6 +1270,12 @@ limitations under the License.
         <version>${kite.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>1.1.0</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 


Mime
View raw message