flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [2/3] FLUME-1630. Flume configuration code could be improved.
Date Fri, 30 Nov 2012 21:18:36 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 405afa3..c066a1a 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -21,6 +21,11 @@ package org.apache.flume.node;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -29,150 +34,278 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.flume.ChannelFactory;
+import org.apache.flume.Channel;
 import org.apache.flume.Constants;
-import org.apache.flume.SinkFactory;
-import org.apache.flume.SourceFactory;
-import org.apache.flume.channel.DefaultChannelFactory;
-import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
-import org.apache.flume.conf.properties.PropertiesFileConfigurationProvider;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.Context;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.flume.source.DefaultSourceFactory;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 
-public class Application {
+public class Application  {
 
   private static final Logger logger = LoggerFactory
       .getLogger(Application.class);
 
-  private String[] args;
-  private File configurationFile;
-  private String nodeName;
+  public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+  public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
 
-  private SourceFactory sourceFactory;
-  private SinkFactory sinkFactory;
-  private ChannelFactory channelFactory;
+  private final List<LifecycleAware> components;
+  private final LifecycleSupervisor supervisor;
+  private MaterializedConfiguration materializedConfiguration;
+  private MonitorService monitorServer;
 
-  public static void main(String[] args) {
-    Application application = new Application();
-
-    application.setArgs(args);
+  public Application() {
+    this(new ArrayList<LifecycleAware>(0));
+  }
+  public Application(List<LifecycleAware> components) {
+    this.components = components;
+    supervisor = new LifecycleSupervisor();
+  }
 
-    try {
-      if (application.parseOptions()) {
-        application.run();
-      }
-    } catch (ParseException e) {
-      logger.error(e.getMessage());
-    } catch (Exception e) {
-      logger.error("A fatal error occurred while running. Exception follows.",
-          e);
+  public void start() {
+    for(LifecycleAware component : components) {
+      supervisor.supervise(component,
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
     }
   }
 
-  public Application() {
-    sourceFactory = new DefaultSourceFactory();
-    sinkFactory = new DefaultSinkFactory();
-    channelFactory = new DefaultChannelFactory();
+
+  @Subscribe
+  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
+    stopAllComponents();
+    startAllComponents(conf);
+  }
+
+  public void stop() {
+    supervisor.stop();
+    if(monitorServer != null) {
+      monitorServer.stop();
+    }
   }
 
-  public boolean parseOptions() throws ParseException {
-    Options options = new Options();
 
-    Option option = new Option("n", "name", true, "the name of this node");
-    options.addOption(option);
+  private void stopAllComponents() {
+    if (this.materializedConfiguration != null) {
+      logger.info("Shutting down configuration: {}", this.materializedConfiguration);
+      for (Entry<String, SourceRunner> entry : this.materializedConfiguration
+          .getSourceRunners().entrySet()) {
+        try{
+          logger.info("Stopping Source " + entry.getKey());
+          supervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
+      }
+
+      for (Entry<String, SinkRunner> entry :
+        this.materializedConfiguration.getSinkRunners().entrySet()) {
+        try{
+          logger.info("Stopping Sink " + entry.getKey());
+          supervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
+      }
 
-    option = new Option("f", "conf-file", true, "specify a conf file");
-    options.addOption(option);
+      for (Entry<String, Channel> entry :
+        this.materializedConfiguration.getChannels().entrySet()) {
+        try{
+          logger.info("Stopping Channel " + entry.getKey());
+          supervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
+      }
+    }
+    if(monitorServer != null) {
+      monitorServer.stop();
+    }
+  }
 
-    option = new Option("h", "help", false, "display help text");
-    options.addOption(option);
+  private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
+    logger.info("Starting new configuration:{}", materializedConfiguration);
 
-    CommandLineParser parser = new GnuParser();
-    CommandLine commandLine = parser.parse(options, args);
+    this.materializedConfiguration = materializedConfiguration;
 
-    if (commandLine.hasOption('f')) {
-      configurationFile = new File(commandLine.getOptionValue('f'));
+    for (Entry<String, Channel> entry :
+      materializedConfiguration.getChannels().entrySet()) {
+      try{
+        logger.info("Starting Channel " + entry.getKey());
+        supervisor.supervise(entry.getValue(),
+            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e){
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
+    }
 
-      if (!configurationFile.exists()) {
-        // If command line invocation, then need to fail fast
-        if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
-          String path = configurationFile.getPath();
-          try {
-            path = configurationFile.getCanonicalPath();
-          } catch (IOException ex) {
-            logger.error("Failed to read canonical path for file: " + path, ex);
-          }
-          throw new ParseException(
-              "The specified configuration file does not exist: " + path);
+    /*
+     * Wait for all channels to start.
+     */
+    for(Channel ch: materializedConfiguration.getChannels().values()){
+      while(ch.getLifecycleState() != LifecycleState.START
+          && !supervisor.isComponentInErrorState(ch)){
+        try {
+          logger.info("Waiting for channel: " + ch.getName() +
+              " to start. Sleeping for 500 ms");
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          logger.error("Interrupted while waiting for channel to start.", e);
+          Throwables.propagate(e);
         }
       }
     }
 
-    if (commandLine.hasOption('n')) {
-      nodeName = commandLine.getOptionValue('n');
+    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
+        .entrySet()) {
+      try{
+        logger.info("Starting Sink " + entry.getKey());
+        supervisor.supervise(entry.getValue(),
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e) {
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
     }
 
-    if (commandLine.hasOption('h')) {
-      new HelpFormatter().printHelp("flume-ng node", options, true);
+    for (Entry<String, SourceRunner> entry : materializedConfiguration
+        .getSourceRunners().entrySet()) {
+      try{
+        logger.info("Starting Source " + entry.getKey());
+        supervisor.supervise(entry.getValue(),
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e) {
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
+    }
+
+    this.loadMonitoring();
+  }
+
 
-      return false;
+  @SuppressWarnings("unchecked")
+  private void loadMonitoring() {
+    Properties systemProps = System.getProperties();
+    Set<String> keys = systemProps.stringPropertyNames();
+    try {
+      if (keys.contains(CONF_MONITOR_CLASS)) {
+        String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
+        Class<? extends MonitorService> klass;
+        try {
+          //Is it a known type?
+          klass = MonitoringType.valueOf(
+                  monitorType.toUpperCase()).getMonitorClass();
+        } catch (Exception e) {
+          //Not a known type, use FQCN
+          klass = (Class<? extends MonitorService>) Class.forName(monitorType);
+        }
+        this.monitorServer = klass.newInstance();
+        Context context = new Context();
+        for (String key : keys) {
+          if (key.startsWith(CONF_MONITOR_PREFIX)) {
+            context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+                    systemProps.getProperty(key));
+          }
+        }
+        monitorServer.configure(context);
+        monitorServer.start();
+      }
+    } catch (Exception e) {
+      logger.warn("Error starting monitoring. "
+              + "Monitoring might not be available.", e);
     }
 
-    return true;
   }
 
-  public void run() throws LifecycleException, InterruptedException,
-      InstantiationException {
+  public static void main(String[] args) {
 
-    final FlumeNode node = new FlumeNode();
-    DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
-    AbstractFileConfigurationProvider configurationProvider =
-        new PropertiesFileConfigurationProvider();
+    try {
 
-    configurationProvider.setChannelFactory(channelFactory);
-    configurationProvider.setSourceFactory(sourceFactory);
-    configurationProvider.setSinkFactory(sinkFactory);
+      Options options = new Options();
 
-    configurationProvider.setNodeName(nodeName);
-    configurationProvider.setConfigurationAware(nodeManager);
-    configurationProvider.setFile(configurationFile);
+      Option option = new Option("n", "name", true, "the name of this agent");
+      option.setRequired(true);
+      options.addOption(option);
 
-    Preconditions.checkState(configurationFile != null,
-        "Configuration file not specified");
-    Preconditions.checkState(nodeName != null, "Node name not specified");
+      option = new Option("f", "conf-file", true, "specify a conf file");
+      option.setRequired(true);
+      options.addOption(option);
 
-    node.setName(nodeName);
-    node.setNodeManager(nodeManager);
-    node.setConfigurationProvider(configurationProvider);
+      option = new Option(null, "no-reload-conf", false, "do not reload " +
+        "conf file if changed");
+      options.addOption(option);
 
-    Runtime.getRuntime().addShutdownHook(new Thread("node-shutdownHook") {
+      option = new Option("h", "help", false, "display help text");
+      options.addOption(option);
 
-      @Override
-      public void run() {
-        node.stop();
-      }
+      CommandLineParser parser = new GnuParser();
+      CommandLine commandLine = parser.parse(options, args);
 
-    });
+      File configurationFile = new File(commandLine.getOptionValue('f'));
+      String agentName = commandLine.getOptionValue('n');
+      boolean reload = !commandLine.hasOption("no-reload");
 
-    node.start();
-    LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
-    LifecycleController.waitForOneOf(node, LifecycleState.STOP_OR_ERROR);
-  }
+      if (commandLine.hasOption('h')) {
+        new HelpFormatter().printHelp("flume-ng agent", options, true);
+        return;
+      }
+      /*
+       * The following is to ensure that by default the agent
+       * will fail on startup if the file does not exist.
+       */
+      if (!configurationFile.exists()) {
+        // If command line invocation, then need to fail fast
+        if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
+          String path = configurationFile.getPath();
+          try {
+            path = configurationFile.getCanonicalPath();
+          } catch (IOException ex) {
+            logger.error("Failed to read canonical path for file: " + path, ex);
+          }
+          throw new ParseException(
+              "The specified configuration file does not exist: " + path);
+        }
+      }
+      List<LifecycleAware> components = Lists.newArrayList();
+      Application application;
+      if(reload) {
+        EventBus eventBus = new EventBus(agentName + "-event-bus");
+        PollingPropertiesFileConfigurationProvider configurationProvider =
+            new PollingPropertiesFileConfigurationProvider(agentName,
+                configurationFile, eventBus, 30);
+        components.add(configurationProvider);
+        application = new Application(components);
+        eventBus.register(application);
+      } else {
+        PropertiesFileConfigurationProvider configurationProvider =
+            new PropertiesFileConfigurationProvider(agentName,
+                configurationFile);
+        application = new Application();
+        application.handleConfigurationEvent(configurationProvider.getConfiguration());
+      }
+      application.start();
 
-  public String[] getArgs() {
-    return args;
-  }
+      final Application appReference = application;
+      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
+        @Override
+        public void run() {
+          appReference.stop();
+        }
+      });
 
-  public void setArgs(String[] args) {
-    this.args = args;
+    } catch (Exception e) {
+      logger.error("A fatal error occurred while running. Exception follows.",
+          e);
+    }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
index ae732aa..6a27898 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
@@ -19,8 +19,11 @@
 
 package org.apache.flume.node;
 
-import org.apache.flume.lifecycle.LifecycleAware;
 
-public interface ConfigurationProvider extends LifecycleAware {
+public interface ConfigurationProvider {
+
+
+  public MaterializedConfiguration getConfiguration();
+
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java b/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
deleted file mode 100644
index cee022d..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class FlumeNode implements LifecycleAware {
-
-  private static final Logger logger = LoggerFactory.getLogger(FlumeNode.class);
-
-  private String name;
-  private LifecycleState lifecycleState;
-  private NodeManager nodeManager;
-  private ConfigurationProvider configurationProvider;
-  private LifecycleSupervisor supervisor;
-
-  public FlumeNode() {
-    supervisor = new LifecycleSupervisor();
-  }
-
-  @Override
-  public void start() {
-
-    Preconditions.checkState(name != null, "Node name can not be null");
-    Preconditions.checkState(nodeManager != null,
-        "Node manager can not be null");
-
-    supervisor.start();
-
-    logger.info("Flume node starting - {}", name);
-
-    supervisor.supervise(nodeManager,
-        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-    supervisor.supervise(configurationProvider,
-        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-
-    lifecycleState = LifecycleState.START;
-  }
-
-  @Override
-  public void stop() {
-
-    logger.info("Flume node stopping - {}", name);
-
-    supervisor.stop();
-
-    lifecycleState = LifecycleState.STOP;
-  }
-
-  @Override
-  public String toString() {
-    return "{ name:" + name + " nodeManager:" + nodeManager
-        + " configurationProvider:" + configurationProvider + " }";
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public NodeManager getNodeManager() {
-    return nodeManager;
-  }
-
-  public void setNodeManager(NodeManager nodeManager) {
-    this.nodeManager = nodeManager;
-  }
-
-  public ConfigurationProvider getConfigurationProvider() {
-    return configurationProvider;
-  }
-
-  public void setConfigurationProvider(
-      ConfigurationProvider configurationProvider) {
-    this.configurationProvider = configurationProvider;
-  }
-
-  @Override
-  public LifecycleState getLifecycleState() {
-    return lifecycleState;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
new file mode 100644
index 0000000..674fb38
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+public interface MaterializedConfiguration {
+
+  public void addSourceRunner(String name, SourceRunner sourceRunner);
+
+  public void addSinkRunner(String name, SinkRunner sinkRunner);
+
+  public void addChannel(String name, Channel channel);
+
+  public ImmutableMap<String, SourceRunner> getSourceRunners();
+
+  public ImmutableMap<String, SinkRunner> getSinkRunners();
+
+  public ImmutableMap<String, Channel> getChannels();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
deleted file mode 100644
index a24c939..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.Map;
-
-import org.apache.flume.Channel;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-
-public interface NodeConfiguration {
-
-  public Map<String, SourceRunner> getSourceRunners();
-
-  public Map<String, SinkRunner> getSinkRunners();
-
-  public Map<String, Channel> getChannels();
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java b/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
deleted file mode 100644
index 7457d87..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.Set;
-
-import org.apache.flume.lifecycle.LifecycleAware;
-
-public interface NodeManager extends LifecycleAware {
-
-  public boolean add(LifecycleAware node);
-
-  public boolean remove(LifecycleAware node);
-
-  public Set<LifecycleAware> getNodes();
-
-  public void setNodes(Set<LifecycleAware> nodes);
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..857c8a5
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PollingPropertiesFileConfigurationProvider extends
+ PropertiesFileConfigurationProvider implements LifecycleAware {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PollingPropertiesFileConfigurationProvider.class);
+
+  private final EventBus eventBus;
+  private final File file;
+  private final int interval;
+  private final CounterGroup counterGroup;
+  private LifecycleState lifecycleState;
+
+  private ScheduledExecutorService executorService;
+
+  public PollingPropertiesFileConfigurationProvider(String agentName,
+      File file, EventBus eventBus, int interval) {
+    super(agentName, file);
+    this.eventBus = eventBus;
+    this.file = file;
+    this.interval = interval;
+    counterGroup = new CounterGroup();
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Configuration provider starting");
+
+    Preconditions.checkState(file != null,
+        "The parameter file must not be null");
+
+    executorService = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+                .build());
+
+    FileWatcherRunnable fileWatcherRunnable =
+        new FileWatcherRunnable(file, counterGroup);
+
+    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
+        TimeUnit.SECONDS);
+
+    lifecycleState = LifecycleState.START;
+
+    LOGGER.debug("Configuration provider started");
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("Configuration provider stopping");
+
+    executorService.shutdown();
+    try{
+      while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+        LOGGER.debug("Waiting for file watcher to terminate");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+      Thread.currentThread().interrupt();
+    }
+    lifecycleState = LifecycleState.STOP;
+    LOGGER.debug("Configuration provider stopped");
+  }
+
+  @Override
+  public synchronized  LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+
+  @Override
+  public String toString() {
+    return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
+        + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
+  }
+
+  public class FileWatcherRunnable implements Runnable {
+
+    private final File file;
+    private final CounterGroup counterGroup;
+
+    private long lastChange;
+
+    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
+      super();
+      this.file = file;
+      this.counterGroup = counterGroup;
+      this.lastChange = 0L;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.debug("Checking file:{} for changes", file);
+
+      counterGroup.incrementAndGet("file.checks");
+
+      long lastModified = file.lastModified();
+
+      if (lastModified > lastChange) {
+        LOGGER.info("Reloading configuration file:{}", file);
+
+        counterGroup.incrementAndGet("file.loads");
+
+        lastChange = lastModified;
+
+        try {
+          eventBus.post(getConfiguration());
+        } catch (Exception e) {
+          LOGGER.error("Failed to load configuration data. Exception follows.",
+              e);
+        } catch (NoClassDefFoundError e) {
+          LOGGER.error("Failed to start agent because dependencies were not " +
+              "found in classpath. Error follows.", e);
+        } catch (Throwable t) {
+          // caught because the caller does not handle or log Throwables
+          LOGGER.error("Unhandled error", t);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..d7438d9
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * <p>
+ * A configuration provider that uses properties file for specifying
+ * configuration. The configuration files follow the Java properties file syntax
+ * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties file is prefixed by an
+ * <em>Agent Name</em> which helps isolate an individual agent&apos;s namespace.
+ * </p>
+ * <p>
+ * Valid configuration files must observe the following rules for every agent
+ * namespace.
+ * <ul>
+ * <li>For every &lt;agent name&gt; there must be three lists specified that
+ * include <tt>&lt;agent name&gt;.sources</tt>,
+ * <tt>&lt;agent name&gt;.sinks</tt>, and <tt>&lt;agent name&gt;.channels</tt>.
+ * Each of these lists must contain a space separated list of names
+ * corresponding to that particular entity.</li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of source
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.type = event</tt></li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a space-separated list of channel names that the source will associate
+ * with during runtime. Each of these names must be contained in the channels
+ * list specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.channels =
+ * &lt;channel-1 name&gt; &lt;channel-2 name&gt;</tt></li>
+ * <li>For each source named in the <tt>&lt;agent name&gt;.sources</tt>, there
+ * must be a <tt>runner</tt> namespace of configuration that configures the
+ * associated source runner. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.type = avro</tt>.
+ * This namespace can also be used to configure other configuration of the
+ * source runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt>
+ * </li>
+ * <li>For each source named in <tt>&lt;sources&gt;.sources</tt> there can
+ * be an optional <tt>selector.type</tt> specified that identifies the type
+ * of channel selector associated with the source. If not specified, the
+ * default replicating channel selector is used.
+ * </li><li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
+ * <tt>&lt;agent name&gt;.channels.&lt;channel name&gt;.type = mem</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.type = hdfs</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty single-valued channel name specified as the value of the
+ * <tt>channel</tt> attribute. This value must be contained in the channels list
+ * specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.channel =
+ * &lt;channel name&gt;</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a <tt>runner</tt> namespace of configuration that configures the
+ * associated sink runner. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.type = polling</tt>.
+ * This namespace can also be used to configure other configuration of the sink
+ * runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
+ * 60</tt></li>
+ * <li>A fourth optional list <tt>&lt;agent name&gt;.sinkgroups</tt>
+ * may be added to each agent, consisting of unique space separated names
+ * for groups</li>
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks
+ * belonging to it. These cannot be shared by multiple groups.
+ * Further, one can set a processor and behavioral parameters to determine
+ * how sink selection is made via <tt>&lt;agent name&gt;.sinkgroups.&lt;
+ * group name&lt.processor</tt>. For further detail refer to inividual processor
+ * documentation</li>
+ * <li>Sinks not assigned to a group will be assigned to default single sink
+ * groups.</li>
+ * </ul>
+ *
+ * Apart from the above required configuration values, each source, sink or
+ * channel can have its own set of arbitrary configuration as required by the
+ * implementation. Each of these configuration values are expressed by fully
+ * namespace qualified configuration keys. For example, the configuration
+ * property called <tt>capacity</tt> for a channel called <tt>ch1</tt> for the
+ * agent named <tt>host1</tt> with value <tt>1000</tt> will be expressed as:
+ * <tt>host1.channels.ch1.capacity = 1000</tt>.
+ * </p>
+ * <p>
+ * Any information contained in the configuration file other than what pertains
+ * to the configured agents, sources, sinks and channels via the explicitly
+ * enumerated list of sources, sinks and channels per agent name are ignored by
+ * this provider. Moreover, if any of the required configuration values are not
+ * present in the configuration file for the configured entities, that entity
+ * and anything that depends upon it is considered invalid and consequently not
+ * configured. For example, if a channel is missing its <tt>type</tt> attribute,
+ * it is considered misconfigured. Also, any sources or sinks that depend upon
+ * this channel are also considered misconfigured and not initialized.
+ * </p>
+ * <p>
+ * Example configuration file:
+ *
+ * <pre>
+ * #
+ * # Flume Configuration
+ * # This file contains configuration for one Agent identified as host1.
+ * #
+ *
+ * host1.sources = avroSource thriftSource
+ * host1.channels = jdbcChannel
+ * host1.sinks = hdfsSink
+ *
+ * # avroSource configuration
+ * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
+ * host1.sources.avroSource.runner.type = avro
+ * host1.sources.avroSource.runner.port = 11001
+ * host1.sources.avroSource.channels = jdbcChannel
+ * host1.sources.avroSource.selector.type = replicating
+ *
+ * # thriftSource configuration
+ * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
+ * host1.sources.thriftSource.runner.type = thrift
+ * host1.sources.thriftSource.runner.port = 12001
+ * host1.sources.thriftSource.channels = jdbcChannel
+ *
+ * # jdbcChannel configuration
+ * host1.channels.jdbcChannel.type = jdbc
+ * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
+ * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
+ * host1.channels.jdbcChannel.jdbc.username = flume
+ * host1.channels.jdbcChannel.jdbc.password = flume
+ *
+ * # hdfsSink configuration
+ * host1.sinks.hdfsSink.type = hdfs
+ * host1.sinks.hdfsSink.hdfs.path = hdfs://localhost/
+ * host1.sinks.hdfsSink.batchsize = 1000
+ * host1.sinks.hdfsSink.runner.type = polling
+ * host1.sinks.hdfsSink.runner.polling.interval = 60
+ * </pre>
+ *
+ * </p>
+ *
+ * @see java.util.Properties#load(java.io.Reader)
+ */
+public class PropertiesFileConfigurationProvider extends
+    AbstractConfigurationProvider {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PropertiesFileConfigurationProvider.class);
+
+  private final File file;
+
+  public PropertiesFileConfigurationProvider(String agentName, File file) {
+    super(agentName);
+    this.file = file;
+  }
+
+  @Override
+  public FlumeConfiguration getFlumeConfiguration() {
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(new FileReader(file));
+      Properties properties = new Properties();
+      properties.load(reader);
+      return new FlumeConfiguration(toMap(properties));
+    } catch (IOException ex) {
+      LOGGER.error("Unable to load file:" + file
+          + " (I/O failure) - Exception follows.", ex);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException ex) {
+          LOGGER.warn(
+              "Unable to close file reader for file: " + file, ex);
+        }
+      }
+    }
+    return new FlumeConfiguration(new HashMap<String, String>());
+  }
+
+  private Map<String, String> toMap(Properties properties) {
+    Map<String, String> result = Maps.newHashMap();
+    Enumeration<?> propertyNames = properties.propertyNames();
+    while (propertyNames.hasMoreElements()) {
+      String name = (String) propertyNames.nextElement();
+      String value = properties.getProperty(name);
+      result.put(name, value);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
new file mode 100644
index 0000000..536dcc4
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleMaterializedConfiguration implements MaterializedConfiguration {
+
+  private final Map<String, Channel> channels;
+  private final Map<String, SourceRunner> sourceRunners;
+  private final Map<String, SinkRunner> sinkRunners;
+
+  public SimpleMaterializedConfiguration() {
+    channels = new HashMap<String, Channel>();
+    sourceRunners = new HashMap<String, SourceRunner>();
+    sinkRunners = new HashMap<String, SinkRunner>();
+  }
+
+  @Override
+  public String toString() {
+    return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners
+        + " channels:" + channels + " }";
+  }
+  @Override
+  public void addSourceRunner(String name, SourceRunner sourceRunner) {
+    sourceRunners.put(name, sourceRunner);
+  }
+
+  @Override
+  public void addSinkRunner(String name, SinkRunner sinkRunner) {
+    sinkRunners.put(name, sinkRunner);
+  }
+
+  @Override
+  public void addChannel(String name, Channel channel){
+    channels.put(name, channel);
+  }
+
+  @Override
+  public ImmutableMap<String, Channel> getChannels() {
+    return ImmutableMap.copyOf(channels);
+  }
+
+  @Override
+  public ImmutableMap<String, SourceRunner> getSourceRunners() {
+    return ImmutableMap.copyOf(sourceRunners);
+  }
+
+  @Override
+  public ImmutableMap<String, SinkRunner> getSinkRunners() {
+    return ImmutableMap.copyOf(sinkRunners);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
deleted file mode 100644
index 1fda07b..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node.nodemanager;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.node.NodeManager;
-
-import com.google.common.base.Preconditions;
-
-abstract public class AbstractLogicalNodeManager implements NodeManager {
-
-  private Set<LifecycleAware> nodes;
-
-  public AbstractLogicalNodeManager() {
-    nodes = new HashSet<LifecycleAware>();
-  }
-
-  @Override
-  public boolean add(LifecycleAware node) {
-    Preconditions.checkNotNull(node);
-
-    return nodes.add(node);
-  }
-
-  @Override
-  public boolean remove(LifecycleAware node) {
-    Preconditions.checkNotNull(node);
-
-    return nodes.remove(node);
-  }
-
-  @Override
-  public Set<LifecycleAware> getNodes() {
-    return nodes;
-  }
-
-  @Override
-  public void setNodes(Set<LifecycleAware> nodes) {
-    Preconditions.checkNotNull(nodes);
-
-    this.nodes = new HashSet<LifecycleAware>(nodes);
-  }
-
-  @Override
-  public String toString() {
-    return "{ nodes:" + nodes + " }";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
deleted file mode 100644
index 60bfd5e..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node.nodemanager;
-
-import java.util.Map.Entry;
-
-import org.apache.flume.Channel;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
-import org.apache.flume.node.NodeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.util.Properties;
-import java.util.Set;
-import org.apache.flume.Context;
-import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.MonitoringType;
-
-
-public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
-    implements NodeConfigurationAware {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(DefaultLogicalNodeManager.class);
-
-  private LifecycleSupervisor nodeSupervisor;
-  private LifecycleState lifecycleState;
-  private NodeConfiguration nodeConfiguration;
-
-  private MonitorService monitorServer;
-
-  public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
-  public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
-
-  public DefaultLogicalNodeManager() {
-    nodeSupervisor = new LifecycleSupervisor();
-    lifecycleState = LifecycleState.IDLE;
-    nodeConfiguration = null;
-  }
-
-  @Override
-  public void stopAllComponents() {
-    if (this.nodeConfiguration != null) {
-      logger.info("Shutting down configuration: {}", this.nodeConfiguration);
-      for (Entry<String, SourceRunner> entry : this.nodeConfiguration
-          .getSourceRunners().entrySet()) {
-        try{
-          logger.info("Stopping Source " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-
-      for (Entry<String, SinkRunner> entry :
-        this.nodeConfiguration.getSinkRunners().entrySet()) {
-        try{
-          logger.info("Stopping Sink " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-
-      for (Entry<String, Channel> entry :
-        this.nodeConfiguration.getChannels().entrySet()) {
-        try{
-          logger.info("Stopping Channel " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-    }
-    if(monitorServer != null) {
-      monitorServer.stop();
-    }
-  }
-
-  @Override
-  public void startAllComponents(NodeConfiguration nodeConfiguration) {
-    logger.info("Starting new configuration:{}", nodeConfiguration);
-
-    this.nodeConfiguration = nodeConfiguration;
-
-    for (Entry<String, Channel> entry :
-      nodeConfiguration.getChannels().entrySet()) {
-      try{
-        logger.info("Starting Channel " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e){
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    /*
-     * Wait for all channels to start.
-     */
-    for(Channel ch: nodeConfiguration.getChannels().values()){
-      while(ch.getLifecycleState() != LifecycleState.START
-          && !nodeSupervisor.isComponentInErrorState(ch)){
-        try {
-          logger.info("Waiting for channel: " + ch.getName() +
-              " to start. Sleeping for 500 ms");
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          logger.error("Interrupted while waiting for channel to start.", e);
-          Throwables.propagate(e);
-        }
-      }
-    }
-
-    for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
-        .entrySet()) {
-      try{
-        logger.info("Starting Sink " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e) {
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    for (Entry<String, SourceRunner> entry : nodeConfiguration
-        .getSourceRunners().entrySet()) {
-      try{
-        logger.info("Starting Source " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e) {
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    this.loadMonitoring();
-  }
-
-  @Override
-  public boolean add(LifecycleAware node) {
-    /*
-     * FIXME: This type of overriding worries me. There should be a better
-     * separation of addition of nodes and management. (i.e. state vs. function)
-     */
-    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
-        "You can not add nodes to a manager that hasn't been started");
-
-    if (super.add(node)) {
-      nodeSupervisor.supervise(node,
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean remove(LifecycleAware node) {
-    /*
-     * FIXME: This type of overriding worries me. There should be a better
-     * separation of addition of nodes and management. (i.e. state vs. function)
-     */
-    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
-        "You can not remove nodes from a manager that hasn't been started");
-
-    if (super.remove(node)) {
-      nodeSupervisor.unsupervise(node);
-
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public void start() {
-
-    logger.info("Node manager starting");
-
-    nodeSupervisor.start();
-
-    logger.debug("Node manager started");
-
-    lifecycleState = LifecycleState.START;
-  }
-
-  @Override
-  public void stop() {
-
-    logger.info("Node manager stopping");
-
-    stopAllComponents();
-
-    nodeSupervisor.stop();
-
-    logger.debug("Node manager stopped");
-
-    lifecycleState = LifecycleState.STOP;
-  }
-
-  @Override
-  public LifecycleState getLifecycleState() {
-    return lifecycleState;
-  }
-
-  private void loadMonitoring() {
-    Properties systemProps = System.getProperties();
-    Set<String> keys = systemProps.stringPropertyNames();
-    try {
-      if (keys.contains(CONF_MONITOR_CLASS)) {
-        String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
-        Class<? extends MonitorService> klass;
-        try {
-          //Is it a known type?
-          klass = MonitoringType.valueOf(
-                  monitorType.toUpperCase()).getMonitorClass();
-        } catch (Exception e) {
-          //Not a known type, use FQCN
-          klass = (Class<? extends MonitorService>) Class.forName(monitorType);
-        }
-        this.monitorServer = klass.newInstance();
-        Context context = new Context();
-        for (String key : keys) {
-          if (key.startsWith(CONF_MONITOR_PREFIX)) {
-            context.put(key.substring(CONF_MONITOR_PREFIX.length()),
-                    systemProps.getProperty(key));
-          }
-        }
-        monitorServer.configure(context);
-        monitorServer.start();
-      }
-    } catch (Exception e) {
-      logger.warn("Error starting monitoring. "
-              + "Monitoring might not be available.", e);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
deleted file mode 100644
index c20bf9b..0000000
--- a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node.nodemanager;
-
-import org.apache.flume.node.NodeConfiguration;
-
-public interface NodeConfigurationAware {
-
-  /**
-   * Stop all components currently running.
-   */
-  public void stopAllComponents();
-
-  /**
-   * Start components with the configuration provided.
-   * @param nodeConfiguration
-   */
-  public void startAllComponents(NodeConfiguration nodeConfiguration);
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
deleted file mode 100644
index d43aed6..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.conf.properties;
-
-import java.io.File;
-
-import org.apache.flume.channel.DefaultChannelFactory;
-import org.apache.flume.node.NodeConfiguration;
-import org.apache.flume.node.nodemanager.NodeConfigurationAware;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.flume.source.DefaultSourceFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPropertiesFileConfigurationProvider {
-
-  private static final File TESTFILE = new File(
-      TestPropertiesFileConfigurationProvider.class.getClassLoader()
-          .getResource("flume-conf.properties").getFile());
-
-  @SuppressWarnings("unused")
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(TestPropertiesFileConfigurationProvider.class);
-
-  @Before
-  public void setUp() throws Exception {
-    File tmpDir = new File("target/test");
-    tmpDir.mkdirs();
-
-    File derbyLogFile = new File(tmpDir, "derbytest.log");
-    String derbyLogFilePath = derbyLogFile.getCanonicalPath();
-
-    System.setProperty("derby.stream.error.file", derbyLogFilePath);
-  }
-
-  @Test
-  public void testPropertyRead() throws Exception {
-    PropertiesFileConfigurationProvider provider =
-        new PropertiesFileConfigurationProvider();
-
-    provider.setNodeName("host1");
-    provider.setConfigurationAware(new DummyNodeConfigurationAware());
-
-    provider.setChannelFactory(new DefaultChannelFactory());
-    provider.setSourceFactory(new DefaultSourceFactory());
-    provider.setSinkFactory(new DefaultSinkFactory());
-
-    provider.setFile(TESTFILE);
-    provider.load();
-  }
-
-  private static class DummyNodeConfigurationAware implements
-    NodeConfigurationAware {
-
-    @Override
-    public void stopAllComponents(){
-
-    }
-    @Override
-    public void startAllComponents(NodeConfiguration config) {
-       // no handling necessary
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
new file mode 100644
index 0000000..25001b1
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.annotations.Recyclable;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestAbstractConfigurationProvider {
+
+  @Test
+  public void testDispoableChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChannel(agentName,
+        DisposableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof DisposableChannel);
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof DisposableChannel);
+    Assert.assertNotSame(channel1, channel2);
+  }
+
+  @Test
+  public void testReusableChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChannel(agentName,
+        RecyclableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof RecyclableChannel);
+
+    Assert.assertSame(channel1, channel2);
+  }
+
+  @Test
+  public void testUnspecifiedChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChannel(agentName,
+        UnspecifiedChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof UnspecifiedChannel);
+
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof UnspecifiedChannel);
+
+    Assert.assertSame(channel1, channel2);
+  }
+
+  @Test
+  public void testReusableChannelNotReusedLater() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> propertiesReusable = getPropertiesForChannel(agentName,
+        RecyclableChannel.class.getName());
+    Map<String, String> propertiesDispoable = getPropertiesForChannel(agentName,
+        DisposableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, propertiesReusable);
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+    provider.setProperties(propertiesDispoable);
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof DisposableChannel);
+
+    provider.setProperties(propertiesReusable);
+    MaterializedConfiguration config3 = provider.getConfiguration();
+    Channel channel3 = config3.getChannels().values().iterator().next();
+    Assert.assertTrue(channel3 instanceof RecyclableChannel);
+
+    Assert.assertNotSame(channel1, channel3);
+  }
+
+
+  private Map<String, String> getPropertiesForChannel(String agentName, String channelType) {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(agentName + ".sources", "source1");
+    properties.put(agentName + ".channels", "channel1");
+    properties.put(agentName + ".sinks", "sink1");
+    properties.put(agentName + ".sources.source1.type", "seq");
+    properties.put(agentName + ".sources.source1.channels", "channel1");
+    properties.put(agentName + ".channels.channel1.type", channelType);
+    properties.put(agentName + ".channels.channel1.capacity", "100");
+    properties.put(agentName + ".sinks.sink1.type", "null");
+    properties.put(agentName + ".sinks.sink1.channel", "channel1");
+    return properties;
+  }
+
+  public static class MemoryConfigurationProvider extends AbstractConfigurationProvider {
+    private Map<String, String> properties;
+    public MemoryConfigurationProvider(String agentName, Map<String, String> properties) {
+      super(agentName);
+      this.properties = properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+      this.properties = properties;
+    }
+
+    @Override
+    protected FlumeConfiguration getFlumeConfiguration() {
+      return new FlumeConfiguration(properties);
+    }
+  }
+  @Disposable
+  public static class DisposableChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  @Recyclable
+  public static class RecyclableChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  public static class UnspecifiedChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
deleted file mode 100644
index 1cbc269..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Sink;
-import org.apache.flume.SinkProcessor;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.Source;
-import org.apache.flume.SourceRunner;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
-import org.apache.flume.sink.DefaultSinkProcessor;
-import org.apache.flume.sink.NullSink;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestAbstractLogicalNodeManager {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(TestAbstractLogicalNodeManager.class);
-
-  private AbstractLogicalNodeManager nodeManager;
-
-  @Before
-  public void setUp() {
-    nodeManager = new AbstractLogicalNodeManager() {
-
-      private LifecycleState lifecycleState = LifecycleState.IDLE;
-
-      @Override
-      public void stop() {
-
-        for (LifecycleAware node : getNodes()) {
-          node.stop();
-
-          boolean reached = false;
-
-          try {
-            reached = LifecycleController.waitForOneOf(node,
-                LifecycleState.STOP_OR_ERROR);
-          } catch (InterruptedException e) {
-            // Do nothing.
-          }
-
-          if (!reached) {
-            logger.error(
-                "Unable to stop logical node:{} This *will* cause failures.",
-                node);
-          }
-
-          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
-            lifecycleState = LifecycleState.ERROR;
-          }
-        }
-
-        lifecycleState = LifecycleState.STOP;
-      }
-
-      @Override
-      public void start() {
-
-        for (LifecycleAware node : getNodes()) {
-          node.start();
-
-          boolean reached = false;
-
-          try {
-            reached = LifecycleController.waitForOneOf(node,
-                LifecycleState.START_OR_ERROR);
-          } catch (InterruptedException e) {
-            // Do nothing.
-          }
-
-          if (!reached) {
-            logger.error(
-                "Unable to stop logical node:{} This *will* cause failures.",
-                node);
-          }
-
-          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
-            lifecycleState = LifecycleState.ERROR;
-          }
-        }
-
-        lifecycleState = LifecycleState.START;
-      }
-
-      @Override
-      public LifecycleState getLifecycleState() {
-        return lifecycleState;
-      }
-    };
-  }
-
-  @Test
-  public void testEmptyLifecycle() throws LifecycleException,
-  InterruptedException {
-
-    nodeManager.start();
-    boolean reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.START_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-    nodeManager.stop();
-    reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.STOP_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
-  }
-
-  @Test
-  public void testLifecycle() throws LifecycleException, InterruptedException {
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-
-    Source generatorSource = new SequenceGeneratorSource();
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    generatorSource.setChannelProcessor(new ChannelProcessor(rcs));
-
-    NullSink nullSink = new NullSink();
-    nullSink.configure(new Context());
-    nullSink.setChannel(channel);
-
-    nodeManager.add(SourceRunner.forSource(generatorSource));
-    SinkProcessor processor = new DefaultSinkProcessor();
-    List<Sink> sinks = new ArrayList<Sink>();
-    sinks.add(nullSink);
-    processor.setSinks(sinks);
-    nodeManager.add(new SinkRunner(processor));
-
-    nodeManager.start();
-    boolean reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.START_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-    nodeManager.stop();
-    reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.STOP_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
-  }
-
-  @Test
-  public void testRapidLifecycleFlapping() throws LifecycleException,
-  InterruptedException {
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-
-    Source source = new SequenceGeneratorSource();
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
-
-    NullSink sink = new NullSink();
-    sink.configure(new Context());
-    sink.setChannel(channel);
-
-    nodeManager.add(SourceRunner.forSource(source));
-    SinkProcessor processor = new DefaultSinkProcessor();
-    List<Sink> sinks = new ArrayList<Sink>();
-    sinks.add(sink);
-    processor.setSinks(sinks);
-    nodeManager.add(new SinkRunner(processor));
-
-    for (int i = 0; i < 10; i++) {
-      nodeManager.start();
-      boolean reached = LifecycleController.waitForOneOf(nodeManager,
-          LifecycleState.START_OR_ERROR);
-
-      Assert.assertTrue(reached);
-      Assert
-      .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-      nodeManager.stop();
-      reached = LifecycleController.waitForOneOf(nodeManager,
-          LifecycleState.STOP_OR_ERROR);
-
-      Assert.assertTrue(reached);
-      Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
new file mode 100644
index 0000000..d326312
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.eventbus.EventBus;
+
+public class TestApplication {
+
+
+  @Before
+  public void setup() throws Exception {
+
+  }
+
+  private <T extends LifecycleAware> T mockLifeCycle(Class<T> klass) {
+
+    T lifeCycleAware = mock(klass);
+
+    final AtomicReference<LifecycleState> state =
+        new AtomicReference<LifecycleState>();
+
+    state.set(LifecycleState.IDLE);
+
+    when(lifeCycleAware.getLifecycleState()).then(new Answer<LifecycleState>() {
+      @Override
+      public LifecycleState answer(InvocationOnMock invocation)
+          throws Throwable {
+        return state.get();
+      }
+    });
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        state.set(LifecycleState.START);
+        return null;
+      }
+    }).when(lifeCycleAware).start();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        state.set(LifecycleState.STOP);
+        return null;
+      }
+    }).when(lifeCycleAware).stop();
+
+    return lifeCycleAware;
+  }
+
+  @Test
+  public void testBasicConfiguration() throws Exception {
+
+    EventBus eventBus = new EventBus("test-event-bus");
+
+    MaterializedConfiguration materializedConfiguration = new
+        SimpleMaterializedConfiguration();
+
+    SourceRunner sourceRunner = mockLifeCycle(SourceRunner.class);
+    materializedConfiguration.addSourceRunner("test", sourceRunner);
+
+    SinkRunner sinkRunner = mockLifeCycle(SinkRunner.class);
+    materializedConfiguration.addSinkRunner("test", sinkRunner);
+
+    Channel channel = mockLifeCycle(Channel.class);
+    materializedConfiguration.addChannel("test", channel);
+
+
+    ConfigurationProvider configurationProvider = mock(ConfigurationProvider.class);
+    when(configurationProvider.getConfiguration()).thenReturn(materializedConfiguration);
+
+    Application application = new Application();
+    eventBus.register(application);
+    eventBus.post(materializedConfiguration);
+    application.start();
+
+    Thread.sleep(1000L);
+
+    verify(sourceRunner).start();
+    verify(sinkRunner).start();
+    verify(channel).start();
+
+    application.stop();
+
+    Thread.sleep(1000L);
+
+    verify(sourceRunner).stop();
+    verify(sinkRunner).stop();
+    verify(channel).stop();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ecd5062b/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
deleted file mode 100644
index 530b299..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
-import org.apache.flume.source.PollableSourceRunner;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestDefaultLogicalNodeManager {
-
-  private NodeManager nodeManager;
-
-  @Before
-  public void setUp() {
-    nodeManager = new DefaultLogicalNodeManager();
-  }
-
-  @Test
-  public void testLifecycle() throws LifecycleException, InterruptedException {
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testLifecycleWithNodes() throws LifecycleException,
-      InterruptedException {
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (int i = 0; i < 3; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      nodeManager.add(sourceRunner);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testNodeStartStops() throws LifecycleException,
-      InterruptedException {
-
-    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
-
-    for (int i = 0; i < 30; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      testNodes.add(sourceRunner);
-    }
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (LifecycleAware node : testNodes) {
-      nodeManager.add(node);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testErrorNode() throws LifecycleException, InterruptedException {
-
-    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
-
-    for (int i = 0; i < 30; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      testNodes.add(sourceRunner);
-    }
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (LifecycleAware node : testNodes) {
-      nodeManager.add(node);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-}


Mime
View raw message