flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1174783 - in /incubator/flume/branches/flume-728/flume-ng-channels: ./ flume-file-channel/ flume-file-channel/src/ flume-file-channel/src/main/ flume-file-channel/src/main/java/ flume-file-channel/src/main/java/org/ flume-file-channel/src/...
Date Fri, 23 Sep 2011 14:07:24 GMT
Author: esammer
Date: Fri Sep 23 14:07:23 2011
New Revision: 1174783

URL: http://svn.apache.org/viewvc?rev=1174783&view=rev
Log:
- Skeleton of a file based durable channel.

Added:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/pom.xml
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/log4j.properties
Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/pom.xml

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/pom.xml?rev=1174783&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/pom.xml Fri Sep
23 14:07:23 2011
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-ng-channels</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-file-channel</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG file-based channel</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>0.9.5-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1174783&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
(added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
Fri Sep 23 14:07:23 2011
@@ -0,0 +1,226 @@
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class FileChannel implements Channel {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(FileChannel.class);
+
+  private static ThreadLocal<FileBackedTransaction> currentTransaction = new ThreadLocal<FileBackedTransaction>();
+
+  private File directory;
+
+  private File openDirectory;
+  private File completeDirectory;
+  private boolean isInitialized;
+
+  private File currentOutputFile;
+  private boolean shouldRotate;
+
+  private void initialize() {
+    Preconditions.checkState(directory != null, "Directory must not be null");
+    Preconditions.checkState(directory.getParentFile().exists(),
+        "Directory %s must exist", directory.getParentFile());
+
+    logger.info("Initializing file channel directory:{}", directory);
+
+    openDirectory = new File(directory, "open");
+    completeDirectory = new File(directory, "complete");
+
+    if (!openDirectory.mkdirs()) {
+      throw new ChannelException("Unable to create open file directory:"
+          + openDirectory);
+    }
+
+    if (!completeDirectory.mkdirs()) {
+      throw new ChannelException("Unable to create complete file directory:"
+          + completeDirectory);
+    }
+
+    shouldRotate = false;
+    isInitialized = true;
+  }
+
+  @Override
+  public void put(Event event) throws ChannelException {
+    Preconditions.checkState(currentTransaction.get() != null,
+        "No transaction currently in progress");
+
+    FileBackedTransaction tx = currentTransaction.get();
+
+    tx.events.add(event);
+  }
+
+  @Override
+  public Event take() throws ChannelException {
+    return null;
+  }
+
+  @Override
+  public synchronized Transaction getTransaction() {
+    if (!isInitialized) {
+      initialize();
+    }
+
+    FileBackedTransaction tx = currentTransaction.get();
+
+    if (shouldRotate) {
+      currentOutputFile = null;
+    }
+
+    if (tx == null || tx.state.equals(FileBackedTransaction.State.CLOSED)) {
+      FileBackedTransaction transaction = new FileBackedTransaction();
+
+      if (currentOutputFile == null) {
+        currentOutputFile = new File(openDirectory, Thread.currentThread()
+            .getId() + "-" + System.currentTimeMillis());
+
+        logger.debug("Using new output file:{}", currentOutputFile);
+      }
+
+      transaction.currentOutputFile = currentOutputFile;
+
+      currentTransaction.set(transaction);
+
+      logger.debug("Created transaction:{} for channel:{}", transaction, this);
+    }
+
+    return currentTransaction.get();
+  }
+
+  public File getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(File directory) {
+    this.directory = directory;
+  }
+
+  public File getOpenDirectory() {
+    return openDirectory;
+  }
+
+  public File getCompleteDirectory() {
+    return completeDirectory;
+  }
+
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  public static class FileBackedTransaction implements Transaction {
+
+    private List<Event> events;
+
+    private File currentOutputFile;
+
+    private FileOutputStream outputStream;
+    private FileInputStream inputStream;
+
+    private State state;
+
+    public FileBackedTransaction() {
+      events = new LinkedList<Event>();
+      state = State.NEW;
+    }
+
+    @Override
+    public void begin() {
+      if (state.equals(State.CLOSED)) {
+        throw new IllegalStateException(
+            "Illegal to begin a transaction with state:" + state);
+      }
+
+      logger.debug("Beginning a new transaction");
+
+      try {
+        outputStream = new FileOutputStream(currentOutputFile, true);
+      } catch (FileNotFoundException e) {
+        throw new ChannelException("Unable to open new output file:"
+            + currentOutputFile, e);
+      }
+
+      state = State.OPEN;
+    }
+
+    @Override
+    public void commit() {
+      Preconditions.checkState(state.equals(State.OPEN),
+          "Attempt to commit a transaction that isn't open (state:" + state
+              + ")");
+
+      logger.debug("Commiting current transaction (size:{})", events.size());
+
+      try {
+        for (Event event : events) {
+          // TODO: Serialize event properly (avro?)
+          outputStream.write((event.toString() + "\n").getBytes());
+          outputStream.flush();
+        }
+
+        // TODO: Write checksum.
+        outputStream.write("---\n".getBytes());
+
+        events.clear();
+      } catch (IOException e) {
+        throw new ChannelException("Unable to write to output file", e);
+      }
+
+      state = State.COMPLETED;
+    }
+
+    @Override
+    public void rollback() {
+      Preconditions.checkState(state.equals(State.OPEN),
+          "Attempt to rollback a transaction that isn't open (state:" + state
+              + ")");
+
+      logger.debug("Rolling back current transaction (size:{})", events.size());
+
+      events.clear();
+
+      state = State.COMPLETED;
+    }
+
+    @Override
+    public void close() {
+      Preconditions
+          .checkState(
+              state.equals(State.COMPLETED),
+              "Attempt to close a transaction that isn't completed - you must either commit
or rollback (state:"
+                  + state + ")");
+
+      logger.debug("Closing current transaction");
+
+      try {
+        outputStream.close();
+      } catch (IOException e) {
+        throw new ChannelException("Unable to close current output file", e);
+      }
+
+      state = State.CLOSED;
+    }
+
+    private static enum State {
+      NEW, OPEN, COMPLETED, CLOSED
+    }
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1174783&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
(added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
Fri Sep 23 14:07:23 2011
@@ -0,0 +1,124 @@
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileChannel {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestFileChannel.class);
+
+  private FileChannel channel;
+
+  @Before
+  public void setUp() {
+    channel = new FileChannel();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoDirectory() {
+    Event event = EventBuilder.withBody("Test event".getBytes());
+
+    channel.put(event);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNonExistantParent() {
+    Event event = EventBuilder.withBody("Test event".getBytes());
+
+    channel.setDirectory(new File("/i/do/not/exist"));
+    channel.put(event);
+  }
+
+  @Test
+  public void testGetTransaction() throws IOException {
+    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
+    FileUtils.forceDeleteOnExit(tmpDir);
+
+    channel.setDirectory(tmpDir);
+
+    Transaction tx1 = channel.getTransaction();
+    Assert.assertNotNull(tx1);
+
+    Transaction tx2 = channel.getTransaction();
+    Assert.assertNotNull(tx2);
+
+    Assert.assertEquals(tx1, tx2);
+
+    tx2.begin();
+    Assert.assertEquals(tx2, channel.getTransaction());
+
+    tx2.rollback();
+    Assert.assertEquals(tx2, channel.getTransaction());
+
+    tx2.close();
+    Assert.assertFalse(tx2.equals(channel.getTransaction()));
+  }
+
+  @Test
+  public void testPut() throws IOException {
+    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
+    FileUtils.forceDeleteOnExit(tmpDir);
+
+    if (!tmpDir.mkdirs()) {
+      throw new IOException("Unable to create test directory:" + tmpDir);
+    }
+
+    channel.setDirectory(tmpDir);
+
+    /* Issue five one record transactions. */
+    for (int i = 0; i < 5; i++) {
+      Transaction transaction = channel.getTransaction();
+
+      Assert.assertNotNull(transaction);
+
+      try {
+        transaction.begin();
+
+        Event event = EventBuilder.withBody(("Test event" + i).getBytes());
+        channel.put(event);
+
+        transaction.commit();
+      } catch (Exception e) {
+        logger.error(
+            "Failed to put event into file channel. Exception follows.", e);
+        transaction.rollback();
+        Assert.fail();
+      } finally {
+        transaction.close();
+      }
+    }
+
+    /* Issue one five record transaction. */
+    Transaction transaction = channel.getTransaction();
+
+    try {
+      transaction.begin();
+
+      for (int i = 0; i < 5; i++) {
+        Event event = EventBuilder.withBody(("Test event" + i).getBytes());
+        channel.put(event);
+      }
+
+      transaction.commit();
+    } catch (Exception e) {
+      logger.error("Failed to put event into file channel. Exception follows.",
+          e);
+      transaction.rollback();
+      Assert.fail();
+    } finally {
+      transaction.close();
+    }
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/log4j.properties?rev=1174783&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/log4j.properties
(added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/resources/log4j.properties
Fri Sep 23 14:07:23 2011
@@ -0,0 +1,7 @@
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG

Modified: incubator/flume/branches/flume-728/flume-ng-channels/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/pom.xml?rev=1174783&r1=1174782&r2=1174783&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/pom.xml Fri Sep 23 14:07:23 2011
@@ -34,5 +34,6 @@ limitations under the License.
 
   <modules>
     <module>flume-jdbc-channel</module>
+    <module>flume-file-channel</module>
   </modules>
 </project>



Mime
View raw message