flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1334488 [1/3] - in /incubator/flume/trunk: flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/...
Date Sat, 05 May 2012 20:34:59 GMT
Author: arvind
Date: Sat May  5 20:34:58 2012
New Revision: 1334488

URL: http://svn.apache.org/viewvc?rev=1334488&view=rev
Log:
FLUME-1085. Implement a durable File Channel.

(Brock Noland via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java   (with props)
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java   (with props)
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java   (with props)
Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
    incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml?rev=1334488&r1=1334487&r2=1334488&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml Sat May  5 20:34:58 2012
@@ -64,6 +64,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.0</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java Sat May  5 20:34:58 2012
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a copy of FlumeEventQueue written out to a specific
+ * directory. Two checkpoints will be used at all times, writes
+ * will be alternated between the two Checkpoint objects/files.
+ */
+class Checkpoint {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(Checkpoint.class);
+  private static final long OFFSET_TIMESTAMP = 0;
+  private final File file;
+  private final int queueCapacity;
+
+  private long timestamp;
+  Checkpoint(File file, int queueCapacity) {
+    this.file = file;
+    this.queueCapacity = queueCapacity;
+    timestamp = -1;
+  }
+
+  void write(FlumeEventQueue queue) throws IOException {
+    LOG.info("Writing checkoint to " + file + ", size = " + queue.size());
+    RandomAccessFile dataOutput = new RandomAccessFile(file, "rw");
+    try {
+      // write out a small number
+      dataOutput.writeLong(Long.MIN_VALUE);
+      // write a the queue itself
+      dataOutput.writeInt(queue.getCapacity());
+      queue.write(dataOutput);
+      // force all changes to disk
+      dataOutput.getChannel().force(true);
+      // now update the timestamp saying we are successful
+      dataOutput.seek(OFFSET_TIMESTAMP);
+      dataOutput.writeLong(timestamp = System.currentTimeMillis());
+      dataOutput.getChannel().force(true);
+      dataOutput.close();
+    } finally {
+      if(dataOutput != null) {
+        try {
+          dataOutput.close();
+        } catch (IOException e) {}
+      }
+    }
+  }
+
+  File getFile() {
+    return file;
+  }
+
+  FlumeEventQueue read() throws IOException {
+    FileInputStream fileInput = new FileInputStream(file);
+    try {
+      DataInputStream dataInput = new DataInputStream(fileInput);
+      long timestamp = dataInput.readLong();
+      Preconditions.checkState(timestamp > 0, "Timestamp is invalid " + timestamp);
+      int capacity = Math.max(dataInput.readInt(), queueCapacity);
+      FlumeEventQueue queue = new FlumeEventQueue(capacity);
+      queue.readFields(dataInput);
+      return queue;
+    } finally {
+      if(fileInput != null) {
+        try {
+          fileInput.close();
+        } catch (IOException e) {}
+      }
+    }
+  }
+
+  long getTimestamp() throws IOException {
+    if(timestamp > 0) {
+      return timestamp;
+    }
+    timestamp = 0;
+    RandomAccessFile fileHandle = null;
+    try {
+      fileHandle = new RandomAccessFile(file, "r");
+      fileHandle.seek(OFFSET_TIMESTAMP);
+      timestamp = fileHandle.readLong();
+    } catch(EOFException e) {
+      // no checkpoint taken
+    } catch (FileNotFoundException e) {
+      // no checkpoint taken
+    } finally {
+      if(fileHandle != null) {
+        try {
+          fileHandle.close();
+        } catch (IOException x) {}
+      }
+    }
+    return timestamp;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Checkpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java Sat May  5 20:34:58 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Commit on disk
+ */
+class Commit extends TransactionEventRecord {
+  /**
+   * Type of Commit Take|Put
+   */
+  private short type;
+  Commit(Long transactionID) {
+    super(transactionID);
+  }
+  Commit(Long transactionID, short type) {
+    this(transactionID);
+    this.type = type;
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    type = in.readShort();
+  }
+
+  short getType() {
+    return type;
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeShort(type);
+  }
+  @Override
+  short getRecordType() {
+    return Type.COMMIT.get();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1334488&r1=1334487&r2=1334488&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Sat May  5 20:34:58 2012
@@ -20,371 +20,375 @@
 package org.apache.flume.channel.file;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.channel.BasicChannelSemantics;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 
 /**
  * <p>
  * A durable {@link Channel} implementation that uses the local file system for
  * its storage.
  * </p>
+ * <p>
+ * FileChannel works by writing all transactions to a set of directories
+ * specified in the configuration. Additionally, when a commit occurs
+ * the transaction is synced to disk. Pointers to events put on the
+ * channel are stored in memory. As such, each event on the queue
+ * will require 8 bytes of DirectMemory (non-heap). However, the channel
+ * will only allow a configurable number messages into the channel.
+ * The appropriate amount of direct memory for said capacity,
+ * must be allocated to the JVM via the JVM property: -XX:MaxDirectMemorySize
+ * </p>
+ * <br>
+ * <p>
+ * Memory Consumption:
+ * <ol>
+ * <li>200GB of data in queue at 100 byte messages: 16GB</li>
+ * <li>200GB of data in queue at 500 byte messages: 3.2GB</li>
+ * <li>200GB of data in queue at 1000 byte messages: 1.6GB</li>
+ * </ol>
+ * </p>
  */
-public class FileChannel extends AbstractChannel {
+public class FileChannel extends BasicChannelSemantics {
 
-  private static final Logger logger = LoggerFactory
+  private static final Logger LOG = LoggerFactory
       .getLogger(FileChannel.class);
 
-  private static ThreadLocal<FileBackedTransaction> currentTransaction = new ThreadLocal<FileBackedTransaction>();
-
-  private File directory;
-
-  private File openDirectory;
-  private File completeDirectory;
-  private boolean isInitialized;
-
-  private File currentOutputFile;
-  private boolean shouldRotate;
+  private int capacity;
+  private int keepAlive;
+  private int transactionCapacity;
+  private long checkpointInterval;
+  private long maxFileSize;
+  private File checkpointDir;
+  private File[] dataDirs;
+  private Log log;
+  private boolean shutdownHookAdded;
+  private Thread shutdownHook;
+  private volatile boolean open;
+  private Semaphore queueRemaining;
+  private final ThreadLocal<FileBackedTransaction> transactions =
+      new ThreadLocal<FileBackedTransaction>();
 
-  private void initialize() {
-    Preconditions.checkState(directory != null, "Directory must not be null");
-    Preconditions.checkState(directory.getParentFile().exists(),
-        "Directory %s must exist", directory.getParentFile());
+  /**
+   * Transaction IDs should unique within a file channel
+   * across JVM restarts.
+   */
+  private static final AtomicLong TRANSACTION_ID =
+      new AtomicLong(System.currentTimeMillis());
 
-    logger.info("Initializing file channel directory:{}", directory);
+  @Override
+  public void configure(Context context) {
 
-    openDirectory = new File(directory, "open");
-    completeDirectory = new File(directory, "complete");
+    String homePath = System.getProperty("user.home").replace('\\', '/');
 
-    if (!openDirectory.mkdirs()) {
-      throw new ChannelException("Unable to create open file directory:"
-          + openDirectory);
+    String strCheckpointDir =
+        context.getString(FileChannelConfiguration.CHECKPOINT_DIR,
+            homePath + "/.flume/file-channel/checkpoint");
+
+    String[] strDataDirs = context.getString(FileChannelConfiguration.DATA_DIRS,
+        homePath + "/.flume/file-channel/data").split(",");
+
+    if(checkpointDir == null) {
+      checkpointDir = new File(strCheckpointDir);
+    } else if(!checkpointDir.getAbsolutePath().
+        equals(new File(strCheckpointDir).getAbsolutePath())) {
+      LOG.warn("An attempt was made to change the checkpoint " +
+          "directory after start, this is not supported.");
+    }
+    if(dataDirs == null) {
+      dataDirs = new File[strDataDirs.length];
+      for (int i = 0; i < strDataDirs.length; i++) {
+        dataDirs[i] = new File(strDataDirs[i]);
+      }
+    } else {
+      boolean changed = false;
+      if(dataDirs.length != strDataDirs.length) {
+        changed = true;
+      } else {
+        for (int i = 0; i < strDataDirs.length; i++) {
+          if(!dataDirs[i].getAbsolutePath().
+              equals(new File(strDataDirs[i]).getAbsolutePath())) {
+            changed = true;
+            break;
+          }
+        }
+      }
+      if(changed) {
+        LOG.warn("An attempt was made to change the data " +
+            "directories after start, this is not supported.");
+      }
     }
 
-    if (!completeDirectory.mkdirs()) {
-      throw new ChannelException("Unable to create complete file directory:"
-          + completeDirectory);
+    int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
+        FileChannelConfiguration.DEFAULT_CAPACITY);
+    if(capacity > 0 && newCapacity != capacity) {
+      LOG.warn("Capacity of this channel cannot be sized on the fly due " +
+          "the requirement we have enough DirectMemory for the queue and " +
+          "downsizing of the queue cannot be guranteed due to the " +
+          "fact there maybe more items on the queue than the new capacity.");
+    } else {
+      capacity = newCapacity;
+    }
+
+    keepAlive =
+        context.getInteger(FileChannelConfiguration.KEEP_ALIVE,
+            FileChannelConfiguration.DEFAULT_KEEP_ALIVE);
+    transactionCapacity =
+        context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY,
+            FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY);
+
+    checkpointInterval =
+        context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+            FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
+
+    // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
+    maxFileSize = Math.min(
+        context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
+            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
+            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
+
+    if(queueRemaining == null) {
+      queueRemaining = new Semaphore(capacity, true);
+    }
+    if(log != null) {
+      log.setCheckpointInterval(checkpointInterval);
+      log.setMaxFileSize(maxFileSize);
     }
-
-    shouldRotate = false;
-    isInitialized = true;
   }
 
   @Override
-  public void put(Event event) throws ChannelException {
-    Preconditions.checkState(currentTransaction.get() != null,
-        "No transaction currently in progress");
-
-    currentTransaction.get().put(event);
-  }
-
-  @Override
-  public Event take() throws ChannelException {
-    Preconditions.checkState(currentTransaction.get() != null,
-        "No transaction currently in progress");
-
-    return currentTransaction.get().take();
+  public synchronized void start() {
+    LOG.info("Starting FileChannel with dataDir "  + Arrays.toString(dataDirs));
+    try {
+      log = new Log(checkpointInterval, maxFileSize, capacity,
+          checkpointDir, dataDirs);
+      log.replay();
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+    open = true;
+    boolean error = true;
+    try {
+      int depth = getDepth();
+      Preconditions.checkState(queueRemaining.tryAcquire(depth),
+          "Unable to acquire " + depth + " permits");
+      LOG.info("Queue Size after replay: " + depth);
+      // shutdown hook flushes all data to disk and closes
+      // file descriptors along with setting all closed flags
+      if(!shutdownHookAdded) {
+        shutdownHookAdded = true;
+        final FileChannel fileChannel = this;
+        LOG.info("Adding shutdownhook for " + fileChannel);
+        shutdownHook = new Thread() {
+          @Override
+          public void run() {
+            String desc = Arrays.toString(fileChannel.dataDirs);
+            LOG.info("Closing FileChannel " + desc);
+            try {
+              fileChannel.close();
+            } catch (Exception e) {
+              LOG.error("Error closing fileChannel " + desc, e);
+            }
+          }
+        };
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+      }
+      error = false;
+    } finally {
+      if(error) {
+        open = false;
+      }
+    }
+    super.start();
   }
 
   @Override
-  public synchronized Transaction getTransaction() {
-    if (!isInitialized) {
-      /* This is a catch-all to ensure we initialize file system storage once. */
-      initialize();
-    }
-
-    FileBackedTransaction tx = currentTransaction.get();
-
-    if (shouldRotate) {
-      currentOutputFile = null;
-    }
-
-    /*
-     * If there's no current transaction (which is stored in a threadlocal) OR
-     * its current state is CLOSED - which indicates the transaction is in a
-     * final state and unusable - we create a new transaction with the current
-     * output file and set the thread-local transaction holder to it.
-     */
-    if (tx == null || tx.state.equals(FileBackedTransaction.State.CLOSED)) {
-      FileBackedTransaction transaction = new FileBackedTransaction();
-
-      if (currentOutputFile == null) {
-        currentOutputFile = new File(openDirectory, Thread.currentThread()
-            .getId() + "-" + System.currentTimeMillis());
-
-        logger.debug("Using new output file:{}", currentOutputFile);
+  public synchronized void stop() {
+    LOG.info("Stopping FileChannel with dataDir " +  Arrays.toString(dataDirs));
+    try {
+      if(shutdownHookAdded && shutdownHook != null) {
+        Runtime.getRuntime().removeShutdownHook(shutdownHook);
+        shutdownHookAdded = false;
+        shutdownHook = null;
       }
-
-      transaction.currentOutputFile = currentOutputFile;
-
-      currentTransaction.set(transaction);
-
-      logger.debug("Created transaction:{} for channel:{}", transaction, this);
+    } finally {
+      close();
     }
-
-    return currentTransaction.get();
-  }
-
-  public File getDirectory() {
-    return directory;
-  }
-
-  public void setDirectory(File directory) {
-    this.directory = directory;
-  }
-
-  public File getOpenDirectory() {
-    return openDirectory;
-  }
-
-  public File getCompleteDirectory() {
-    return completeDirectory;
-  }
-
-  public boolean isInitialized() {
-    return isInitialized;
+    super.stop();
   }
 
   @Override
-  public String getName() {
-    // TODO Auto-generated method stub
-    return null;
+  protected BasicTransactionSemantics createTransaction() {
+    Preconditions.checkState(open, "Channel closed");
+    FileBackedTransaction trans = transactions.get();
+    if(trans != null && !trans.isClosed()) {
+      Preconditions.checkState(false,
+          "Thread has transaction which is still open: " +
+              trans.getStateAsString());
+    }
+    trans = new FileBackedTransaction(log, TRANSACTION_ID.incrementAndGet(),
+        transactionCapacity, keepAlive, queueRemaining);
+    transactions.set(trans);
+    return trans;
+  }
+
+  int getDepth() {
+    Preconditions.checkState(open, "Channel closed");
+    Preconditions.checkNotNull(log, "log");
+    FlumeEventQueue queue = log.getFlumeEventQueue();
+    Preconditions.checkNotNull(queue, "queue");
+    return queue.size();
+  }
+  void close() {
+    if(open) {
+      open = false;
+      log.close();
+      log = null;
+      queueRemaining = null;
+    }
   }
 
   /**
-   * <p>
-   * An implementation of {@link Transaction} for {@link FileChannel}s.
-   * </p>
+   * Transaction backed by a file. This transaction supports either puts
+   * or takes but not both.
    */
-  public static class FileBackedTransaction implements Transaction {
-
-    private String transactionId;
-
-    private List<Event> readEvents;
-    private List<Event> writeEvents;
-
-    private File currentOutputFile;
-
-    private FileInputStream inputStream;
-    private FileOutputStream outputStream;
-
-    private State state;
-    private boolean readInitialized;
-    private boolean writeInitialized;
-
-    public FileBackedTransaction() {
-      transactionId = Thread.currentThread().getId() + "-"
-          + System.currentTimeMillis();
-
-      state = State.NEW;
-      readInitialized = false;
-      writeInitialized = false;
-    }
-
-    /**
-     * <p>
-     * Initializes the input (i.e. {@code take()}) support in this transaction.
-     * </p>
-     * <p>
-     * Any transaction may support reads, writes, or a combination thereof. In
-     * order to consume the least amount of resources possible, initialization
-     * of resources are deferred until the first read ({@code take()} or write (
-     * {@code put()}) is performed.
-     * </p>
-     * <p>
-     */
-    private void initializeInput() {
-      readEvents = new LinkedList<Event>();
-
-      readInitialized = true;
-    }
-
-    /**
-     * <p>
-     * Initializes the output (i.e. {@code put()}) support in this transaction.
-     * </p>
-     * <p>
-     * Any transaction may support reads, writes, or a combination thereof. In
-     * order to consume the least amount of resources possible, initialization
-     * of resources are deferred until the first read ({@code take()} or write (
-     * {@code put()}) is performed.
-     * </p>
-     * <p>
-     */
-    private void initializeOutput() {
-      writeEvents = new LinkedList<Event>();
-
-      try {
-        outputStream = new FileOutputStream(currentOutputFile, true);
-      } catch (FileNotFoundException e) {
-        throw new ChannelException("Unable to open new output file:"
-            + currentOutputFile, e);
-      }
-
-      writeInitialized = true;
+  static class FileBackedTransaction extends BasicTransactionSemantics {
+    private final LinkedBlockingDeque<FlumeEventPointer> takeList;
+    private final LinkedBlockingDeque<FlumeEventPointer> putList;
+    private final long transactionID;
+    private final int keepAlive;
+    private final Log log;
+    private final FlumeEventQueue queue;
+    private final Semaphore queueRemaining;
+    public FileBackedTransaction(Log log, long transactionID,
+        int transCapacity, int keepAlive, Semaphore queueRemaining) {
+      this.log = log;
+      queue = log.getFlumeEventQueue();
+      this.transactionID = transactionID;
+      this.keepAlive = keepAlive;
+      this.queueRemaining = queueRemaining;
+      putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
+      takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
     }
-
-    @Override
-    public void begin() {
-      if (state.equals(State.CLOSED)) {
-        throw new IllegalStateException(
-            "Illegal to begin a transaction with state:" + state);
-      }
-
-      logger.debug("Beginning a new transaction");
-
-      state = State.OPEN;
+    private boolean isClosed() {
+      return State.CLOSED.equals(getState());
+    }
+    private String getStateAsString() {
+      return String.valueOf(getState());
     }
-
     @Override
-    public void commit() {
-      Preconditions.checkState(state.equals(State.OPEN),
-          "Attempt to commit a transaction that isn't open (state:" + state
-              + ")");
-
-      logger.debug("Committing current transaction");
-
-      if (writeInitialized) {
-        logger.debug("Flushing {} writes", writeEvents.size());
-
-        try {
-          for (Event event : writeEvents) {
-            // TODO: Serialize event properly (avro?)
-            outputStream.write((event.toString() + "\n").getBytes());
-            outputStream.flush();
-          }
-
-          // TODO: Write checksum.
-          outputStream.write("---\n".getBytes());
-
-          writeEvents.clear();
-        } catch (IOException e) {
-          throw new ChannelException("Unable to write to output file", e);
-        }
+    protected void doPut(Event event) throws InterruptedException {
+      if(putList.remainingCapacity() == 0) {
+        throw new ChannelException("Put queue for FileBackedTransaction " +
+            "of capacity " + putList.size() + " full, consider " +
+            "committing more frequently, increasing capacity or " +
+            "increasing thread count");
       }
-
-      if (readInitialized) {
-        logger.debug("Freeing {} consumed events", readEvents.size());
-
-        // TODO: Implement me!
+      if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
+        throw new ChannelException("Cannot acquire capacity");
+      }
+      try {
+        FlumeEventPointer ptr = log.put(transactionID, event);
+        Preconditions.checkState(putList.offer(ptr));
+      } catch (IOException e) {
+        throw new ChannelException("Put failed due to IO error", e);
       }
-
-      state = State.COMPLETED;
     }
 
     @Override
-    public void rollback() {
-      Preconditions.checkState(state.equals(State.OPEN),
-          "Attempt to rollback a transaction that isn't open (state:" + state
-              + ")");
-
-      logger.debug("Rolling back current transaction");
-
-      if (writeInitialized) {
-        writeEvents.clear();
+    protected Event doTake() throws InterruptedException {
+      if(takeList.remainingCapacity() == 0) {
+        throw new ChannelException("Take list for FileBackedTransaction, capacity " +
+            takeList.size() + " full, consider committing more frequently, " +
+            "increasing capacity, or increasing thread count");
       }
-
-      if (readInitialized) {
-        readEvents.clear();
+      FlumeEventPointer ptr = queue.removeHead();
+      if(ptr != null) {
+        try {
+          // first add to takeList so that if write to disk
+          // fails rollback actually does it's work
+          Preconditions.checkState(takeList.offer(ptr));
+          log.take(transactionID, ptr); // write take to disk
+          Event event = log.get(ptr);
+          return event;
+        } catch (IOException e) {
+          throw new ChannelException("Take failed due to IO error", e);
+        }
       }
-
-      state = State.COMPLETED;
+      return null;
     }
 
     @Override
-    public void close() {
-      Preconditions
-          .checkState(
-              state.equals(State.COMPLETED),
-              "Attempt to close a transaction that isn't completed - you must either commit or rollback (state:"
-                  + state + ")");
-
-      logger.debug("Closing current transaction:{}", this);
-
-      if (writeInitialized) {
+    protected void doCommit() throws InterruptedException {
+      int puts = putList.size();
+      int takes = takeList.size();
+      if(puts > 0) {
+        Preconditions.checkState(takes == 0);
+        synchronized (queue) {
+          while(!putList.isEmpty()) {
+            if(!queue.addTail(putList.removeFirst())) {
+              StringBuilder msg = new StringBuilder();
+              msg.append("Queue add failed, this shouldn't be able to ");
+              msg.append("happen. A portion of the transaction has been ");
+              msg.append("added to the queue but the remaining portion ");
+              msg.append("cannot be added. Those messages will be consumed ");
+              msg.append("despite this transaction failing. Please report.");
+              LOG.error(msg.toString());
+              Preconditions.checkState(false, msg.toString());
+            }
+          }
+        }
         try {
-          outputStream.close();
+          log.commitPut(transactionID);
         } catch (IOException e) {
-          throw new ChannelException("Unable to close current output file", e);
+          throw new ChannelException("Commit failed due to IO error", e);
         }
+      } else if(takes > 0) {
+        try {
+          log.commitTake(transactionID);
+        } catch (IOException e) {
+          throw new ChannelException("Commit failed due to IO error", e);
+        }
+        queueRemaining.release(takes);
       }
-
-      if (readInitialized) {
-        // TODO: Implement me!
-      }
-
-      state = State.CLOSED;
+      putList.clear();
+      takeList.clear();
     }
 
-    private void put(Event event) {
-      if (!writeInitialized) {
-        initializeOutput();
+    @Override
+    protected void doRollback() throws InterruptedException {
+      int puts = putList.size();
+      int takes = takeList.size();
+      if(takes > 0) {
+        Preconditions.checkState(puts == 0);
+        while(!takeList.isEmpty()) {
+          Preconditions.checkState(queue.addHead(takeList.removeLast()),
+              "Queue add failed, this shouldn't be able to happen");
+        }
       }
-
-      writeEvents.add(event);
-    }
-
-    private Event take() {
-      if (!readInitialized) {
-        initializeInput();
+      queueRemaining.release(puts);
+      try {
+        log.rollback(transactionID);
+      } catch (IOException e) {
+        throw new ChannelException("Commit failed due to IO error", e);
       }
-
-      // TODO: Implement me!
-      return null;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder builder = new StringBuilder(
-          "FileTransaction: { transactionId:").append(transactionId)
-          .append(" state:").append(state);
-
-      if (readInitialized) {
-        builder.append(" read-enabled: { readBuffer:")
-            .append(readEvents.size()).append(" }");
-      }
-
-      if (writeInitialized) {
-        builder.append(" write-enabled: { writeBuffer:")
-            .append(writeEvents.size()).append(" currentOutputFile:")
-            .append(currentOutputFile).append(" }");
-      }
-
-      builder.append(" }");
-
-      return builder.toString();
-    }
-
-    /**
-     * <p>
-     * The state of the {@link Transaction} to which it belongs.
-     * </p>
-     * <dl>
-     * <dt>NEW</dt>
-     * <dd>A newly created transaction that has not yet begun.</dd>
-     * <dt>OPEN</dt>
-     * <dd>A transaction that is open. It is permissible to commit or rollback.</dd>
-     * <dt>COMPLETED</dt>
-     * <dd>This transaction has been committed or rolled back. It is illegal to
-     * perform any further operations beyond closing it.</dd>
-     * <dt>CLOSED</dt>
-     * <dd>A closed transaction. No further operations are permitted.</dd>
-     */
-    private static enum State {
-      NEW, OPEN, COMPLETED, CLOSED
+      putList.clear();
+      takeList.clear();
     }
 
   }

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java Sat May  5 20:34:58 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+public class FileChannelConfiguration {
+  /**
+   * Directory Checkpoints will be written in
+   */
+  public static final String CHECKPOINT_DIR = "checkpointDir";
+  /**
+   * Directories data files will be written in. Multiple directories
+   * can be specified as comma separated values. Writes will
+   * be written in a round robin fashion.
+   */
+  public static final String DATA_DIRS = "dataDirs";
+  /**
+   * Maximum number of put/take events in a transaction. Default: 1000
+   */
+  public static final String TRANSACTION_CAPACITY = "transactionCapacity";
+  public static final int DEFAULT_TRANSACTION_CAPACITY = 1000;
+  /**
+   * Interval at which checkpoints should be taken. Default 5 mins (ms)
+   */
+  public static final String CHECKPOINT_INTERVAL = "checkpointInterval";
+  public static final long DEFAULT_CHECKPOINT_INTERVAL = 5L * 60L * 1000L;
+  /**
+   * Max file size for data files, cannot exceed the default. Default: 2GB
+   */
+  public static final String MAX_FILE_SIZE = "maxFileSize";
+  public static final long DEFAULT_MAX_FILE_SIZE = LogFile.MAX_FILE_SIZE;
+  /**
+   * Maximum capacity of the channel. This number needs to be configured
+   * in line with -XX:MaxDirectMemorySize. {@link FileChannel}
+   * Default: 1,000,000 which will consume 8MB of direct memory
+   */
+  public static final String CAPACITY = "capacity";
+  public static final int DEFAULT_CAPACITY = 1000000;
+  /**
+   * The length of time we will wait for space available to do a Put.
+   * Default: 3 (seconds)
+   */
+  public static final String KEEP_ALIVE = "keep-alive";
+  public static final int DEFAULT_KEEP_ALIVE = 3;
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java Sat May  5 20:34:58 2012
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Persistable wrapper for Event
+ */
+class FlumeEvent implements Event, Writable {
+
+  private Map<String, String> headers;
+  private byte[] body;
+
+  private FlumeEvent() {
+    this(null, null);
+  }
+  FlumeEvent(Map<String, String> headers, byte[] body) {
+    this.headers = headers;
+    this.body = body;
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    this.headers = headers;
+  }
+
+  @Override
+  public byte[] getBody() {
+    return body;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+    this.body = body;
+  }
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    MapWritable map = toMapWritable(getHeaders());
+    map.write(out);
+    byte[] body = getBody();
+    if(body == null) {
+      out.writeInt(-1);
+    } else {
+      out.writeInt(body.length);
+      out.write(body);
+    }
+  }
+
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    MapWritable map = new MapWritable();
+    map.readFields(in);
+    setHeaders(fromMapWritable(map));
+    byte[] body = null;
+    int bodyLength = in.readInt();
+    if(bodyLength != -1) {
+      body = new byte[bodyLength];
+      in.readFully(body);
+    }
+    setBody(body);
+  }
+  private MapWritable toMapWritable(Map<String, String> map) {
+    MapWritable result = new MapWritable();
+    if(map != null) {
+      for(Map.Entry<String, String> entry : map.entrySet()) {
+        result.put(new Text(entry.getKey()),new Text(entry.getValue()));
+      }
+    }
+    return result;
+  }
+  private Map<String, String> fromMapWritable(MapWritable map) {
+    Map<String, String> result = Maps.newHashMap();
+    if(map != null) {
+      for(Map.Entry<Writable, Writable> entry : map.entrySet()) {
+        result.put(entry.getKey().toString(),entry.getValue().toString());
+      }
+    }
+    return result;
+  }
+  static FlumeEvent from(DataInput in) throws IOException {
+    FlumeEvent event = new FlumeEvent();
+    event.readFields(in);
+    return event;
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java Sat May  5 20:34:58 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Pointer to an Event on disk. This is represented in memory
+ * as a long. As such there are methods to convert from this
+ * object to a long and from a long to this object.
+ */
+class FlumeEventPointer {
+  private final int fileID;
+  private final int offset;
+  FlumeEventPointer(int fileID, int offset) {
+    this.fileID = fileID;
+    this.offset = offset;
+    Preconditions.checkArgument(offset > 0);
+  }
+  int getFileID() {
+    return fileID;
+  }
+  int getOffset() {
+    return offset;
+  }
+  public long toLong() {
+    long result = fileID;
+    result = (long)fileID << 32;
+    result += (long)offset;
+    return result;
+  }
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + fileID;
+    result = prime * result + offset;
+    return result;
+  }
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    FlumeEventPointer other = (FlumeEventPointer) obj;
+    if (fileID != other.fileID) {
+      return false;
+    }
+    if (offset != other.offset) {
+      return false;
+    }
+    return true;
+  }
+  @Override
+  public String toString() {
+    return "FlumeEventPointer [fileID=" + fileID + ", offset=" + offset + "]";
+  }
+  public static FlumeEventPointer fromLong(long value) {
+    int fileID = (int)(value >>> 32);
+    int offset = (int)value;
+    return new FlumeEventPointer(fileID, offset);
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java Sat May  5 20:34:58 2012
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Queue of events in the channel. This queue stores only
+ * {@link FlumeEventPointer} objects which are represented
+ * as 8 byte longs internally. Additionally the queue itself
+ * of longs is stored as a {@link LongBuffer} in DirectMemory
+ * (off heap).
+ */
+class FlumeEventQueue implements Writable {
+  // XXX  We use % heavily which can be CPU intensive.
+  @SuppressWarnings("unused")
+  private static final Logger LOG = LoggerFactory
+  .getLogger(FlumeEventQueue.class);
+  protected static final int VERSION = 1;
+  protected static final int SIZE_OF_LONG = 8;
+  protected static final int EMPTY = 0;
+  protected final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
+  protected final LongBuffer elements;
+  protected final ByteBuffer backingBuffer;
+  // both fields will be modified by multiple threads
+  protected volatile int size;
+  protected volatile int next;
+  /**
+   * @param capacity max event capacity of queue
+   */
+  FlumeEventQueue(int capacity) {
+    Preconditions.checkArgument(capacity > 0, "Capacity must be greater than zero");
+    backingBuffer = DirectMemoryUtils.allocate(capacity * SIZE_OF_LONG);
+    elements = backingBuffer.asLongBuffer();
+    for (int index = 0; index < elements.capacity(); index++) {
+      elements.put(index, EMPTY);
+    }
+  }
+  /**
+   * Retrieve and remove the head of the queue.
+   *
+   * @return FlumeEventPointer or null if queue is empty
+   */
+  synchronized FlumeEventPointer removeHead() {
+    if(size() == 0) {
+      return null;
+    }
+    long value = remove(0);
+    Preconditions.checkState(value != EMPTY);
+    FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
+    decrementFileID(ptr.getFileID());
+    return ptr;
+  }
+  /**
+   * Add a FlumeEventPointer to the head of the queue
+   * @param FlumeEventPointer to be added
+   * @return true if space was available and pointer was
+   * added to the queue
+   */
+  synchronized boolean addHead(FlumeEventPointer e) {
+    long value = e.toLong();
+    Preconditions.checkArgument(value != EMPTY);
+    if(add(0, value)) {
+      incrementFileID(e.getFileID());
+      return true;
+    }
+    return false;
+  }
+  /**
+   * Add a FlumeEventPointer to the tail of the queue
+   * this will normally be used when recovering from a
+   * crash
+   * @param FlumeEventPointer to be added
+   * @return true if space was available and pointer
+   * was added to the queue
+   */
+  synchronized boolean addTail(FlumeEventPointer e) {
+    long value = e.toLong();
+    Preconditions.checkArgument(value != EMPTY);
+    if(add(size(), value)) {
+      incrementFileID(e.getFileID());
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Remove FlumeEventPointer from queue, will normally
+   * only be used when recovering from a crash
+   * @param FlumeEventPointer to be removed
+   * @return true if the FlumeEventPointer was found
+   * and removed
+   */
+  synchronized boolean remove(FlumeEventPointer e) {
+    long value = e.toLong();
+    Preconditions.checkArgument(value != EMPTY);
+    for (int i = 0; i < size; i++) {
+      if(get(i) == value) {
+        remove(i);
+        FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
+        decrementFileID(ptr.getFileID());
+        return true;
+      }
+    }
+    return false;
+  }
+  /**
+   * @return the set of fileIDs which are currently on the queue
+   * will be normally be used when deciding which data files can
+   * be deleted
+   */
+  synchronized Set<Integer> getFileIDs() {
+    return new HashSet<Integer>(fileIDCounts.keySet());
+  }
+  /**
+   * @return current size of the queue, not the capacity
+   */
+  synchronized int size() {
+    return size;
+  }
+  protected void incrementFileID(int fileID) {
+    AtomicInteger counter = fileIDCounts.get(fileID);
+    if(counter == null) {
+      counter = new AtomicInteger(0);
+      fileIDCounts.put(fileID, counter);
+    }
+    counter.incrementAndGet();
+  }
+
+  protected void decrementFileID(int fileID) {
+    AtomicInteger counter = fileIDCounts.get(fileID);
+    Preconditions.checkState(counter != null);
+    int count = counter.decrementAndGet();
+    if(count == 0) {
+      fileIDCounts.remove(fileID);
+    }
+  }
+
+  protected long get(int index) {
+    if (index < 0 || index > size - 1) {
+      throw new IndexOutOfBoundsException(String.valueOf(index));
+    }
+    return elements.get(convert(index));
+  }
+
+  protected boolean add(int index, long value) {
+    if (index < 0 || index > size) {
+      throw new IndexOutOfBoundsException(String.valueOf(index));
+    }
+    if (size + 1 > elements.capacity()) {
+      return false;
+    }
+    // shift right if element is not added at the right
+    // edge of the array. the common case, add(size-1, value)
+    // will result in no copy operations
+    for (int k = size; k > index; k--) {
+      elements.put(convert(k),
+          elements.get(convert(k - 1)));
+    }
+    elements.put(convert(index), value);
+    size++;
+    return true;
+  }
+
+  protected long remove(int index) {
+    if (index < 0 || index > size - 1) {
+      throw new IndexOutOfBoundsException(String.valueOf(index));
+    }
+    long value = elements.get(convert(index));
+    // shift left if element removed is not on the left
+    // edge of the array. the common case, remove(0)
+    // will result in no copy operations
+    for (int k = index; k > 0; k--) {
+      elements.put(convert(k),
+          elements.get(convert(k - 1)));
+    }
+    elements.put(next % elements.capacity(), EMPTY);
+    next = (next + 1) % elements.capacity();
+    size--;
+    return value;
+  }
+
+  protected int convert(int index) {
+    return (next + index % elements.capacity()) % elements.capacity();
+  }
+
+  @Override
+  public synchronized void readFields(DataInput input) throws IOException {
+    int version = input.readInt();
+    if(version != VERSION) {
+      throw new IOException("Bad Version " + Integer.toHexString(version));
+    }
+    int length = input.readInt();
+    for (int index = 0; index < length; index++) {
+      long value = input.readLong();
+      FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
+      Preconditions.checkState(value != EMPTY);
+      Preconditions.checkState(addHead(ptr), "Unable to add to queue");
+    }
+  }
+
+  @Override
+  public synchronized void write(DataOutput output) throws IOException {
+    output.writeInt(VERSION);
+    output.writeInt(size);
+    for (int index = 0; index < size; index++) {
+      long value = elements.get(convert(index));
+      Preconditions.checkState(value != EMPTY);;
+      output.writeLong(value);
+    }
+  }
+  /**
+   * @return max capacity of the queue
+   */
+  public int getCapacity() {
+    return elements.capacity();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java Sat May  5 20:34:58 2012
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
+ * Once a log object is created the replay method should be called to reconcile
+ * the on disk write ahead log with the last checkpoint of the queue.
+ */
+class Log {
+  public static final String PREFIX = "log-";
+  private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
+  private static final int MIN_NUM_LOGS = 2;
+  private static final String FILE_LOCK = "in_use.lock";
+  // for reader
+  private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
+      .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
+  private final AtomicInteger nextFileID = new AtomicInteger(0);
+  private final File checkpointDir;
+  private final File[] logDirs;
+  private final BackgroundWorker worker;
+  private final int queueSize;
+  private final AtomicReferenceArray<LogFile.Writer> logFiles;
+
+  private volatile boolean open;
+  private AtomicReference<Checkpoint> checkpoint;
+  private Checkpoint checkpointA;
+  private Checkpoint checkpointB;
+  private FlumeEventQueue queue;
+  private long checkpointInterval;
+  private long maxFileSize;
+  private final Map<String, FileLock> locks;
+
+  Log(long checkpointInterval, long maxFileSize, int queueSize,
+      File checkpointDir, File... logDirs) throws IOException {
+    Preconditions.checkArgument(checkpointInterval > 0,
+        "checkpointInterval <= 0");
+    Preconditions.checkArgument(queueSize > 0, "queueSize <= 0");
+    Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
+    Preconditions.checkNotNull(checkpointDir, "checkpointDir");
+    Preconditions.checkArgument(
+        checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+            + checkpointDir + " could not be created");
+    Preconditions.checkNotNull(logDirs, "logDirs");
+    Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
+    for (File logDir : logDirs) {
+      Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
+          "LogDir " + logDir + " could not be created");
+    }
+    locks = Maps.newHashMap();
+    try {
+      lock(checkpointDir);
+      for (File logDir : logDirs) {
+        lock(logDir);
+      }
+    } catch(IOException e) {
+      unlock(checkpointDir);
+      for (File logDir : logDirs) {
+        unlock(logDir);
+      }
+      throw e;
+    }
+    open = false;
+    this.checkpointInterval = checkpointInterval;
+    this.maxFileSize = maxFileSize;
+    this.queueSize = queueSize;
+    this.checkpointDir = checkpointDir;
+    this.logDirs = logDirs;
+    logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
+    worker = new BackgroundWorker(this);
+    worker.setName("Log-BackgroundWorker");
+    worker.setDaemon(true);
+    worker.start();
+  }
+
+  /**
+   * Read checkpoint and data files from disk replaying them to the state
+   * directly before the shutdown or crash.
+   * @throws IOException
+   */
+  synchronized void replay() throws IOException {
+    Preconditions.checkState(!open, "Cannot replay after Log as been opened");
+    open = true;
+    boolean error = true;
+    try {
+      /*
+       * First we are going to look through the data directories
+       * and find all log files. We will store the highest file id
+       * (at the end of the filename) we find and use that when we
+       * create additional log files.
+       *
+       * Also store up the list of files so we can replay them later.
+       */
+      LOGGER.info("Replay started");
+      nextFileID.set(0);
+      List<File> dataFiles = Lists.newArrayList();
+      for (File logDir : logDirs) {
+        for (File file : LogUtils.getLogs(logDir)) {
+          int id = LogUtils.getIDForFile(file);
+          dataFiles.add(file);
+          nextFileID.set(Math.max(nextFileID.get(), id));
+          idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX
+              + id)));
+        }
+      }
+      LOGGER.info("Found NextFileID " + nextFileID +
+          ", from " + Arrays.toString(logDirs));
+
+      /*
+       * sort the data files by file id so we can replay them by file id
+       * which should approximately give us sequential events
+       */
+      LogUtils.sort(dataFiles);
+
+      /*
+       * Read the checkpoint (in memory queue) from one of two alternating
+       * locations. We will read the last one written to disk.
+       */
+      checkpointA = new Checkpoint(new File(checkpointDir, "chkpt-A"),
+          queueSize);
+      checkpointB = new Checkpoint(new File(checkpointDir, "chkpt-B"), 
+          queueSize);
+      if (checkpointA.getTimestamp() > checkpointB.getTimestamp()) {
+        try {
+          LOGGER.info("Reading checkpoint A " + checkpointA.getFile());
+          // read from checkpoint A, write to B
+          queue = checkpointA.read();
+          checkpoint = new AtomicReference<Checkpoint>(checkpointB);
+        } catch (EOFException e) {
+          LOGGER.info("EOF reading from A", e);
+          // read from checkpoint B, write to A
+          queue = checkpointB.read();
+          checkpoint = new AtomicReference<Checkpoint>(checkpointA);
+        }
+      } else if (checkpointB.getTimestamp() > checkpointA.getTimestamp()) {
+        try {
+          LOGGER.info("Reading checkpoint B " + checkpointB.getFile());
+          // read from checkpoint B, write to A
+          queue = checkpointB.read();
+          checkpoint = new AtomicReference<Checkpoint>(checkpointA);
+        } catch (EOFException e) {
+          LOGGER.info("EOF reading from B", e);
+          // read from checkpoint A, write to B
+          queue = checkpointA.read();
+          checkpoint = new AtomicReference<Checkpoint>(checkpointB);
+        }
+      } else {
+        LOGGER.info("Starting checkpoint from scratch");
+        queue = new FlumeEventQueue(queueSize);
+        checkpoint = new AtomicReference<Checkpoint>(checkpointA);
+      }
+
+      long ts = checkpoint.get().getTimestamp();
+      LOGGER.info("Last Checkpoint " + new Date(ts) +
+          ", queue depth = " + queue.size());
+
+      /*
+       * We now have everything we need to actually replay the log files
+       * the queue, the timestamp the queue was written to disk, and
+       * the list of data files.
+       */
+      ReplayHandler replayHandler = new ReplayHandler(queue,
+          checkpoint.get().getTimestamp());
+      replayHandler.replayLog(dataFiles);
+      for (int index = 0; index < logDirs.length; index++) {
+        LOGGER.info("Rolling " + logDirs[index]);
+        roll(index);
+      }
+      /*
+       * Now that we have replayed, write the current queue to disk
+       */
+      writeCheckpoint();
+      error = false;
+    } finally {
+      if (error) {
+        open = false;
+      }
+    }
+  }
+
+  int getNextFileID() {
+    Preconditions.checkState(open, "Log is closed");
+    return nextFileID.get();
+  }
+
+  FlumeEventQueue getFlumeEventQueue() {
+    Preconditions.checkState(open, "Log is closed");
+    return queue;
+  }
+
+  /**
+   * Return the FlumeEvent for an event pointer. This method is
+   * non-transactional. It is assumed the client has obtained this
+   * FlumeEventPointer via FlumeEventQueue.
+   *
+   * @param pointer
+   * @return FlumeEventPointer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  FlumeEvent get(FlumeEventPointer pointer) throws IOException,
+  InterruptedException {
+    Preconditions.checkState(open, "Log is closed");
+    int id = pointer.getFileID();
+    LogFile.RandomReader logFile = idLogFileMap.get(id);
+    Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
+    return logFile.get(pointer.getOffset());
+  }
+
+  /**
+   * Log a put of an event
+   *
+   * Synchronization not required as this method is atomic
+   * @param transactionID
+   * @param event
+   * @return
+   * @throws IOException
+   */
+  FlumeEventPointer put(long transactionID, Event event)
+      throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    FlumeEvent flumeEvent = new FlumeEvent(event.getHeaders(), event.getBody());
+    Put put = new Put(transactionID, flumeEvent);
+    put.setTimestamp(System.currentTimeMillis());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
+    try {
+      FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+      error = false;
+      return ptr;
+    } finally {
+      if (error) {
+        roll(logFileIndex);
+      }
+    }
+  }
+
+  /**
+   * Log a take of an event, pointer points at the corresponding put
+   *
+   * Synchronization not required as this method is atomic
+   * @param transactionID
+   * @param pointer
+   * @throws IOException
+   */
+  void take(long transactionID, FlumeEventPointer pointer)
+      throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    Take take = new Take(transactionID, pointer.getOffset(),
+        pointer.getFileID());
+    take.setTimestamp(System.currentTimeMillis());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
+    try {
+      logFiles.get(logFileIndex).take(buffer);
+      error = false;
+    } finally {
+      if (error) {
+        roll(logFileIndex);
+      }
+    }
+  }
+
+  /**
+   * Log a rollback of a transaction
+   *
+   * Synchronization not required as this method is atomic
+   * @param transactionID
+   * @throws IOException
+   */
+  void rollback(long transactionID) throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    if(LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Rolling back " + transactionID);
+    }
+    Rollback rollback = new Rollback(transactionID);
+    rollback.setTimestamp(System.currentTimeMillis());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
+    try {
+      logFiles.get(logFileIndex).rollback(buffer);
+      error = false;
+    } finally {
+      if (error) {
+        roll(logFileIndex);
+      }
+    }
+  }
+
+  /**
+   * Log commit of put, we need to know which type of commit
+   * so we know if the pointers corresponding to the events
+   * should be added or removed from the flume queue. We
+   * could infer but it's best to be explicit.
+   *
+   * Synchronization not required as this method is atomic
+   * @param transactionID
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void commitPut(long transactionID) throws IOException,
+  InterruptedException {
+    Preconditions.checkState(open, "Log is closed");
+    commit(transactionID, TransactionEventRecord.Type.PUT.get());
+  }
+
+  /**
+   * Log commit of take, we need to know which type of commit
+   * so we know if the pointers corresponding to the events
+   * should be added or removed from the flume queue. We
+   * could infer but it's best to be explicit.
+   *
+   * Synchronization not required as this method is atomic
+   * @param transactionID
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void commitTake(long transactionID) throws IOException,
+  InterruptedException {
+    Preconditions.checkState(open, "Log is closed");
+    commit(transactionID, TransactionEventRecord.Type.TAKE.get());
+  }
+
+  /**
+   * Synchronization required since we do not want this
+   * to be called during a checkpoint.
+   */
+  synchronized void close() {
+    open = false;
+    if (worker != null) {
+      worker.shutdown();
+    }
+    if (logFiles != null) {
+      for (int index = 0; index < logFiles.length(); index++) {
+        logFiles.get(index).close();
+      }
+    }
+    try {
+      unlock(checkpointDir);
+    } catch(IOException ex) {
+      LOGGER.warn("Error unlocking " + checkpointDir, ex);
+    }
+    for (File logDir : logDirs) {
+      try {
+        unlock(logDir);
+      } catch(IOException ex) {
+        LOGGER.warn("Error unlocking " + logDir, ex);
+      }
+    }
+  }
+
+  synchronized void shutdownWorker() {
+    Preconditions.checkNotNull(worker, "worker");
+    worker.shutdown();
+  }
+  void setCheckpointInterval(long checkpointInterval) {
+    this.checkpointInterval = checkpointInterval;
+  }
+  void setMaxFileSize(long maxFileSize) {
+    this.maxFileSize = maxFileSize;
+  }
+
+  /**
+   * Synchronization not required as this method is atomic
+   *
+   * @param transactionID
+   * @param type
+   * @throws IOException
+   */
+  private void commit(long transactionID, short type) throws IOException {
+    Commit commit = new Commit(transactionID, type);
+    commit.setTimestamp(System.currentTimeMillis());
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
+    int logFileIndex = nextLogWriter(transactionID);
+    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
+      roll(logFileIndex, buffer);
+    }
+    boolean error = true;
+    try {
+      logFiles.get(logFileIndex).commit(buffer);
+      error = false;
+    } finally {
+      if (error) {
+        roll(logFileIndex);
+      }
+    }
+  }
+  /**
+   * Atomic so not synchronization required.
+   * @return
+   */
+  private int nextLogWriter(long transactionID) {
+    return (int)Math.abs(transactionID % (long)logFiles.length());
+  }
+  /**
+   * Unconditionally roll
+   * Synchronization done internally
+   * @param index
+   * @throws IOException
+   */
+  private void roll(int index) throws IOException {
+    roll(index, null);
+  }
+  /**
+   * Roll a log if needed. Roll always occurs if the log at the index
+   * does not exist (typically on startup), or buffer is null. Otherwise
+   * LogFile.Writer.isRollRequired is checked again to ensure we don't
+   * have threads pile up on this log resulting in multiple successive
+   * rolls
+   *
+   * Synchronization required since both synchronized and unsynchronized
+   * methods call this method.
+   * @param index
+   * @throws IOException
+   */
+  private synchronized void roll(int index, ByteBuffer buffer)
+      throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    LogFile.Writer oldLogFile = logFiles.get(index);
+    // check to make sure a roll is actually required due to
+    // the possibility of multiple writes waiting on lock
+    if(oldLogFile == null || buffer == null ||
+        oldLogFile.isRollRequired(buffer)) {
+      try {
+        LOGGER.info("Roll start " + logDirs[index]);
+        int fileID = nextFileID.incrementAndGet();
+        File file = new File(logDirs[index], PREFIX + fileID);
+        Preconditions.checkState(!file.exists(), "File alread exists "  + file);
+        Preconditions.checkState(file.createNewFile(), "File could not be created " + file);
+        idLogFileMap.put(fileID, new LogFile.RandomReader(file));
+        // writer from this point on will get new reference
+        logFiles.set(index, new LogFile.Writer(file, fileID, maxFileSize));
+        // close out old log
+        if (oldLogFile != null) {
+          oldLogFile.close();
+        }
+      } finally {
+        LOGGER.info("Roll end");
+      }
+    }
+  }
+  /**
+   * Write the current checkpoint object and then swap objects so that
+   * the next checkpoint occurs on the other checkpoint directory.
+   *
+   * Synchronization required since both synchronized and unsynchronized
+   * @throws IOException if we are unable to write the checkpoint out to disk
+   */
+  private synchronized void writeCheckpoint() throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    synchronized (queue) {
+      checkpoint.get().write(queue);
+      if (!checkpoint.compareAndSet(checkpointA, checkpointB)) {
+        Preconditions.checkState(checkpoint.compareAndSet(checkpointB,
+            checkpointA));
+      }
+    }
+  }
+  /**
+   * Synchronization not required as this is atomic
+   * @return last time we successfully check pointed
+   * @throws IOException if there is an io error reading the ts from disk
+   */
+  private long getLastCheckpoint() throws IOException {
+    Preconditions.checkState(open, "Log is closed");
+    return checkpoint.get().getTimestamp();
+  }
+
+  private void removeOldLogs() {
+    // we will find the smallest fileID currently in use and
+    // won't delete any files with an id larger than the min
+    Set<Integer> fileIDs = new TreeSet<Integer>(queue.getFileIDs());
+    for (int index = 0; index < logDirs.length; index++) {
+      fileIDs.add(logFiles.get(index).getFileID());
+    }
+    int minFileID = Collections.min(fileIDs);
+    LOGGER.debug("Files currently in use: " + fileIDs);
+    for(File logDir : logDirs) {
+      List<File> logs = LogUtils.getLogs(logDir);
+      // sort oldset to newest
+      LogUtils.sort(logs);
+      // ensure we always keep two logs per dir
+      int size = logs.size() - MIN_NUM_LOGS;
+      for (int index = 0; index < size; index++) {
+        File logFile = logs.get(index);
+        int logFileID = LogUtils.getIDForFile(logFile);
+        if(logFileID < minFileID) {
+          LogFile.RandomReader reader = idLogFileMap.remove(logFileID);
+          if(reader != null) {
+            reader.close();
+          }
+          LOGGER.info("Removing old log " + logFile +
+              ", result = " + logFile.delete() + ", minFileID "
+              + minFileID);
+        }
+      }
+    }
+  }
+  /**
+   * Lock storage to provide exclusive access.
+   *
+   * <p> Locking is not supported by all file systems.
+   * E.g., NFS does not consistently support exclusive locks.
+   *
+   * <p> If locking is supported we guarantee exculsive access to the
+   * storage directory. Otherwise, no guarantee is given.
+   *
+   * @throws IOException if locking fails
+   */
+  private void lock(File dir) throws IOException {
+    FileLock lock = tryLock(dir);
+    if (lock == null) {
+      String msg = "Cannot lock " + dir
+          + ". The directory is already locked.";
+      LOGGER.info(msg);
+      throw new IOException(msg);
+    }
+    FileLock secondLock = tryLock(dir);
+    if(secondLock != null) {
+      LOGGER.warn("Directory "+dir+" does not support locking");
+      secondLock.release();
+      secondLock.channel().close();
+    }
+    locks.put(dir.getAbsolutePath(), lock);
+  }
+
+  /**
+   * Attempts to acquire an exclusive lock on the directory.
+   *
+   * @return A lock object representing the newly-acquired lock or
+   * <code>null</code> if directory is already locked.
+   * @throws IOException if locking fails.
+   */
+  private FileLock tryLock(File dir) throws IOException {
+    File lockF = new File(dir, FILE_LOCK);
+    lockF.deleteOnExit();
+    RandomAccessFile file = new RandomAccessFile(lockF, "rws");
+    FileLock res = null;
+    try {
+      res = file.getChannel().tryLock();
+    } catch(OverlappingFileLockException oe) {
+      file.close();
+      return null;
+    } catch(IOException e) {
+      LOGGER.error("Cannot create lock on " + lockF, e);
+      file.close();
+      throw e;
+    }
+    return res;
+  }
+
+  /**
+   * Unlock directory.
+   *
+   * @throws IOException
+   */
+  private void unlock(File dir) throws IOException {
+    FileLock lock = locks.remove(dir.getAbsolutePath());
+    if(lock == null) {
+      return;
+    }
+    lock.release();
+    lock.channel().close();
+    lock = null;
+  }
+  static class BackgroundWorker extends Thread {
+    private static final Logger LOG = LoggerFactory
+        .getLogger(BackgroundWorker.class);
+    private final Log log;
+    private volatile boolean run = true;
+
+    public BackgroundWorker(Log log) {
+      this.log = log;
+    }
+
+    void shutdown() {
+      if(run) {
+        run = false;
+        interrupt();
+      }
+    }
+
+    @Override
+    public void run() {
+      while (run) {
+        try {
+          try {
+            Thread.sleep(Math.max(1000L, log.checkpointInterval / 10L));
+          } catch (InterruptedException e) {
+            // recheck run flag
+            continue;
+          }
+          if(!log.open) {
+            continue;
+          }
+          // check to see if we should do a checkpoint
+          long elapsed = System.currentTimeMillis() - log.getLastCheckpoint();
+          if (elapsed > log.checkpointInterval) {
+            log.writeCheckpoint();
+          }
+          log.removeOldLogs();
+        } catch (IOException e) {
+          LOG.error("Error doing checkpoint", e);
+        } catch (Exception e) {
+          LOG.error("General error in checkpoint worker", e);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message