Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34E1F968C for ; Mon, 19 Sep 2011 21:25:35 +0000 (UTC) Received: (qmail 20467 invoked by uid 500); 19 Sep 2011 21:25:34 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 20437 invoked by uid 500); 19 Sep 2011 21:25:33 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 20401 invoked by uid 99); 19 Sep 2011 21:25:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2011 21:25:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2011 21:25:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2EC962388AC8; Mon, 19 Sep 2011 21:25:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1172841 - in /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node: Application.java FlumeNode.java Date: Mon, 19 Sep 2011 21:25:10 -0000 To: flume-commits@incubator.apache.org From: esammer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110919212510.2EC962388AC8@eris.apache.org> Author: esammer Date: Mon Sep 19 21:25:09 2011 New Revision: 1172841 URL: http://svn.apache.org/viewvc?rev=1172841&view=rev Log: - FlumeNode now supports (and requires) having a ConfigurationProvider injected. - Our bare-bones flume node command line tool now takes -f and expects it to be a json conf. Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1172841&r1=1172840&r2=1172841&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original) +++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Mon Sep 19 21:25:09 2011 @@ -1,5 +1,6 @@ package org.apache.flume.node; +import java.io.File; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -12,6 +13,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.flume.Channel; +import org.apache.flume.ChannelFactory; import org.apache.flume.Context; import org.apache.flume.LogicalNode; import org.apache.flume.Sink; @@ -20,8 +22,10 @@ import org.apache.flume.SinkRunner; import org.apache.flume.Source; import org.apache.flume.SourceFactory; import org.apache.flume.SourceRunner; +import org.apache.flume.channel.DefaultChannelFactory; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; +import org.apache.flume.conf.file.JsonFileConfigurationProvider; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleException; import org.apache.flume.lifecycle.LifecycleState; @@ -36,6 +40,8 @@ import org.apache.flume.source.SequenceG import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class Application { private static final Logger logger = LoggerFactory @@ -44,9 +50,11 @@ public class Application { private String[] args; private Set nodeConfigs; private Map contexts; + private File configurationFile; private SourceFactory sourceFactory; private SinkFactory sinkFactory; + private ChannelFactory channelFactory; public static void main(String[] args) { Application application = new Application(); @@ -69,10 +77,13 @@ public class Application { nodeConfigs = new HashSet(); sourceFactory = new DefaultSourceFactory(); sinkFactory = new DefaultSinkFactory(); + channelFactory = new DefaultChannelFactory(); contexts = new HashMap(); } public void loadPlugins() { + channelFactory.register("memory", MemoryChannel.class); + sourceFactory.register("seq", SequenceGeneratorSource.class); sourceFactory.register("netcat", NetcatSource.class); @@ -88,6 +99,9 @@ public class Application { option.setValueSeparator(','); options.addOption(option); + option = new Option("f", "conf-file", true, "specify a conf file"); + options.addOption(option); + CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); @@ -132,6 +146,8 @@ public class Application { nodeConfigs.add(nodeConfiguration); } + } else if (commandLine.hasOption('f')) { + configurationFile = new File(commandLine.getOptionValue('f')); } } @@ -140,9 +156,20 @@ public class Application { final FlumeNode node = new FlumeNode(); NodeManager nodeManager = new DefaultLogicalNodeManager(); + JsonFileConfigurationProvider configurationProvider = new JsonFileConfigurationProvider(); + + configurationProvider.setChannelFactory(channelFactory); + configurationProvider.setSourceFactory(sourceFactory); + configurationProvider.setSinkFactory(sinkFactory); + + Preconditions.checkState(configurationFile != null, + "Configuration file not specified"); + + configurationProvider.setFile(configurationFile); node.setName("node"); node.setNodeManager(nodeManager); + node.setConfigurationProvider(configurationProvider); Runtime.getRuntime().addShutdownHook(new Thread("node-shutdownHook") { Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java?rev=1172841&r1=1172840&r2=1172841&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java (original) +++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java Mon Sep 19 21:25:09 2011 @@ -16,7 +16,7 @@ public class FlumeNode implements Lifecy private String name; private LifecycleState lifecycleState; private NodeManager nodeManager; - private NodeConfigurationClient configurationClient; + private ConfigurationProvider configurationProvider; private LifecycleSupervisor supervisor; public FlumeNode() { @@ -36,6 +36,8 @@ public class FlumeNode implements Lifecy supervisor.supervise(nodeManager, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + supervisor.supervise(configurationProvider, + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); lifecycleState = LifecycleState.START; } @@ -52,7 +54,8 @@ public class FlumeNode implements Lifecy @Override public String toString() { - return "{ name:" + name + " nodeManager:" + nodeManager + " }"; + return "{ name:" + name + " nodeManager:" + nodeManager + + " configurationProvider:" + configurationProvider + " }"; } public String getName() { @@ -71,6 +74,15 @@ public class FlumeNode implements Lifecy this.nodeManager = nodeManager; } + public ConfigurationProvider getConfigurationProvider() { + return configurationProvider; + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + this.configurationProvider = configurationProvider; + } + @Override public LifecycleState getLifecycleState() { return lifecycleState;