flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject [1/24] git commit: FLUME-1603: Flume configuration code could be improved
Date Tue, 30 Oct 2012 22:59:39 GMT
Updated Branches:
  refs/heads/FLUME-1502 34cbe3243 -> 3f3a888fd (forced update)


FLUME-1603: Flume configuration code could be improved


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3f3a888f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3f3a888f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3f3a888f

Branch: refs/heads/FLUME-1502
Commit: 3f3a888fdf069758008624e48fb6953df38b87b8
Parents: e47fd1e
Author: Brock Noland <brock@apache.org>
Authored: Mon Oct 15 09:53:20 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Tue Oct 30 17:57:23 2012 -0500

----------------------------------------------------------------------
 .../org/apache/flume/annotations/Disposable.java   |   27 ++
 .../org/apache/flume/annotations/Recyclable.java   |   27 ++
 .../flume/node/AbstractConfigurationProvider.java  |  352 +++++++++++++++
 .../flume/node/MaterializedConfiguration.java      |   36 ++
 ...PollingPropertiesFileConfigurationProvider.java |  158 +++++++
 .../node/PropertiesFileConfigurationProvider.java  |  221 +++++++++
 .../node/SimpleMaterializedConfiguration.java      |   62 +++
 .../node/TestAbstractConfigurationProvider.java    |  193 ++++++++
 .../org/apache/flume/node/TestApplication.java     |  123 +++++
 ...PollingPropertiesFileConfigurationProvider.java |  101 ++++
 .../TestPropertiesFileConfigurationProvider.java   |  111 +++++
 11 files changed, 1411 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java b/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java
