flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pras...@apache.org
Subject svn commit: r1233005 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/agent/durability/ main/java/com/cloudera/flume/handlers/debug/ test/java/com/cloudera/flume/agent/diskfailover/ test/java/com/cloudera/flume/agent/durability/
Date Wed, 18 Jan 2012 19:10:37 GMT
Author: prasadm
Date: Wed Jan 18 19:10:37 2012
New Revision: 1233005

URL: http://svn.apache.org/viewvc?rev=1233005&view=rev
Log:
FLUME-927
When the WAL decorator starts its subsink, it should wait for the subsink to be active, only
exceptions in the subsink will abort the wait.

Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALDeco.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/DelayDecorator.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverAgent.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALDeco.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALDeco.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALDeco.java?rev=1233005&r1=1233004&r2=1233005&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALDeco.java
(original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALDeco.java
Wed Jan 18 19:10:37 2012
@@ -248,10 +248,7 @@ public class NaiveFileWALDeco extends Ev
     walConsumerDriver = new DirectDriver("naive file wal consumer",
         walConsumer, drainSink);
     walConsumerDriver.start();
-    boolean success = walConsumerDriver.waitForAtLeastState(DriverState.ACTIVE,
-        1000);
-    if (!success) {
-      walConsumerDriver.stop();
+    while (!walConsumerDriver.waitForAtLeastState(DriverState.ACTIVE, 1000)) {
       attemptToForwardException();
     }
     LOG.debug("Opened NaiveFileWALDeco");

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/DelayDecorator.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/DelayDecorator.java?rev=1233005&r1=1233004&r2=1233005&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/DelayDecorator.java
(original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/DelayDecorator.java
Wed Jan 18 19:10:37 2012
@@ -33,10 +33,24 @@ import com.google.common.base.Preconditi
 public class DelayDecorator<S extends EventSink> extends EventSinkDecorator<S>
{
 
   final int millis;
+  final boolean delayOpen;
 
-  public DelayDecorator(S s, int millis) {
+  public DelayDecorator(S s, int millis, boolean delayOpen) {
     super(s);
     this.millis = millis;
+    this.delayOpen = delayOpen;
+  }
+
+  @Override
+  public void open() throws IOException, InterruptedException {
+    if (delayOpen) {
+      try {
+        Thread.sleep(millis);
+      } catch (InterruptedException e1) {
+        throw e1;
+      }
+    }
+    super.open();
   }
 
   @Override
@@ -57,11 +71,15 @@ public class DelayDecorator<S extends Ev
       public EventSinkDecorator<EventSink> build(Context context,
           String... argv) {
         Preconditions
-            .checkArgument(argv.length <= 2, "usage: delay(init=1000)");
+            .checkArgument(argv.length <= 3, "usage: delay(init=1000 [,delayOpen])");
         int delaymillis = 1000;
+        boolean delayOpen = false;
         if (argv.length >= 1)
           delaymillis = Integer.parseInt(argv[0]);
-        return new DelayDecorator<EventSink>(null, delaymillis);
+        if (argv.length >= 2) {
+          delayOpen = true;
+        }
+        return new DelayDecorator<EventSink>(null, delaymillis, delayOpen);
       }
 
     };

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverAgent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverAgent.java?rev=1233005&r1=1233004&r2=1233005&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverAgent.java
(original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverAgent.java
Wed Jan 18 19:10:37 2012
@@ -207,11 +207,10 @@ public class TestDiskFailoverAgent {
     master.getSpecMan().addLogicalNode(NetUtils.localhost(), lnode);
     liveMan.heartbeatChecks();
 
-    // TODO It we only wait for opening state, this test can hang
     LogicalNode n = node.getLogicalNodeManager().get(lnode);
     Driver d = n.getDriver();
     assertTrue("Attempting to start driver timed out",
-        d.waitForAtLeastState(DriverState.ACTIVE, 10000));
+        d.waitForAtLeastState(DriverState.OPENING, 10000));
 
     // update config node to something that will be interrupted.
     LOG.info("!!! decommissioning node on master");
@@ -253,11 +252,10 @@ public class TestDiskFailoverAgent {
     master.getSpecMan().addLogicalNode(NetUtils.localhost(), lnode);
     liveMan.heartbeatChecks();
 
-    // TODO It we only wait for opening state, this test can hang
     LogicalNode n = node.getLogicalNodeManager().get(lnode);
     Driver d = n.getDriver();
     assertTrue("Attempting to start driver timed out",
-        d.waitForAtLeastState(DriverState.ACTIVE, 15000));
+        d.waitForAtLeastState(DriverState.OPENING, 15000));
 
     // update config node to something that will be interrupted.
     LOG.info("!!! decommissioning node on master");

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALDeco.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALDeco.java?rev=1233005&r1=1233004&r2=1233005&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALDeco.java
(original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALDeco.java
Wed Jan 18 19:10:37 2012
@@ -353,4 +353,28 @@ public class TestNaiveFileWALDeco {
     b.build(LogicalNodeContext.testingContext(), "foo", "bar");
   }
 
+  /**
+   * force the 1500ms delay in open, make sure that the events
+   * passed successfully to sink.
+   */
+  @Test
+  public void testSlowOpenSubsink() throws FlumeSpecException,
+      IOException, InterruptedException {
+    FlumeTestHarness.setupLocalWriteDir();
+    EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(
+        LogicalNodeContext.testingContext()),
+        "{ ackedWriteAhead => { ackChecker => { delay(1500, \"delayOpen\") => counter(\"count\")
} } }");
+    EventSource src =  MemorySinkSource.cannedData("foo foo foo ", 5);
+    snk.open();
+    src.open();
+    EventUtil.dumpAll(src, snk);
+    src.close();
+    snk.close();
+
+    CounterSink cnt = (CounterSink) ReportManager.get().getReportable("count");
+
+    assertEquals(5, cnt.getCount());
+    FlumeTestHarness.cleanupLocalWriteDir();
+  }
+
 }



Mime
View raw message