flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1917: FileChannel group commit (coalesce fsync)
Date Sun, 23 Jun 2013 19:51:06 GMT
Updated Branches:
  refs/heads/trunk 15f280f75 -> b8c4b003a


FLUME-1917: FileChannel group commit (coalesce fsync)

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b8c4b003
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b8c4b003
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b8c4b003

Branch: refs/heads/trunk
Commit: b8c4b003a8273b17e8641a9e5bcafd2357fbd370
Parents: 15f280f
Author: Brock Noland <brock@apache.org>
Authored: Sun Jun 23 14:50:51 2013 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Sun Jun 23 14:50:51 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Log.java | 11 ++++-
 .../org/apache/flume/channel/file/LogFile.java  | 52 +++++++++++++++++---
 .../apache/flume/channel/file/TestLogFile.java  | 51 +++++++++++++++++++
 3 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b8c4b003/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 8dc0ff8..8a8cb7f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -869,14 +869,21 @@ public class Log {
     boolean error = true;
     try {
       try {
-        logFiles.get(logFileIndex).commit(buffer);
+        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
+        // If multiple transactions are committing at the same time,
+        // this ensures that the number of actual fsyncs is small and a
+        // number of them are grouped together into one.
+        logFileWriter.commit(buffer);
+        logFileWriter.sync();
         error = false;
       } catch (LogFileRetryableIOException e) {
         if(!open) {
           throw e;
         }
         roll(logFileIndex, buffer);
-        logFiles.get(logFileIndex).commit(buffer);
+        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
+        logFileWriter.commit(buffer);
+        logFileWriter.sync();
         error = false;
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/flume/blob/b8c4b003/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index bb8ce1a..62f68c6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -167,6 +167,11 @@ public abstract class LogFile {
     private final CipherProvider.Encryptor encryptor;
     private final CachedFSUsableSpace usableSpace;
     private volatile boolean open;
+    private long lastCommitPosition;
+    private long lastSyncPosition;
+
+    // To ensure we can count the number of fsyncs.
+    private long syncCount;
 
 
     Writer(File file, int logFileID, long maxFileSize,
@@ -207,9 +212,28 @@ public abstract class LogFile {
     long getMaxSize() {
       return maxFileSize;
     }
+
+    @VisibleForTesting
+    long getLastCommitPosition(){
+      return lastCommitPosition;
+    }
+
+    @VisibleForTesting
+    long getLastSyncPosition() {
+      return lastSyncPosition;
+    }
+
+    @VisibleForTesting
+    long getSyncCount() {
+      return syncCount;
+    }
     synchronized long position() throws IOException {
       return getFileChannel().position();
     }
+
+    // encrypt and write methods may not be thread safe in the following
+    // methods, so all methods need to be synchronized.
+
     synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
       if(encryptor != null) {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
@@ -229,14 +253,17 @@ public abstract class LogFile {
       }
       write(buffer);
     }
+
     synchronized void commit(ByteBuffer buffer) throws IOException {
-      if(encryptor != null) {
+      if (encryptor != null) {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
       }
       write(buffer);
-      sync();
+      lastCommitPosition = position();
     }
-    private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
+
+    private Pair<Integer, Integer> write(ByteBuffer buffer)
+      throws IOException {
       if(!isOpen()) {
         throw new LogFileRetryableIOException("File closed " + file);
       }
@@ -260,14 +287,27 @@ public abstract class LogFile {
       Preconditions.checkState(wrote == toWrite.limit());
       return Pair.of(getLogFileID(), offset);
     }
+
     synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException {
       return isOpen() && position() + (long) buffer.limit() > getMaxSize();
     }
-    private void sync() throws IOException {
-      if(!isOpen()) {
+
+    /**
+     * Sync the underlying log file to disk. Expensive call,
+     * should be used only on commits. If a sync has already happened after
+     * the last commit, this method is a no-op
+     * @throws IOException
+     * @throws LogFileRetryableIOException - if this log file is closed.
+     */
+    synchronized void sync() throws IOException {
+      if (!isOpen()) {
         throw new LogFileRetryableIOException("File closed " + file);
       }
-      getFileChannel().force(false);
+      if (lastSyncPosition < lastCommitPosition) {
+        getFileChannel().force(false);
+        lastSyncPosition = position();
+        syncCount++;
+      }
     }
 
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b8c4b003/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 4da6ac1..e5d830e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -29,10 +29,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.channel.file.proto.ProtosFactory;
@@ -285,6 +289,7 @@ public class TestLogFile {
     FlumeEventPointer ptr = logFileWriter.put(bytes);
     logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
       (transactionID, WriteOrderOracle.next())));
+    logFileWriter.sync();
     final int offset = ptr.getOffset();
     RandomAccessFile writer = new RandomAccessFile(dataFile, "rw");
     writer.seek(offset + 1500);
@@ -309,6 +314,7 @@ public class TestLogFile {
     FlumeEventPointer ptr = logFileWriter.put(bytes);
     logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
       (transactionID, WriteOrderOracle.next())));
+    logFileWriter.sync();
     final int offset = ptr.getOffset();
     LogFile.OperationRecordUpdater updater = new LogFile
       .OperationRecordUpdater(dataFile);
@@ -354,6 +360,7 @@ public class TestLogFile {
     FlumeEventPointer ptr = logFileWriter.put(bytes);
     logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
       (transactionID, WriteOrderOracle.next())));
+    logFileWriter.sync();
     final int offset = ptr.getOffset();
     LogFile.OperationRecordUpdater updater = new LogFile
       .OperationRecordUpdater(dataFile);
@@ -361,4 +368,48 @@ public class TestLogFile {
     RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw");
     Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte());
   }
+
+  @Test
+  public void testGroupCommit() throws Exception {
+    final FlumeEvent eventIn = TestUtils.newPersistableEvent(250);
+    final CyclicBarrier barrier = new CyclicBarrier(20);
+    ExecutorService executorService = Executors.newFixedThreadPool(20);
+    ExecutorCompletionService<Void> completionService = new
+      ExecutorCompletionService<Void>(executorService);
+    final LogFile.Writer writer = logFileWriter;
+    final AtomicLong txnId = new AtomicLong(++transactionID);
+    for (int i = 0; i < 20; i++) {
+      completionService.submit(new Callable<Void>() {
+        @Override
+        public Void call() {
+          try {
+            Put put = new Put(txnId.incrementAndGet(),
+              WriteOrderOracle.next(), eventIn);
+            ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+            writer.put(bytes);
+            writer.commit(TransactionEventRecord.toByteBuffer(
+              new Commit(txnId.get(), WriteOrderOracle.next())));
+            barrier.await();
+            writer.sync();
+          } catch (Exception ex) {
+            Throwables.propagate(ex);
+          }
+          return null;
+        }
+      });
+    }
+
+    for(int i = 0; i < 20; i++) {
+      completionService.take().get();
+    }
+
+    //At least 250*20, but can be higher due to serialization overhead
+    Assert.assertTrue(logFileWriter.position() >= 5000);
+    Assert.assertEquals(1, writer.getSyncCount());
+    Assert.assertTrue(logFileWriter.getLastCommitPosition() ==
+      logFileWriter.getLastSyncPosition());
+
+    executorService.shutdown();
+
+  }
 }


Mime
View raw message