Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0AAC1D73C for ; Mon, 29 Oct 2012 21:29:42 +0000 (UTC) Received: (qmail 602 invoked by uid 500); 29 Oct 2012 21:29:42 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 581 invoked by uid 500); 29 Oct 2012 21:29:41 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 574 invoked by uid 99); 29 Oct 2012 21:29:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Oct 2012 21:29:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B6B3850747; Mon, 29 Oct 2012 21:29:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-946: Allow multiplexing channel selector to specify optional channels Message-Id: <20121029212941.B6B3850747@tyr.zones.apache.org> Date: Mon, 29 Oct 2012 21:29:41 +0000 (UTC) Updated Branches: refs/heads/flume-1.3.0 1bb0a80ab -> 73ebd2774 FLUME-946: Allow multiplexing channel selector to specify optional channels (Hari Shreedharan via Brock Noland Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/73ebd277 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/73ebd277 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/73ebd277 Branch: refs/heads/flume-1.3.0 Commit: 73ebd2774d1e6c196fbaba50f12a0043b771e1d3 Parents: 1bb0a80 Author: Brock Noland Authored: Mon Oct 29 16:28:38 2012 -0500 Committer: Brock Noland Committed: Mon Oct 29 16:28:58 2012 -0500 ---------------------------------------------------------------------- .../flume/channel/MultiplexingChannelSelector.java | 40 ++++++++++++++- .../channel/TestMultiplexingChannelSelector.java | 25 +++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 40 ++++++++++++++- 3 files changed, 100 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/73ebd277/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java index 81dc3e8..866d9dc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java @@ -38,6 +38,8 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { "flume.selector.header"; public static final String CONFIG_PREFIX_MAPPING = "mapping."; public static final String CONFIG_DEFAULT_CHANNEL = "default"; + public static final String CONFIG_PREFIX_OPTIONAL = "optional"; + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory .getLogger(MultiplexingChannelSelector.class); @@ -48,7 +50,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { private String headerName; private Map> channelMapping; - + private Map> optionalChannels; private List defaultChannels; @Override public List getRequiredChannels(Event event) { @@ -70,7 +72,13 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { @Override public List getOptionalChannels(Event event) { - return EMPTY_LIST; + String hdr = event.getHeaders().get(headerName); + List channels = optionalChannels.get(hdr); + + if(channels == null) { + channels = EMPTY_LIST; + } + return channels; } @Override @@ -113,6 +121,34 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { } //If no mapping is configured, it is ok. //All events will go to the default channel(s). + Map optionalChannelsMapping = + context.getSubProperties(CONFIG_PREFIX_OPTIONAL + "."); + + optionalChannels = new HashMap>(); + for (String hdr : optionalChannelsMapping.keySet()) { + List confChannels = getChannelListFromNames( + optionalChannelsMapping.get(hdr), channelNameMap); + if (confChannels.isEmpty()) { + confChannels = EMPTY_LIST; + } + //Remove channels from optional channels, which are already + //configured to be required channels. + + List reqdChannels = channelMapping.get(hdr); + //Check if there are required channels, else defaults to default channels + if(reqdChannels == null || reqdChannels.isEmpty()) { + reqdChannels = defaultChannels; + } + for (Channel c : reqdChannels) { + if (confChannels.contains(c)) { + confChannels.remove(c); + } + } + + if (optionalChannels.put(hdr, confChannels) != null) { + throw new FlumeException("Selector channel configured twice"); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/73ebd277/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java index 2626b20..9dff5bb 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java @@ -51,6 +51,9 @@ public class TestMultiplexingChannelSelector { config.put("mapping.bar", "ch2 ch3"); config.put("mapping.xyz", "ch1 ch2 ch3"); config.put("default", "ch1 ch3"); + config.put("optional.foo", "ch2 ch3"); + config.put("optional.xyz", "ch1 ch3"); + config.put("optional.zebra", "ch1 ch2"); selector = ChannelSelectorFactory.create(channels, config); } @@ -69,7 +72,11 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh1.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh1.get(1).getName().equals("ch2")); List optCh1 = selector.getOptionalChannels(event1); - Assert.assertTrue(optCh1.size() == 0); + Assert.assertTrue(optCh1.size() == 1); + //ch2 should not be there -- since it is a required channel + Assert.assertTrue(optCh1.get(0).getName().equals("ch3")); + + Event event2 = new MockEvent(); Map header2 = new HashMap(); @@ -94,7 +101,9 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh3.get(1).getName().equals("ch2")); Assert.assertTrue(reqCh3.get(2).getName().equals("ch3")); List optCh3 = selector.getOptionalChannels(event3); + //All of the optional channels should go away. Assert.assertTrue(optCh3.size() == 0); + } //If the header information cannot map the event to any of the channels @@ -136,6 +145,20 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh3.get(1).getName().equals("ch3")); Assert.assertTrue(optCh3.size() == 0); + Map header4 = new HashMap(); + header4.put("myheader", "zebra"); + Event zebraEvent = new MockEvent(); + zebraEvent.setHeaders(header4); + + List reqCh4 = selector.getRequiredChannels(zebraEvent); + List optCh4 = selector.getOptionalChannels(zebraEvent); + Assert.assertEquals(2, reqCh4.size()); + Assert.assertTrue(reqCh4.get(0).getName().equals("ch1")); + Assert.assertTrue(reqCh4.get(1).getName().equals("ch3")); + System.out.println(optCh4.size()); + //Since ch1 is also in default list, it is removed. + Assert.assertTrue(optCh4.size() == 1); + Assert.assertTrue(optCh4.get(0).getName().equals("ch2")); List allChannels = selector.getAllChannels(); Assert.assertTrue(allChannels.size() == 3); http://git-wip-us.apache.org/repos/asf/flume/blob/73ebd277/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index c4316ad..d62e6b9 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -567,6 +567,42 @@ sent to mem-channel-1, if its "AZ" then it goes to jdbc-channel-2 or if its "NY" then both. If the "State" header is not set or doesn't match any of the three, then it goes to mem-channel-1 which is designated as 'default'. +The selector also supports optional channels. To specify optional channels for +a header, the config parameter 'optional' is used in the following way: + +.. code-block:: properties + + # channel selector configuration + agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing + agent_foo.sources.avro-AppSrv-source1.selector.header = State + agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1 + +The selector will attempt to write to the required channels first and will fail +the transaction if even one of these channels fails to consume the events. The +transaction is reattempted on **all** of the channels. Once all required +channels have consumed the events, then the selector will attempt to write to +the optional channels. A failure by any of the optional channels to consume the +event is simply ignored and not retried. + +If there is an overlap between the optional channels and required channels for a +specific header, the channel is considered to be required, and a failure in the +channel will cause the entire set of required channels to be retried. For +instance, in the above example, for the header "CA" mem-channel-1 is considered +to be a required channel even though it is marked both as required and optional, + and a failure to write to this channel will cause that +event to be retried on **all** channels configured for the selector. + +Note that if a header does not have any required channels, then the event will +be written to the default channels and will be attempted to be written to the +optional channels for that header. Specifying optional channels will still cause +the event to be written to the default channels, if no required channels are +specified. + Flume Sources ------------- @@ -1345,7 +1381,7 @@ ElasticSearchSink This sink writes data to ElasticSearch. A class implementing ElasticSearchEventSerializer which is specified by the configuration is used to convert the events into XContentBuilder which detail the fields and mappings which will be indexed. These are then then written -to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with +to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with a single large index The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink Required properties are in **bold**. @@ -1360,7 +1396,7 @@ indexName flume indexType logs The type to index the document to, defaults to 'log' clusterName elasticsearch Name of the ElasticSearch cluster to connect to batchSize 100 Number of events to be written per txn. -ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, +ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted serializer org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer serializer.* -- Properties to be passed to the serializer.