flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1175666 - in /incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src: main/java/org/apache/flume/channel/file/FileChannel.java test/java/org/apache/flume/channel/file/TestFileChannel.java
Date Mon, 26 Sep 2011 04:30:01 GMT
Author: esammer
Date: Mon Sep 26 04:30:00 2011
New Revision: 1175666

URL: http://svn.apache.org/viewvc?rev=1175666&view=rev
Log:
- Added a test to ensure different threads receive different transactions.
- Each FileBackedTransaction now gets a (mostly) unique ID for debugging purposes (only).

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1175666&r1=1175665&r2=1175666&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
Mon Sep 26 04:30:00 2011
@@ -145,6 +145,8 @@ public class FileChannel implements Chan
    */
   public static class FileBackedTransaction implements Transaction {
 
+    private String transactionId;
+
     private List<Event> readEvents;
     private List<Event> writeEvents;
 
@@ -158,6 +160,9 @@ public class FileChannel implements Chan
     private boolean writeInitialized;
 
     public FileBackedTransaction() {
+      transactionId = Thread.currentThread().getId() + "-"
+          + System.currentTimeMillis();
+
       state = State.NEW;
       readInitialized = false;
       writeInitialized = false;
@@ -317,8 +322,9 @@ public class FileChannel implements Chan
 
     @Override
     public String toString() {
-      StringBuilder builder = new StringBuilder("FileTransaction: { state:")
-          .append(state);
+      StringBuilder builder = new StringBuilder(
+          "FileTransaction: { transactionId:").append(transactionId)
+          .append(" state:").append(state);
 
       if (readInitialized) {
         builder.append(" read-enabled: { readBuffer:")
@@ -327,7 +333,8 @@ public class FileChannel implements Chan
 
       if (writeInitialized) {
         builder.append(" write-enabled: { writeBuffer:")
-            .append(writeEvents.size()).append(" }");
+            .append(writeEvents.size()).append(" currentOutputFile:")
+            .append(currentOutputFile).append(" }");
       }
 
       builder.append(" }");

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1175666&r1=1175665&r2=1175666&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
Mon Sep 26 04:30:00 2011
@@ -2,10 +2,18 @@ package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,6 +73,66 @@ public class TestFileChannel {
     Assert.assertFalse(tx2.equals(channel.getTransaction()));
   }
 
+  /**
+   * <p>
+   * Ensure two threads calling {@link FileChannel#getTransaction()} get
+   * different transactions back.
+   * </p>
+   * 
+   */
+  @Test
+  public void testConcurrentGetTransaction() throws IOException,
+      InterruptedException, ExecutionException {
+    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
+    FileUtils.forceDeleteOnExit(tmpDir);
+    final CyclicBarrier latch = new CyclicBarrier(2);
+
+    channel.setDirectory(tmpDir);
+
+    Callable<FileBackedTransaction> getTransRunnable = new Callable<FileBackedTransaction>()
{
+
+      @Override
+      public FileBackedTransaction call() {
+        Transaction tx = null;
+
+        try {
+          /*
+           * Wait for all threads to pile up to prevent thread reuse in the
+           * pool.
+           */
+          latch.await();
+          tx = channel.getTransaction();
+          /*
+           * This await isn't strictly necessary but it guarantees both threads
+           * entered and exited getTransaction() in lock step which simplifies
+           * debugging.
+           */
+          latch.await();
+        } catch (InterruptedException e) {
+          logger.error("Interrupted while waiting for threads", e);
+          Assert.fail();
+        } catch (BrokenBarrierException e) {
+          logger.error("Barrier broken", e);
+          Assert.fail();
+        }
+        // Purposefully serialize to force things to occur in different threads.
+        return (FileBackedTransaction) tx;
+      }
+
+    };
+
+    ExecutorService pool = Executors.newFixedThreadPool(2);
+
+    Future<FileBackedTransaction> f1 = pool.submit(getTransRunnable);
+    Future<FileBackedTransaction> f2 = pool.submit(getTransRunnable);
+
+    FileBackedTransaction t1 = f1.get();
+    FileBackedTransaction t2 = f2.get();
+
+    Assert.assertNotSame("Transactions from different threads are different",
+        t1, t2);
+  }
+
   @Test
   public void testPut() throws IOException {
     File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());



Mime
View raw message