flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1352615 - in /incubator/flume/trunk: flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/ flume-ng-core/src/main/java/org/apache...
Date Thu, 21 Jun 2012 16:58:24 GMT
Author: brock
Date: Thu Jun 21 16:58:23 2012
New Revision: 1352615

URL: http://svn.apache.org/viewvc?rev=1352615&view=rev
Log:
FLUME-1232: Cannot start agent a 3rd time when using FileChannel 

(Arvind Prabhakar via Brock Noland)

Removed:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java
Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Thu Jun 21 16:58:23 2012
@@ -33,11 +33,11 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.flume.channel.file.Log.Builder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 
 /**
  * <p>
@@ -79,10 +79,11 @@ public class FileChannel extends BasicCh
   private Log log;
   private boolean shutdownHookAdded;
   private Thread shutdownHook;
-	private volatile boolean open;
+  private volatile boolean open;
   private Semaphore queueRemaining;
   private final ThreadLocal<FileBackedTransaction> transactions =
       new ThreadLocal<FileBackedTransaction>();
+  private int logWriteTimeout;
 
   /**
    * Transaction IDs should unique within a file channel
@@ -162,6 +163,19 @@ public class FileChannel extends BasicCh
             FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
             FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
 
+    logWriteTimeout = context.getInteger(
+        FileChannelConfiguration.LOG_WRITE_TIMEOUT,
+        FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT);
+
+    if (logWriteTimeout < 0) {
+      LOG.warn("Log write time out is invalid: " + logWriteTimeout
+          + ", using default: "
+          + FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT);
+
+      logWriteTimeout = FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
+    }
+
+
     if(queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
     }
@@ -175,15 +189,19 @@ public class FileChannel extends BasicCh
   public synchronized void start() {
     LOG.info("Starting FileChannel with dataDir "  + Arrays.toString(dataDirs));
     try {
-      log = new Log(checkpointInterval, maxFileSize, capacity,
-          checkpointDir, dataDirs);
+      Builder builder = new Log.Builder();
+      builder.setCheckpointInterval(checkpointInterval);
+      builder.setMaxFileSize(maxFileSize);
+      builder.setQueueSize(capacity);
+      builder.setLogWriteTimeout(logWriteTimeout);
+      builder.setCheckpointDir(checkpointDir);
+      builder.setLogDirs(dataDirs);
+
+      log = builder.build();
+
       log.replay();
-    } catch (IOException e) {
-      Throwables.propagate(e);
-    }
-    open = true;
-    boolean error = true;
-    try {
+      open = true;
+
       int depth = getDepth();
       Preconditions.checkState(queueRemaining.tryAcquire(depth),
           "Unable to acquire " + depth + " permits");
@@ -208,11 +226,9 @@ public class FileChannel extends BasicCh
         };
         Runtime.getRuntime().addShutdownHook(shutdownHook);
       }
-      error = false;
-    } finally {
-      if(error) {
-        open = false;
-      }
+    } catch (Exception ex) {
+      open = false;
+      LOG.error("Failed to start the file channel", ex);
     }
     super.start();
   }
@@ -226,6 +242,8 @@ public class FileChannel extends BasicCh
         shutdownHookAdded = false;
         shutdownHook = null;
       }
+    } catch (Exception ex) {
+      LOG.debug("Failed to remove shutdown hook", ex);
     } finally {
       close();
     }
@@ -252,7 +270,7 @@ public class FileChannel extends BasicCh
     Preconditions.checkNotNull(log, "log");
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Preconditions.checkNotNull(queue, "queue");
-    return queue.size();
+    return queue.getSize();
   }
   void close() {
     if(open) {
@@ -263,6 +281,10 @@ public class FileChannel extends BasicCh
     }
   }
 
+  boolean isOpen() {
+    return open;
+  }
+
   /**
    * Transaction backed by a file. This transaction supports either puts
    * or takes but not both.

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java Thu Jun 21 16:58:23 2012
@@ -35,10 +35,10 @@ public class FileChannelConfiguration {
   public static final String TRANSACTION_CAPACITY = "transactionCapacity";
   public static final int DEFAULT_TRANSACTION_CAPACITY = 1000;
   /**
-   * Interval at which checkpoints should be taken. Default 5 mins (ms)
+   * Interval at which checkpoints should be taken. Default 30s (ms)
    */
   public static final String CHECKPOINT_INTERVAL = "checkpointInterval";
-  public static final long DEFAULT_CHECKPOINT_INTERVAL = 5L * 60L * 1000L;
+  public static final long DEFAULT_CHECKPOINT_INTERVAL = 30L * 1000L;
   /**
    * Max file size for data files, cannot exceed the default. Default: 2GB
    */
@@ -57,4 +57,11 @@ public class FileChannelConfiguration {
    */
   public static final String KEEP_ALIVE = "keep-alive";
   public static final int DEFAULT_KEEP_ALIVE = 3;
+
+  /**
+   * The amount of time in seconds a writer will wait before failing when
+   * checkpoint is enqueued or in progress.
+   */
+  public static final String LOG_WRITE_TIMEOUT = "write-timeout";
+  public static final int DEFAULT_WRITE_TIMEOUT = 3;
 }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java Thu Jun 21 16:58:23 2012
@@ -18,18 +18,21 @@
  */
 package org.apache.flume.channel.file;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.io.RandomAccessFile;
 import java.nio.LongBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,49 +43,166 @@ import com.google.common.collect.Maps;
  * Queue of events in the channel. This queue stores only
  * {@link FlumeEventPointer} objects which are represented
  * as 8 byte longs internally. Additionally the queue itself
- * of longs is stored as a {@link LongBuffer} in DirectMemory
- * (off heap).
+ * of longs is stored as a memory mapped file with a fixed
+ * header and circular queue semantics. The header of the queue
+ * contains the timestamp of last sync, the queue size and
+ * the head position.
  */
