flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject svn commit: r1353841 - /incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
Date Tue, 26 Jun 2012 07:16:23 GMT
Author: mpercy
Date: Tue Jun 26 07:16:22 2012
New Revision: 1353841

URL: http://svn.apache.org/viewvc?rev=1353841&view=rev
Log:
FLUME-1304. Allow for faster allocation of checkpoint file.

(Arvind Prabhakar via Mike Percy)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java?rev=1353841&r1=1353840&r2=1353841&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
Tue Jun 26 07:16:22 2012
@@ -51,7 +51,7 @@ import com.google.common.collect.Maps;
 class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
   .getLogger(FlumeEventQueue.class);
-  private static final int VERSION = 2;
+  private static final long VERSION = 2;
   private static final int EMPTY = 0;
   private static final int INDEX_VERSION = 0;
   private static final int INDEX_TIMESTAMP = 1;
@@ -60,6 +60,7 @@ class FlumeEventQueue {
   private static final int INDEX_ACTIVE_LOG = 4;
   private static final int MAX_ACTIVE_LOGS = 1024;
   private static final int HEADER_SIZE = 1028;
+  private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
   private final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
   private final MappedByteBuffer mappedBuffer;
   private final LongBuffer elementsBuffer;
@@ -92,11 +93,21 @@ class FlumeEventQueue {
       // Allocate
       LOG.info("Event queue has zero allocation. Initializing to capacity. "
           + "Please wait...");
-      checkpointFile.writeLong(VERSION);
-      int absoluteCapacity = capacity + HEADER_SIZE;
-      for (int i = 1; i < absoluteCapacity; i++) {
-        checkpointFile.writeLong(EMPTY);
+      int totalBytes = (capacity + HEADER_SIZE)*8;
+      if (totalBytes <= MAX_ALLOC_BUFFER_SIZE) {
+        checkpointFile.write(new byte[totalBytes]);
+      } else {
+        byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE];
+        int remainingBytes = totalBytes;
+        while (remainingBytes >= MAX_ALLOC_BUFFER_SIZE) {
+          checkpointFile.write(initBuffer);
+          remainingBytes -= MAX_ALLOC_BUFFER_SIZE;
+        }
+        if (remainingBytes > 0) {
+          checkpointFile.write(initBuffer, 0, remainingBytes);
+        }
       }
+
       LOG.info("Event queue allocation complete");
       freshlyAllocated = true;
     } else {



Mime
View raw message