flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1172868 - in /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume: conf/file/ node/ node/nodemanager/
Date Mon, 19 Sep 2011 22:38:08 GMT
Author: esammer
Date: Mon Sep 19 22:38:08 2011
New Revision: 1172868

URL: http://svn.apache.org/viewvc?rev=1172868&view=rev
Log:
- Manage source and sink runners as first class citizens (rather than wrapped in logical nodes).
- Wired the JsonFileConfigurationProvider into the node manager so it receives config updates
(WORKS).
- JsonFlumeConfiguration renamed to SimpleNodeConfiguration (there was nothing json-ish about
it).
- Retroactively made NodeConfiguration the interface for SimpleNodeConfiguration.

Added:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/SimpleNodeConfiguration.java
      - copied, changed from r1172867, incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
Removed:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/DefaultNodeConfiguration.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java?rev=1172868&r1=1172867&r2=1172868&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
Mon Sep 19 22:38:08 2011
@@ -20,6 +20,7 @@ import org.apache.flume.SourceFactory;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.ConfigurationProvider;
+import org.apache.flume.node.nodemanager.NodeConfigurationAware;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@ public class JsonFileConfigurationProvid
   private ChannelFactory channelFactory;
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
+  private NodeConfigurationAware configurationAware;
 
   private LifecycleState lifecycleState;
   private ScheduledExecutorService executorService;
@@ -98,7 +100,7 @@ public class JsonFileConfigurationProvid
     logger.debug("JSON configuration provider stopped");
   }
 
-  private void loadSources(JsonFlumeConfiguration conf,
+  private void loadSources(SimpleNodeConfiguration conf,
       List<Map<String, Object>> defs) throws InstantiationException {
 
     logger.debug("Loading sources");
@@ -127,7 +129,7 @@ public class JsonFileConfigurationProvid
     }
   }
 
-  private void loadSinks(JsonFlumeConfiguration conf,
+  private void loadSinks(SimpleNodeConfiguration conf,
       List<Map<String, Object>> defs) throws InstantiationException {
 
     logger.debug("Loading sinks");
@@ -153,7 +155,7 @@ public class JsonFileConfigurationProvid
     }
   }
 