-class FlumeEventQueue implements Writable {
-  // XXX  We use % heavily which can be CPU intensive.
-  @SuppressWarnings("unused")
+class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
   .getLogger(FlumeEventQueue.class);
-  protected static final int VERSION = 1;
-  protected static final int SIZE_OF_LONG = 8;
-  protected static final int EMPTY = 0;
-  protected final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
-  protected final LongBuffer elements;
-  protected final ByteBuffer backingBuffer;
-  // both fields will be modified by multiple threads
-  protected volatile int size;
-  protected volatile int next;
+  private static final int VERSION = 2;
+  private static final int EMPTY = 0;
+  private static final int INDEX_VERSION = 0;
+  private static final int INDEX_TIMESTAMP = 1;
+  private static final int INDEX_SIZE = 2;
+  private static final int INDEX_HEAD = 3;
+  private static final int INDEX_ACTIVE_LOG = 4;
+  private static final int MAX_ACTIVE_LOGS = 1024;
+  private static final int HEADER_SIZE = 1028;
+  private final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
+  private final MappedByteBuffer mappedBuffer;
+  private final LongBuffer elementsBuffer;
+  private LongBufferWrapper elements;
+  private final RandomAccessFile checkpointFile;
+  private final java.nio.channels.FileChannel checkpointFileHandle;
+  private final int queueCapacity;
+
+  private int queueSize;
+  private int queueHead;
+  private long timestamp;
+
   /**
    * @param capacity max event capacity of queue
+   * @throws IOException
    */
-  FlumeEventQueue(int capacity) {
-    Preconditions.checkArgument(capacity > 0, "Capacity must be greater than zero");
-    backingBuffer = DirectMemoryUtils.allocate(capacity * SIZE_OF_LONG);
-    elements = backingBuffer.asLongBuffer();
-    for (int index = 0; index < elements.capacity(); index++) {
-      elements.put(index, EMPTY);
+  FlumeEventQueue(int capacity, File file) throws IOException {
+    Preconditions.checkArgument(capacity > 0,
+        "Capacity must be greater than zero");
+    this.queueCapacity = capacity;
+
+    if (!file.exists()) {
+      Preconditions.checkState(file.createNewFile(), "Unable to create file: "
+          + file);
+    }
+
+    boolean freshlyAllocated = false;
+    checkpointFile = new RandomAccessFile(file, "rw");
+    if (checkpointFile.length() == 0) {
+      // Allocate
+      LOG.info("Event queue has zero allocation. Initializing to capacity. "
+          + "Please wait...");
+      checkpointFile.writeLong(VERSION);
+      int absoluteCapacity = capacity + HEADER_SIZE;
+      for (int i = 1; i < absoluteCapacity; i++) {
+        checkpointFile.writeLong(EMPTY);
+      }
+      LOG.info("Event queue allocation complete");
+      freshlyAllocated = true;
+    } else {
+      int fileCapacity = (int) checkpointFile.length() / 8;
+      int expectedCapacity = capacity + HEADER_SIZE;
+
+      Preconditions.checkState(fileCapacity == expectedCapacity,
+          "Capacity cannot be reduced once the channel is initialized");
+    }
+
+    checkpointFileHandle = checkpointFile.getChannel();
+
+    mappedBuffer = checkpointFileHandle.map(MapMode.READ_WRITE, 0,
+        file.length());
+
+    elementsBuffer = mappedBuffer.asLongBuffer();
+    if (freshlyAllocated) {
+      elementsBuffer.put(INDEX_VERSION, VERSION);
+    } else {
+      int version = (int) elementsBuffer.get(INDEX_VERSION);
+      Preconditions.checkState(version == VERSION,
+          "Invalid version: " + version);
+      timestamp = elementsBuffer.get(INDEX_TIMESTAMP);
+      queueSize = (int) elementsBuffer.get(INDEX_SIZE);
+      queueHead = (int) elementsBuffer.get(INDEX_HEAD);
+
+      int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS;
+      for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) {
+        long nextFileCode = elementsBuffer.get(i);
+        if (nextFileCode  != EMPTY) {
+          Pair<Integer, Integer> idAndCount =
+              deocodeActiveLogCounter(nextFileCode);
+          fileIDCounts.put(idAndCount.getLeft(),
+              new AtomicInteger(idAndCount.getRight()));
+        }
+      }
     }
+
+    elements = new LongBufferWrapper(elementsBuffer);
+  }
+
+  private Pair<Integer, Integer> deocodeActiveLogCounter(long value) {
+    int fileId = (int) (value >>> 32);
+    int count = (int) value;
+
+    return Pair.of(fileId, count);
+  }
+
+  private long encodeActiveLogCounter(int fileId, int count) {
+    long result = fileId;
+    result = (long)fileId << 32;
+    result += (long) count;
+    return result;
   }
+
+  synchronized long getTimestamp() {
+    return timestamp;
+  }
+
+  synchronized boolean checkpoint(boolean force) {
+    if (!elements.syncRequired() && !force) {
+      LOG.debug("Checkpoint not required");
+      return false;
+    }
+
+    updateHeaders();
+
+    List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
+    for (Integer fileId : fileIDCounts.keySet()) {
+      Integer count = fileIDCounts.get(fileId).get();
+      long value = encodeActiveLogCounter(fileId, count);
+      fileIdAndCountEncoded.add(value);
+    }
+
+    int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
+    for (int i = 0; i < emptySlots; i++)  {
+      fileIdAndCountEncoded.add(0L);
+    }
+    for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
+      elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
+    }
+
+    elements.sync();
+    mappedBuffer.force();
+
+    return true;
+  }
+
   /**
    * Retrieve and remove the head of the queue.
    *
    * @return FlumeEventPointer or null if queue is empty
    */
   synchronized FlumeEventPointer removeHead() {
-    if(size() == 0) {
+    if(queueSize == 0) {
       return null;
     }
+
     long value = remove(0);
     Preconditions.checkState(value != EMPTY);
+
     FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
     decrementFileID(ptr.getFileID());
     return ptr;
   }
+
   /**
    * Add a FlumeEventPointer to the head of the queue
    * @param FlumeEventPointer to be added
@@ -90,14 +210,19 @@ class FlumeEventQueue implements Writabl
    * added to the queue
    */
   synchronized boolean addHead(FlumeEventPointer e) {
+    if (queueSize == queueCapacity) {
+      return false;
+    }
+
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    if(add(0, value)) {
-      incrementFileID(e.getFileID());
-      return true;
-    }
-    return false;
+    incrementFileID(e.getFileID());
+
+    add(0, value);
+    return true;
   }
+
+
   /**
    * Add a FlumeEventPointer to the tail of the queue
    * this will normally be used when recovering from a
@@ -107,13 +232,16 @@ class FlumeEventQueue implements Writabl
    * was added to the queue
    */
   synchronized boolean addTail(FlumeEventPointer e) {
+    if (queueSize == queueCapacity) {
+      return false;
+    }
+
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    if(add(size(), value)) {
-      incrementFileID(e.getFileID());
-      return true;
-    }
-    return false;
+    incrementFileID(e.getFileID());
+
+    add(queueSize, value);
+    return true;
   }
 
   /**
@@ -126,7 +254,7 @@ class FlumeEventQueue implements Writabl
   synchronized boolean remove(FlumeEventPointer e) {
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    for (int i = 0; i < size; i++) {
+    for (int i = 0; i < queueSize; i++) {
       if(get(i) == value) {
         remove(i);
         FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
@@ -144,15 +272,12 @@ class FlumeEventQueue implements Writabl
   synchronized Set<Integer> getFileIDs() {
     return new HashSet<Integer>(fileIDCounts.keySet());
   }
-  /**
-   * @return current size of the queue, not the capacity
-   */
-  synchronized int size() {
-    return size;
-  }
+
   protected void incrementFileID(int fileID) {
     AtomicInteger counter = fileIDCounts.get(fileID);
     if(counter == null) {
+      Preconditions.checkState(fileIDCounts.size() < MAX_ACTIVE_LOGS,
+          "Too many active logs");
       counter = new AtomicInteger(0);
       fileIDCounts.put(fileID, counter);
     }
@@ -169,82 +294,148 @@ class FlumeEventQueue implements Writabl
   }
 
   protected long get(int index) {
-    if (index < 0 || index > size - 1) {
+    if (index < 0 || index > queueSize - 1) {
       throw new IndexOutOfBoundsException(String.valueOf(index));
     }
-    return elements.get(convert(index));
+
+    return elements.get(getPhysicalIndex(index));
+  }
+
+  private void set(int index, long value) {
+    if (index < 0 || index > queueSize - 1) {
+      throw new IndexOutOfBoundsException(String.valueOf(index));
+    }
+
+    elements.put(getPhysicalIndex(index), value);
   }
 
   protected boolean add(int index, long value) {
-    if (index < 0 || index > size) {
+    if (index < 0 || index > queueSize) {
       throw new IndexOutOfBoundsException(String.valueOf(index));
     }
-    if (size + 1 > elements.capacity()) {
+
+    if (queueSize == queueCapacity) {
       return false;
     }
-    // shift right if element is not added at the right
-    // edge of the array. the common case, add(size-1, value)
-    // will result in no copy operations
-    for (int k = size; k > index; k--) {
-      elements.put(convert(k),
-          elements.get(convert(k - 1)));
+
+    queueSize++;
+
+    if (index <= queueSize/2) {
+      // Shift left
+      queueHead--;
+      if (queueHead < 0) {
+        queueHead = queueCapacity - 1;
+      }
+      for (int i = 0; i < index; i++) {
+        set(i, get(i+1));
+      }
+    } else {
+      // Sift right
+      for (int i = queueSize - 1; i > index; i--) {
+        set(i, get(i-1));
+      }
     }
-    elements.put(convert(index), value);
-    size++;
+    set(index, value);
     return true;
   }
 
-  protected long remove(int index) {
-    if (index < 0 || index > size - 1) {
+  protected synchronized long remove(int index) {
+    if (index < 0 || index > queueSize - 1) {
       throw new IndexOutOfBoundsException(String.valueOf(index));
     }
-    long value = elements.get(convert(index));
-    // shift left if element removed is not on the left
-    // edge of the array. the common case, remove(0)
-    // will result in no copy operations
-    for (int k = index; k > 0; k--) {
-      elements.put(convert(k),
-          elements.get(convert(k - 1)));
-    }
-    elements.put(next % elements.capacity(), EMPTY);
-    next = (next + 1) % elements.capacity();
-    size--;
+    long value = get(index);
+
+    if (index > queueSize/2) {
+      // Move tail part to left
+      for (int i = index; i < queueSize - 1; i++) {
+        long rightValue = get(i+1);
+        set(i, rightValue);
+      }
+      set(queueSize - 1, EMPTY);
+    } else {
+      // Move head part to right
+      for (int i = index - 1; i >= 0; i--) {
+        long leftValue = get(i);
+        set(i+1, leftValue);
+      }
+      set(0, EMPTY);
+      queueHead++;
+      if (queueHead == queueCapacity) {
+        queueHead = 0;
+      }
+    }
+
+    queueSize--;
     return value;
   }
 
-  protected int convert(int index) {
-    return (next + index % elements.capacity()) % elements.capacity();
+  private synchronized void updateHeaders() {
+    timestamp = System.currentTimeMillis();
+    elementsBuffer.put(INDEX_TIMESTAMP, timestamp);
+    elementsBuffer.put(INDEX_SIZE, queueSize);
+    elementsBuffer.put(INDEX_HEAD, queueHead);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updating checkpoint headers: ts: " + timestamp + ", qs: "
+          + queueSize + ", qh: " + queueHead);
+    }
   }
 
-  @Override
-  public synchronized void readFields(DataInput input) throws IOException {
-    int version = input.readInt();
-    if(version != VERSION) {
-      throw new IOException("Bad Version " + Integer.toHexString(version));
-    }
-    int length = input.readInt();
-    for (int index = 0; index < length; index++) {
-      long value = input.readLong();
-      FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
-      Preconditions.checkState(value != EMPTY);
-      Preconditions.checkState(addHead(ptr), "Unable to add to queue");
-    }
+
+  private int getPhysicalIndex(int index) {
+    return HEADER_SIZE + (queueHead + index) % queueCapacity;
   }
 
-  @Override
-  public synchronized void write(DataOutput output) throws IOException {
-    output.writeInt(VERSION);
-    output.writeInt(size);
-    for (int index = 0; index < size; index++) {
-      long value = elements.get(convert(index));
-      Preconditions.checkState(value != EMPTY);;
-      output.writeLong(value);
-    }
+  protected synchronized int getSize() {
+    return queueSize;
   }
+
   /**
    * @return max capacity of the queue
    */
   public int getCapacity() {
-    return elements.capacity();
+    return queueCapacity;
+  }
+
+  static class LongBufferWrapper {
+    private final LongBuffer buffer;
+
+    Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
+
+    LongBufferWrapper(LongBuffer lb) {
+      buffer = lb;
+    }
+
+    long get(int index) {
+      long result = EMPTY;
+      if (overwriteMap.containsKey(index)) {
+        result = overwriteMap.get(index);
+      } else {
+        result = buffer.get(index);
+      }
+
+      return result;
+    }
+
+    void put(int index, long value) {
+      overwriteMap.put(index, value);
+    }
+
+    boolean syncRequired() {
+      return overwriteMap.size() > 0;
+    }
+
+    void sync() {
+      Iterator<Integer> it = overwriteMap.keySet().iterator();
+      while (it.hasNext()) {
+        int index = it.next();
+        long value = overwriteMap.get(index);
+
+        buffer.put(index, value);
+        it.remove();
+      }
+
+      Preconditions.checkState(overwriteMap.size() == 0,
+          "concurrent update detected");
+    }
   }
 }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java Thu Jun 21 16:58:23 2012
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel.file;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -29,19 +28,24 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.flume.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -62,23 +66,71 @@ class Log {
   private final File checkpointDir;
   private final File[] logDirs;
   private final BackgroundWorker worker;
-  private final int queueSize;
+  private final int queueCapacity;
   private final AtomicReferenceArray<LogFile.Writer> logFiles;
 
   private volatile boolean open;
-  private AtomicReference<Checkpoint> checkpoint;
-  private Checkpoint checkpointA;
-  private Checkpoint checkpointB;
   private FlumeEventQueue queue;
   private long checkpointInterval;
   private long maxFileSize;
   private final Map<String, FileLock> locks;
+  private final ReentrantReadWriteLock checkpointLock =
+      new ReentrantReadWriteLock(true);
+  private final ReadLock checkpointReadLock = checkpointLock.readLock();
+  private final WriteLock checkpointWriterLock = checkpointLock.writeLock();
+  private int logWriteTimeout;
 
-  Log(long checkpointInterval, long maxFileSize, int queueSize,
-      File checkpointDir, File... logDirs) throws IOException {
+  static class Builder {
+    private long bCheckpointInterval;
+    private long bMaxFileSize;
+    private int bQueueCapacity;
+    private File bCheckpointDir;
+    private File[] bLogDirs;
+    private int bLogWriteTimeout =
+        FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
+
+    Builder setCheckpointInterval(long interval) {
+      bCheckpointInterval = interval;
+      return this;
+    }
+
+    Builder setMaxFileSize(long maxSize) {
+      bMaxFileSize = maxSize;
+      return this;
+    }
+
+    Builder setQueueSize(int capacity) {
+      bQueueCapacity = capacity;
+      return this;
+    }
+
+    Builder setCheckpointDir(File cpDir) {
+      bCheckpointDir = cpDir;
+      return this;
+    }
+
+    Builder setLogDirs(File[] dirs) {
+      bLogDirs = dirs;
+      return this;
+    }
+
+    Builder setLogWriteTimeout(int timeout) {
+      bLogWriteTimeout = timeout;
+      return this;
+    }
+
+    Log build() throws IOException {
+      return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
+          bLogWriteTimeout, bCheckpointDir, bLogDirs);
+    }
+  }
+
+  private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
+      int logWriteTimeout, File checkpointDir, File... logDirs)
+          throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
-    Preconditions.checkArgument(queueSize > 0, "queueSize <= 0");
+    Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
     Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
     Preconditions.checkNotNull(checkpointDir, "checkpointDir");
     Preconditions.checkArgument(
@@ -106,9 +158,10 @@ class Log {
     open = false;
     this.checkpointInterval = checkpointInterval;
     this.maxFileSize = maxFileSize;
-    this.queueSize = queueSize;
+    this.queueCapacity = queueCapacity;
     this.checkpointDir = checkpointDir;
     this.logDirs = logDirs;
+    this.logWriteTimeout = logWriteTimeout;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     worker = new BackgroundWorker(this);
     worker.setName("Log-BackgroundWorker");
@@ -123,6 +176,9 @@ class Log {
    */
   synchronized void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log as been opened");
+
+    checkpointWriterLock.lock();
+
     try {
       /*
        * First we are going to look through the data directories
@@ -157,63 +213,40 @@ class Log {
        * Read the checkpoint (in memory queue) from one of two alternating
        * locations. We will read the last one written to disk.
        */
-      checkpointA = new Checkpoint(new File(checkpointDir, "chkpt-A"),
-          queueSize);
-      checkpointB = new Checkpoint(new File(checkpointDir, "chkpt-B"),
-          queueSize);
-      if (checkpointA.getTimestamp() > checkpointB.getTimestamp()) {
-        try {
-          LOGGER.info("Reading checkpoint A " + checkpointA.getFile());
-          // read from checkpoint A, write to B
-          queue = checkpointA.read();
-          checkpoint = new AtomicReference<Checkpoint>(checkpointB);
-        } catch (EOFException e) {
-          LOGGER.info("EOF reading from A", e);
-          // read from checkpoint B, write to A
-          queue = checkpointB.read();
-          checkpoint = new AtomicReference<Checkpoint>(checkpointA);
-        }
-      } else if (checkpointB.getTimestamp() > checkpointA.getTimestamp()) {
-        try {
-          LOGGER.info("Reading checkpoint B " + checkpointB.getFile());
-          // read from checkpoint B, write to A
-          queue = checkpointB.read();
-          checkpoint = new AtomicReference<Checkpoint>(checkpointA);
-        } catch (EOFException e) {
-          LOGGER.info("EOF reading from B", e);
-          // read from checkpoint A, write to B
-          queue = checkpointA.read();
-          checkpoint = new AtomicReference<Checkpoint>(checkpointB);
-        }
-      } else {
-        LOGGER.info("Starting checkpoint from scratch");
-        queue = new FlumeEventQueue(queueSize);
-        checkpoint = new AtomicReference<Checkpoint>(checkpointA);
-      }
+      queue = new FlumeEventQueue(queueCapacity,
+                        new File(checkpointDir, "checkpoint"));
 
-      long ts = checkpoint.get().getTimestamp();
+      long ts = queue.getTimestamp();
       LOGGER.info("Last Checkpoint " + new Date(ts) +
-          ", queue depth = " + queue.size());
+          ", queue depth = " + queue.getSize());
 
       /*
        * We now have everything we need to actually replay the log files
        * the queue, the timestamp the queue was written to disk, and
        * the list of data files.
        */
-      ReplayHandler replayHandler = new ReplayHandler(queue,
-          checkpoint.get().getTimestamp());
+      ReplayHandler replayHandler = new ReplayHandler(queue);
       replayHandler.replayLog(dataFiles);
+
       for (int index = 0; index < logDirs.length; index++) {
         LOGGER.info("Rolling " + logDirs[index]);
         roll(index);
       }
+
       /*
        * Now that we have replayed, write the current queue to disk
        */
-      writeCheckpoint();
+      writeCheckpoint(true);
+
       open = true;
     } catch (Exception ex) {
       LOGGER.error("Failed to initialize Log", ex);
+      if (ex instanceof IOException) {
+        throw (IOException) ex;
+      }
+      Throwables.propagate(ex);
+    } finally {
+      checkpointWriterLock.unlock();
     }
   }
 
@@ -258,22 +291,44 @@ class Log {
   FlumeEventPointer put(long transactionID, Event event)
       throws IOException {
     Preconditions.checkState(open, "Log is closed");
-    FlumeEvent flumeEvent = new FlumeEvent(event.getHeaders(), event.getBody());
-    Put put = new Put(transactionID, flumeEvent);
-    put.setTimestamp(System.currentTimeMillis());
-    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
-    int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
-    boolean error = true;
-    try {
-      FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
-      error = false;
-      return ptr;
+
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
+    }
+
+    try {
+      FlumeEvent flumeEvent = new FlumeEvent(
+                    event.getHeaders(), event.getBody());
+      Put put = new Put(transactionID, flumeEvent);
+      put.setTimestamp(System.currentTimeMillis());
+      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
+      int logFileIndex = nextLogWriter(transactionID);
+      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+        roll(logFileIndex, buffer);
+      }
+      boolean error = true;
+      try {
+        FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+        error = false;
+        return ptr;
+      } finally {
+        if (error) {
+          roll(logFileIndex);
+        }
+      }
     } finally {
-      if (error) {
-        roll(logFileIndex);
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
       }
     }
   }
@@ -289,21 +344,42 @@ class Log {
   void take(long transactionID, FlumeEventPointer pointer)
       throws IOException {
     Preconditions.checkState(open, "Log is closed");
-    Take take = new Take(transactionID, pointer.getOffset(),
-        pointer.getFileID());
-    take.setTimestamp(System.currentTimeMillis());
-    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
-    int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
+
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
     }
-    boolean error = true;
+
     try {
-      logFiles.get(logFileIndex).take(buffer);
-      error = false;
+      Take take = new Take(transactionID, pointer.getOffset(),
+          pointer.getFileID());
+      take.setTimestamp(System.currentTimeMillis());
+      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
+      int logFileIndex = nextLogWriter(transactionID);
+      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+        roll(logFileIndex, buffer);
+      }
+      boolean error = true;
+      try {
+        logFiles.get(logFileIndex).take(buffer);
+        error = false;
+      } finally {
+        if (error) {
+          roll(logFileIndex);
+        }
+      }
     } finally {
-      if (error) {
-        roll(logFileIndex);
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
       }
     }
   }
@@ -317,23 +393,45 @@ class Log {
    */
   void rollback(long transactionID) throws IOException {
     Preconditions.checkState(open, "Log is closed");
+
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
+    }
+
     if(LOGGER.isDebugEnabled()) {
       LOGGER.debug("Rolling back " + transactionID);
     }
-    Rollback rollback = new Rollback(transactionID);
-    rollback.setTimestamp(System.currentTimeMillis());
-    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
-    int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
-    boolean error = true;
+
     try {
-      logFiles.get(logFileIndex).rollback(buffer);
-      error = false;
+      Rollback rollback = new Rollback(transactionID);
+      rollback.setTimestamp(System.currentTimeMillis());
+      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
+      int logFileIndex = nextLogWriter(transactionID);
+      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+        roll(logFileIndex, buffer);
+      }
+      boolean error = true;
+      try {
+        logFiles.get(logFileIndex).rollback(buffer);
+        error = false;
+      } finally {
+        if (error) {
+          roll(logFileIndex);
+        }
+      }
     } finally {
-      if (error) {
-        roll(logFileIndex);
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
       }
     }
   }
@@ -380,6 +478,7 @@ class Log {
     open = false;
     if (worker != null) {
       worker.shutdown();
+      worker.interrupt();
     }
     if (logFiles != null) {
       for (int index = 0; index < logFiles.length(); index++) {
@@ -427,23 +526,47 @@ class Log {
    * @throws IOException
    */
   private void commit(long transactionID, short type) throws IOException {
-    Commit commit = new Commit(transactionID, type);
-    commit.setTimestamp(System.currentTimeMillis());
-    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
-    int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
+
+    Preconditions.checkState(open, "Log is closed");
+
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
     }
-    boolean error = true;
+
     try {
-      logFiles.get(logFileIndex).commit(buffer);
-      error = false;
+      Commit commit = new Commit(transactionID, type);
+      commit.setTimestamp(System.currentTimeMillis());
+      ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
+      int logFileIndex = nextLogWriter(transactionID);
+      if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+        roll(logFileIndex, buffer);
+      }
+      boolean error = true;
+      try {
+        logFiles.get(logFileIndex).commit(buffer);
+        error = false;
+      } finally {
+        if (error) {
+          roll(logFileIndex);
+        }
+      }
     } finally {
-      if (error) {
-        roll(logFileIndex);
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
       }
     }
   }
+
   /**
    * Atomic so not synchronization required.
    * @return
@@ -474,54 +597,103 @@ class Log {
    */
   private synchronized void roll(int index, ByteBuffer buffer)
       throws IOException {
-    LogFile.Writer oldLogFile = logFiles.get(index);
-    // check to make sure a roll is actually required due to
-    // the possibility of multiple writes waiting on lock
-    if(oldLogFile == null || buffer == null ||
-        oldLogFile.isRollRequired(buffer)) {
-      try {
-        LOGGER.info("Roll start " + logDirs[index]);
-        int fileID = nextFileID.incrementAndGet();
-        File file = new File(logDirs[index], PREFIX + fileID);
-        Preconditions.checkState(!file.exists(), "File alread exists "  + file);
-        Preconditions.checkState(file.createNewFile(), "File could not be created " + file);
-        idLogFileMap.put(fileID, new LogFile.RandomReader(file));
-        // writer from this point on will get new reference
-        logFiles.set(index, new LogFile.Writer(file, fileID, maxFileSize));
-        // close out old log
-        if (oldLogFile != null) {
-          oldLogFile.close();
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
+    }
+
+    try {
+      LogFile.Writer oldLogFile = logFiles.get(index);
+      // check to make sure a roll is actually required due to
+      // the possibility of multiple writes waiting on lock
+      if(oldLogFile == null || buffer == null ||
+          oldLogFile.isRollRequired(buffer)) {
+        try {
+          LOGGER.info("Roll start " + logDirs[index]);
+          int fileID = nextFileID.incrementAndGet();
+          File file = new File(logDirs[index], PREFIX + fileID);
+          Preconditions.checkState(!file.exists(),
+              "File already exists "  + file);
+          Preconditions.checkState(file.createNewFile(),
+              "File could not be created " + file);
+          idLogFileMap.put(fileID, new LogFile.RandomReader(file));
+          // writer from this point on will get new reference
+          logFiles.set(index, new LogFile.Writer(file, fileID, maxFileSize));
+          // close out old log
+          if (oldLogFile != null) {
+            oldLogFile.close();
+          }
+        } finally {
+          LOGGER.info("Roll end");
         }
-      } finally {
-        LOGGER.info("Roll end");
+      }
+    } finally {
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
       }
     }
   }
+
+  private synchronized void writeCheckpoint() throws IOException {
+    writeCheckpoint(false);
+  }
+
   /**
    * Write the current checkpoint object and then swap objects so that
    * the next checkpoint occurs on the other checkpoint directory.
    *
    * Synchronization required since both synchronized and unsynchronized
+   * @param force  a flag to force the writing of checkpoint
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
-  private synchronized void writeCheckpoint() throws IOException {
-    synchronized (queue) {
-      checkpoint.get().write(queue);
-      if (!checkpoint.compareAndSet(checkpointA, checkpointB)) {
-        Preconditions.checkState(checkpoint.compareAndSet(checkpointB,
-            checkpointA));
+  private synchronized void writeCheckpoint(boolean force) throws IOException {
+    checkpointWriterLock.lock();
+    try {
+      if (queue.checkpoint(force) || force) {
+        long ts = queue.getTimestamp();
+
+        Set<Integer> idSet = queue.getFileIDs();
+
+        int numFiles = logFiles.length();
+        for (int i = 0; i < numFiles; i++) {
+          LogFile.Writer writer = logFiles.get(i);
+          writer.markCheckpoint(ts);
+          int id = writer.getFileID();
+          idSet.remove(id);
+          LOGGER.debug("Updated checkpoint for file: " + writer.getFile());
+        }
+
+        // Update any inactive data files as well
+        Iterator<Integer> idIterator = idSet.iterator();
+        while (idIterator.hasNext()) {
+          int id = idIterator.next();
+          LogFile.RandomReader reader = idLogFileMap.remove(id);
+          File file = reader.getFile();
+          reader.close();
+          LogFile.Writer writer = new LogFile.Writer(file, id, maxFileSize);
+          writer.markCheckpoint(ts);
+          writer.close();
+          reader = new LogFile.RandomReader(file);
+          idLogFileMap.put(id, reader);
+          LOGGER.debug("Updated checkpoint for file: " + file);
+          idIterator.remove();
+        }
+        Preconditions.checkState(idSet.size() == 0,
+            "Could not update all data file timestamps: " + idSet);
       }
+    } finally {
+      checkpointWriterLock.unlock();
     }
   }
-  /**
-   * Synchronization not required as this is atomic
-   * @return last time we successfully check pointed
-   * @throws IOException if there is an io error reading the ts from disk
-   */
-  private long getLastCheckpoint() throws IOException {
-    Preconditions.checkState(open, "Log is closed");
-    return checkpoint.get().getTimestamp();
-  }
 
   private void removeOldLogs() {
     Preconditions.checkState(open, "Log is closed");
@@ -634,12 +806,12 @@ class Log {
     void shutdown() {
       if(run) {
         run = false;
-        interrupt();
       }
     }
 
     @Override
     public void run() {
+      long lastCheckTime = 0L;
       while (run) {
         try {
           try {
@@ -650,9 +822,11 @@ class Log {
           }
           if(log.open) {
             // check to see if we should do a checkpoint
-            long elapsed = System.currentTimeMillis() - log.getLastCheckpoint();
+            long currentTime = System.currentTimeMillis();
+            long elapsed = currentTime - lastCheckTime;
             if (elapsed > log.checkpointInterval) {
               log.writeCheckpoint();
+              lastCheckTime = currentTime;
             }
           }
           if(log.open) {

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java Thu Jun 21 16:58:23 2012
@@ -61,7 +61,7 @@ class LogFile {
       FILL.put(OP_EOF);
     }
   }
-  private static final int VERSION = 1;
+  private static final int VERSION = 2;
 
 
   static class Writer {
@@ -70,6 +70,7 @@ class LogFile {
     private final long maxFileSize;
     private final RandomAccessFile writeFileHandle;
     private final FileChannel writeFileChannel;
+    private final long checkpointPositionMarker;
 
     private volatile boolean open;
 
@@ -80,22 +81,42 @@ class LogFile {
       writeFileHandle = new RandomAccessFile(file, "rw");
       writeFileHandle.writeInt(VERSION);
       writeFileHandle.writeInt(fileID);
+      checkpointPositionMarker = writeFileHandle.getFilePointer();
+      // checkpoint marker
+      writeFileHandle.writeLong(0L);
+      // timestamp placeholder
+      writeFileHandle.writeLong(0L);
       writeFileChannel = writeFileHandle.getChannel();
       writeFileChannel.force(true);
       LOG.info("Opened " + file);
       open = true;
     }
 
+    File getFile() {
+      return file;
+    }
+
+    synchronized void markCheckpoint(long timestamp) throws IOException {
+      long currentPosition = writeFileChannel.size();
+      writeFileHandle.seek(checkpointPositionMarker);
+      writeFileHandle.writeLong(currentPosition);
+      writeFileHandle.writeLong(timestamp);
+      writeFileChannel.position(currentPosition);
+      LOG.info("Noted checkpoint for file: " + file + ", id: " + fileID
+          + ", checkpoint position: " + currentPosition);
+    }
+
     String getParent() {
       return file.getParent();
     }
+
     synchronized void close() {
       if(open) {
         open = false;
         if(writeFileChannel.isOpen()) {
           LOG.info("Closing " + file);
           try {
-            writeFileChannel.force(false);
+            writeFileChannel.force(true);
           } catch (IOException e) {
             LOG.warn("Unable to flush to disk", e);
           }
@@ -175,6 +196,11 @@ class LogFile {
       readFileHandles.add(open());
       open = true;
     }
+
+    File getFile() {
+      return file;
+    }
+
     FlumeEvent get(int offset) throws IOException, InterruptedException {
       Preconditions.checkState(open, "File closed");
       RandomAccessFile fileHandle = checkOut();
@@ -232,7 +258,7 @@ class LogFile {
         close(fileHandle);
       }
     }
-    private RandomAccessFile checkOut() 
+    private RandomAccessFile checkOut()
         throws IOException, InterruptedException {
       RandomAccessFile fileHandle = readFileHandles.poll();
       if(fileHandle != null) {
@@ -260,6 +286,8 @@ class LogFile {
     private final FileChannel fileChannel;
     private final int version;
     private final int logFileID;
+    private final long lastCheckpointPosition;
+    private final long lastCheckpointTimestamp;
 
     /**
      * Construct a Sequential Log Reader object
@@ -276,6 +304,9 @@ class LogFile {
             " expected " + Integer.toHexString(VERSION));
       }
       logFileID = fileHandle.readInt();
+      lastCheckpointPosition = fileHandle.readLong();
+      lastCheckpointTimestamp = fileHandle.readLong();
+
       Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
           + Integer.toHexString(logFileID));
     }
@@ -285,6 +316,19 @@ class LogFile {
     int getLogFileID() {
       return logFileID;
     }
+    void skipToLastCheckpointPosition(long checkpointTimestamp)
+        throws IOException {
+      if (lastCheckpointPosition > 0L
+          && lastCheckpointTimestamp == checkpointTimestamp) {
+        LOG.info("fast-forward to checkpoint position: "
+                  + lastCheckpointPosition);
+        fileChannel.position(lastCheckpointPosition);
+      } else {
+        LOG.warn("Checkpoint was not done or did not match."
+            + "Replaying the entire log: file = " + lastCheckpointTimestamp
+            + ", queue: " + checkpointTimestamp);
+      }
+    }
     Pair<Integer, TransactionEventRecord> next() throws IOException {
       try {
         long position = fileChannel.position();

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java Thu Jun 21 16:58:23 2012
@@ -57,9 +57,9 @@ class ReplayHandler {
    */
   private final List<Long> pendingTakes;
 
-  ReplayHandler(FlumeEventQueue queue, long lastCheckpoint) {
+  ReplayHandler(FlumeEventQueue queue) {
     this.queue = queue;
-    this.lastCheckpoint = lastCheckpoint;
+    this.lastCheckpoint = queue.getTimestamp();
     pendingTakes = Lists.newArrayList();
   }
 
@@ -73,36 +73,41 @@ class ReplayHandler {
       LogFile.SequentialReader reader = null;
       try {
         reader = new LogFile.SequentialReader(log);
+        reader.skipToLastCheckpointPosition(queue.getTimestamp());
         Pair<Integer, TransactionEventRecord> entry;
         FlumeEventPointer ptr;
         // for puts the fileId is the fileID of the file they exist in
         // for takes the fileId and offset are pointers to a put
         int fileId = reader.getLogFileID();
+        int readCount = 0;
+        int putCount = 0;
+        int takeCount = 0;
+        int rollbackCount = 0;
+        int commitCount = 0;
+        int skipCount = 0;
         while ((entry = reader.next()) != null) {
           int offset = entry.getLeft();
           TransactionEventRecord record = entry.getRight();
           short type = record.getRecordType();
           long trans = record.getTransactionID();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("record.getTimestamp() = " + record.getTimestamp()
-                + ", lastCheckpoint = " + lastCheckpoint + ", fileId = "
-                + fileId + ", offset = " + offset + ", type = "
-                + TransactionEventRecord.getName(type) + ", transaction "
-                + trans);
-          }
+          readCount++;
           if (record.getTimestamp() > lastCheckpoint) {
             if (type == TransactionEventRecord.Type.PUT.get()) {
+              putCount++;
               ptr = new FlumeEventPointer(fileId, offset);
               transactionMap.put(trans, ptr);
             } else if (type == TransactionEventRecord.Type.TAKE.get()) {
+              takeCount++;
               Take take = (Take) record;
               ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
               transactionMap.put(trans, ptr);
             } else if (type == TransactionEventRecord.Type.ROLLBACK.get()) {
+              rollbackCount++;
               transactionMap.remove(trans);
             } else if (type == TransactionEventRecord.Type.COMMIT.get()) {
+              commitCount++;
               @SuppressWarnings("unchecked")
-              Collection<FlumeEventPointer> pointers = 
+              Collection<FlumeEventPointer> pointers =
                 (Collection<FlumeEventPointer>) transactionMap.remove(trans);
               if (pointers != null && pointers.size() > 0) {
                 processCommit(((Commit) record).getType(), pointers);
@@ -113,9 +118,16 @@ class ReplayHandler {
                   + Integer.toHexString(type));
             }
 
+          } else {
+            skipCount++;
           }
         }
         LOG.info("Replayed " + count + " from " + log);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("read: " + readCount + ", put: " + putCount + ", take: "
+            + takeCount + ", rollback: " + rollbackCount + ", commit: "
+            + commitCount + ", skipp: " + skipCount);
+        }
       } catch (EOFException e) {
         LOG.warn("Hit EOF on " + log);
       } finally {
@@ -134,7 +146,6 @@ class ReplayHandler {
         for (Long pointer : pendingTakes) {
           LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
         }
-        Preconditions.checkState(false, msg);
       } else {
         LOG.error(msg + ". Duplicate messages will exist in destination.");
       }
@@ -143,9 +154,6 @@ class ReplayHandler {
   }
 
   private void processCommit(short type, Collection<FlumeEventPointer> pointers) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing commit of " + TransactionEventRecord.getName(type));
-    }
     if (type == TransactionEventRecord.Type.PUT.get()) {
       for (FlumeEventPointer pointer : pointers) {
         Preconditions.checkState(queue.addTail(pointer), "Unable to add "
@@ -154,8 +162,6 @@ class ReplayHandler {
           Preconditions.checkState(queue.remove(pointer),
               "Take was pending and pointer was successfully added to the"
                   + " queue but could not be removed: " + pointer);
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("Commited Put " + pointer);
         }
       }
     } else if (type == TransactionEventRecord.Type.TAKE.get()) {
@@ -163,9 +169,6 @@ class ReplayHandler {
         boolean removed = queue.remove(pointer);
         if (!removed) {
           pendingTakes.add(pointer.toLong());
-          if (LOG.isDebugEnabled()) {
-            LOG.info("Unable to remove " + pointer + " added to pending list");
-          }
         }
       }
     } else {

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java Thu Jun 21 16:58:23 2012
@@ -43,14 +43,14 @@ public class TestCheckpoint {
   @Test
   public void testSerialization() throws IOException {
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
-    FlumeEventQueue queueIn = new FlumeEventQueue(1);
+    FlumeEventQueue queueIn = new FlumeEventQueue(1, file);
     queueIn.addHead(ptrIn);
-    Checkpoint checkpoint = new Checkpoint(file, 1);
-    Assert.assertEquals(0, checkpoint.getTimestamp());
-    checkpoint.write(queueIn);
-    FlumeEventQueue queueOut = checkpoint.read();
-    FlumeEventPointer ptrOut = queueOut.removeHead();
+    FlumeEventQueue queueOut = new FlumeEventQueue(1, file);
+    Assert.assertEquals(0, queueOut.getTimestamp());
+    queueIn.checkpoint(false);
+    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file);
+    FlumeEventPointer ptrOut = queueOut2.removeHead();
     Assert.assertEquals(ptrIn, ptrOut);
-    Assert.assertTrue(checkpoint.getTimestamp() > 0);
+    Assert.assertTrue(queueOut2.getTimestamp() > 0);
   }
 }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java Thu Jun 21 16:58:23 2012
@@ -71,22 +71,22 @@ public class TestFileChannel {
       dataDir += dataDirs[i].getAbsolutePath() + ",";
     }
     dataDir = dataDir.substring(0, dataDir.length() - 1);
-    channel = createFileChannel(1000);
+    channel = createFileChannel();
 
   }
   private FileChannel createFileChannel() {
-    return createFileChannel(FileChannelConfiguration.DEFAULT_CAPACITY);
-  }
-  private FileChannel createFileChannel(int capacity) {
     FileChannel channel = new FileChannel();
     context.put(FileChannelConfiguration.CHECKPOINT_DIR,
         checkpointDir.getAbsolutePath());
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
-    context.put(FileChannelConfiguration.CAPACITY, String.valueOf(capacity));
+    context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    // Set checkpoint for 5 seconds otherwise test will run out of memory
+    context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");
     Configurables.configure(channel, context);
     channel.start();
     return channel;
   }
+
   @After
   public void teardown() {
     if(channel != null) {
@@ -180,7 +180,7 @@ public class TestFileChannel {
   @Test
   public void testCapacity() throws Exception {
     channel.close();
-    channel = createFileChannel(5);
+    channel = createFileChannel();
     try {
       putEvents(channel, "capacity", 1, 6);
     } catch (ChannelException e) {
@@ -337,20 +337,10 @@ public class TestFileChannel {
     Collections.sort(actual);
     Assert.assertEquals(expected, actual);
   }
-  @Test(expected=IOException.class)
+  @Test
   public void testLocking() throws IOException {
-    try {
-      createFileChannel();
-    } catch (RuntimeException e) {
-      Throwable cause = e.getCause();
-      Assert.assertNotNull(cause);
-      Assert.assertTrue(cause.getClass().getSimpleName(),
-          cause instanceof IOException);
-      String msg = cause.getMessage();
-      Assert.assertNotNull(msg);
-      Assert.assertTrue(msg.endsWith("The directory is already locked."));
-      throw (IOException)cause;
-    }
+    FileChannel fc = createFileChannel();
+    Assert.assertTrue(!fc.isOpen());
   }
   @Test
   public void testIntegration() throws IOException, InterruptedException {
@@ -458,4 +448,4 @@ public class TestFileChannel {
     }
     return result;
   }
-}
\ No newline at end of file
+}

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java Thu Jun 21 16:58:23 2012
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Set;
 
 import org.junit.Assert;
@@ -28,39 +30,43 @@ import com.google.common.collect.Sets;
 
 public class TestFlumeEventQueue {
 
+  File file;
   FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
   FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
   FlumeEventQueue queue;
   @Before
-  public void setup() {
-    queue = new FlumeEventQueue(1000);
+  public void setup() throws Exception {
+    file = File.createTempFile("Checkpoint", "");
   }
   @Test
-  public void testQueueIsEmptyAfterCreation() {
+  public void testQueueIsEmptyAfterCreation() throws IOException {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertNull(queue.removeHead());
   }
   @Test
-  public void testCapacity() {
-    queue = new FlumeEventQueue(1);
+  public void testCapacity() throws Exception {
+    queue = new FlumeEventQueue(1, file);
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
   @Test(expected=IllegalArgumentException.class)
-  public void testInvalidCapacityZero() {
-    queue = new FlumeEventQueue(0);
+  public void testInvalidCapacityZero() throws Exception {
+    queue = new FlumeEventQueue(0, file);
   }
   @Test(expected=IllegalArgumentException.class)
-  public void testInvalidCapacityNegative() {
-    queue = new FlumeEventQueue(-1);
+  public void testInvalidCapacityNegative() throws Exception {
+    queue = new FlumeEventQueue(-1, file);
   }
   @Test
-  public void addTail1() {
+  public void addTail1() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(pointer1, queue.removeHead());
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
-  public void addTail2() {
+  public void addTail2() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -68,7 +74,8 @@ public class TestFlumeEventQueue {
     Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
   }
   @Test
-  public void addTailLarge() {
+  public void addTailLarge() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -84,14 +91,16 @@ public class TestFlumeEventQueue {
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
-  public void addHead1() {
+  public void addHead1() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead());
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
-  public void addHead2() {
+  public void addHead2() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -99,7 +108,8 @@ public class TestFlumeEventQueue {
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
   }
   @Test
-  public void addHeadLarge() {
+  public void addHeadLarge() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -115,7 +125,8 @@ public class TestFlumeEventQueue {
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
-  public void addTailRemove1() {
+  public void addTailRemove1() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
@@ -125,7 +136,8 @@ public class TestFlumeEventQueue {
   }
 
   @Test
-  public void addTailRemove2() {
+  public void addTailRemove2() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -133,27 +145,30 @@ public class TestFlumeEventQueue {
   }
 
   @Test
-  public void addHeadRemove1() {
+  public void addHeadRemove1() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertNull(queue.removeHead());
   }
   @Test
-  public void addHeadRemove2() {
+  public void addHeadRemove2() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertEquals(pointer2, queue.removeHead());
   }
   @Test
-  public void testWrappingCorrectly() {
+  public void testWrappingCorrectly() throws Exception {
+    queue = new FlumeEventQueue(1000, file);
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
       if(!queue.addHead(new FlumeEventPointer(i, i))) {
         break;
       }
     }
-    for (int i = queue.size()/2; i > 0; i--) {
+    for (int i = queue.getSize()/2; i > 0; i--) {
       Assert.assertNotNull(queue.removeHead());
     }
     // addHead below would throw an IndexOOBounds with

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java Thu Jun 21 16:58:23 2012
@@ -31,12 +31,14 @@ import com.google.common.io.Files;
 
 public class TestLog {
   private static final long MAX_FILE_SIZE = 1000;
+  private static final int CAPACITY = 10000;
   private Log log;
   private File checkpointDir;
   private File[] dataDirs;
   private long transactionID;
   @Before
   public void setup() throws IOException {
+    transactionID = 0;
     checkpointDir = Files.createTempDir();
     Assert.assertTrue(checkpointDir.isDirectory());
     dataDirs = new File[3];
@@ -44,8 +46,9 @@ public class TestLog {
       dataDirs[i] = Files.createTempDir();
       Assert.assertTrue(dataDirs[i].isDirectory());
     }
-    log = new Log(1L, MAX_FILE_SIZE, 10000,
-        checkpointDir, dataDirs);
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
   }
   @After
@@ -77,6 +80,7 @@ public class TestLog {
   @Test
   public void testRoll() throws IOException, InterruptedException {
     log.shutdownWorker();
+    Thread.sleep(1000);
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
       long transactionID = ++this.transactionID;
@@ -109,8 +113,10 @@ public class TestLog {
     FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn);
     log.commitPut(transactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    log = new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
+                dataDirs).build();
     log.replay();
     System.out.println(log.getNextFileID());
     takeAndVerify(eventPointerIn, eventIn);
@@ -126,8 +132,10 @@ public class TestLog {
     log.put(transactionID, eventIn);
     log.rollback(transactionID); // rolled back so it should not be replayed
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    log = new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
+                dataDirs).build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Assert.assertNull(queue.removeHead());
@@ -147,8 +155,9 @@ public class TestLog {
     log.take(takeTransactionID, eventPointer);
     log.commitTake(takeTransactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Assert.assertNull(queue.removeHead());
@@ -168,8 +177,9 @@ public class TestLog {
     log.take(takeTransactionID, eventPointerIn);
     log.rollback(takeTransactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
     takeAndVerify(eventPointerIn, eventIn);
   }
@@ -179,8 +189,9 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.commitPut(putTransactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead();
@@ -192,8 +203,9 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.commitTake(putTransactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead();
@@ -205,8 +217,9 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.rollback(putTransactionID);
     log.close();
-    log = new Log(Long.MAX_VALUE, LogFile.MAX_FILE_SIZE, 1,
-        checkpointDir, dataDirs);
+    new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(LogFile.MAX_FILE_SIZE).setQueueSize(
+            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs).build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead();

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1352615&r1=1352614&r2=1352615&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Thu Jun 21 16:58:23 2012
@@ -76,8 +76,7 @@ public class LoggerSink extends Abstract
       transaction.commit();
     } catch (Exception ex) {
       transaction.rollback();
-      throw new EventDeliveryException("Failed to log event: " +
-          EventHelper.dumpEvent(event), ex);
+      throw new EventDeliveryException("Failed to log event: " + event, ex);
     } finally {
       transaction.close();
     }



Mime
View raw message