flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1606. Rollbacks of Put transactions does not clear the transaction from inflight puts.
Date Tue, 25 Sep 2012 05:52:49 GMT
Updated Branches:
  refs/heads/flume-1.3.0 a221a8e99 -> 892521fd5


FLUME-1606. Rollbacks of Put transactions does not clear the transaction from inflight puts.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/892521fd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/892521fd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/892521fd

Branch: refs/heads/flume-1.3.0
Commit: 892521fd5b216d9691cec235fa12822e9dfa7e87
Parents: a221a8e
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Sep 24 22:51:04 2012 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Sep 24 22:52:30 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |    4 +-
 .../apache/flume/channel/file/FlumeEventQueue.java |    7 ++-
 .../apache/flume/channel/file/TestFileChannel.java |   64 ++++++++++++++-
 3 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index ca7db70..bdc9f04 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -387,7 +387,7 @@ public class FileChannel extends BasicChannelSemantics {
       // this does not need to be in the critical section as it does not
       // modify the structure of the log or queue.
       if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
-        throw new ChannelException("The channel has reached it's capacity. " 
+        throw new ChannelException("The channel has reached it's capacity. "
             + "This might be the result of a sink on the channel having too "
             + "low of batch size, a downstream system running slower than "
             + "normal, or that the channel capacity is just too low. "
@@ -537,11 +537,11 @@ public class FileChannel extends BasicChannelSemantics {
                   "Queue add failed, this shouldn't be able to happen "
                       + channelNameDescriptor);
             }
-            queue.completeTransaction(transactionID);
           }
         }
         putList.clear();
         takeList.clear();
+        queue.completeTransaction(transactionID);
         channelCounter.setChannelSize(queue.getSize());
       } catch (IOException e) {
         throw new ChannelException("Commit failed due to IO error "

http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index a8df042..36553c5 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -331,7 +331,7 @@ final class FlumeEventQueue {
    * None of the methods are thread safe, and should be called from thread
    * safe methods only.
    */
-  private class InflightEventWrapper {
+  class InflightEventWrapper {
     private SetMultimap<Long, Long> inflightEvents = HashMultimap.create();
     private RandomAccessFile file;
     private volatile java.nio.channels.FileChannel fileChannel;
@@ -516,5 +516,10 @@ final class FlumeEventQueue {
     public Collection<Integer> getFileIDs(){
       return inflightFileIDs.values();
     }
+
+    //Needed for testing.
+    public Collection<Long> getInFlightPointers() {
+      return inflightEvents.values();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 41b1fbb..87a0a3f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -19,6 +19,7 @@
 package org.apache.flume.channel.file;
 
 import static org.apache.flume.channel.file.TestUtils.*;
+import static org.fest.reflect.core.Reflection.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -49,6 +50,9 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
+import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper;
+import org.apache.flume.event.EventBuilder;
 
 public class TestFileChannel extends TestFileChannelBase {
 
@@ -171,7 +175,7 @@ public class TestFileChannel extends TestFileChannelBase {
         in.addAll(putEvents(channel, "reconfig", 1, 1));
       }
     } catch (ChannelException e) {
-      Assert.assertEquals("The channel has reached it's capacity. " 
+      Assert.assertEquals("The channel has reached it's capacity. "
           + "This might be the result of a sink on the channel having too "
           + "low of batch size, a downstream system running slower than "
           + "normal, or that the channel capacity is just too low. [channel="
@@ -476,4 +480,62 @@ public class TestFileChannel extends TestFileChannelBase {
     }).get();
     Assert.assertEquals(15, takenEvents.size());
   }
+
+  // This test will fail without FLUME-1606.
+  @Test
+  public void testRollbackIncompleteTransaction() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+            String.valueOf(Integer.MAX_VALUE));
+    final FileChannel channel = createFileChannel(overrides);
+    channel.start();
+    FileBackedTransaction tx = (FileBackedTransaction) channel.getTransaction();
+
+    InflightEventWrapper inflightPuts =
+            field("inflightPuts").ofType(InflightEventWrapper.class).in(
+            field("queue").ofType(FlumeEventQueue.class).in(tx).get()).get();
+
+    tx.begin();
+
+    for (int i = 0; i < 100; i++) {
+      channel.put(EventBuilder.withBody("TestEvent".getBytes()));
+    }
+
+    Assert.assertFalse(inflightPuts.getFileIDs().isEmpty());
+    Assert.assertFalse(inflightPuts.getInFlightPointers().isEmpty());
+
+    tx.rollback();
+    tx.close();
+
+    Assert.assertTrue(inflightPuts.getFileIDs().isEmpty());
+    Assert.assertTrue(inflightPuts.getInFlightPointers().isEmpty());
+    Assert.assertTrue(channel.getDepth() == 0);
+
+    Set<String> in = putEvents(channel, "testing-rollbacks", 100, 100);
+
+    tx = (FileBackedTransaction) channel.getTransaction();
+
+    InflightEventWrapper inflightTakes =
+            field("inflightTakes").ofType(InflightEventWrapper.class).in(
+            field("queue").ofType(FlumeEventQueue.class).in(tx).get()).get();
+
+    tx.begin();
+
+    for (int i = 0; i < 100; i++) {
+      channel.take();
+    }
+
+    Assert.assertFalse(inflightTakes.getFileIDs().isEmpty());
+    Assert.assertFalse(inflightTakes.getInFlightPointers().isEmpty());
+
+    tx.rollback();
+    tx.close();
+
+
+    Assert.assertTrue(inflightTakes.getFileIDs().isEmpty());
+    Assert.assertTrue(inflightTakes.getInFlightPointers().isEmpty());
+    Assert.assertTrue(channel.getDepth() == in.size());
+
+  }
+
 }


Mime
View raw message