new file mode 100644
index 0000000..d8db82c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.annotations;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.TYPE;
+
+@Target({ TYPE }) @Retention(RUNTIME)
+public @interface Disposable {}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java b/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java
new file mode 100644
index 0000000..a732c83
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.annotations;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.TYPE;
+
+@Target({ TYPE }) @Retention(RUNTIME)
+public @interface Recyclable {}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
new file mode 100644
index 0000000..fc371f6
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelFactory;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ChannelSelectorFactory;
+import org.apache.flume.channel.DefaultChannelFactory;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+public abstract class AbstractConfigurationProvider implements
+    ConfigurationProvider {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(AbstractConfigurationProvider.class);
+
+  private final String agentName;
+  private final SourceFactory sourceFactory;
+  private final SinkFactory sinkFactory;
+  private final ChannelFactory channelFactory;
+
+  private final Map<Class<? extends Channel>, Map<String, Channel>> channels;
+  private final ListMultimap<Class<? extends Channel>, String> unusedChannels;
+
+  public AbstractConfigurationProvider(String agentName) {
+    super();
+    this.agentName = agentName;
+    this.sourceFactory = new DefaultSourceFactory();
+    this.sinkFactory = new DefaultSinkFactory();
+    this.channelFactory = new DefaultChannelFactory();
+
+    channels = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
+    unusedChannels = ArrayListMultimap.create();
+
+  }
+
+  protected abstract FlumeConfiguration getFlumeConfiguration();
+
+  public MaterializedConfiguration getConfiguration() {
+    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
+    FlumeConfiguration fconfig = getFlumeConfiguration();
+    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
+    /*
+     * Assume every channel will not be used
+     */
+    unusedChannels.clear();
+    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channels.entrySet()) {
+      unusedChannels.get(entry.getKey()).addAll(entry.getValue().keySet());
+    }
+    if (agentConf != null) {
+      try {
+        loadChannels(agentConf, conf);
+        loadSources(agentConf, conf);
+        loadSinks(agentConf, conf);
+      } catch (InstantiationException ex) {
+        LOGGER.error("Failed to instantiate component", ex);
+      }
+    } else {
+      LOGGER.warn("No configuration found for this host:{}", getAgentName());
+    }
+    /*
+     * Any channel which was not used, will have it's reference removed
+     */
+    for (Class<? extends Channel> channelKlass : unusedChannels.keySet()) {
+      for (String channelName : unusedChannels.get(channelKlass)) {
+        Map<String, Channel> channelMap = channels.get(channelKlass);
+        if (channelMap != null) {
+          if(channelMap.remove(channelName) != null) {
+            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
+          }
+          if (channelMap.isEmpty()) {
+            channels.remove(channelKlass);
+          }
+        }
+      }
+    }
+    unusedChannels.clear();
+    return conf;
+  }
+
+  public String getAgentName() {
+    return agentName;
+  }
+
+
+  private void loadChannels(AgentConfiguration agentConf,
+      MaterializedConfiguration conf) throws InstantiationException {
+    LOGGER.info("Creating channels");
+    Set<String> channels = agentConf.getChannelSet();
+    Map<String, ComponentConfiguration> compMap =
+        agentConf.getChannelConfigMap();
+    for (String chName : channels) {
+      ComponentConfiguration comp = compMap.get(chName);
+      if(comp != null) {
+        Channel channel = getOrCreateChannel(comp.getComponentName(),
+            comp.getType());
+
+        Configurables.configure(channel, comp);
+
+        conf.getChannels().put(comp.getComponentName(), channel);
+      }
+    }
+
+    for (String ch : channels) {
+      Context context = agentConf.getChannelContext().get(ch);
+      if(context != null){
+        Channel channel =
+            getOrCreateChannel(ch, context.getString(
+                BasicConfigurationConstants.CONFIG_TYPE));
+        Configurables.configure(channel, context);
+        conf.getChannels().put(ch, channel);
+        LOGGER.info("created channel " + ch);
+      }
+    }
+
+  }
+  private void loadSources(AgentConfiguration agentConf, MaterializedConfiguration conf)
+      throws InstantiationException {
+
+    Set<String> sources = agentConf.getSourceSet();
+    Map<String, ComponentConfiguration> compMap =
+        agentConf.getSourceConfigMap();
+    for (String sourceName : sources) {
+      ComponentConfiguration comp = compMap.get(sourceName);
+      if(comp != null) {
+        SourceConfiguration config = (SourceConfiguration) comp;
+
+        Source source = sourceFactory.create(comp.getComponentName(),
+            comp.getType());
+
+        Configurables.configure(source, config);
+        Set<String> channelNames = config.getChannels();
+        List<Channel> channels = new ArrayList<Channel>();
+        for (String chName : channelNames) {
+          channels.add(conf.getChannels().get(chName));
+        }
+
+        ChannelSelectorConfiguration selectorConfig =
+            config.getSelectorConfiguration();
+
+        ChannelSelector selector = ChannelSelectorFactory.create(
+            channels, selectorConfig);
+
+        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+        Configurables.configure(channelProcessor, config);
+
+        source.setChannelProcessor(channelProcessor);
+        conf.getSourceRunners().put(comp.getComponentName(),
+            SourceRunner.forSource(source));
+      }
+    }
+    Map<String, Context> sourceContexts = agentConf.getSourceContext();
+
+    for (String src : sources) {
+      Context context = sourceContexts.get(src);
+      if(context != null){
+        Source source =
+            sourceFactory.create(src,
+                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+        List<Channel> channels = new ArrayList<Channel>();
+        Configurables.configure(source, context);
+        String[] channelNames = context.getString(
+            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+        for (String chName : channelNames) {
+          channels.add(conf.getChannels().get(chName));
+        }
+
+        Map<String, String> selectorConfig = context.getSubProperties(
+            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+        ChannelSelector selector = ChannelSelectorFactory.create(
+            channels, selectorConfig);
+
+        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+        Configurables.configure(channelProcessor, context);
+
+        source.setChannelProcessor(channelProcessor);
+        conf.getSourceRunners().put(src,
+            SourceRunner.forSource(source));
+
+      }
+    }
+  }
+
+  private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf)
+      throws InstantiationException {
+    Set<String> sinkNames = agentConf.getSinkSet();
+    Map<String, ComponentConfiguration> compMap =
+        agentConf.getSinkConfigMap();
+    Map<String, Sink> sinks = new HashMap<String, Sink>();
+    for (String sinkName : sinkNames) {
+      ComponentConfiguration comp = compMap.get(sinkName);
+      if(comp != null) {
+        SinkConfiguration config = (SinkConfiguration) comp;
+        Sink sink = sinkFactory.create(comp.getComponentName(),
+            comp.getType());
+
+        Configurables.configure(sink, config);
+
+        sink.setChannel(conf.getChannels().get(config.getChannel()));
+        sinks.put(comp.getComponentName(), sink);
+      }
+    }
+
+    Map<String, Context> sinkContexts = agentConf.getSinkContext();
+    for (String sinkName : sinkNames) {
+      Context context = sinkContexts.get(sinkName);
+      if(context != null) {
+        Sink sink = sinkFactory.create(sinkName, context.getString(
+            BasicConfigurationConstants.CONFIG_TYPE));
+        Configurables.configure(sink, context);
+
+        sink.setChannel(conf.getChannels().get(context.getString(
+            BasicConfigurationConstants.CONFIG_CHANNEL)));
+        sinks.put(sinkName, sink);
+      }
+    }
+
+    loadSinkGroups(agentConf, sinks, conf);
+  }
+
+  private void loadSinkGroups(AgentConfiguration agentConf,
+      Map<String, Sink> sinks, MaterializedConfiguration conf)
+          throws InstantiationException {
+    Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
+    Map<String, ComponentConfiguration> compMap =
+        agentConf.getSinkGroupConfigMap();
+    Map<String, String> usedSinks = new HashMap<String, String>();
+    for (String groupName: sinkgroupNames) {
+      ComponentConfiguration comp = compMap.get(groupName);
+      if(comp != null) {
+        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+        List<String> groupSinkList = groupConf.getSinks();
+        List<Sink> groupSinks = new ArrayList<Sink>();
+        for (String sink : groupSinkList) {
+          Sink s = sinks.remove(sink);
+          if (s == null) {
+            String sinkUser = usedSinks.get(sink);
+            if (sinkUser != null) {
+              throw new InstantiationException(String.format(
+                  "Sink %s of group %s already " +
+                      "in use by group %s", sink, groupName, sinkUser));
+            } else {
+              throw new InstantiationException(String.format(
+                  "Sink %s of group %s does "
+                      + "not exist or is not properly configured", sink,
+                      groupName));
+            }
+          }
+          groupSinks.add(s);
+          usedSinks.put(sink, groupName);
+        }
+        SinkGroup group = new SinkGroup(groupSinks);
+        Configurables.configure(group, groupConf);
+        conf.getSinkRunners().put(comp.getComponentName(),
+            new SinkRunner(group.getProcessor()));
+      }
+    }
+    // add any unassigned sinks to solo collectors
+    for(Entry<String, Sink> entry : sinks.entrySet()) {
+      if (!usedSinks.containsValue(entry.getKey())) {
+        SinkProcessor pr = new DefaultSinkProcessor();
+        List<Sink> sinkMap = new ArrayList<Sink>();
+        sinkMap.add(entry.getValue());
+        pr.setSinks(sinkMap);
+        Configurables.configure(pr, new Context());
+        conf.getSinkRunners().put(entry.getKey(),
+            new SinkRunner(pr));
+      }
+    }
+  }
+
+
+  private Channel getOrCreateChannel(String name, String type)
+      throws FlumeException {
+
+    Class<? extends Channel> channelClass = channelFactory.
+        getChannelClass(type);
+    /*
+     * Channel has requested a new instance on each re-configuration
+     */
+    if(channelClass.isAnnotationPresent(Disposable.class)) {
+      Channel channel = channelFactory.create(name, type);
+      channel.setName(name);
+      return channel;
+    }
+    Map<String, Channel> channelMap = channels.get(channelClass);
+    if (channelMap == null) {
+      channelMap = new HashMap<String, Channel>();
+      channels.put(channelClass, channelMap);
+    }
+    Channel channel = channelMap.get(name);
+    if(channel == null) {
+      channel = channelFactory.create(name, type);
+      channel.setName(name);
+      channelMap.put(name, channel);
+    }
+    unusedChannels.get(channelClass).remove(name);
+    return channel;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
new file mode 100644
index 0000000..7d9d7ec
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+public interface MaterializedConfiguration {
+
+  public Map<String, SourceRunner> getSourceRunners();
+
+  public Map<String, SinkRunner> getSinkRunners();
+
+  public Map<String, Channel> getChannels();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..925df83
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PollingPropertiesFileConfigurationProvider extends
+ PropertiesFileConfigurationProvider implements LifecycleAware {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PollingPropertiesFileConfigurationProvider.class);
+
+  private final EventBus eventBus;
+  private final File file;
+  private final int interval;
+  private final CounterGroup counterGroup;
+  private LifecycleState lifecycleState;
+
+  private ScheduledExecutorService executorService;
+
+  public PollingPropertiesFileConfigurationProvider(String agentName,
+      File file, EventBus eventBus, int interval) {
+    super(agentName, file);
+    this.eventBus = eventBus;
+    this.file = file;
+    this.interval = interval;
+    counterGroup = new CounterGroup();
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Configuration provider starting");
+
+    Preconditions.checkState(file != null,
+        "The parameter file must not be null");
+
+    executorService = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+                .build());
+
+    FileWatcherRunnable fileWatcherRunnable =
+        new FileWatcherRunnable(file, counterGroup);
+
+    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
+        TimeUnit.SECONDS);
+
+    lifecycleState = LifecycleState.START;
+
+    LOGGER.debug("Configuration provider started");
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("Configuration provider stopping");
+
+    executorService.shutdown();
+
+    while (!executorService.isTerminated()) {
+      try {
+        LOGGER.debug("Waiting for file watcher to terminate");
+        executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    lifecycleState = LifecycleState.STOP;
+    LOGGER.debug("Configuration provider stopped");
+  }
+
+  @Override
+  public synchronized  LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+
+  @Override
+  public String toString() {
+    return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
+        + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
+  }
+
+  public class FileWatcherRunnable implements Runnable {
+
+    private final File file;
+    private final CounterGroup counterGroup;
+
+    private long lastChange;
+
+    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
+      super();
+      this.file = file;
+      this.counterGroup = counterGroup;
+      this.lastChange = 0L;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.debug("Checking file:{} for changes", file);
+
+      counterGroup.incrementAndGet("file.checks");
+
+      long lastModified = file.lastModified();
+
+      if (lastModified > lastChange) {
+        LOGGER.info("Reloading configuration file:{}", file);
+
+        counterGroup.incrementAndGet("file.loads");
+
+        lastChange = lastModified;
+
+        try {
+          eventBus.post(getConfiguration());
+        } catch (Exception e) {
+          LOGGER.error("Failed to load configuration data. Exception follows.",
+              e);
+        } catch (NoClassDefFoundError e) {
+          LOGGER.error("Failed to start agent because dependencies were not " +
+              "found in classpath. Error follows.", e);
+        } catch (Throwable t) {
+          // caught because the caller does not handle or log Throwables
+          LOGGER.error("Unhandled error", t);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..b7ad9f0
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * <p>
+ * A configuration provider that uses properties file for specifying
+ * configuration. The configuration files follow the Java properties file syntax
+ * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties file is prefixed by an
+ * <em>Agent Name</em> which helps isolate an individual agent&apos;s namespace.
+ * </p>
+ * <p>
+ * Valid configuration files must observe the following rules for every agent
+ * namespace.
+ * <ul>
+ * <li>For every &lt;agent name&gt; there must be three lists specified that
+ * include <tt>&lt;agent name&gt;.sources</tt>,
+ * <tt>&lt;agent name&gt;.sinks</tt>, and <tt>&lt;agent name&gt;.channels</tt>.
+ * Each of these lists must contain a space separated list of names
+ * corresponding to that particular entity.</li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of source
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.type = event</tt></li>
+ * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
+ * be a space-separated list of channel names that the source will associate
+ * with during runtime. Each of these names must be contained in the channels
+ * list specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.channels =
+ * &lt;channel-1 name&gt; &lt;channel-2 name&gt;</tt></li>
+ * <li>For each source named in the <tt>&lt;agent name&gt;.sources</tt>, there
+ * must be a <tt>runner</tt> namespace of configuration that configures the
+ * associated source runner. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.type = avro</tt>.
+ * This namespace can also be used to configure other configuration of the
+ * source runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt>
+ * </li>
+ * <li>For each source named in <tt>&lt;sources&gt;.sources</tt> there can
+ * be an optional <tt>selector.type</tt> specified that identifies the type
+ * of channel selector associated with the source. If not specified, the
+ * default replicating channel selector is used.
+ * </li><li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
+ * <tt>&lt;agent name&gt;.channels.&lt;channel name&gt;.type = mem</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
+ * types. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.type = hdfs</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a non-empty single-valued channel name specified as the value of the
+ * <tt>channel</tt> attribute. This value must be contained in the channels list
+ * specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.channel =
+ * &lt;channel name&gt;</tt></li>
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a <tt>runner</tt> namespace of configuration that configures the
+ * associated sink runner. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.type = polling</tt>.
+ * This namespace can also be used to configure other configuration of the sink
+ * runner as needed. For example:
+ * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
+ * 60</tt></li>
+ * <li>A fourth optional list <tt>&lt;agent name&gt;.sinkgroups</tt>
+ * may be added to each agent, consisting of unique space separated names
+ * for groups</li>
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks
+ * belonging to it. These cannot be shared by multiple groups.
+ * Further, one can set a processor and behavioral parameters to determine
+ * how sink selection is made via <tt>&lt;agent name&gt;.sinkgroups.&lt;
+ * group name&lt.processor</tt>. For further detail refer to inividual processor
+ * documentation</li>
+ * <li>Sinks not assigned to a group will be assigned to default single sink
+ * groups.</li>
+ * </ul>
+ *
+ * Apart from the above required configuration values, each source, sink or
+ * channel can have its own set of arbitrary configuration as required by the
+ * implementation. Each of these configuration values are expressed by fully
+ * namespace qualified configuration keys. For example, the configuration
+ * property called <tt>capacity</tt> for a channel called <tt>ch1</tt> for the
+ * agent named <tt>host1</tt> with value <tt>1000</tt> will be expressed as:
+ * <tt>host1.channels.ch1.capacity = 1000</tt>.
+ * </p>
+ * <p>
+ * Any information contained in the configuration file other than what pertains
+ * to the configured agents, sources, sinks and channels via the explicitly
+ * enumerated list of sources, sinks and channels per agent name are ignored by
+ * this provider. Moreover, if any of the required configuration values are not
+ * present in the configuration file for the configured entities, that entity
+ * and anything that depends upon it is considered invalid and consequently not
+ * configured. For example, if a channel is missing its <tt>type</tt> attribute,
+ * it is considered misconfigured. Also, any sources or sinks that depend upon
+ * this channel are also considered misconfigured and not initialized.
+ * </p>
+ * <p>
+ * Example configuration file:
+ *
+ * <pre>
+ * #
+ * # Flume Configuration
+ * # This file contains configuration for one Agent identified as host1.
+ * #
+ *
+ * host1.sources = avroSource thriftSource
+ * host1.channels = jdbcChannel
+ * host1.sinks = hdfsSink
+ *
+ * # avroSource configuration
+ * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
+ * host1.sources.avroSource.runner.type = avro
+ * host1.sources.avroSource.runner.port = 11001
+ * host1.sources.avroSource.channels = jdbcChannel
+ * host1.sources.avroSource.selector.type = replicating
+ *
+ * # thriftSource configuration
+ * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
+ * host1.sources.thriftSource.runner.type = thrift
+ * host1.sources.thriftSource.runner.port = 12001
+ * host1.sources.thriftSource.channels = jdbcChannel
+ *
+ * # jdbcChannel configuration
+ * host1.channels.jdbcChannel.type = jdbc
+ * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
+ * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
+ * host1.channels.jdbcChannel.jdbc.username = flume
+ * host1.channels.jdbcChannel.jdbc.password = flume
+ *
+ * # hdfsSink configuration
+ * host1.sinks.hdfsSink.type = hdfs
+ * host1.sinks.hdfsSink.hdfs.path = hdfs://localhost/
+ * host1.sinks.hdfsSink.batchsize = 1000
+ * host1.sinks.hdfsSink.runner.type = polling
+ * host1.sinks.hdfsSink.runner.polling.interval = 60
+ * </pre>
+ *
+ * </p>
+ *
+ * @see java.util.Properties#load(java.io.Reader)
+ */
+public class PropertiesFileConfigurationProvider extends
+    AbstractConfigurationProvider {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PropertiesFileConfigurationProvider.class);
+
+  private final File file;
+
+  public PropertiesFileConfigurationProvider(String agentName, File file) {
+    super(agentName);
+    this.file = file;
+  }
+
+  @Override
+  public FlumeConfiguration getFlumeConfiguration() {
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(new FileReader(file));
+      Properties properties = new Properties();
+      properties.load(reader);
+      // If there is an old configuration, shut it down.
+      return new FlumeConfiguration(toMap(properties));
+    } catch (IOException ex) {
+      LOGGER.error("Unable to load file:" + file
+          + " (I/O failure) - Exception follows.", ex);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException ex) {
+          LOGGER.warn(
+              "Unable to close file reader for file: " + file, ex);
+        }
+      }
+    }
+    return new FlumeConfiguration(new HashMap<String, String>());
+  }
+
+  private Map<String, String> toMap(Properties properties) {
+    Map<String, String> result = Maps.newHashMap();
+    Enumeration<?> propertyNames = properties.propertyNames();
+    while (propertyNames.hasMoreElements()) {
+      String name = (String) propertyNames.nextElement();
+      String value = properties.getProperty(name);
+      result.put(name, value);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
new file mode 100644
index 0000000..cb29607
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+
+public class SimpleMaterializedConfiguration implements MaterializedConfiguration {
+
+  private final Map<String, Channel> channels;
+  private final Map<String, SourceRunner> sourceRunners;
+  private final Map<String, SinkRunner> sinkRunners;
+
+  public SimpleMaterializedConfiguration() {
+    channels = new HashMap<String, Channel>();
+    sourceRunners = new HashMap<String, SourceRunner>();
+    sinkRunners = new HashMap<String, SinkRunner>();
+  }
+
+  @Override
+  public String toString() {
+    return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners
+        + " channels:" + channels + " }";
+  }
+
+  @Override
+  public Map<String, Channel> getChannels() {
+    return channels;
+  }
+
+  @Override
+  public Map<String, SourceRunner> getSourceRunners() {
+    return sourceRunners;
+  }
+
+  @Override
+  public Map<String, SinkRunner> getSinkRunners() {
+    return sinkRunners;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
new file mode 100644
index 0000000..d6bfed8
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.Disposable;
+import org.apache.flume.annotations.Recyclable;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestAbstractConfigurationProvider {
+
+  @Test
+  public void testDispoableChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChanne(agentName,
+        DisposableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof DisposableChannel);
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof DisposableChannel);
+    Assert.assertNotSame(channel1, channel2);
+  }
+
+  @Test
+  public void testReusableChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChanne(agentName,
+        RecyclableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof RecyclableChannel);
+
+    Assert.assertSame(channel1, channel2);
+  }
+  
+  @Test
+  public void testUnspecifiedChannel() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> properties = getPropertiesForChanne(agentName,
+        UnspecifiedChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof UnspecifiedChannel);
+
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof UnspecifiedChannel);
+
+    Assert.assertSame(channel1, channel2);
+  }
+
+  @Test
+  public void testReusableChannelNotReusedLater() throws Exception {
+    String agentName = "agent1";
+    Map<String, String> propertiesReusable = getPropertiesForChanne(agentName,
+        RecyclableChannel.class.getName());
+    Map<String, String> propertiesDispoable = getPropertiesForChanne(agentName,
+        DisposableChannel.class.getName());
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, propertiesReusable);
+    MaterializedConfiguration config1 = provider.getConfiguration();
+    Channel channel1 = config1.getChannels().values().iterator().next();
+    Assert.assertTrue(channel1 instanceof RecyclableChannel);
+
+    provider.setProperties(propertiesDispoable);
+    MaterializedConfiguration config2 = provider.getConfiguration();
+    Channel channel2 = config2.getChannels().values().iterator().next();
+    Assert.assertTrue(channel2 instanceof DisposableChannel);
+
+    provider.setProperties(propertiesReusable);
+    MaterializedConfiguration config3 = provider.getConfiguration();
+    Channel channel3 = config3.getChannels().values().iterator().next();
+    Assert.assertTrue(channel3 instanceof RecyclableChannel);
+
+    Assert.assertNotSame(channel1, channel3);
+  }
+
+
+  private Map<String, String> getPropertiesForChanne(String agentName, String channelType) {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(agentName + ".sources", "source1");
+    properties.put(agentName + ".channels", "channel1");
+    properties.put(agentName + ".sinks", "sink1");
+    properties.put(agentName + ".sources.source1.type", "seq");
+    properties.put(agentName + ".sources.source1.channels", "channel1");
+    properties.put(agentName + ".channels.channel1.type", channelType);
+    properties.put(agentName + ".channels.channel1.capacity", "100");
+    properties.put(agentName + ".sinks.sink1.type", "null");
+    properties.put(agentName + ".sinks.sink1.channel", "channel1");
+    return properties;
+  }
+
+  public static class MemoryConfigurationProvider extends AbstractConfigurationProvider {
+    private Map<String, String> properties;
+    public MemoryConfigurationProvider(String agentName, Map<String, String> properties) {
+      super(agentName);
+      this.properties = properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+      this.properties = properties;
+    }
+
+    @Override
+    protected FlumeConfiguration getFlumeConfiguration() {
+      return new FlumeConfiguration(properties);
+    }
+  }
+  @Disposable
+  public static class DisposableChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  @Recyclable
+  public static class RecyclableChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  public static class UnspecifiedChannel extends AbstractChannel {
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
new file mode 100644
index 0000000..464901f
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.node;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.eventbus.EventBus;
+
+public class TestApplication {
+
+
+  @Before
+  public void setup() throws Exception {
+
+  }
+
+  private <T extends LifecycleAware> T mockLifeCycle(Class<T> klass) {
+
+    T lifeCycleAware = mock(klass);
+
+    final AtomicReference<LifecycleState> state =
+        new AtomicReference<LifecycleState>();
+
+    state.set(LifecycleState.IDLE);
+
+    when(lifeCycleAware.getLifecycleState()).then(new Answer<LifecycleState>() {
+      @Override
+      public LifecycleState answer(InvocationOnMock invocation)
+          throws Throwable {
+        return state.get();
+      }
+    });
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        state.set(LifecycleState.START);
+        return null;
+      }
+    }).when(lifeCycleAware).start();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        state.set(LifecycleState.STOP);
+        return null;
+      }
+    }).when(lifeCycleAware).stop();
+
+    return lifeCycleAware;
+  }
+
+  @Test
+  public void testBasicConfiguration() throws Exception {
+
+    EventBus eventBus = new EventBus("test-event-bus");
+
+    MaterializedConfiguration materializedConfiguration = new
+        SimpleMaterializedConfiguration();
+
+    SourceRunner sourceRunner = mockLifeCycle(SourceRunner.class);
+    materializedConfiguration.getSourceRunners().put("test", sourceRunner);
+
+    SinkRunner sinkRunner = mockLifeCycle(SinkRunner.class);
+    materializedConfiguration.getSinkRunners().put("test", sinkRunner);
+
+    Channel channel = mockLifeCycle(Channel.class);
+    materializedConfiguration.getChannels().put("test", channel);
+
+
+    ConfigurationProvider configurationProvider = mock(ConfigurationProvider.class);
+    when(configurationProvider.getConfiguration()).thenReturn(materializedConfiguration);
+
+    Application application = new Application();
+    eventBus.register(application);
+    eventBus.post(materializedConfiguration);
+    application.start();
+
+    Thread.sleep(1000L);
+
+    verify(sourceRunner).start();
+    verify(sinkRunner).start();
+    verify(channel).start();
+
+    application.stop();
+
+    Thread.sleep(1000L);
+
+    verify(sourceRunner).stop();
+    verify(sinkRunner).stop();
+    verify(channel).stop();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..eed22ee
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Files;
+
+public class TestPollingPropertiesFileConfigurationProvider  {
+
+
+  private static final File TESTFILE = new File(
+      TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
+          .getResource("flume-conf.properties").getFile());
+
+  private PollingPropertiesFileConfigurationProvider provider;
+  private File baseDir;
+  private File configFile;
+  private EventBus eventBus;
+
+  @Before
+  public void setUp() throws Exception {
+
+    baseDir = Files.createTempDir();
+
+    configFile = new File(baseDir, TESTFILE.getName());
+    Files.copy(TESTFILE, configFile);
+
+    eventBus = new EventBus("test");
+    provider =
+        new PollingPropertiesFileConfigurationProvider("host1",
+            configFile, eventBus, 1);
+    provider.start();
+    LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(baseDir);
+    provider.stop();
+  }
+
+  @Test
+  public void testPolling() throws Exception {
+
+    // let first event fire
+    Thread.sleep(2000L);
+
+    final List<MaterializedConfiguration> events = Lists.newArrayList();
+
+    Object eventHandler = new Object() {
+      @Subscribe
+      public synchronized void handleConfigurationEvent(MaterializedConfiguration event) {
+        events.add(event);
+      }
+    };
+    eventBus.register(eventHandler);
+    configFile.setLastModified(System.currentTimeMillis());
+
+    // now wait for second event to fire
+    Thread.sleep(2000L);
+
+    Assert.assertEquals(String.valueOf(events), 1, events.size());
+
+    MaterializedConfiguration materializedConfiguration = events.remove(0);
+
+    Assert.assertEquals(1, materializedConfiguration.getSourceRunners().size());
+    Assert.assertEquals(1, materializedConfiguration.getSinkRunners().size());
+    Assert.assertEquals(1, materializedConfiguration.getChannels().size());
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3f3a888f/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
new file mode 100644
index 0000000..84a8cfd
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class TestPropertiesFileConfigurationProvider  {
+
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(TestPropertiesFileConfigurationProvider.class);
+
+  private static final File TESTFILE = new File(
+      TestPropertiesFileConfigurationProvider.class.getClassLoader()
+          .getResource("flume-conf.properties").getFile());
+
+  private PropertiesFileConfigurationProvider provider;
+
+  @Before
+  public void setUp() throws Exception {
+    provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+
+  }
+
+  @Test
+  public void testPropertyRead() throws Exception {
+
+    FlumeConfiguration configuration = provider.getFlumeConfiguration();
+    Assert.assertNotNull(configuration);
+
+    /*
+     * Test the known errors in the file
+     */
+    List<String> expected = Lists.newArrayList();
+    expected.add("host5 CONFIG_ERROR");
+    expected.add("host5 INVALID_PROPERTY");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 AGENT_CONFIGURATION_INVALID");
+    expected.add("ch2 ATTRS_MISSING");
+    expected.add("host3 CONFIG_ERROR");
+    expected.add("host3 PROPERTY_VALUE_NULL");
+    expected.add("host3 AGENT_CONFIGURATION_INVALID");
+    expected.add("host2 PROPERTY_VALUE_NULL");
+    expected.add("host2 AGENT_CONFIGURATION_INVALID");
+    List<String> actual = Lists.newArrayList();
+    for(FlumeConfigurationError error : configuration.getConfigurationErrors()) {
+      actual.add(error.getComponentName() + " " + error.getErrorType().toString());
+    }
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+
+
+    AgentConfiguration agentConfiguration =
+        configuration.getConfigurationFor("host1");
+    Assert.assertNotNull(agentConfiguration);
+
+
+    LOGGER.info(agentConfiguration.getPrevalidationConfig());
+    LOGGER.info(agentConfiguration.getPostvalidationConfig());
+
+
+    Set<String> sources = Sets.newHashSet("source1");
+    Set<String> sinks = Sets.newHashSet("sink1");
+    Set<String> channels = Sets.newHashSet("channel1");
+
+    Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+    Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+    Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+  }
+}


Mime
View raw message