-  private void loadChannels(JsonFlumeConfiguration conf,
+  private void loadChannels(SimpleNodeConfiguration conf,
       List<Map<String, Object>> defs) throws InstantiationException {
 
     logger.debug("Loading channels");
@@ -171,7 +173,7 @@ public class JsonFileConfigurationProvid
   }
 
   private synchronized void load() {
-    JsonFlumeConfiguration flumeConf = new JsonFlumeConfiguration();
+    SimpleNodeConfiguration flumeConf = new SimpleNodeConfiguration();
     ObjectMapper mapper = new ObjectMapper();
 
     try {
@@ -194,6 +196,8 @@ public class JsonFileConfigurationProvid
       }
 
       logger.debug("Loaded conf:{}", flumeConf);
+
+      configurationAware.onNodeConfigurationChanged(flumeConf);
     } catch (JsonParseException e) {
       logger.error("Unable to parse json file:" + file + " Exception follows.",
           e);
@@ -244,6 +248,14 @@ public class JsonFileConfigurationProvid
     this.sinkFactory = sinkFactory;
   }
 
+  public NodeConfigurationAware getConfigurationAware() {
+    return configurationAware;
+  }
+
+  public void setConfigurationAware(NodeConfigurationAware configurationAware) {
+    this.configurationAware = configurationAware;
+  }
+
   public class FileWatcherRunnable implements Runnable {
 
     private File file;

Copied: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/SimpleNodeConfiguration.java
(from r1172867, incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java)
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/SimpleNodeConfiguration.java?p2=incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/SimpleNodeConfiguration.java&p1=incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java&r1=1172867&r2=1172868&rev=1172868&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/SimpleNodeConfiguration.java
Mon Sep 19 22:38:08 2011
@@ -6,14 +6,15 @@ import java.util.Map;
 import org.apache.flume.Channel;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
+import org.apache.flume.node.NodeConfiguration;
 
-public class JsonFlumeConfiguration {
+public class SimpleNodeConfiguration implements NodeConfiguration {
 
   private Map<String, Channel> channels;
   private Map<String, SourceRunner> sourceRunners;
   private Map<String, SinkRunner> sinkRunners;
 
-  public JsonFlumeConfiguration() {
+  public SimpleNodeConfiguration() {
     channels = new HashMap<String, Channel>();
     sourceRunners = new HashMap<String, SourceRunner>();
     sinkRunners = new HashMap<String, SinkRunner>();
@@ -25,6 +26,7 @@ public class JsonFlumeConfiguration {
         + " channels:" + channels + " }";
   }
 
+  @Override
   public Map<String, Channel> getChannels() {
     return channels;
   }
@@ -33,6 +35,7 @@ public class JsonFlumeConfiguration {
     this.channels = channels;
   }
 
+  @Override
   public Map<String, SourceRunner> getSourceRunners() {
     return sourceRunners;
   }
@@ -41,6 +44,7 @@ public class JsonFlumeConfiguration {
     this.sourceRunners = sourceRunners;
   }
 
+  @Override
   public Map<String, SinkRunner> getSinkRunners() {
     return sinkRunners;
   }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1172868&r1=1172867&r2=1172868&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Mon Sep 19 22:38:08 2011
@@ -1,10 +1,6 @@
 package org.apache.flume.node;
 
 import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -12,19 +8,11 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.flume.Channel;
 import org.apache.flume.ChannelFactory;
-import org.apache.flume.Context;
-import org.apache.flume.LogicalNode;
-import org.apache.flume.Sink;
 import org.apache.flume.SinkFactory;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
-import org.apache.flume.SourceRunner;
 import org.apache.flume.channel.DefaultChannelFactory;
 import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.file.JsonFileConfigurationProvider;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
@@ -48,8 +36,6 @@ public class Application {
       .getLogger(Application.class);
 
   private String[] args;
-  private Set<NodeConfiguration> nodeConfigs;
-  private Map<String, Context> contexts;
   private File configurationFile;
 
   private SourceFactory sourceFactory;
@@ -74,11 +60,9 @@ public class Application {
   }
 
   public Application() {
-    nodeConfigs = new HashSet<NodeConfiguration>();
     sourceFactory = new DefaultSourceFactory();
     sinkFactory = new DefaultSinkFactory();
     channelFactory = new DefaultChannelFactory();
-    contexts = new HashMap<String, Context>();
   }
 
   public void loadPlugins() {
@@ -106,47 +90,7 @@ public class Application {
 
     CommandLine commandLine = parser.parse(options, args);
 
-    if (commandLine.hasOption("node")) {
-      String[] values = commandLine.getOptionValues("node");
-
-      for (String value : values) {
-        String[] parts = value.split(":");
-
-        if (parts.length < 3) {
-          throw new ParseException(
-              "Node definition must be in the format <name>:<source>:<sink>:<context
params>");
-        }
-
-        DefaultNodeConfiguration nodeConfiguration = new DefaultNodeConfiguration();
-
-        nodeConfiguration.setName(parts[0]);
-        nodeConfiguration.setSourceDefinition(parts[1]);
-        nodeConfiguration.setSinkDefinition(parts[2]);
-
-        Context context = new Context();
-
-        if (parts.length >= 4) {
-          logger.debug("Parsing context parameters:{}", parts[3]);
-
-          String[] contextParts = parts[3].split("\\|");
-
-          for (String contextPart : contextParts) {
-            logger.debug("parsing kv pair:{}", contextPart);
-
-            String[] strings = contextPart.split("=");
-            context.put(strings[0], strings[1]);
-          }
-
-        }
-
-        logger.debug("Created nodeConfig:{} context:{}", nodeConfiguration,
-            context);
-
-        contexts.put(nodeConfiguration.getName(), context);
-
-        nodeConfigs.add(nodeConfiguration);
-      }
-    } else if (commandLine.hasOption('f')) {
+    if (commandLine.hasOption('f')) {
       configurationFile = new File(commandLine.getOptionValue('f'));
     }
   }
@@ -155,7 +99,7 @@ public class Application {
       InstantiationException {
 
     final FlumeNode node = new FlumeNode();
-    NodeManager nodeManager = new DefaultLogicalNodeManager();
+    DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
     JsonFileConfigurationProvider configurationProvider = new JsonFileConfigurationProvider();
 
     configurationProvider.setChannelFactory(channelFactory);
@@ -165,6 +109,7 @@ public class Application {
     Preconditions.checkState(configurationFile != null,
         "Configuration file not specified");
 
+    configurationProvider.setConfigurationAware(nodeManager);
     configurationProvider.setFile(configurationFile);
 
     node.setName("node");
@@ -183,31 +128,30 @@ public class Application {
     node.start();
     LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
 
-    if (node.getLifecycleState().equals(LifecycleState.START)) {
-      for (NodeConfiguration nodeConf : nodeConfigs) {
-        Source source = sourceFactory.create(nodeConf.getSourceDefinition());
-        Sink sink = sinkFactory.create(nodeConf.getSinkDefinition());
-        Channel channel = new MemoryChannel();
-
-        Configurables.configure(source, contexts.get(nodeConf.getName()));
-        Configurables.configure(sink, contexts.get(nodeConf.getName()));
-
-        source.setChannel(channel);
-        sink.setChannel(channel);
-
-        LogicalNode logicalNode = new LogicalNode();
-
-        logicalNode.setName(nodeConf.getName());
-
-        SourceRunner sourceRunner = SourceRunner.forSource(source);
-        SinkRunner sinkRunner = SinkRunner.forSink(sink);
-
-        logicalNode.setSourceRunner(sourceRunner);
-        logicalNode.setSinkRunner(sinkRunner);
-
-        nodeManager.add(logicalNode);
-      }
-    }
+    /*
+     * if (node.getLifecycleState().equals(LifecycleState.START)) { for
+     * (NodeConfiguration nodeConf : nodeConfigs) { Source source =
+     * sourceFactory.create(nodeConf.getSourceDefinition()); Sink sink =
+     * sinkFactory.create(nodeConf.getSinkDefinition()); Channel channel = new
+     * MemoryChannel();
+     * 
+     * Configurables.configure(source, contexts.get(nodeConf.getName()));
+     * Configurables.configure(sink, contexts.get(nodeConf.getName()));
+     * 
+     * source.setChannel(channel); sink.setChannel(channel);
+     * 
+     * LogicalNode logicalNode = new LogicalNode();
+     * 
+     * logicalNode.setName(nodeConf.getName());
+     * 
+     * SourceRunner sourceRunner = SourceRunner.forSource(source); SinkRunner
+     * sinkRunner = SinkRunner.forSink(sink);
+     * 
+     * logicalNode.setSourceRunner(sourceRunner);
+     * logicalNode.setSinkRunner(sinkRunner);
+     * 
+     * nodeManager.add(logicalNode); } }
+     */
 
     LifecycleController.waitForOneOf(node, LifecycleState.STOP_OR_ERROR);
   }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java?rev=1172868&r1=1172867&r2=1172868&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
Mon Sep 19 22:38:08 2011
@@ -1,13 +1,17 @@
 package org.apache.flume.node;
 
-public interface NodeConfiguration {
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
 
-  public long getVersion();
+public interface NodeConfiguration {
 
-  public String getName();
+  public Map<String, SourceRunner> getSourceRunners();
 
-  public String getSourceDefinition();
+  public Map<String, SinkRunner> getSinkRunners();
 
-  public String getSinkDefinition();
+  public Map<String, Channel> getChannels();
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1172868&r1=1172867&r2=1172868&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Mon Sep 19 22:38:08 2011
@@ -1,17 +1,14 @@
 package org.apache.flume.node.nodemanager;
 
-import org.apache.flume.Sink;
-import org.apache.flume.Source;
+import java.util.Map.Entry;
+
 import org.apache.flume.LogicalNode;
-import org.apache.flume.PollableSource;
-import org.apache.flume.SinkFactory;
-import org.apache.flume.SourceFactory;
+import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.node.NodeConfiguration;
-import org.apache.flume.source.PollableSourceRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,9 +20,6 @@ public class DefaultLogicalNodeManager e
   private static final Logger logger = LoggerFactory
       .getLogger(DefaultLogicalNodeManager.class);
 
-  private SourceFactory sourceFactory;
-  private SinkFactory sinkFactory;
-
   private LifecycleSupervisor nodeSupervisor;
   private LifecycleState lifecycleState;
 
@@ -38,42 +32,19 @@ public class DefaultLogicalNodeManager e
   public void onNodeConfigurationChanged(NodeConfiguration nodeConfiguration) {
     logger.info("Node configuration change:{}", nodeConfiguration);
 
-    /*
-     * FIXME: Decide if nodeConfiguration is worth applying. We can't trust the
-     * caller to know our config.
-     */
-
-    Source source = null;
-    Sink sink = null;
+    for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
+        .entrySet()) {
 
-    try {
-      source = sourceFactory.create(nodeConfiguration.getSourceDefinition());
-    } catch (InstantiationException e) {
-      logger
-          .error(
-              "Failed to apply configuration:{} because of source failure:{} - retaining
old configuration",
-              nodeConfiguration, e.getMessage());
-      return;
-    }
-
-    try {
-      sink = sinkFactory.create(nodeConfiguration.getSinkDefinition());
-    } catch (InstantiationException e) {
-      logger
-          .error(
-              "Failed to apply configuration:{} because of sink failure:{} - retaining old
configuration",
-              nodeConfiguration, e.getMessage());
-      return;
+      nodeSupervisor.supervise(entry.getValue(),
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
     }
 
-    LogicalNode newLogicalNode = new LogicalNode();
+    for (Entry<String, SourceRunner> entry : nodeConfiguration
+        .getSourceRunners().entrySet()) {
 
-    if (source instanceof PollableSource) {
-      SourceRunner channelAdapter = new PollableSourceRunner();
-      newLogicalNode.setSourceRunner(channelAdapter);
+      nodeSupervisor.supervise(entry.getValue(),
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
     }
-
-    add(newLogicalNode);
   }
 
   @Override



Mime
View raw message