Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 274B7796C for ; Fri, 12 Aug 2011 00:49:25 +0000 (UTC) Received: (qmail 11573 invoked by uid 500); 12 Aug 2011 00:49:25 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 11554 invoked by uid 500); 12 Aug 2011 00:49:24 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 11546 invoked by uid 99); 12 Aug 2011 00:49:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2011 00:49:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2011 00:49:21 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5275D2388ADA; Fri, 12 Aug 2011 00:49:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1156921 - in /incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source: NetcatSource.java TestNetcatSource.java Date: Fri, 12 Aug 2011 00:49:01 -0000 To: flume-commits@incubator.apache.org From: esammer@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110812004901.5275D2388ADA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: esammer Date: Fri Aug 12 00:49:00 2011 New Revision: 1156921 URL: http://svn.apache.org/viewvc?rev=1156921&view=rev Log: - Added a netcat-style source and test for testing / debugging / playing around. Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java?rev=1156921&view=auto ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java (added) +++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java Fri Aug 12 00:49:00 2011 @@ -0,0 +1,115 @@ +package org.apache.flume.source; + +import java.io.IOException; +import java.io.Reader; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.CharBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import org.apache.flume.Context; +import org.apache.flume.CounterGroup; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetcatSource extends AbstractEventSource { + + private static final Logger logger = LoggerFactory + .getLogger(NetcatSource.class); + + private int port; + private ServerSocketChannel serverSocket; + private CounterGroup counterGroup; + + public NetcatSource() { + port = 0; + counterGroup = new CounterGroup(); + } + + @Override + public void open(Context context) { + counterGroup.incrementAndGet("open.attempts"); + + try { + SocketAddress bindPoint = new InetSocketAddress(port); + + serverSocket = ServerSocketChannel.open(); + serverSocket.socket().setReuseAddress(true); + serverSocket.socket().bind(bindPoint); + + logger.info("Created serverSocket:{}", serverSocket); + } catch (IOException e) { + counterGroup.incrementAndGet("open.errors"); + logger.error("Unable to bind to socket. Exception follows.", e); + } + } + + @Override + public Event next(Context context) throws InterruptedException, + EventDeliveryException { + + Event event = null; + + counterGroup.incrementAndGet("next.calls"); + + try { + SocketChannel channel = serverSocket.accept(); + + logger.debug("Received a connection:{}", channel); + + Reader reader = Channels.newReader(channel, "utf-8"); + CharBuffer buffer = CharBuffer.allocate(512); + StringBuilder builder = new StringBuilder(); + + while (reader.read(buffer) != -1) { + buffer.flip(); + logger.debug("read {} characters", buffer.remaining()); + builder.append(buffer.array(), buffer.position(), buffer.length()); + } + + if (builder.charAt(builder.length() - 1) == '\n') { + builder.deleteCharAt(builder.length() - 1); + } + + logger.debug("end of message"); + + event = EventBuilder.withBody(builder.toString()); + + channel.close(); + + counterGroup.incrementAndGet("events.success"); + } catch (IOException e) { + counterGroup.incrementAndGet("events.failed"); + + throw new EventDeliveryException("Unable to process event due to " + + e.getMessage(), e); + } + + return event; + } + + @Override + public void close(Context context) { + if (serverSocket != null) { + try { + serverSocket.close(); + } catch (IOException e) { + logger.error("Unable to close socket. Exception follows.", e); + } + } + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + +} Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1156921&view=auto ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (added) +++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Aug 12 00:49:00 2011 @@ -0,0 +1,82 @@ +package org.apache.flume.source; + +import java.io.IOException; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.nio.channels.Channels; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.EventSource; +import org.apache.flume.lifecycle.LifecycleException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNetcatSource { + + private EventSource source; + + @Before + public void setUp() { + source = new NetcatSource(); + } + + @Test(timeout = 5000) + public void testLifecycle() throws InterruptedException, LifecycleException, + EventDeliveryException { + + ExecutorService executor = Executors.newFixedThreadPool(3); + Context context = new Context(); + + /* FIXME: Use a random port for testing. */ + ((NetcatSource) source).setPort(41414); + + source.open(context); + + Runnable clientRequestRunnable = new Runnable() { + + @Override + public void run() { + try { + SocketChannel clientChannel = SocketChannel + .open(new InetSocketAddress(41414)); + + Writer writer = Channels.newWriter(clientChannel, "utf-8"); + + writer.write("Test message"); + + writer.flush(); + clientChannel.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + }; + + for (int i = 0; i < 100; i++) { + executor.submit(clientRequestRunnable); + + Event event = source.next(context); + + Assert.assertNotNull(event); + Assert.assertEquals("Test message", event.getBody()); + } + + executor.shutdown(); + + while (!executor.isTerminated()) { + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + source.close(context); + } + +}