flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1329280 [2/2] - in /incubator/flume/trunk: ./ flume-ng-configuration/ flume-ng-configuration/src/ flume-ng-configuration/src/main/ flume-ng-configuration/src/main/java/ flume-ng-configuration/src/main/java/org/ flume-ng-configuration/src/m...
Date Mon, 23 Apr 2012 15:18:34 GMT
Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.conf.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.apache.flume.conf.FlumeConfigurationErrorType;
+import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
+import org.apache.flume.conf.channel.ChannelConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+
+public class SinkConfiguration extends ComponentConfiguration {
+
+  protected String channel;
+
+  public SinkConfiguration(String componentName) {
+    super(componentName);
+  }
+
+  public String getChannel() {
+    return channel;
+  }
+
+  public void getChannel(String channel) {
+    this.channel = channel;
+  }
+
+  public void configure(Context context) throws ConfigurationException {
+    super.configure(context);
+    this.channel = context.getString("channel");
+    if (this.channel == null || this.channel.isEmpty()) {
+      errors
+          .add(new FlumeConfigurationError(componentName, "channel",
+              FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
+              ErrorOrWarning.ERROR));
+      throw new ConfigurationException("No channel configured for sink: "
+          + this.getComponentName());
+    }
+  }
+
+  public enum SinkConfigurationType {
+    /**
+     * Place holder for custom sinks not part of this enumeration.
+     */
+    OTHER(null),
+
+    /**
+     * Null sink
+     *
+     * @see NullSink
+     */
+    NULL(null),
+
+    /**
+     * Logger sink
+     *
+     * @see LoggerSink
+     */
+    LOGGER(null),
+
+    /**
+     * Rolling file sink
+     *
+     * @see RollingFileSink
+     */
+    FILE_ROLL("org.apache.flume.conf.sink.RollingFileSinkConfiguration"),
+
+    /**
+     * HDFS Sink provided by org.apache.flume.sink.hdfs.HDFSEventSink
+     */
+    HDFS("org.apache.flume.conf.sink.HDFSSinkConfiguration"),
+
+    /**
+     * IRC Sink provided by org.apache.flume.sink.irc.IRCSink
+     */
+    IRC("org.apache.flume.conf.sink.IRCSinkConfiguration"),
+
+    /**
+     * Avro sink
+     *
+     * @see AvroSink
+     */
+    AVRO("org.apache.flume.conf.sink.AvroSinkConfiguration");
+
+    private final String sinkConfigurationName;
+
+    private SinkConfigurationType(String type) {
+      this.sinkConfigurationName = type;
+    }
+
+    public String getSinkConfigurationType() {
+      return this.sinkConfigurationName;
+    }
+
+    @SuppressWarnings("unchecked")
+    public SinkConfiguration getConfiguration(String name)
+        throws ConfigurationException {
+
+      Class<? extends SinkConfiguration> clazz;
+      SinkConfiguration instance = null;
+      try {
+        if (sinkConfigurationName != null) {
+          clazz =
+              (Class<? extends SinkConfiguration>) Class
+                  .forName(sinkConfigurationName);
+          instance = clazz.getConstructor(String.class).newInstance(name);
+        } else {
+          return new SinkConfiguration(name);
+        }
+      } catch (ClassNotFoundException e) {
+        // Could not find the configuration stub, do basic validation
+        instance = new SinkConfiguration(name);
+        // Let the caller know that this was created because of this exception.
+        instance.setNotFoundConfigClass();
+      } catch (Exception e){
+        throw new ConfigurationException("Couldn't create configuration", e);
+      }
+      return instance;
+    }
+
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.conf.sink;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ComponentConfigurationFactory;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.FlumeConfiguration;
+
+public class SinkGroupConfiguration extends ComponentConfiguration {
+  private static final String PROCESSOR_PREFIX = "processor.";
+  private Context processorContext;
+  private List<String> sinks;
+  private SinkProcessorConfiguration processorConf;
+
+  public SinkGroupConfiguration(String name) {
+    super(name);
+    setType(ComponentType.SINKGROUP.getComponentType());
+  }
+
+  public void setSinks(List<String> sinks) {
+    this.sinks = sinks;
+  }
+
+  public List<String> getSinks() {
+    return sinks;
+  }
+
+  @Override
+  public void configure(Context context) throws ConfigurationException {
+    super.configure(context);
+    sinks = Arrays.asList(context.getString("sinks").split("\\s+"));
+    Map<String, String> params = context.getSubProperties(PROCESSOR_PREFIX);
+    processorContext = new Context();
+    processorContext.putAll(params);
+    SinkProcessorType spType =
+        getKnownSinkProcessor(
+            processorContext.getString(FlumeConfiguration.CONF_TYPE));
+
+    if (spType != null) {
+      processorConf =
+          (SinkProcessorConfiguration) ComponentConfigurationFactory.create(
+              this.getComponentName() + "-processor",
+              spType.toString(),
+              ComponentType.SINK_PROCESSOR);
+      if (processorConf != null) {
+        processorConf.setSinks(new HashSet<String>(sinks));
+        processorConf.configure(processorContext);
+      }
+    }
+    setConfigured();
+  }
+
+  public Context getProcessorContext() {
+    return processorContext;
+  }
+
+  public void setProcessorContext(Context processorContext) {
+    this.processorContext = processorContext;
+  }
+
+  public SinkProcessorConfiguration getSinkProcessorConfiguration() {
+    return processorConf;
+  }
+
+  public void setSinkProcessorConfiguration(SinkProcessorConfiguration conf) {
+    this.processorConf = conf;
+  }
+
+  private SinkProcessorType getKnownSinkProcessor(String type) {
+    SinkProcessorType[] values = SinkProcessorType.values();
+    for (SinkProcessorType value : values) {
+      if (value.toString().equalsIgnoreCase(type)) return value;
+      String sinkProcessClassName = value.getSinkProcessorClassName();
+      if (sinkProcessClassName != null
+          && sinkProcessClassName.equalsIgnoreCase(type)){
+        return value;
+      }
+    }
+    return null;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.conf.sink;
+
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType;
+import org.apache.flume.conf.source.SourceConfiguration;
+
+public class SinkProcessorConfiguration extends ComponentConfiguration {
+  protected Set<String> sinks;
+
+  protected SinkProcessorConfiguration(String componentName) {
+    super(componentName);
+    setType("default");
+  }
+
+  public void configure(Context context) throws ConfigurationException {
+
+  }
+
+  public Set<String> getSinks() {
+    return sinks;
+  }
+
+  public void setSinks(Set<String> sinks) {
+    this.sinks = sinks;
+  }
+
+  public enum SinkProcessorConfigurationType {
+    /**
+     * Failover processor
+     *
+     * @see FailoverSinkProcessor
+     */
+    FAILOVER("org.apache.flume.conf.sink.FailoverSinkProcessorConfiguration"),
+
+    /**
+     * Standard processor
+     *
+     * @see DefaultSinkProcessor
+     */
+    DEFAULT(null);
+    private final String processorClassName;
+
+    private SinkProcessorConfigurationType(String processorClassName) {
+      this.processorClassName = processorClassName;
+    }
+
+    public String getSinkProcessorConfigurationType() {
+      return processorClassName;
+    }
+
+    @SuppressWarnings("unchecked")
+    public SinkProcessorConfiguration getConfiguration(String name)
+        throws ConfigurationException {
+      Class<? extends SinkProcessorConfiguration> clazz;
+      SinkProcessorConfiguration instance = null;
+      try {
+        if (processorClassName != null) {
+          clazz =
+              (Class<? extends SinkProcessorConfiguration>) Class
+                  .forName(processorClassName);
+          instance = clazz.getConstructor(String.class).newInstance(name);
+
+        } else {
+          return new SinkProcessorConfiguration(name);
+        }
+      } catch (ClassNotFoundException e) {
+        // Could not find the configuration stub, do basic validation
+        instance = new SinkProcessorConfiguration(name);
+        // Let the caller know that this was created because of this exception.
+        instance.setNotFoundConfigClass();
+      } catch (Exception e) {
+        throw new ConfigurationException(
+            "Could not instantiate configuration!", e);
+      }
+      return instance;
+    }
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.conf.sink;
+
+public enum SinkProcessorType {
+  /**
+   * Failover processor
+   *
+   * @see FailoverSinkProcessor
+   */
+  FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),
+
+  /**
+   * Standard processor
+   *
+   * @see DefaultSinkProcessor
+   */
+  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor");
+  private final String processorClassName;
+
+  private SinkProcessorType(String processorClassName) {
+    this.processorClassName = processorClassName;
+  }
+
+  public String getSinkProcessorClassName() {
+    return processorClassName;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.conf.sink;
+
+/**
+ * Enumeration of built in sink types available in the system.
+ */
+public enum SinkType {
+
+  /**
+   * Place holder for custom sinks not part of this enumeration.
+   */
+  OTHER(null),
+
+  /**
+   * Null sink
+   *
+   * @see NullSink
+   */
+  NULL("org.apache.flume.sink.NullSink"),
+
+  /**
+   * Logger sink
+   *
+   * @see LoggerSink
+   */
+  LOGGER("org.apache.flume.sink.LoggerSink"),
+
+  /**
+   * Rolling file sink
+   *
+   * @see RollingFileSink
+   */
+  FILE_ROLL("org.apache.flume.sink.RollingFileSink"),
+
+  /**
+   * HDFS Sink provided by org.apache.flume.sink.hdfs.HDFSEventSink
+   */
+  HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),
+
+  /**
+   * IRC Sink provided by org.apache.flume.sink.irc.IRCSink
+   */
+  IRC("org.apache.flume.sink.irc.IRCSink"),
+
+  /**
+   * Avro sink
+   *
+   * @see AvroSink
+   */
+  AVRO("org.apache.flume.sink.AvroSink");
+
+  private final String sinkClassName;
+
+  private SinkType(String sinkClassName) {
+    this.sinkClassName = sinkClassName;
+  }
+
+  public String getSinkClassName() {
+    return sinkClassName;
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.conf.source;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ComponentConfigurationFactory;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.apache.flume.conf.FlumeConfigurationErrorType;
+import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType;
+import org.apache.flume.conf.channel.ChannelSelectorType;
+import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType;
+
+public class SourceConfiguration extends ComponentConfiguration {
+  protected Set<String> channels;
+  protected ChannelSelectorConfiguration selectorConf;
+  private static final String CHANNELS_CONF = "channels";
+  public SourceConfiguration(String componentName) {
+    super(componentName);
+  }
+
+  public Set<String> getChannels() {
+    return channels;
+  }
+
+  public ChannelSelectorConfiguration getSelectorConfiguration() {
+    return selectorConf;
+  }
+
+  public void configure(Context context) throws ConfigurationException {
+    super.configure(context);
+    try {
+      String channelList = context.getString(CHANNELS_CONF);
+      if (channelList != null) {
+        this.channels =
+            new HashSet<String>(Arrays.asList(channelList.split("\\s+")));
+        if (channels.isEmpty()) {
+          errors.add(new FlumeConfigurationError(componentName,
+              ComponentType.CHANNEL.getComponentType(),
+              FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
+              ErrorOrWarning.ERROR));
+          throw new ConfigurationException("No channels set for "
+              + this.getComponentName());
+        }
+      }
+      Map<String, String> selectorParams =
+          context.getSubProperties("selector.");
+      String selType;
+      if (selectorParams != null && !selectorParams.isEmpty()) {
+        selType = selectorParams.get(FlumeConfiguration.CONF_TYPE);
+        System.out.println("Loading selector: " + selType);
+      } else {
+        selType = ChannelSelectorConfigurationType.REPLICATING.toString();
+      }
+
+      if (selType == null || selType.isEmpty()) {
+        selType = ChannelSelectorConfigurationType.REPLICATING.toString();
+
+      }
+      ChannelSelectorType selectorType =
+          this.getKnownChannelSelector(selType);
+      Context selectorContext = new Context();
+      selectorContext.putAll(selectorParams);
+      String config = null;
+      if (selectorType == null) {
+        config = selectorContext.getString(FlumeConfiguration.CONF_CONFIG);
+        if (config == null || config.isEmpty()) {
+          config = "OTHER";
+        }
+      } else {
+        config = selectorType.toString().toUpperCase();
+      }
+
+      this.selectorConf =
+          (ChannelSelectorConfiguration) ComponentConfigurationFactory
+              .create(ComponentType.CHANNELSELECTOR.getComponentType(), config,
+                  ComponentType.CHANNELSELECTOR);
+      selectorConf.setChannels(channels);
+      selectorConf.configure(selectorContext);
+    } catch (Exception e) {
+      errors.add(new FlumeConfigurationError(componentName,
+          ComponentType.CHANNELSELECTOR.getComponentType(),
+          FlumeConfigurationErrorType.CONFIG_ERROR,
+          ErrorOrWarning.ERROR));
+      throw new ConfigurationException("Failed to configure component!", e);
+    }
+  }
+
+  private ChannelSelectorType getKnownChannelSelector(String type) {
+    ChannelSelectorType[] values = ChannelSelectorType.values();
+    for (ChannelSelectorType value : values) {
+      if (value.toString().equalsIgnoreCase(type)) return value;
+      String clName = value.getChannelSelectorClassName();
+      if (clName != null && clName.equalsIgnoreCase(type)) return value;
+    }
+    return null;
+  }
+
+  public enum SourceConfigurationType {
+    OTHER(null),
+
+    SEQ(null),
+    /**
+     * Netcat source.
+     *
+     * @see NetcatSource
+     */
+    NETCAT("org.apache.flume.conf.source.NetcatSourceConfiguration"),
+
+    /**
+     * Exec source.
+     *
+     * @see ExecSource
+     */
+    EXEC("org.apache.flume.conf.source.ExecSourceConfiguration"),
+
+    /**
+     * Avro soruce.
+     *
+     * @see AvroSource
+     */
+    AVRO("org.apache.flume.conf.source.AvroSourceConfiguration"),
+
+    /**
+     * Syslog Tcp Source
+     *
+     * @see org.apache.flume.source.SyslogTcpSource
+     */
+    SYSLOGTCP("org.apache.flume.conf.source.SyslogTcpSourceConfiguration"),
+
+    /**
+     * Syslog Udp Source
+     *
+     * @see org.apache.flume.source.SyslogUDPSource
+     */
+
+    SYSLOGUDP("org.apache.flume.conf.source.SyslogUDPSourceConfiguration");
+
+    private String srcConfigurationName;
+
+    private SourceConfigurationType(String src) {
+      this.srcConfigurationName = src;
+    }
+
+    public String getSourceConfigurationType() {
+      return this.getSourceConfigurationType();
+    }
+
+    @SuppressWarnings("unchecked")
+    public SourceConfiguration getConfiguration(String name)
+        throws ConfigurationException {
+      if (this.equals(ChannelConfigurationType.OTHER)) {
+        return new SourceConfiguration(name);
+      }
+      Class<? extends SourceConfiguration> clazz = null;
+      SourceConfiguration instance = null;
+      try {
+        if (srcConfigurationName != null) {
+          clazz =
+              (Class<? extends SourceConfiguration>) Class
+                  .forName(srcConfigurationName);
+          instance = clazz.getConstructor(String.class).newInstance(name);
+        }
+        else {
+          return new SourceConfiguration(name);
+        }
+      } catch (ClassNotFoundException e) {
+        // Could not find the configuration stub, do basic validation
+        instance = new SourceConfiguration(name);
+        // Let the caller know that this was created because of this exception.
+        instance.setNotFoundConfigClass();
+      } catch (Exception e) {
+        throw new ConfigurationException("Error creating configuration", e);
+      }
+      return instance;
+    }
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java (added)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.conf.source;
+
+/**
+ * Enumeration of built in source types available in the system.
+ */
+public enum SourceType {
+
+  /**
+   * Place holder for custom sources not part of this enumeration.
+   */
+  OTHER(null),
+
+  /**
+   * Sequence generator file source.
+   *
+   * @see org.apache.flume.source.SequenceGeneratorSource
+   */
+  SEQ("org.apache.flume.source.SequenceGeneratorSource"),
+
+  /**
+   * Netcat source.
+   *
+   * @see org.apache.flume.source.NetcatSource
+   */
+  NETCAT("org.apache.flume.source.NetcatSource"),
+
+  /**
+   * Exec source.
+   *
+   * @see org.apache.flume.source.ExecSource
+   */
+  EXEC("org.apache.flume.source.ExecSource"),
+
+  /**
+   * Avro source.
+   *
+   * @see org.apache.flume.source.AvroSource
+   */
+  AVRO("org.apache.flume.source.AvroSource"),
+
+  /**
+   * SyslogTcpSource
+   *
+   * @see org.apache.flume.source.SyslogTcpSource
+   */
+
+  SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),
+
+  /**
+   * SyslogUDPSource
+   *
+   * @see org.apache.flume.source.SyslogUDPSource
+   */
+
+  SYSLOGUDP("org.apache.flume.source.SyslogUDPSource");
+
+  private final String sourceClassName;
+
+  private SourceType(String sourceClassName) {
+    this.sourceClassName = sourceClassName;
+  }
+
+  public String getSourceClassName() {
+    return sourceClassName;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/pom.xml?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-core/pom.xml Mon Apr 23 15:18:32 2012
@@ -89,6 +89,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro-ipc</artifactId>
     </dependency>

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java Mon Apr 23 15:18:32 2012
@@ -21,6 +21,7 @@ package org.apache.flume;
 import java.util.List;
 
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
 
 /**
  * <p>

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkProcessor.java Mon Apr 23 15:18:32 2012
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.flume.Sink.Status;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
 import org.apache.flume.lifecycle.LifecycleAware;
 
 /**

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java Mon Apr 23 15:18:32 2012
@@ -26,6 +26,8 @@ import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
+import org.apache.flume.conf.channel.ChannelSelectorType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +47,18 @@ public class ChannelSelectorFactory {
     context.putAll(config);
 
     Configurables.configure(selector, context);
+    return selector;
+  }
 
+  public static ChannelSelector create(List<Channel> channels,
+      ChannelSelectorConfiguration conf) {
+    String type = "replicating";
+    if (conf != null){
+      type = conf.getType();
+    }
+    ChannelSelector selector = getSelectorForType(type);
+    selector.setChannels(channels);
+    Configurables.configure(selector, conf);
     return selector;
   }
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java Mon Apr 23 15:18:32 2012
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelFactory;
 import org.apache.flume.FlumeException;
+import org.apache.flume.conf.channel.ChannelType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/ConfigurableComponent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/ConfigurableComponent.java?rev=1329280&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/ConfigurableComponent.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/ConfigurableComponent.java Mon Apr 23 15:18:32 2012
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.conf;
+
+public interface ConfigurableComponent {
+
+  public void configure(ComponentConfiguration conf);
+
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/ConfigurableComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java Mon Apr 23 15:18:32 2012
@@ -45,6 +45,14 @@ public class Configurables {
     return false;
   }
 
+  public static boolean configure(Object target, ComponentConfiguration conf) {
+    if (target instanceof ConfigurableComponent) {
+      ((ConfigurableComponent) target).configure(conf);
+      return true;
+    }
+    return false;
+  }
+
   public static void ensureRequiredNonNull(Context context, String... keys) {
     for (String key : keys) {
       if (!context.getParameters().containsKey(key)

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java Mon Apr 23 15:18:32 2012
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.SinkFactory;
+import org.apache.flume.conf.sink.SinkType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java Mon Apr 23 15:18:32 2012
@@ -24,6 +24,8 @@ import org.apache.flume.EventDeliveryExc
 import org.apache.flume.Sink;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.SinkProcessor;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ConfigurableComponent;
 import org.apache.flume.lifecycle.LifecycleState;
 
 import com.google.common.base.Preconditions;
@@ -33,7 +35,8 @@ import com.google.common.base.Preconditi
  * results without any additional handling. Suitable for all sinks that aren't
  * assigned to a group.
  */
-public class DefaultSinkProcessor implements SinkProcessor {
+public class DefaultSinkProcessor implements SinkProcessor,
+ConfigurableComponent {
   private Sink sink;
   private LifecycleState lifecycleState;
 
@@ -74,4 +77,9 @@ public class DefaultSinkProcessor implem
     sink = sinks.get(0);
   }
 
+  @Override
+  public void configure(ComponentConfiguration conf) {
+
+  }
+
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java Mon Apr 23 15:18:32 2012
@@ -21,18 +21,23 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.SinkProcessor;
+import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
 
 /**
  * <p>Configuration concept for handling multiple sinks working together.</p>
  * @see org.apache.flume.conf.properties.PropertiesFileConfigurationProvider
  */
-public class SinkGroup implements Configurable {
-  private static final String PROCESSOR_PREFIX = "processor.";
+public class SinkGroup implements Configurable, ConfigurableComponent {
   List<Sink> sinks;
   SinkProcessor processor;
+  SinkGroupConfiguration conf;
 
   public SinkGroup(List<Sink> groupSinks) {
     sinks = groupSinks;
@@ -40,13 +45,25 @@ public class SinkGroup implements Config
 
   @Override
   public void configure(Context context) {
-    Context processorContext = new Context();
-    Map<String, String> subparams = context.getSubProperties(PROCESSOR_PREFIX);
-    processorContext.putAll(subparams);
-    processor = SinkProcessorFactory.getProcessor(processorContext, sinks);
+    conf = new SinkGroupConfiguration("sinkgrp");
+    try {
+      conf.configure(context);
+    } catch (ConfigurationException e) {
+      throw new FlumeException("Invalid Configuration!", e);
+    }
+    configure(conf);
+
   }
 
   public SinkProcessor getProcessor() {
     return processor;
   }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    this.conf = (SinkGroupConfiguration) conf;
+    processor =
+        SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
+            sinks);
+  }
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java Mon Apr 23 15:18:32 2012
@@ -24,7 +24,9 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.SinkProcessor;
-import org.apache.flume.SinkProcessorType;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.sink.SinkProcessorType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +56,7 @@ public class SinkProcessorFactory {
  List<Sink> sinks) {
     Map<String, String> params = context.getParameters();
     SinkProcessor processor;
-    String typeStr = (String) params.get(TYPE);
+    String typeStr = params.get(TYPE);
     SinkProcessorType type = SinkProcessorType.DEFAULT;
     try {
       type = SinkProcessorType.valueOf(typeStr.toUpperCase());
@@ -78,7 +80,41 @@ public class SinkProcessorFactory {
     }
 
     processor.setSinks(sinks);
-    processor.configure(context);
+    Configurables.configure(processor, context);
     return processor;
   }
+
+  @SuppressWarnings("unchecked")
+  public static SinkProcessor getProcessor(ComponentConfiguration conf,
+      List<Sink> sinks) {
+    String typeStr = conf.getType();
+    SinkProcessor processor;
+    SinkProcessorType type = SinkProcessorType.DEFAULT;
+    try {
+      type = SinkProcessorType.valueOf(typeStr.toUpperCase());
+    } catch (Exception ex) {
+      logger.warn("Sink type {} does not exist, using default", typeStr);
+    }
+
+    Class<? extends SinkProcessor> processorClass = null;
+    try {
+      processorClass =
+          (Class<? extends SinkProcessor>) Class.forName(type
+              .getSinkProcessorClassName());
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to load sink processor type: " + typeStr
+          + ", class: " + type.getSinkProcessorClassName(), ex);
+    }
+    try {
+      processor = processorClass.newInstance();
+    } catch (Exception e) {
+      throw new FlumeException("Unable to create processor, type: " + typeStr
+          + ", class: " + type.getSinkProcessorClassName(), e);
+    }
+
+    processor.setSinks(sinks);
+    Configurables.configure(processor, conf);
+    return processor;
+  }
+
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java Mon Apr 23 15:18:32 2012
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
+import org.apache.flume.conf.source.SourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Modified: incubator/flume/trunk/flume-ng-dist/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-dist/pom.xml?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-dist/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-dist/pom.xml Mon Apr 23 15:18:32 2012
@@ -65,6 +65,10 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-sdk</artifactId>
     </dependency>
     <dependency>

Modified: incubator/flume/trunk/flume-ng-dist/src/main/assembly/dist.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-dist/src/main/assembly/dist.xml?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-dist/src/main/assembly/dist.xml (original)
+++ incubator/flume/trunk/flume-ng-dist/src/main/assembly/dist.xml Mon Apr 23 15:18:32 2012
@@ -36,6 +36,7 @@
       <useAllReactorProjects>true</useAllReactorProjects>
 
       <includes>
+        <include>org.apache.flume:flume-ng-configuration</include>
         <include>org.apache.flume:flume-ng-sdk</include>
         <include>org.apache.flume:flume-ng-core</include>
         <include>org.apache.flume:flume-ng-node</include>
@@ -87,6 +88,7 @@
       <directory>../</directory>
 
       <excludes>
+        <exclude>flume-ng-configuration/**</exclude>
         <exclude>flume-ng-sdk/**</exclude>
         <exclude>flume-ng-core/**</exclude>
         <exclude>flume-ng-node/**</exclude>

Modified: incubator/flume/trunk/flume-ng-dist/src/main/assembly/src.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-dist/src/main/assembly/src.xml?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-dist/src/main/assembly/src.xml (original)
+++ incubator/flume/trunk/flume-ng-dist/src/main/assembly/src.xml Mon Apr 23 15:18:32 2012
@@ -36,6 +36,7 @@
       <useAllReactorProjects>true</useAllReactorProjects>
 
       <includes>
+        <include>org.apache.flume:flume-ng-configuration</include>
         <include>org.apache.flume:flume-ng-sdk</include>
         <include>org.apache.flume:flume-ng-core</include>
         <include>org.apache.flume:flume-ng-node</include>
@@ -75,6 +76,7 @@
       <directory>../</directory>
 
       <excludes>
+        <exclude>flume-ng-configuration/**</exclude>
         <exclude>flume-ng-sdk/**</exclude>
         <exclude>flume-ng-core/**</exclude>
         <exclude>flume-ng-node/**</exclude>

Modified: incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Mon Apr 23 15:18:32 2012
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.StringTokenizer;
+import java.util.Set;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
@@ -40,10 +40,15 @@ import org.apache.flume.SourceRunner;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.ChannelSelectorFactory;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
 import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
 import org.apache.flume.conf.file.SimpleNodeConfiguration;
-import org.apache.flume.conf.properties.FlumeConfiguration.AgentConfiguration;
-import org.apache.flume.conf.properties.FlumeConfiguration.ComponentConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.conf.sink.SinkGroupConfiguration;
+import org.apache.flume.conf.source.SourceConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.node.NodeConfiguration;
 import org.apache.flume.sink.DefaultSinkProcessor;
 import org.apache.flume.sink.SinkGroup;
@@ -112,9 +117,9 @@ import org.slf4j.LoggerFactory;
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
  * 60</tt></li>
  * <li>A fourth optional list <tt>&lt;agent name&gt;.sinkgroups</tt>
- * may be added to each agent, consisting of unique space separated names 
+ * may be added to each agent, consisting of unique space separated names
  * for groups</li>
- * <li>Each sinkgroup must specify sinks, containing a list of all sinks 
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks
  * belonging to it. These cannot be shared by multiple groups.
  * Further, one can set a processor and behavioral parameters to determine
  * how sink selection is made via <tt>&lt;agent name&gt;.sinkgroups.&lt;
@@ -238,49 +243,70 @@ public class PropertiesFileConfiguration
 
   private void loadChannels(AgentConfiguration agentConf,
       NodeConfiguration conf) throws InstantiationException {
-
+    LOGGER.info("Creating channels");
     for (ComponentConfiguration comp : agentConf.getChannels()) {
-      Context context = new Context();
-
       Channel channel = getChannelFactory().create(comp.getComponentName(),
-          comp.getConfiguration().get("type"));
+          comp.getType());
 
-      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
-        context.put(entry.getKey(), entry.getValue());
+      Configurables.configure(channel, comp);
+
+      conf.getChannels().put(comp.getComponentName(), channel);
       }
 
+    for (String ch : agentConf.getChannelContext().keySet()) {
+      Context context = agentConf.getChannelContext().get(ch);
+      Channel channel =
+          getChannelFactory().create(ch, context.getString("type"));
       Configurables.configure(channel, context);
-
-      conf.getChannels().put(comp.getComponentName(), channel);
+      conf.getChannels().put(ch, channel);
+      LOGGER.info("created channel " + ch);
     }
+
   }
 
-  private void
-      loadSources(AgentConfiguration agentConf, NodeConfiguration conf)
+  private void loadSources(AgentConfiguration agentConf, NodeConfiguration conf)
       throws InstantiationException {
 
     for (ComponentConfiguration comp : agentConf.getSources()) {
-      Context context = new Context();
-
-      Map<String, String> componentConfig = comp.getConfiguration();
+      SourceConfiguration config = (SourceConfiguration) comp;
 
       Source source = getSourceFactory().create(comp.getComponentName(),
-          componentConfig.get("type"));
+          comp.getType());
 
-      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
-        context.put(entry.getKey(), entry.getValue());
+      Configurables.configure(source, config);
+      Set<String> channelNames = config.getChannels();
+      List<Channel> channels = new ArrayList<Channel>();
+      for (String chName : channelNames) {
+        channels.add(conf.getChannels().get(chName));
       }
 
-      Configurables.configure(source, context);
+      ChannelSelectorConfiguration selectorConfig =
+          config.getSelectorConfiguration();
+
+      ChannelSelector selector = ChannelSelectorFactory.create(
+          channels, selectorConfig);
 
-      String channelNames = comp.getConfiguration().get("channels");
+      ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+
+      source.setChannelProcessor(channelProcessor);
+      conf.getSourceRunners().put(comp.getComponentName(),
+          SourceRunner.forSource(source));
+    }
+    Map<String, Context> sourceContexts = agentConf.getSourceContext();
       List<Channel> channels = new ArrayList<Channel>();
 
-      for (String chName : channelNames.split(" ")) {
+    for (String src : sourceContexts.keySet()) {
+      Context context = sourceContexts.get(src);
+      Source source =
+          getSourceFactory().create(src,
+              context.getString("type"));
+      Configurables.configure(source, context);
+      String[] channelNames = context.getString("channels").split("\\s+");
+      for (String chName : channelNames) {
         channels.add(conf.getChannels().get(chName));
       }
 
-      Map<String, String> selectorConfig = comp.getSubconfiguration("selector");
+      Map<String, String> selectorConfig = context.getSubProperties("selector");
 
       ChannelSelector selector = ChannelSelectorFactory.create(
           channels, selectorConfig);
@@ -288,8 +314,9 @@ public class PropertiesFileConfiguration
       ChannelProcessor channelProcessor = new ChannelProcessor(selector);
 
       source.setChannelProcessor(channelProcessor);
-      conf.getSourceRunners().put(comp.getComponentName(),
+      conf.getSourceRunners().put(src,
           SourceRunner.forSource(source));
+
     }
   }
 
@@ -298,21 +325,25 @@ public class PropertiesFileConfiguration
 
     Map<String, Sink> sinks = new HashMap<String, Sink>();
     for (ComponentConfiguration comp : agentConf.getSinks()) {
-      Context context = new Context();
-      Map<String, String> componentConfig = comp.getConfiguration();
 
+      SinkConfiguration config = (SinkConfiguration) comp;
       Sink sink = getSinkFactory().create(comp.getComponentName(),
-          componentConfig.get("type"));
+          comp.getType());
 
-      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
-        context.put(entry.getKey(), entry.getValue());
+      Configurables.configure(sink, config);
+
+      sink.setChannel(conf.getChannels().get(config.getChannel()));
+      sinks.put(comp.getComponentName(), sink);
       }
 
+    Map<String, Context> sinkContexts = agentConf.getSinkContext();
+    for (String sinkName : sinkContexts.keySet()) {
+      Context context = sinkContexts.get(sinkName);
+      Sink sink = getSinkFactory().create(sinkName, context.getString("type"));
       Configurables.configure(sink, context);
 
-      sink.setChannel(conf.getChannels().get(
-          componentConfig.get("channel")));
-      sinks.put(comp.getComponentName(), sink);
+      sink.setChannel(conf.getChannels().get(context.getString("channel")));
+      sinks.put(sinkName, sink);
     }
 
     loadSinkGroups(agentConf, sinks, conf);
@@ -323,41 +354,35 @@ public class PropertiesFileConfiguration
           throws InstantiationException {
     Map<String, String> usedSinks = new HashMap<String, String>();
     for (ComponentConfiguration comp : agentConf.getSinkGroups()) {
-      Context context = new Context();
       String groupName = comp.getComponentName();
-      Map<String, String> groupConf = comp.getConfiguration();
-      for (Entry<String, String> ent : groupConf.entrySet()) {
-        context.put(ent.getKey(), ent.getValue());
-      }
-      String groupSinkList = groupConf.get("sinks");
-      StringTokenizer sinkTokenizer = new StringTokenizer(groupSinkList, " \t");
+      SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
+      List<String> groupSinkList = groupConf.getSinks();
       List<Sink> groupSinks = new ArrayList<Sink>();
-      while (sinkTokenizer.hasMoreTokens()) {
-        String sinkName = sinkTokenizer.nextToken();
-        Sink s = sinks.remove(sinkName);
+      for (String sink : groupSinkList) {
+        Sink s = sinks.remove(sink);
         if (s == null) {
-          String sinkUser = usedSinks.get(sinkName);
+          String sinkUser = usedSinks.get(sink);
           if (sinkUser != null) {
             throw new InstantiationException(String.format(
                 "Sink %s of group %s already " +
-                "in use by group %s", sinkName, groupName, sinkUser));
+                    "in use by group %s", sink, groupName, sinkUser));
           } else {
             throw new InstantiationException(String.format(
                 "Sink %s of group %s does "
-                    + "not exist or is not properly configured", sinkName,
+                    + "not exist or is not properly configured", sink,
                 groupName));
           }
         }
         groupSinks.add(s);
-        usedSinks.put(sinkName, groupName);
+        usedSinks.put(sink, groupName);
       }
       SinkGroup group = new SinkGroup(groupSinks);
-      Configurables.configure(group, context);
+      Configurables.configure(group, groupConf);
       conf.getSinkRunners().put(comp.getComponentName(),
           new SinkRunner(group.getProcessor()));
     }
     // add any unasigned sinks to solo collectors
-    for (Entry<String, Sink> entry : sinks.entrySet()) {
+    for(Entry<String, Sink> entry : sinks.entrySet()) {
       if (!usedSinks.containsValue(entry.getKey())) {
         SinkProcessor pr = new DefaultSinkProcessor();
         List<Sink> sinkMap = new ArrayList<Sink>();
@@ -369,5 +394,4 @@ public class PropertiesFileConfiguration
       }
     }
   }
-
 }

Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1329280&r1=1329279&r2=1329280&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Mon Apr 23 15:18:32 2012
@@ -48,6 +48,7 @@ limitations under the License.
 
   <modules>
     <module>flume-ng-core</module>
+    <module>flume-ng-configuration</module>
     <module>flume-ng-sinks</module>
     <module>flume-ng-node</module>
     <module>flume-ng-dist</module>
@@ -628,6 +629,11 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-configuration</artifactId>
+        <version>1.2.0-incubating-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
         <artifactId>flume-ng-core</artifactId>
         <version>1.2.0-incubating-SNAPSHOT</version>
       </dependency>



Mime
View raw message