flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-946: Allow multiplexing channel selector to specify optional channels
Date Mon, 29 Oct 2012 21:29:28 GMT
Updated Branches:
  refs/heads/flume-1.4 5c65867cf -> 0e8bd19d5


FLUME-946: Allow multiplexing channel selector to specify optional channels

(Hari Shreedharan via Brock Noland


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0e8bd19d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0e8bd19d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0e8bd19d

Branch: refs/heads/flume-1.4
Commit: 0e8bd19d5e648dca614fbaafaa511b747582a5d2
Parents: 5c65867
Author: Brock Noland <brock@apache.org>
Authored: Mon Oct 29 16:28:38 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Mon Oct 29 16:28:54 2012 -0500

----------------------------------------------------------------------
 .../flume/channel/MultiplexingChannelSelector.java |   40 ++++++++++++++-
 .../channel/TestMultiplexingChannelSelector.java   |   25 +++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   40 ++++++++++++++-
 3 files changed, 100 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0e8bd19d/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
index 81dc3e8..866d9dc 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
@@ -38,6 +38,8 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector
{
       "flume.selector.header";
   public static final String CONFIG_PREFIX_MAPPING = "mapping.";
   public static final String CONFIG_DEFAULT_CHANNEL = "default";
+  public static final String CONFIG_PREFIX_OPTIONAL = "optional";
+
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory
     .getLogger(MultiplexingChannelSelector.class);
@@ -48,7 +50,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector
{
   private String headerName;
 
   private Map<String, List<Channel>> channelMapping;
-
+  private Map<String, List<Channel>> optionalChannels;
   private List<Channel> defaultChannels;
   @Override
   public List<Channel> getRequiredChannels(Event event) {
@@ -70,7 +72,13 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector
{
 
   @Override
   public List<Channel> getOptionalChannels(Event event) {
-    return EMPTY_LIST;
+    String hdr = event.getHeaders().get(headerName);
+    List<Channel> channels = optionalChannels.get(hdr);
+
+    if(channels == null) {
+      channels = EMPTY_LIST;
+    }
+    return channels;
   }
 
   @Override
@@ -113,6 +121,34 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector
{
     }
     //If no mapping is configured, it is ok.
     //All events will go to the default channel(s).
+    Map<String, String> optionalChannelsMapping =
+        context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");
+
+    optionalChannels = new HashMap<String, List<Channel>>();
+    for (String hdr : optionalChannelsMapping.keySet()) {
+      List<Channel> confChannels = getChannelListFromNames(
+              optionalChannelsMapping.get(hdr), channelNameMap);
+      if (confChannels.isEmpty()) {
+        confChannels = EMPTY_LIST;
+      }
+      //Remove channels from optional channels, which are already
+      //configured to be required channels.
+
+      List<Channel> reqdChannels = channelMapping.get(hdr);
+      //Check if there are required channels, else defaults to default channels
+      if(reqdChannels == null || reqdChannels.isEmpty()) {
+        reqdChannels = defaultChannels;
+      }
+      for (Channel c : reqdChannels) {
+        if (confChannels.contains(c)) {
+          confChannels.remove(c);
+        }
+      }
+
+      if (optionalChannels.put(hdr, confChannels) != null) {
+        throw new FlumeException("Selector channel configured twice");
+      }
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/0e8bd19d/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
index 2626b20..9dff5bb 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
@@ -51,6 +51,9 @@ public class TestMultiplexingChannelSelector {
     config.put("mapping.bar", "ch2 ch3");
     config.put("mapping.xyz", "ch1 ch2 ch3");
     config.put("default", "ch1 ch3");
+    config.put("optional.foo", "ch2 ch3");
+    config.put("optional.xyz", "ch1 ch3");
+    config.put("optional.zebra", "ch1 ch2");
 
     selector = ChannelSelectorFactory.create(channels, config);
   }
@@ -69,7 +72,11 @@ public class TestMultiplexingChannelSelector {
     Assert.assertTrue(reqCh1.get(0).getName().equals("ch1"));
     Assert.assertTrue(reqCh1.get(1).getName().equals("ch2"));
     List<Channel> optCh1 = selector.getOptionalChannels(event1);
-    Assert.assertTrue(optCh1.size() == 0);
+    Assert.assertTrue(optCh1.size() == 1);
+    //ch2 should not be there -- since it is a required channel
+    Assert.assertTrue(optCh1.get(0).getName().equals("ch3"));
+
+
 
     Event event2 = new MockEvent();
     Map<String, String> header2 = new HashMap<String, String>();
@@ -94,7 +101,9 @@ public class TestMultiplexingChannelSelector {
     Assert.assertTrue(reqCh3.get(1).getName().equals("ch2"));
     Assert.assertTrue(reqCh3.get(2).getName().equals("ch3"));
     List<Channel> optCh3 = selector.getOptionalChannels(event3);
+    //All of the optional channels should go away.
     Assert.assertTrue(optCh3.size() == 0);
+
   }
 
   //If the header information cannot map the event to any of the channels
@@ -136,6 +145,20 @@ public class TestMultiplexingChannelSelector {
     Assert.assertTrue(reqCh3.get(1).getName().equals("ch3"));
     Assert.assertTrue(optCh3.size() == 0);
 
+    Map<String, String> header4 = new HashMap<String, String>();
+    header4.put("myheader", "zebra");
+    Event zebraEvent = new MockEvent();
+    zebraEvent.setHeaders(header4);
+
+    List<Channel> reqCh4 = selector.getRequiredChannels(zebraEvent);
+    List<Channel> optCh4 = selector.getOptionalChannels(zebraEvent);
+    Assert.assertEquals(2, reqCh4.size());
+    Assert.assertTrue(reqCh4.get(0).getName().equals("ch1"));
+    Assert.assertTrue(reqCh4.get(1).getName().equals("ch3"));
+    System.out.println(optCh4.size());
+    //Since ch1 is also in default list, it is removed.
+    Assert.assertTrue(optCh4.size() == 1);
+    Assert.assertTrue(optCh4.get(0).getName().equals("ch2"));
 
     List<Channel> allChannels = selector.getAllChannels();
     Assert.assertTrue(allChannels.size() == 3);

http://git-wip-us.apache.org/repos/asf/flume/blob/0e8bd19d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0c0951b..c1303e0 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -567,6 +567,42 @@ sent to mem-channel-1, if its "AZ" then it goes to jdbc-channel-2 or
if its
 "NY" then both. If the "State" header is not set or doesn't match any of the
 three, then it goes to mem-channel-1 which is designated as 'default'.
 
+The selector also supports optional channels. To specify optional channels for
+a header, the config parameter 'optional' is used in the following way:
+
+.. code-block:: properties
+
+  # channel selector configuration
+  agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
+  agent_foo.sources.avro-AppSrv-source1.selector.header = State
+  agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
+  agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2
+  agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2
+  agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 jdbc-channel-2
+  agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2
+  agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
+
+The selector will attempt to write to the required channels first and will fail
+the transaction if even one of these channels fails to consume the events. The
+transaction is reattempted on **all** of the channels. Once all required
+channels have consumed the events, then the selector will attempt to write to
+the optional channels. A failure by any of the optional channels to consume the
+event is simply ignored and not retried.
+
+If there is an overlap between the optional channels and required channels for a
+specific header, the channel is considered to be required, and a failure in the
+channel will cause the entire set of required channels to be retried. For
+instance, in the above example, for the header "CA" mem-channel-1 is considered
+to be a required channel even though it is marked both as required and optional,
+ and a failure to write to this channel will cause that
+event to be retried on **all** channels configured for the selector.
+
+Note that if a header does not have any required channels, then the event will
+be written to the default channels and will be attempted to be written to the
+optional channels for that header. Specifying optional channels will still cause
+the event to be written to the default channels, if no required channels are
+specified.
+
 
 Flume Sources
 -------------
@@ -1345,7 +1381,7 @@ ElasticSearchSink
 This sink writes data to ElasticSearch. A class implementing
 ElasticSearchEventSerializer which is specified by the configuration is used to convert the
events into
 XContentBuilder which detail the fields and mappings which will be indexed. These are then
then written
-to ElasticSearch. The sink will generate an index per day allowing easier management instead
of dealing with 
+to ElasticSearch. The sink will generate an index per day allowing easier management instead
of dealing with
 a single large index
 The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
 Required properties are in **bold**.
@@ -1360,7 +1396,7 @@ indexName         flume
 indexType	  logs                                                                The type
to index the document to, defaults to 'log'
 clusterName       elasticsearch							      Name of the ElasticSearch cluster to connect
to
 batchSize         100                                                                 Number
of events to be written per txn.
-ttl               --                                                                  TTL
in days, when set will cause the expired documents to be deleted automatically, 
+ttl               --                                                                  TTL
in days, when set will cause the expired documents to be deleted automatically,
                                                                                       if
not set documents will never be automatically deleted
 serializer        org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
 serializer.*      --                                                                  Properties
to be passed to the serializer.


Mime
View raw message