Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 741ADD6E6 for ; Fri, 25 Jan 2013 00:27:06 +0000 (UTC) Received: (qmail 1034 invoked by uid 500); 25 Jan 2013 00:27:06 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 999 invoked by uid 500); 25 Jan 2013 00:27:06 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 989 invoked by uid 99); 25 Jan 2013 00:27:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jan 2013 00:27:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 10D8E82506E; Fri, 25 Jan 2013 00:27:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1852. Issues with EmbeddedAgentConfiguration. Message-Id: <20130125002706.10D8E82506E@tyr.zones.apache.org> Date: Fri, 25 Jan 2013 00:27:06 +0000 (UTC) Updated Branches: refs/heads/trunk d45af178e -> 3df65e12c FLUME-1852. Issues with EmbeddedAgentConfiguration. (Brock Noland via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3df65e12 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3df65e12 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3df65e12 Branch: refs/heads/trunk Commit: 3df65e12c8d480cd46f190a0bb4addfee4272062 Parents: d45af17 Author: Mike Percy Authored: Thu Jan 24 16:26:03 2013 -0800 Committer: Mike Percy Committed: Thu Jan 24 16:26:33 2013 -0800 ---------------------------------------------------------------------- .../agent/embedded/EmbeddedAgentConfiguration.java | 73 ++++++++------- .../apache/flume/agent/embedded/package-info.java | 2 +- .../embedded/TestEmbeddedAgentConfiguration.java | 42 ++++++++- .../embedded/TestEmbeddedAgentEmbeddedSource.java | 1 - 4 files changed, 79 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java index e52f912..6204bc5 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java @@ -33,6 +33,7 @@ import org.apache.flume.conf.sink.SinkProcessorType; import org.apache.flume.conf.sink.SinkType; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; /** @@ -65,7 +66,7 @@ public class EmbeddedAgentConfiguration { public static final String SINKS_PREFIX = join(SINKS, ""); /** - * Source type, choices are `embedded' or `avro' + * Source type, choices are `embedded' */ public static final String SOURCE_TYPE = join(SOURCE, TYPE); /** @@ -81,7 +82,7 @@ public class EmbeddedAgentConfiguration { */ public static final String CHANNEL_PREFIX = join(CHANNEL, ""); /** - * Sink processor type, choices are `default' (failover) or `load_balance' + * Sink processor type, choices are `default', `failover' or `load_balance' */ public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE); /** @@ -90,10 +91,11 @@ public class EmbeddedAgentConfiguration { public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, ""); /** * Embedded source which provides simple in-memory transfer to channel. - * Use this source via the put,pulAll methods on the EmbeddedAgent. This - * is the recommended source to use for Embedded Agents. + * Use this source via the put,putAll methods on the EmbeddedAgent. This + * is the only supported source to use for Embedded Agents. */ public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName(); + private static final String SOURCE_TYPE_EMBEDDED_ALIAS = "EMBEDDED"; /** * Memory channel which stores events in heap. See Flume User Guide for * configuration information. This is the recommended channel to use for @@ -101,8 +103,8 @@ public class EmbeddedAgentConfiguration { */ public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name(); /** - * File based channel which stores events in heap. See Flume User Guide for - * configuration information. + * File based channel which stores events in on local disk. See Flume User + * Guide for configuration information. */ public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name(); @@ -129,6 +131,7 @@ public class EmbeddedAgentConfiguration { private static final String[] ALLOWED_SOURCES = { + SOURCE_TYPE_EMBEDDED_ALIAS, SOURCE_TYPE_EMBEDDED, }; @@ -147,6 +150,9 @@ public class EmbeddedAgentConfiguration { SINK_PROCESSOR_TYPE_LOAD_BALANCE }; + private static final ImmutableList DISALLOWED_SINK_NAMES = + ImmutableList.of("source", "channel", "processor"); + private static void validate(String name, Map properties) throws FlumeException { @@ -158,6 +164,10 @@ public class EmbeddedAgentConfiguration { checkRequired(properties, SINKS); String sinkNames = properties.get(SINKS); for(String sink : sinkNames.split("\\s+")) { + if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase())) { + throw new FlumeException("Sink name " + sink + " is one of the" + + " disallowed sink names: " + DISALLOWED_SINK_NAMES); + } String key = join(sink, TYPE); checkRequired(properties, key); checkAllowed(ALLOWED_SINKS, properties.get(key)); @@ -182,7 +192,8 @@ public class EmbeddedAgentConfiguration { // we are going to modify the properties as we parse the config properties = new HashMap(properties); - if(!properties.containsKey(SOURCE_TYPE)) { + if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS. + equalsIgnoreCase(properties.get(SOURCE_TYPE))) { properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED); } String sinkNames = properties.remove(SINKS); @@ -199,7 +210,6 @@ public class EmbeddedAgentConfiguration { // user supplied config -> agent configuration Map result = Maps.newHashMap(); - Joiner joiner = Joiner.on(SEPERATOR); // properties will be modified during iteration so we need a // copy of the keys Set userProvidedKeys; @@ -209,42 +219,40 @@ public class EmbeddedAgentConfiguration { * source at the channel. */ // point agent at source - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_SOURCES), sourceName); + result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES), + sourceName); // point agent at channel - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_CHANNELS), channelName); - // point agent at source - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_SINKS), sinkNames); + result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS), + channelName); + // point agent at sinks + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS), + sinkNames); // points the agent at the sinkgroup - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS), + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS), sinkGroupName); // points the sinkgroup at the sinks - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS, + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS, sinkGroupName, SINKS), sinkNames); // points the source at the channel - result.put(joiner.join(name, + result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES, sourceName, BasicConfigurationConstants.CONFIG_CHANNELS), channelName); /* - * Second process the the sink configuration and point the sinks + * Second process the sink configuration and point the sinks * at the channel. */ userProvidedKeys = new HashSet(properties.keySet()); for(String sink : sinkNames.split("\\s+")) { for(String key : userProvidedKeys) { String value = properties.get(key); - if(key.startsWith(sink)) { + if(key.startsWith(sink + SEPERATOR)) { properties.remove(key); - result.put(joiner.join(name, + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS, key), value); } } // point the sink at the channel - result.put(joiner.join(name, + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS, sink, BasicConfigurationConstants.CONFIG_CHANNEL), channelName); } @@ -255,20 +263,19 @@ public class EmbeddedAgentConfiguration { userProvidedKeys = new HashSet(properties.keySet()); for(String key : userProvidedKeys) { String value = properties.get(key); - if(key.startsWith(SOURCE)) { + if(key.startsWith(SOURCE_PREFIX)) { // users use `source' but agent needs the actual source name - key = key.replace(SOURCE, sourceName); - result.put(joiner.join(name, + key = key.replaceFirst(SOURCE, sourceName); + result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES, key), value); - } else if(key.startsWith(CHANNEL)) { + } else if(key.startsWith(CHANNEL_PREFIX)) { // users use `channel' but agent needs the actual channel name - key = key.replace(CHANNEL, channelName); - result.put(joiner.join(name, + key = key.replaceFirst(CHANNEL, channelName); + result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS, key), value); - } else if(key.startsWith(SINK_PROCESSOR)) { + } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) { // agent.sinkgroups.sinkgroup.processor.* - result.put(joiner. - join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS, + result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS, sinkGroupName, key), value); } else { // XXX should we simply ignore this? http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java index 0a53c5f..919a630 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java @@ -19,6 +19,6 @@ /** * This package provides Flume users the ability to embed simple agents * in applications. For specific and up to date information, please see - * the Flume User Guide. + * the Flume Developer Guide. */ package org.apache.flume.agent.embedded; http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java index 3805ea8..f70d0b1 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java @@ -47,10 +47,30 @@ public class TestEmbeddedAgentConfiguration { properties.put("sink2.port", "2"); properties.put("processor.type", "load_balance"); } + + + @Test + public void testFullSourceType() throws Exception { + doTestExcepted(EmbeddedAgentConfiguration. + configure("test1", properties)); + } + @Test - public void testBasic() throws Exception { - Map actual = EmbeddedAgentConfiguration. - configure("test1", properties); + public void testMissingSourceType() throws Exception { + Assert.assertNotNull(properties.remove("source.type")); + doTestExcepted(EmbeddedAgentConfiguration. + configure("test1", properties)); + } + + @Test + public void testShortSourceType() throws Exception { + properties.put("source.type", "EMBEDDED"); + doTestExcepted(EmbeddedAgentConfiguration. + configure("test1", properties)); + } + + + public void doTestExcepted(Map actual) throws Exception { Map expected = Maps.newHashMap(); expected.put("test1.channels", "channel-test1"); expected.put("test1.channels.channel-test1.capacity", "200"); @@ -71,7 +91,6 @@ public class TestEmbeddedAgentConfiguration { expected.put("test1.sources.source-test1.channels", "channel-test1"); expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration. SOURCE_TYPE_EMBEDDED); - Assert.assertEquals(expected, actual); } @@ -116,4 +135,19 @@ public class TestEmbeddedAgentConfiguration { properties.put("bad.key.name", "bad"); EmbeddedAgentConfiguration.configure("test1", properties); } + @Test(expected = FlumeException.class) + public void testSinkNamedLikeSource() throws Exception { + properties.put("sinks", "source"); + EmbeddedAgentConfiguration.configure("test1", properties); + } + @Test(expected = FlumeException.class) + public void testSinkNamedLikeChannel() throws Exception { + properties.put("sinks", "channel"); + EmbeddedAgentConfiguration.configure("test1", properties); + } + @Test(expected = FlumeException.class) + public void testSinkNamedLikeProcessor() throws Exception { + properties.put("sinks", "processor"); + EmbeddedAgentConfiguration.configure("test1", properties); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java index 4e94d72..9d85e6e 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java @@ -28,7 +28,6 @@ import junit.framework.Assert; import org.apache.flume.Channel; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; import org.apache.flume.event.SimpleEvent;