Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B07B8981D for ; Fri, 21 Oct 2011 22:43:28 +0000 (UTC) Received: (qmail 74841 invoked by uid 500); 21 Oct 2011 22:43:28 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 74807 invoked by uid 500); 21 Oct 2011 22:43:28 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 74800 invoked by uid 99); 21 Oct 2011 22:43:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2011 22:43:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2011 22:43:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4F8F42388847; Fri, 21 Oct 2011 22:43:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1187586 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-node/src/test/java/org/apache/flume/source/ Date: Fri, 21 Oct 2011 22:43:06 -0000 To: flume-commits@incubator.apache.org From: esammer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111021224306.4F8F42388847@eris.apache.org> Author: esammer Date: Fri Oct 21 22:43:05 2011 New Revision: 1187586 URL: http://svn.apache.org/viewvc?rev=1187586&view=rev Log: FLUME-786: Write javadoc for builtin sources - Changed netcat name param to bind to be consistent with other server-ish sources. It's now possible to bind to a specific IP:port rather than just a port. Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Oct 21 22:43:05 2011 @@ -17,6 +17,7 @@ import org.apache.flume.Context; import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; +import org.apache.flume.Source; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; @@ -26,6 +27,55 @@ import org.apache.flume.source.avro.Stat import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

+ * A {@link Source} implementation that receives Avro events from clients that + * implement {@link AvroSourceProtocol}. + *

+ *

+ * This source forms one half of Flume's tiered collection support. Internally, + * this source uses Avro's NettyTransceiver to listen for, and handle + * events. It can be paired with the builtin AvroSink to create tiered + * collection topologies. Of course, nothing prevents one from using this source + * to receive data from other custom built infrastructure that uses the same + * Avro protocol (specifically {@link AvroSourceProtocol}). + *

+ *

+ * Events may be received from the client either singly or in batches.Generally, + * larger batches are far more efficient, but introduce a slight delay (measured + * in millis) in delivery. A batch submitted to the configured {@link Channel} + * atomically (i.e. either all events make it into the channel or none). + *

+ *

+ * Configuration options + *

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
ParameterDescriptionUnit / TypeDefault
bindThe hostname or IP to which the source will bind.Hostname or IP / Stringnone (required)
portThe port to which the source will bind and listen for events.TCP port / intnone (required)
+ *

+ * Metrics + *

+ *

+ * TODO + *

+ */ public class AvroSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol { Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Oct 21 22:43:05 2011 @@ -13,6 +13,7 @@ import org.apache.flume.Context; import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; +import org.apache.flume.Source; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; @@ -21,6 +22,72 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +/** + *

+ * A {@link Source} implementation that executes a Unix process and turns each + * line of text into an event. + *

+ *

+ * This source runs a given Unix command on start up and expects that process to + * continuously produce data on standard out (stderr is simply discarded). If + * the process exits for any reason, the source also exits and will produce no + * further data. This means configurations such as cat [named pipe] or + * tail -F [file] are going to produce the desired results where as + * date will probably not - the former two commands produce streams of + * data where as the latter produces a single event and exits. + *

+ *

+ * The ExecSource is meant for situations where one must integrate with + * existing systems without modifying code. It is a compatibility gateway built + * to allow simple, stop-gap integration and doesn't necessarily offer all of + * the benefits or guarantees of native integration with Flume. If one has the + * option of using the AvroSource, for instance, that would be greatly + * preferred to this source as it (and similarly implemented sources) can + * maintain the transactional guarantees that exec can not. + *

+ *

+ * Why doesn't ExecSource offer transactional guarantees? + *

+ *

+ * The problem with ExecSource and other asynchronous sources is that + * the source can not guarantee that if there is a failure to put the event into + * the {@link Channel} the client knows about it. As a for instance, one of the + * most commonly requested features is the tail -F [file]-like use case + * where an application writes to a log file on disk and Flume tails the file, + * sending each line as an event. While this is possible, there's an obvious + * problem; what happens if the channel fills up and Flume can't send an event? + * Flume has no way of indicating to the application writing the log file that + * it needs to retain the log or that the event hasn't been sent, for some + * reason. If this doesn't make sense, you need only know this: Your + * application can never guarantee data has been received when using a + * unidirectional asynchronous interface such as ExecSource! As an extension + * of this warning - and to be completely clear - there is absolutely zero + * guarantee of event delivery when using this source. You have been warned. + *

+ *

+ * Configuration options + *

+ * + * + * + * + * + * + * + * + * + * + * + * + * + *
ParameterDescriptionUnit / TypeDefault
commandThe command to executeStringnone (required)
+ *

+ * Metrics + *

+ *

+ * TODO + *

+ */ public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Fri Oct 21 22:43:05 2011 @@ -29,12 +29,59 @@ import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + *

+ * A netcat-like source that listens on a given port and turns each line of text + * into an event. + *

+ *

+ * This source, primarily built for testing and exceedingly simple systems, acts + * like nc -k -l [host] [port]. In other words, it opens a specified + * port and listens for data. The expectation is that the supplied data is + * newline separated text. Each line of text is turned into a Flume event and + * sent via the connected channel. + *

+ *

+ * Most testing has been done by using the nc client but other, + * similarly implemented, clients should work just fine. + *

+ *

+ * Configuration options + *

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
ParameterDescriptionUnit / TypeDefault
bindThe hostname or IP to which the source will bind.Hostname or IP / Stringnone (required)
portThe port to which the source will bind and listen for events.TCP port / intnone (required)
+ *

+ * Metrics + *

+ *

+ * TODO + *

+ */ public class NetcatSource extends AbstractSource implements Configurable, EventDrivenSource { private static final Logger logger = LoggerFactory .getLogger(NetcatSource.class); + private String hostName; private int port; private CounterGroup counterGroup; @@ -53,8 +100,9 @@ public class NetcatSource extends Abstra @Override public void configure(Context context) { - Configurables.ensureRequiredNonNull(context, "name", "port"); + Configurables.ensureRequiredNonNull(context, "bind", "port"); + hostName = context.get("bind", String.class); port = Integer.parseInt(context.get("port", String.class)); } @@ -71,7 +119,7 @@ public class NetcatSource extends Abstra .setNameFormat("netcat-handler-%d").build()); try { - SocketAddress bindPoint = new InetSocketAddress(port); + SocketAddress bindPoint = new InetSocketAddress(hostName, port); serverSocket = ServerSocketChannel.open(); serverSocket.socket().setReuseAddress(true); Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Oct 21 22:43:05 2011 @@ -48,7 +48,7 @@ public class TestNetcatSource { Context context = new Context(); /* FIXME: Use a random port for testing. */ - context.put("name", "test"); + context.put("bind", "0.0.0.0"); context.put("port", "41414"); Configurables.configure(source, context);