flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1172841 - in /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node: Application.java FlumeNode.java
Date Mon, 19 Sep 2011 21:25:10 GMT
Author: esammer
Date: Mon Sep 19 21:25:09 2011
New Revision: 1172841

URL: http://svn.apache.org/viewvc?rev=1172841&view=rev
Log:
- FlumeNode now supports (and requires) having a ConfigurationProvider injected.
- Our bare-bones flume node command line tool now takes -f <conffile> and expects it
to be a json conf.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1172841&r1=1172840&r2=1172841&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Mon Sep 19 21:25:09 2011
@@ -1,5 +1,6 @@
 package org.apache.flume.node;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -12,6 +13,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
 import org.apache.flume.Context;
 import org.apache.flume.LogicalNode;
 import org.apache.flume.Sink;
@@ -20,8 +22,10 @@ import org.apache.flume.SinkRunner;
 import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
 import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.DefaultChannelFactory;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.file.JsonFileConfigurationProvider;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -36,6 +40,8 @@ import org.apache.flume.source.SequenceG
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class Application {
 
   private static final Logger logger = LoggerFactory
@@ -44,9 +50,11 @@ public class Application {
   private String[] args;
   private Set<NodeConfiguration> nodeConfigs;
   private Map<String, Context> contexts;
+  private File configurationFile;
 
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
+  private ChannelFactory channelFactory;
 
   public static void main(String[] args) {
     Application application = new Application();
@@ -69,10 +77,13 @@ public class Application {
     nodeConfigs = new HashSet<NodeConfiguration>();
     sourceFactory = new DefaultSourceFactory();
     sinkFactory = new DefaultSinkFactory();
+    channelFactory = new DefaultChannelFactory();
     contexts = new HashMap<String, Context>();
   }
 
   public void loadPlugins() {
+    channelFactory.register("memory", MemoryChannel.class);
+
     sourceFactory.register("seq", SequenceGeneratorSource.class);
     sourceFactory.register("netcat", NetcatSource.class);
 
@@ -88,6 +99,9 @@ public class Application {
     option.setValueSeparator(',');
     options.addOption(option);
 
+    option = new Option("f", "conf-file", true, "specify a conf file");
+    options.addOption(option);
+
     CommandLineParser parser = new GnuParser();
 
     CommandLine commandLine = parser.parse(options, args);
@@ -132,6 +146,8 @@ public class Application {
 
         nodeConfigs.add(nodeConfiguration);
       }
+    } else if (commandLine.hasOption('f')) {
+      configurationFile = new File(commandLine.getOptionValue('f'));
     }
   }
 
@@ -140,9 +156,20 @@ public class Application {
 
     final FlumeNode node = new FlumeNode();
     NodeManager nodeManager = new DefaultLogicalNodeManager();
+    JsonFileConfigurationProvider configurationProvider = new JsonFileConfigurationProvider();
+
+    configurationProvider.setChannelFactory(channelFactory);
+    configurationProvider.setSourceFactory(sourceFactory);
+    configurationProvider.setSinkFactory(sinkFactory);
+
+    Preconditions.checkState(configurationFile != null,
+        "Configuration file not specified");
+
+    configurationProvider.setFile(configurationFile);
 
     node.setName("node");
     node.setNodeManager(nodeManager);
+    node.setConfigurationProvider(configurationProvider);
 
     Runtime.getRuntime().addShutdownHook(new Thread("node-shutdownHook") {
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java?rev=1172841&r1=1172840&r2=1172841&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
Mon Sep 19 21:25:09 2011
@@ -16,7 +16,7 @@ public class FlumeNode implements Lifecy
   private String name;
   private LifecycleState lifecycleState;
   private NodeManager nodeManager;
-  private NodeConfigurationClient configurationClient;
+  private ConfigurationProvider configurationProvider;
   private LifecycleSupervisor supervisor;
 
   public FlumeNode() {
@@ -36,6 +36,8 @@ public class FlumeNode implements Lifecy
 
     supervisor.supervise(nodeManager,
         new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+    supervisor.supervise(configurationProvider,
+        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
 
     lifecycleState = LifecycleState.START;
   }
@@ -52,7 +54,8 @@ public class FlumeNode implements Lifecy
 
   @Override
   public String toString() {
-    return "{ name:" + name + " nodeManager:" + nodeManager + " }";
+    return "{ name:" + name + " nodeManager:" + nodeManager
+        + " configurationProvider:" + configurationProvider + " }";
   }
 
   public String getName() {
@@ -71,6 +74,15 @@ public class FlumeNode implements Lifecy
     this.nodeManager = nodeManager;
   }
 
+  public ConfigurationProvider getConfigurationProvider() {
+    return configurationProvider;
+  }
+
+  public void setConfigurationProvider(
+      ConfigurationProvider configurationProvider) {
+    this.configurationProvider = configurationProvider;
+  }
+
   @Override
   public LifecycleState getLifecycleState() {
     return lifecycleState;



Mime
View raw message