flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From juha...@apache.org
Subject [9/10] git commit: NetcatSource and test fixes
Date Wed, 15 Aug 2012 07:00:19 GMT
NetcatSource and test fixes


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd098cee
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd098cee
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd098cee

Branch: refs/heads/trunk
Commit: bd098cee15e867f87f83656fe76a78f2fe61deb3
Parents: 9f52b5f
Author: Juhani Connolly <juhanic@cyberagent.co.jp>
Authored: Wed Jul 11 18:22:42 2012 +0900
Committer: Juhani Connolly <juhanic@cyberagent.co.jp>
Committed: Wed Jul 11 18:22:42 2012 +0900

----------------------------------------------------------------------
 .../java/org/apache/flume/source/NetcatSource.java |    9 +++---
 .../org/apache/flume/source/TestNetcatSource.java  |   20 ++++++++++----
 2 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bd098cee/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
index 9d28cda..37c09fe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
@@ -40,6 +40,7 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Source;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
@@ -145,8 +146,6 @@ public class NetcatSource extends AbstractSource implements Configurable,
 
     logger.info("Source starting");
 
-    super.start();
-
     counterGroup.incrementAndGet("open.attempts");
 
     handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
@@ -163,7 +162,7 @@ public class NetcatSource extends AbstractSource implements Configurable,
     } catch (IOException e) {
       counterGroup.incrementAndGet("open.errors");
       logger.error("Unable to bind to socket. Exception follows.", e);
-      return;
+      throw new FlumeException(e);
     }
 
     AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
@@ -179,14 +178,13 @@ public class NetcatSource extends AbstractSource implements Configurable,
     acceptThread.start();
 
     logger.debug("Source started");
+    super.start();
   }
 
   @Override
   public void stop() {
     logger.info("Source stopping");
 
-    super.stop();
-
     acceptThreadShouldStop.set(true);
 
     if (acceptThread != null) {
@@ -238,6 +236,7 @@ public class NetcatSource extends AbstractSource implements Configurable,
     }
 
     logger.debug("Source stopped. Event metrics:{}", counterGroup);
+    super.stop();
   }
 
   private static class AcceptHandler implements Runnable {

http://git-wip-us.apache.org/repos/asf/flume/blob/bd098cee/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
index c195db7..3c17d3d 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
@@ -37,6 +37,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
@@ -79,15 +80,22 @@ public class TestNetcatSource {
       EventDeliveryException {
 
     ExecutorService executor = Executors.newFixedThreadPool(3);
-    Context context = new Context();
+    boolean bound = false;
 
-    /* FIXME: Use a random port for testing. */
-    context.put("bind", "0.0.0.0");
-    context.put("port", "41414");
+    for(int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+        context.put("bind", "0.0.0.0");
+        context.put("port", "41414");
 
-    Configurables.configure(source, context);
+        Configurables.configure(source, context);
 
-    source.start();
+        source.start();
+        bound = true;
+      } catch (FlumeException e) {
+        // assume port in use, try another one
+      }
+    }
 
     Runnable clientRequestRunnable = new Runnable() {
 


Mime
View raw message