flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1291612 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-node/src/main/java/org/apache/flume/conf/properties/ flume-ng-node/src/test/java/org/apache/flume/node/ flume-ng-node/src/test/resources/
Date Tue, 21 Feb 2012 06:30:41 GMT
Author: arvind
Date: Tue Feb 21 06:30:41 2012
New Revision: 1291612

URL: http://svn.apache.org/viewvc?rev=1291612&view=rev
Log:
FLUME-865. Implement failover sink.

(Juhani Connolly via Arvind Prabhakar)

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.properties

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1291612&r1=1291611&r2=1291612&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
Tue Feb 21 06:30:41 2012
@@ -38,35 +38,35 @@ public class SinkRunner implements Lifec
   private Thread runnerThread;
   private LifecycleState lifecycleState;
 
-  private Sink sink;
+  private SinkProcessor policy;
 
   public SinkRunner() {
     counterGroup = new CounterGroup();
     lifecycleState = LifecycleState.IDLE;
   }
-  
-  public SinkRunner(Sink sink) {
+
+  public SinkRunner(SinkProcessor policy) {
     this();
-    setSink(sink);
+    setSink(policy);
   }
 
-  public Sink getSink() {
-    return sink;
+  public SinkProcessor getPolicy() {
+    return policy;
   }
 
-  public void setSink(Sink sink) {
-    this.sink = sink;
+  public void setSink(SinkProcessor policy) {
+    this.policy = policy;
   }
 
   @Override
   public void start() {
-    Sink sink = getSink();
+    SinkProcessor policy = getPolicy();
 
-    sink.start();
+    policy.start();
 
     runner = new PollingRunner();
 
-    runner.sink = sink;
+    runner.policy = policy;
     runner.counterGroup = counterGroup;
     runner.shouldStop = new AtomicBoolean();
 
@@ -79,7 +79,7 @@ public class SinkRunner implements Lifec
   @Override
   public void stop() {
 
-    getSink().stop();
+    getPolicy().stop();
 
     if (runnerThread != null) {
       runner.shouldStop.set(true);
@@ -91,9 +91,9 @@ public class SinkRunner implements Lifec
           runnerThread.join(500);
         } catch (InterruptedException e) {
           logger
-              .debug(
-                  "Interrupted while waiting for runner thread to exit. Exception follows.",
-                  e);
+          .debug(
+              "Interrupted while waiting for runner thread to exit. Exception follows.",
+              e);
         }
       }
     }
@@ -103,7 +103,7 @@ public class SinkRunner implements Lifec
 
   @Override
   public String toString() {
-    return "SinkRunner: { sink:" + getSink() + " counterGroup:"
+    return "SinkRunner: { policy:" + getPolicy() + " counterGroup:"
         + counterGroup + " }";
   }
 
@@ -114,7 +114,7 @@ public class SinkRunner implements Lifec
 
   public static class PollingRunner implements Runnable {
 
-    private Sink sink;
+    private SinkProcessor policy;
     private AtomicBoolean shouldStop;
     private CounterGroup counterGroup;
 
@@ -124,12 +124,12 @@ public class SinkRunner implements Lifec
 
       while (!shouldStop.get()) {
         try {
-          if (sink.process().equals(Sink.Status.BACKOFF)) {
+          if (policy.process().equals(Sink.Status.BACKOFF)) {
             counterGroup.incrementAndGet("runner.backoffs");
 
             Thread.sleep(Math.min(
                 counterGroup.incrementAndGet("runner.backoffs.consecutive")
-                    * backoffSleepIncrement, maxBackoffSleep));
+                * backoffSleepIncrement, maxBackoffSleep));
           } else {
             counterGroup.set("runner.backoffs.consecutive", 0L);
           }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java?rev=1291612&r1=1291611&r2=1291612&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
Tue Feb 21 06:30:41 2012
@@ -30,6 +30,8 @@ import java.util.StringTokenizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * <p>
  * FlumeConfiguration is an in memory representation of the hierarchical
@@ -56,16 +58,20 @@ public class FlumeConfiguration {
   private static final String SOURCES_PREFIX = SOURCES + ".";
   private static final String SINKS = "sinks";
   private static final String SINKS_PREFIX = SINKS + ".";
+  private static final String SINKGROUPS = "sinkgroups";
+  private static final String SINKGROUPS_PREFIX = SINKGROUPS + ".";
   private static final String CHANNELS = "channels";
   private static final String CHANNELS_PREFIX = CHANNELS + ".";
   private static final String RUNNER = "runner";
   private static final String RUNNER_PREFIX = RUNNER + ".";
   private static final String ATTR_TYPE = "type";
+  private static final String ATTR_SINKS = "sinks";
   private static final String ATTR_CHANNEL = "channel";
   private static final String ATTR_CHANNELS = "channels";
   private static final String CLASS_CHANNEL = "channel";
   private static final String CLASS_SOURCE = "source";
   private static final String CLASS_SINK = "sink";
+  private static final String CLASS_SINKGROUP = "sinkgroup";
 
   private final Map<String, AgentConfiguration> agentConfigMap;
 
@@ -168,10 +174,12 @@ public class FlumeConfiguration {
     private String sources;
     private String sinks;
     private String channels;
+    private String sinkgroups;
 
     private final Map<String, ComponentConfiguration> sourceConfigMap;
     private final Map<String, ComponentConfiguration> sinkConfigMap;
     private final Map<String, ComponentConfiguration> channelConfigMap;
+    private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
 
     private AgentConfiguration(String agentName) {
       this.agentName = agentName;
@@ -179,6 +187,7 @@ public class FlumeConfiguration {
       sourceConfigMap = new HashMap<String, ComponentConfiguration>();
       sinkConfigMap = new HashMap<String, ComponentConfiguration>();
       channelConfigMap = new HashMap<String, ComponentConfiguration>();
+      sinkgroupConfigMap = new HashMap<String, ComponentConfiguration>();
     }
 
     public Collection<ComponentConfiguration> getChannels() {
@@ -193,6 +202,10 @@ public class FlumeConfiguration {
       return sinkConfigMap.values();
     }
 
+    public Collection<ComponentConfiguration> getSinkGroups() {
+      return sinkgroupConfigMap.values();
+    }
+
     /**
      * <p>
      * Checks the validity of the agent configuration. This method assumes that
@@ -219,13 +232,7 @@ public class FlumeConfiguration {
         return false;
       }
 
-      Set<String> channelSet = new HashSet<String>();
-      StringTokenizer channelTok = new StringTokenizer(channels, " \t");
-
-      while (channelTok.hasMoreTokens()) {
-        channelSet.add(channelTok.nextToken());
-      }
-
+      Set<String> channelSet = stringToSet(channels, " \t");
       validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL, ATTR_TYPE);
 
       if (channelSet.size() == 0) {
@@ -235,17 +242,32 @@ public class FlumeConfiguration {
         return false;
       }
 
-      // At least one channel is configured correctly
-      // Validate sources
-      Set<String> sourceSet = new HashSet<String>();
+      Set<String> sourceSet = validateSources(channelSet);
+      Set<String> sinkSet = validateSinks(channelSet);
+      Set<String> sinkgroupSet = validateGroups(sinkSet);
 
-      if (sources != null && sources.trim().length() > 0) {
-        StringTokenizer sourceTok = new StringTokenizer(sources, " \t");
+      // If no sources or sinks are present, then this is invalid
+      if (sourceSet.size() == 0 && sinkSet.size() == 0) {
+        logger.warn("Agent configuration for '" + agentName
+            + "' has no sources or sinks. Will be marked invalid.");
+        return false;
+      }
 
-        while (sourceTok.hasMoreTokens()) {
-          sourceSet.add(sourceTok.nextToken());
-        }
+      // Now rewrite the sources/sinks/channels
 
+      this.sources = getSpaceDelimitedList(sourceSet);
+      this.channels = getSpaceDelimitedList(channelSet);
+      this.sinks = getSpaceDelimitedList(sinkSet);
+      this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);
+
+      return true;
+    }
+
+    private Set<String> validateSources(Set<String> channelSet) {
+      Preconditions.checkArgument(channelSet != null && channelSet.size() > 0);
+      Set<String> sourceSet = stringToSet(sources, " \t");
+
+      if (sourceSet != null && sourceSet.size() > 0) {
         // Filter out any sources that have invalid channels
         Iterator<String> srcIt = sourceConfigMap.keySet().iterator();
 
@@ -295,16 +317,14 @@ public class FlumeConfiguration {
 
       validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE,
           ATTR_CHANNELS);
+      return sourceSet;
+    }
 
-      Set<String> sinkSet = new HashSet<String>();
-
-      if (sinks != null && sinks.trim().length() > 0) {
-        StringTokenizer sinkTok = new StringTokenizer(sinks, " \t");
-
-        while (sinkTok.hasMoreTokens()) {
-          sinkSet.add(sinkTok.nextToken());
-        }
+    private Set<String> validateSinks(Set<String> channelSet) {
+      Preconditions.checkArgument(channelSet != null && channelSet.size() > 0);
+      Set<String> sinkSet = stringToSet(sinks, " \t");
 
+      if (sinkSet != null && sinkSet.size() > 0) {
         // Filter out any sinks that have invalid channel
         Iterator<String> sinkIt = sinkConfigMap.keySet().iterator();
 
@@ -335,21 +355,89 @@ public class FlumeConfiguration {
 
       validateComponent(sinkSet, sinkConfigMap, CLASS_SINK, ATTR_TYPE,
           ATTR_CHANNEL);
+      return sinkSet;
+    }
 
-      // If no sources or sinks are present, then this is invalid
-      if (sourceSet.size() == 0 && sinkSet.size() == 0) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' has no sources or sinks. Will be marked invalid.");
-        return false;
-      }
-
-      // Now rewrite the sources/sinks/channels
+    /**
+     * Validates that each group has at least one sink, blocking other groups
+     * from acquiring it
+     * 
+     * @param sinkSet
+     *          Set of valid sinks
+     * @return Set of valid sinkgroups
+     */
+    private Set<String> validateGroups(Set<String> sinkSet) {
+      Set<String> sinkgroupSet = stringToSet(sinkgroups, " \t");
+      if(sinkgroupSet != null && sinkgroupSet.size() > 0) {
+        Iterator<String> groupIt = sinkgroupConfigMap.keySet().iterator();
+        Map<String, String> usedSinks = new HashMap<String, String>();
+        while (groupIt.hasNext()) {
+          String nextGroup = groupIt.next();
+          ComponentConfiguration groupConf = sinkgroupConfigMap.get(nextGroup);
+
+          if ( ! groupConf.hasAttribute(ATTR_SINKS)) {
+            logger.warn("Agent configuration for '" + agentName
+                + "' sinkGroup '" + groupConf.getComponentName()
+                + "' has no configured sinks. Removing.");
+            groupIt.remove();
+            continue;
+          }
+          Set<String> groupSinks = validGroupSinks(sinkSet, usedSinks,
+              groupConf);
+          if (groupSinks == null || groupSinks.isEmpty()) {
+            logger.warn("Agent configuration for '" + agentName
+                + "' sinkGroup '" + groupConf.getComponentName()
+                + "' has no valid sinks. Removing.");
+            groupIt.remove();
+          } else {
+            groupConf.setAttribute(ATTR_SINKS,
+                this.getSpaceDelimitedList(groupSinks));
+          }
 
-      this.sources = getSpaceDelimitedList(sourceSet);
-      this.channels = getSpaceDelimitedList(channelSet);
-      this.sinks = getSpaceDelimitedList(sinkSet);
+        }
+      }
+      validateComponent(sinkgroupSet, sinkgroupConfigMap, CLASS_SINKGROUP,
+          ATTR_SINKS);
+      return sinkgroupSet;
+    }
 
-      return true;
+    /**
+     * Check availability of sinks for group
+     * 
+     * @param sinkSet
+     *          [in]Existing valid sinks
+     * @param usedSinks
+     *          [in/out]Sinks already in use by other groups
+     * @param groupConf
+     *          [in]sinkgroup configuration
+     * @return List of sinks available and reserved for group
+     */
+    private Set<String> validGroupSinks(Set<String> sinkSet,
+        Map<String, String> usedSinks, ComponentConfiguration groupConf) {
+      Set<String> groupSinks = stringToSet(groupConf.getAttribute(ATTR_SINKS),
+          " \t");
+      if(groupSinks == null) return null;
+      Iterator<String> sinkIt = groupSinks.iterator();
+      while (sinkIt.hasNext()) {
+        String curSink = sinkIt.next();
+        if(usedSinks.containsKey(curSink)) {
+          logger.warn("Agent configuration for '" + agentName + "' sinkgroup '"
+              + groupConf.getComponentName() + "' sink '" + curSink
+              + "' in use by " + "another group: '" + usedSinks.get(curSink)
+              + "', sink not added");
+          sinkIt.remove();
+          continue;
+        } else if (!sinkSet.contains(curSink)) {
+          logger.warn("Agent configuration for '" + agentName + "' sinkgroup '"
+              + groupConf.getComponentName() + "' sink not found: '" + curSink
+              + "',  sink not added");
+          sinkIt.remove();
+          continue;
+        } else {
+          usedSinks.put(curSink, groupConf.getComponentName());
+        }
+      }
+      return groupSinks;
     }
 
     private String getSpaceDelimitedList(Set<String> entries) {
@@ -366,6 +454,18 @@ public class FlumeConfiguration {
       return sb.toString().trim();
     }
 
+    private Set<String> stringToSet(String target, String delim) {
+      Set<String> out = new HashSet<String>();
+      if(target == null || target.trim().length() == 0) {
+        return out;
+      }
+      StringTokenizer t = new StringTokenizer(target, delim);
+      while(t.hasMoreTokens()) {
+        out.add(t.nextToken());
+      }
+      return out;
+    }
+
     /**
      * <p>
      * Utility method to iterate over the component configuration to validate
@@ -427,7 +527,7 @@ public class FlumeConfiguration {
         }
       }
 
-      // Remove the active channels that are not configured
+      // Remove the active components that are not configured
       Iterator<String> activeIt = activeSet.iterator();
 
       while (activeIt.hasNext()) {
@@ -462,7 +562,7 @@ public class FlumeConfiguration {
           return true;
         } else {
           logger
-              .warn("Duplicate source list specified for agent: " + agentName);
+          .warn("Duplicate source list specified for agent: " + agentName);
           return false;
         }
       }
@@ -492,6 +592,19 @@ public class FlumeConfiguration {
         }
       }
 
+      // Check for sinkgroups
+      if (key.equals(SINKGROUPS))  {
+        if (sinkgroups == null) {
+          sinkgroups = value;
+
+          return true;
+        } else {
+          logger.warn("Duplicate channel list specfied for agent: "
+              + agentName);
+          return false;
+        }
+      }
+
       ComponentNameAndConfigKey cnck = parseConfigKey(key, SOURCES_PREFIX);
 
       if (cnck != null) {
@@ -537,6 +650,19 @@ public class FlumeConfiguration {
         return sinkConf.addProperty(cnck.getConfigKey(), value);
       }
 
+      cnck = parseConfigKey(key, SINKGROUPS_PREFIX);
+
+      if (cnck != null) {
+        String name = cnck.getComponentName();
+        ComponentConfiguration groupConf = sinkgroupConfigMap.get(name);
+        if(groupConf == null) {
+          groupConf = new ComponentConfiguration(name, true);
+          sinkgroupConfigMap.put(name, groupConf);
+        }
+
+        return groupConf.addProperty(cnck.getConfigKey(), value);
+      }
+
       logger.warn("Invalid property specified: " + key);
       return false;
     }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1291612&r1=1291611&r2=1291612&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
Tue Feb 21 06:30:41 2012
@@ -22,15 +22,18 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.StringTokenizer;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.Source;
 import org.apache.flume.SourceRunner;
@@ -42,6 +45,8 @@ import org.apache.flume.conf.file.Simple
 import org.apache.flume.conf.properties.FlumeConfiguration.AgentConfiguration;
 import org.apache.flume.conf.properties.FlumeConfiguration.ComponentConfiguration;
 import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.SinkGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,6 +111,17 @@ import org.slf4j.LoggerFactory;
  * runner as needed. For example:
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval
=
  * 60</tt></li>
+ * <li>A fourth optional list <tt>&lt;agent name&gt;.sinkgroups</tt>
+ * may be added to each agent, consisting of unique space separated names 
+ * for groups</li>
+ * <li>Each sinkgroup must specify sinks, containing a list of all sinks 
+ * belonging to it. These cannot be shared by multiple groups.
+ * Further, one can set a processor and behavioral parameters to determine
+ * how sink selection is made via <tt>&lt;agent name&gt;.sinkgroups.&lt;
+ * group name&lt.processor</tt>. For further detail refer to inividual processor
+ * documentation</li>
+ * <li>Sinks not assigned to a group will be assigned to default single sink
+ * groups.</li>
  * </ul>
  *
  * Apart from the above required configuration values, each source, sink or
@@ -173,7 +189,7 @@ import org.slf4j.LoggerFactory;
  * @see java.util.Properties#load(java.io.Reader)
  */
 public class PropertiesFileConfigurationProvider extends
-    AbstractFileConfigurationProvider {
+AbstractFileConfigurationProvider {
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(PropertiesFileConfigurationProvider.class);
@@ -278,6 +294,7 @@ public class PropertiesFileConfiguration
   private void loadSinks(AgentConfiguration agentConf, NodeConfiguration conf)
       throws InstantiationException {
 
+    Map<String, Sink> sinks = new HashMap<String, Sink>();
     for (ComponentConfiguration comp : agentConf.getSinks()) {
       Context context = new Context();
       Map<String, String> componentConfig = comp.getConfiguration();
@@ -294,8 +311,61 @@ public class PropertiesFileConfiguration
 
       sink.setChannel(conf.getChannels().get(
           componentConfig.get("channel")));
+      sinks.put(comp.getComponentName(), sink);
+    }
+
+    loadSinkGroups(agentConf, sinks, conf);
+  }
+
+  private void loadSinkGroups(AgentConfiguration agentConf,
+      Map<String, Sink> sinks, NodeConfiguration conf)
+          throws InstantiationException {
+    Map<String, String> usedSinks = new HashMap<String, String>();
+    for (ComponentConfiguration comp : agentConf.getSinkGroups()) {
+      Context context = new Context();
+      String groupName = comp.getComponentName();
+      Map<String, String> groupConf = comp.getConfiguration();
+      for (Entry<String, String> ent : groupConf.entrySet()) {
+        context.put(ent.getKey(), ent.getValue());
+      }
+      String groupSinkList = groupConf.get("sinks");
+      StringTokenizer sinkTokenizer = new StringTokenizer(groupSinkList, " \t");
+      List<Sink> groupSinks = new ArrayList<Sink>();
+      while(sinkTokenizer.hasMoreTokens()) {
+        String sinkName = sinkTokenizer.nextToken();
+        Sink s = sinks.remove(sinkName);
+        if(s == null) {
+          String sinkUser = usedSinks.get(sinkName);
+          if(sinkUser != null) {
+            throw new InstantiationException(String.format(
+                "Sink %s of group %s already " +
+                "in use by group %s", sinkName, groupName, sinkUser));
+          } else {
+            throw new InstantiationException(String.format(
+                "Sink %s of group %s does "
+                    + "not exist or is not properly configured", sinkName,
+                groupName));
+          }
+        }
+        groupSinks.add(s);
+        usedSinks.put(sinkName, groupName);
+      }
+      SinkGroup group = new SinkGroup(groupSinks);
+      Configurables.configure(group, context);
       conf.getSinkRunners().put(comp.getComponentName(),
-          new SinkRunner(sink));
+          new SinkRunner(group.getProcessor()));
+    }
+    // add any unasigned sinks to solo collectors
+    for(Entry<String, Sink> entry : sinks.entrySet()) {
+      if (!usedSinks.containsValue(entry.getKey())) {
+        SinkProcessor pr = new DefaultSinkProcessor();
+        List<Sink> sinkMap = new ArrayList<Sink>();
+        sinkMap.add(entry.getValue());
+        pr.setSinks(sinkMap);
+        Configurables.configure(pr, new Context());
+        conf.getSinkRunners().put(entry.getKey(),
+            new SinkRunner(pr));
+      }
     }
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1291612&r1=1291611&r2=1291612&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
Tue Feb 21 06:30:41 2012
@@ -26,6 +26,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.Source;
 import org.apache.flume.SourceRunner;
@@ -38,6 +39,7 @@ import org.apache.flume.lifecycle.Lifecy
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
+import org.apache.flume.sink.DefaultSinkProcessor;
 import org.apache.flume.sink.NullSink;
 import org.apache.flume.source.SequenceGeneratorSource;
 import org.junit.Assert;
@@ -126,7 +128,7 @@ public class TestAbstractLogicalNodeMana
 
   @Test
   public void testEmptyLifecycle() throws LifecycleException,
-      InterruptedException {
+  InterruptedException {
 
     nodeManager.start();
     boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -162,7 +164,11 @@ public class TestAbstractLogicalNodeMana
     nullSink.setChannel(channel);
 
     nodeManager.add(SourceRunner.forSource(generatorSource));
-    nodeManager.add(new SinkRunner(nullSink));
+    SinkProcessor processor = new DefaultSinkProcessor();
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(nullSink);
+    processor.setSinks(sinks);
+    nodeManager.add(new SinkRunner(processor));
 
     nodeManager.start();
     boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -181,7 +187,7 @@ public class TestAbstractLogicalNodeMana
 
   @Test
   public void testRapidLifecycleFlapping() throws LifecycleException,
-      InterruptedException {
+  InterruptedException {
 
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
@@ -198,7 +204,11 @@ public class TestAbstractLogicalNodeMana
     sink.setChannel(channel);
 
     nodeManager.add(SourceRunner.forSource(source));
-    nodeManager.add(new SinkRunner(sink));
+    SinkProcessor processor = new DefaultSinkProcessor();
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(sink);
+    processor.setSinks(sinks);
+    nodeManager.add(new SinkRunner(processor));
 
     for (int i = 0; i < 10; i++) {
       nodeManager.start();
@@ -207,7 +217,7 @@ public class TestAbstractLogicalNodeMana
 
       Assert.assertTrue(reached);
       Assert
-          .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+      .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
 
       nodeManager.stop();
       reached = LifecycleController.waitForOneOf(nodeManager,

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.properties?rev=1291612&r1=1291611&r2=1291612&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.properties
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.properties
Tue Feb 21 06:30:41 2012
@@ -99,3 +99,29 @@ host4.channels.ch2.foo = bar
 host4.channels.ch2.type = abc
 host4.channels.ch3.type = foo
 host4.channels.ch3.xxx = yyy
+
+#
+# Agent configuration for host5 - valid using a sinkgroup with a failover processor
+# One of the sinks isn't properly configured but the group should let it fail and drop down
+# to two sinks
+#
+
+host5.sources = src1
+host5.channels = ch1
+host5.sinks = sink1 sink2 sink3
+host5.sinkgroups = sg1
+
+host5.channels.ch1.type = abc
+
+host5.sources.src1.type = def
+host5.sources.src1.channels = ch1
+
+host5.sinks.sink1.type = foo
+host5.sinks.sink1.channel = ch1
+host5.sinks.sink2.type = bar
+host5.sinks.sink2.channel = ch1
+
+host5.sinkgroups.sg1.sinks = sink1 sink2 sink3
+host5.sinkgroups.sg1.policy.type = failover
+host5.sinkgroups.sg1.policy.priority.sink1 = 1
+host5.sinkgroups.sg1.policy.priority.sink2 = 2



Mime
View raw message