flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1165430 [2/2] - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/durability/ flume-ng-core/src/main/java/org/apache/flume/durability/file/ flume-ng-core/src/...
Date Mon, 05 Sep 2011 21:51:28 GMT
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1165430&r1=1165429&r2=1165430&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Mon Sep  5 21:51:26 2011
@@ -1,6 +1,5 @@
 package org.apache.flume.source;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
@@ -10,13 +9,13 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.EventSource;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
-import org.apache.flume.durability.file.FileBasedWALManager;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -24,14 +23,18 @@ import org.junit.Test;
 
 public class TestNetcatSource {
 
-  private EventSource source;
+  private Channel channel;
+  private EventDrivenSource source;
 
   @Before
   public void setUp() {
+    channel = new MemoryChannel();
     source = new NetcatSource();
+
+    source.setChannel(channel);
   }
 
-  @Test(timeout = 5000)
+  @Test
   public void testLifecycle() throws InterruptedException, LifecycleException,
       EventDeliveryException {
 
@@ -44,68 +47,9 @@ public class TestNetcatSource {
 
     Configurables.configure(source, context);
 
-    source.open(context);
-
-    Runnable clientRequestRunnable = new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          SocketChannel clientChannel = SocketChannel
-              .open(new InetSocketAddress(41414));
-
-          Writer writer = Channels.newWriter(clientChannel, "utf-8");
-
-          writer.write("Test message");
-
-          writer.flush();
-          clientChannel.close();
-        } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-      }
-
-    };
+    source.start(context);
 
-    for (int i = 0; i < 100; i++) {
-      executor.submit(clientRequestRunnable);
-
-      Event event = source.next(context);
-
-      Assert.assertNotNull(event);
-      Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
-    }
-
-    executor.shutdown();
-
-    while (!executor.isTerminated()) {
-      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
-    }
-
-    source.close(context);
-  }
-
-  @Test
-  public void testDurability() throws InterruptedException, LifecycleException,
-      EventDeliveryException, IOException {
-
-    FileBasedWALManager walManager = new FileBasedWALManager();
-
-    walManager.setDirectory(new File("/tmp/flume-ncs-tests", "wal-test"));
-    walManager.getDirectory().mkdirs();
-
-    ((NetcatSource) source).setWALManager(walManager);
-
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    Context context = new Context();
-
-    context.put("logicalNode.name", "test");
-    context.put("source.port", "41414");
-
-    Configurables.configure(source, context);
-
-    source.open(context);
+    /* FIXME: Ensure proper send / received semantics. */
 
     Runnable clientRequestRunnable = new Runnable() {
 
@@ -132,7 +76,7 @@ public class TestNetcatSource {
     for (int i = 0; i < 100; i++) {
       executor.submit(clientRequestRunnable);
 
-      Event event = source.next(context);
+      Event event = channel.take();
 
       Assert.assertNotNull(event);
       Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
@@ -144,12 +88,7 @@ public class TestNetcatSource {
       executor.awaitTermination(500, TimeUnit.MILLISECONDS);
     }
 
-    source.close(context);
-
-    FileUtils.deleteDirectory(walManager.getDirectory());
-
-    /* Only delete the parent if it's empty. */
-    walManager.getDirectory().getParentFile().delete();
+    source.stop(context);
   }
 
 }



Mime
View raw message