flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1172753 - in /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file: JsonFileConfigurationProvider.java JsonFlumeConfiguration.java
Date Mon, 19 Sep 2011 19:12:58 GMT
Author: esammer
Date: Mon Sep 19 19:12:57 2011
New Revision: 1172753

URL: http://svn.apache.org/viewvc?rev=1172753&view=rev
Log:
- Keep track of source / sink / channel names in JsonFlumeConfiguration so we can wire up
{source,sink} -> channel dependencies.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java?rev=1172753&r1=1172752&r2=1172753&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
Mon Sep 19 19:12:57 2011
@@ -103,17 +103,26 @@ public class JsonFileConfigurationProvid
 
     logger.debug("Loading sources");
 
-    for (Map<String, Object> source : defs) {
-      logger.debug("source:{}", source);
+    for (Map<String, Object> sourceDef : defs) {
+      logger.debug("source:{}", sourceDef);
 
-      if (source.containsKey("type")) {
-        Source s = sourceFactory.create((String) source.get("type"));
+      if (sourceDef.containsKey("type")) {
+        Source source = sourceFactory.create((String) sourceDef.get("type"));
+        Channel channel = conf.getChannels().get(sourceDef.get("channel"));
+
+        if (channel != null) {
+          source.setChannel(channel);
+        } else {
+          logger.warn(
+              "No channel named {} - source:{} is likely non-functional.",
+              source, sourceDef.get("channel"));
+        }
 
-        // s.setChannel(knownChannels.get(source.get("channel")));
-        conf.getSourceRunners().add(SourceRunner.forSource(s));
+        conf.getSourceRunners().put((String) sourceDef.get("name"),
+            SourceRunner.forSource(source));
       } else {
         throw new IllegalArgumentException("Illegal source definition:"
-            + source + " - Missing type.");
+            + sourceDef + " - Missing type.");
       }
     }
   }
@@ -123,13 +132,23 @@ public class JsonFileConfigurationProvid
 
     logger.debug("Loading sinks");
 
-    for (Map<String, Object> sink : defs) {
-      logger.debug("sink:{}", sink);
+    for (Map<String, Object> sinkDef : defs) {
+      logger.debug("sink:{}", sinkDef);
 
-      if (sink.containsKey("type")) {
-        Sink s = sinkFactory.create((String) sink.get("type"));
-        // s.setChannel(knownChannels.get(source.get("channel")));
-        conf.getSinkRunners().add(SinkRunner.forSink(s));
+      if (sinkDef.containsKey("type")) {
+        Sink sink = sinkFactory.create((String) sinkDef.get("type"));
+        Channel channel = conf.getChannels().get(sinkDef.get("channel"));
+
+        if (channel != null) {
+          sink.setChannel(channel);
+        } else {
+          logger.warn(
+              "No channel named {} - sink:{} is likely non-functional.", sink,
+              sinkDef.get("channel"));
+        }
+
+        conf.getSinkRunners().put((String) sinkDef.get("name"),
+            SinkRunner.forSink(sink));
       }
     }
   }
@@ -146,7 +165,7 @@ public class JsonFileConfigurationProvid
         Channel channel = channelFactory
             .create((String) channelDef.get("type"));
 
-        conf.getChannels().add(channel);
+        conf.getChannels().put((String) channelDef.get("name"), channel);
       }
     }
 
@@ -166,9 +185,13 @@ public class JsonFileConfigurationProvid
 
         logger.debug("host:{}", hostDef);
 
+        /*
+         * NB: Because load{Sources,Sinks} wire up dependencies (i.e. channels),
+         * loadChannels must always be executed first.
+         */
+        loadChannels(flumeConf, hostDef.getValue().get("channels"));
         loadSources(flumeConf, hostDef.getValue().get("sources"));
         loadSinks(flumeConf, hostDef.getValue().get("sinks"));
-        loadChannels(flumeConf, hostDef.getValue().get("channels"));
       }
 
       logger.debug("Loaded conf:{}", flumeConf);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java?rev=1172753&r1=1172752&r2=1172753&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFlumeConfiguration.java
Mon Sep 19 19:12:57 2011
@@ -1,7 +1,7 @@
 package org.apache.flume.conf.file;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.flume.Channel;
 import org.apache.flume.SinkRunner;
@@ -9,14 +9,14 @@ import org.apache.flume.SourceRunner;
 
 public class JsonFlumeConfiguration {
 
-  private Set<Channel> channels;
-  private Set<SourceRunner> sourceRunners;
-  private Set<SinkRunner> sinkRunners;
+  private Map<String, Channel> channels;
+  private Map<String, SourceRunner> sourceRunners;
+  private Map<String, SinkRunner> sinkRunners;
 
   public JsonFlumeConfiguration() {
-    channels = new HashSet<Channel>();
-    sourceRunners = new HashSet<SourceRunner>();
-    sinkRunners = new HashSet<SinkRunner>();
+    channels = new HashMap<String, Channel>();
+    sourceRunners = new HashMap<String, SourceRunner>();
+    sinkRunners = new HashMap<String, SinkRunner>();
   }
 
   @Override
@@ -25,27 +25,27 @@ public class JsonFlumeConfiguration {
         + " channels:" + channels + " }";
   }
 
-  public Set<Channel> getChannels() {
+  public Map<String, Channel> getChannels() {
     return channels;
   }
 
-  public void setChannels(Set<Channel> channels) {
+  public void setChannels(Map<String, Channel> channels) {
     this.channels = channels;
   }
 
-  public Set<SourceRunner> getSourceRunners() {
+  public Map<String, SourceRunner> getSourceRunners() {
     return sourceRunners;
   }
 
-  public void setSourceRunners(Set<SourceRunner> sourceRunners) {
+  public void setSourceRunners(Map<String, SourceRunner> sourceRunners) {
     this.sourceRunners = sourceRunners;
   }
 
-  public Set<SinkRunner> getSinkRunners() {
+  public Map<String, SinkRunner> getSinkRunners() {
     return sinkRunners;
   }
 
-  public void setSinkRunners(Set<SinkRunner> sinkRunners) {
+  public void setSinkRunners(Map<String, SinkRunner> sinkRunners) {
     this.sinkRunners = sinkRunners;
   }
 



Mime
View raw message