flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2189. Add support for IP filtering on AvroSource
Date Fri, 20 Sep 2013 04:55:13 GMT
Updated Branches:
  refs/heads/flume-1.5 9ef11916c -> 30d9d56c1


FLUME-2189. Add support for IP filtering on AvroSource

(Ted Malaska via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30d9d56c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30d9d56c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30d9d56c

Branch: refs/heads/flume-1.5
Commit: 30d9d56c13f7a14606c9baa31dee15de16ebe9ba
Parents: 9ef1191
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Sep 19 21:53:55 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Sep 19 21:54:36 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     | 122 +++++++++++++-
 .../org/apache/flume/source/TestAvroSource.java | 163 ++++++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  17 ++
 pom.xml                                         |   2 +-
 4 files changed, 292 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/30d9d56c/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index f23cd93..f6e4cfe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
-
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
@@ -60,6 +59,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
+import org.jboss.netty.handler.ipfilter.PatternRule;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,6 +132,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private static final String BIND_KEY = "bind";
   private static final String COMPRESSION_TYPE = "compression-type";
   private static final String SSL_KEY = "ssl";
+  private static final String IP_FILTER_KEY = "ipFilter";
+  private static final String IP_FILTER_RULES_KEY = "ipFilterRules";
   private static final String KEYSTORE_KEY = "keystore";
   private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
   private static final String KEYSTORE_TYPE_KEY = "keystore-type";
@@ -141,6 +144,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private String keystorePassword;
   private String keystoreType;
   private boolean enableSsl = false;
+  private boolean enableIpFilter;
+  private String patternRuleConfigDefinition;
 
   private Server server;
   private SourceCounter sourceCounter;
@@ -182,6 +187,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
       }
     }
 
+    enableIpFilter = context.getBoolean(IP_FILTER_KEY, false);
+    if (enableIpFilter) {
+      patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
+      if (patternRuleConfigDefinition == null ||
+        patternRuleConfigDefinition.isEmpty()) {
+        throw new FlumeException(
+          "ipFilter is configured with true but ipFilterRules is not defined:" +
+            " ");
+      }
+    }
+
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -233,10 +249,11 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private ChannelPipelineFactory initChannelPipelineFactory() {
     ChannelPipelineFactory pipelineFactory;
     boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
-    if (enableCompression || enableSsl) {
-      pipelineFactory = new SSLCompressionChannelPipelineFactory(
-          enableCompression, enableSsl, keystore,
-          keystorePassword, keystoreType);
+    if (enableCompression || enableSsl || enableIpFilter) {
+      pipelineFactory = new AdvancedChannelPipelineFactory(
+        enableCompression, enableSsl, keystore,
+        keystorePassword, keystoreType, enableIpFilter,
+        patternRuleConfigDefinition);
     } else {
       pipelineFactory = new ChannelPipelineFactory() {
         @Override
@@ -356,7 +373,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
    * Factory of SSL-enabled server worker channel pipelines
    * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
    */
-  private static class SSLCompressionChannelPipelineFactory
+  private static class AdvancedChannelPipelineFactory
       implements ChannelPipelineFactory {
 
     private boolean enableCompression;
@@ -365,12 +382,20 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     private String keystorePassword;
     private String keystoreType;
 
-    public SSLCompressionChannelPipelineFactory(boolean enableCompression, boolean enableSsl,
String keystore, String keystorePassword, String keystoreType) {
+    private boolean enableIpFilter;
+    private String patternRuleConfigDefinition;
+
+    public AdvancedChannelPipelineFactory(boolean enableCompression,
+      boolean enableSsl, String keystore, String keystorePassword,
+      String keystoreType, boolean enableIpFilter,
+      String patternRuleConfigDefinition) {
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
       this.keystore = keystore;
       this.keystorePassword = keystorePassword;
       this.keystoreType = keystoreType;
+      this.enableIpFilter = enableIpFilter;
+      this.patternRuleConfigDefinition = patternRuleConfigDefinition;
     }
 
     private SSLContext createServerSSLContext() {
@@ -407,6 +432,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
         pipeline.addFirst("deflater", encoder);
         pipeline.addFirst("inflater", new ZlibDecoder());
       }
+
+
       if (enableSsl) {
         SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
         sslEngine.setUseClientMode(false);
@@ -415,7 +442,88 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
         // adding compression handling above
         pipeline.addFirst("ssl", new SslHandler(sslEngine));
       }
+
+      if (enableIpFilter) {
+
+        logger.info("Setting up ipFilter with the following rule definition: " +
+          patternRuleConfigDefinition);
+        IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
+
+        if (patternRuleConfigDefinition != null &&
+          !patternRuleConfigDefinition.isEmpty()) {
+          String[] patternRuleDefinitions = patternRuleConfigDefinition.split(
+            ",");
+          for (String patternRuleDefinition : patternRuleDefinitions) {
+
+            PatternRule patternRule
+              = PatternRuleBuilder.withConfigRuleDefinition(
+              patternRuleDefinition);
+
+            if (patternRule != null) {
+              ipFilterHandler.add(patternRule);
+            }
+          }
+        }
+
+        logger.info(
+          "Adding ipFilter with " + ipFilterHandler.size() + " rules");
+
+        pipeline.addFirst("ipFilter", ipFilterHandler);
+      }
+
       return pipeline;
     }
+
+    public static class PatternRuleBuilder {
+      public static PatternRule withConfigRuleDefinition(
+        String patternRuleDefinition) throws FlumeException {
+        patternRuleDefinition = patternRuleDefinition.trim();
+        //first validation the format
+
+        int firstColonIndex = patternRuleDefinition.indexOf(":");
+        if (firstColonIndex == -1) {
+          logger.error(
+            "Invalid ipFilter patternRule '" + patternRuleDefinition +
+              "' should look like <'allow'  or 'deny'>:<'ip' or " +
+              "'name'>:<pattern>");
+          return null;
+        } else {
+
+          String ruleAccessFlag = patternRuleDefinition.substring(0,
+            firstColonIndex);
+          int secondColonIndex = patternRuleDefinition.indexOf(":",
+            firstColonIndex + 1);
+          if ((!ruleAccessFlag.equals("allow") &&
+            !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {
+            logger.error(
+              "Invalid ipFilter patternRule '" + patternRuleDefinition +
+                "' should look like <'allow'  or 'deny'>:<'ip' or " +
+                "'name'>:<pattern>");
+            return null;
+          }
+
+          String patternTypeFlag = patternRuleDefinition.substring(
+            firstColonIndex + 1, secondColonIndex);
+          if ((!patternTypeFlag.equals("ip") &&
+            !patternTypeFlag.equals("name"))) {
+            logger.error(
+              "Invalid ipFilter patternRule '" + patternRuleDefinition +
+                "' should look like <'allow'  or 'deny'>:<'ip' or " +
+                "'name'>:<pattern>");
+            return null;
+          }
+
+          boolean isAllow = ruleAccessFlag.equals("allow");
+          String patternRuleString =
+            (patternTypeFlag.equals("ip") ? "i" : "n") + ":" +
+              patternRuleDefinition.substring(secondColonIndex + 1);
+          logger.info("Adding ipFilter PatternRule: "
+            + (isAllow ? "Allow" : "deny") +
+            " " + patternRuleString);
+          return new PatternRule(isAllow, patternRuleString);
+        }
+      }
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/30d9d56c/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 2667a6f..e208fff 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -20,6 +20,7 @@
 package org.apache.flume.source;
 
 import java.io.IOException;
+import java.net.Inet4Address;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
@@ -39,6 +40,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
@@ -199,14 +201,19 @@ public class TestAvroSource {
         source.getLifecycleState());
 
     AvroSourceProtocol client;
+    NettyTransceiver nettyTransceiver;
     if (clientEnableCompression) {
+
+      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(
+          selectedPort), new CompressionChannelFactory(compressionLevel));
+
       client = SpecificRequestor.getClient(
-          AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
-              selectedPort), new CompressionChannelFactory(6)));
+          AvroSourceProtocol.class, nettyTransceiver);
     } else {
+      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort));
+
       client = SpecificRequestor.getClient(
-          AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
-              selectedPort)));
+          AvroSourceProtocol.class, nettyTransceiver);
     }
 
     AvroFlumeEvent avroEvent = new AvroFlumeEvent();
@@ -230,6 +237,8 @@ public class TestAvroSource {
 
     logger.debug("Round trip event:{}", event);
 
+
+    nettyTransceiver.close();
     source.stop();
     Assert.assertTrue("Reached stop or error",
         LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
@@ -372,4 +381,150 @@ public class TestAvroSource {
       return new X509Certificate[0];
     }
   }
+
+  @Test
+  public void testValidIpFilterAllows() throws InterruptedException, IOException {
+
+    doIpFilterTest("allow:name:localhost,deny:ip:*", true, false);
+    doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",deny:ip:*",
true, false);
+    doIpFilterTest("allow:ip:*", true, false);
+    doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,deny:ip:*", true, false);
+    doIpFilterTest("allow:ip:127.0.0.2,allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,deny:ip:*", true, false);
+
+    doIpFilterTest("allow:name:localhost,deny:ip:*", true, true);
+    doIpFilterTest("allow:ip:*", true, true);
+
+  }
+
+  @Test
+  public void testValidIpFilterDenys() throws InterruptedException, IOException {
+
+    doIpFilterTest("deny:ip:*", false, false);
+    doIpFilterTest("deny:name:localhost", false, false);
+    doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",allow:ip:*",
false, false);
+    doIpFilterTest("deny:ip:*", false, false);
+    doIpFilterTest("allow:ip:45.2.2.2,deny:ip:*", false, false);
+    doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,allow:ip:*", false, false);
+
+
+    doIpFilterTest("deny:ip:*", false, true);
+  }
+
+  @Test
+  public void testInvalidIpFilter() throws InterruptedException, IOException {
+
+    doIpFilterTest("deny:ip?*", true, false);
+    doIpFilterTest("deny?name:localhost", true, false);
+    doIpFilterTest("deny:ip:127.0.0.2,allow:ip?*,deny:ip:" + Inet4Address.getLocalHost().getHostAddress()
+ "", false, false);
+    doIpFilterTest("deny:*", true, false);
+    doIpFilterTest("deny:id:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,allow:ip:*", true, false);
+    try {
+      doIpFilterTest(null, true, false);
+      Assert.fail("The null ipFilterRules config should had thrown an exception.");
+    } catch (FlumeException e) {
+      //Do nothing
+    }
+
+    try{
+      doIpFilterTest("", true, false);
+      Assert.fail("The empty string ipFilterRules config should had thrown an exception.");
+    }  catch (FlumeException e) {
+      //Do nothing
+    }
+
+
+  }
+
+  public void doIpFilterTest(String ruleDefinition, boolean eventShouldBeAllowed, boolean
testWithSSL) throws InterruptedException, IOException {
+    boolean bound = false;
+
+    for (int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
+        context.put("bind", "0.0.0.0");
+        context.put("ipFilter", "true");
+        if (ruleDefinition != null) {
+          context.put("ipFilterRules", ruleDefinition);
+        }
+        if (testWithSSL) {
+          logger.info("Client testWithSSL" + testWithSSL);
+          context.put("ssl", "true");
+          context.put("keystore", "src/test/resources/server.p12");
+          context.put("keystore-password", "password");
+          context.put("keystore-type", "PKCS12");
+        }
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        /*
+         * NB: This assume we're using the Netty server under the hood and the
+         * failure is to bind. Yucky.
+         */
+        Thread.sleep(100);
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    AvroSourceProtocol client;
+    NettyTransceiver nettyTransceiver;
+
+    if (testWithSSL) {
+      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort), new SSLChannelFactory());
+      client = SpecificRequestor.getClient(
+          AvroSourceProtocol.class, nettyTransceiver );
+    } else {
+      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort));
+      client = SpecificRequestor.getClient(
+          AvroSourceProtocol.class, nettyTransceiver);
+    }
+
+    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+    avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes()));
+
+    try {
+      logger.info("Client about to append");
+      Status status = client.append(avroEvent);
+      logger.info("Client appended");
+      Assert.assertEquals(Status.OK, status);
+    } catch(IOException e) {
+      Assert.assertTrue("Should have been Allowed:" + ruleDefinition, !eventShouldBeAllowed);
+      return;
+    }
+    Assert.assertTrue("Should have been denied:" + ruleDefinition, eventShouldBeAllowed);
+
+
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("Channel contained our event", "Hello avro ipFilter",
+        new String(event.getBody()));
+    transaction.commit();
+    transaction.close();
+
+    logger.debug("Round trip event:{}", event);
+
+    nettyTransceiver.close();
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/30d9d56c/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index bbfb5d0..dac3ce7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -705,6 +705,8 @@ ssl                  false        Set this to true to enable SSL encryption.
You
 keystore             --           This is the path to a Java keystore file. Required for
SSL.
 keystore-password    --           The password for the Java keystore. Required for SSL.
 keystore-type        JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
+ipFilter             false        Set this to true to enable ipFiltering for netty
+ipFilter.rules       --           Define N netty ipFilter pattern rules with this config.
 ==================   ===========  ===================================================
 
 Example for agent named a1:
@@ -718,6 +720,21 @@ Example for agent named a1:
   a1.sources.r1.bind = 0.0.0.0
   a1.sources.r1.port = 4141
 
+Example of ipFilter.rules
+
+ipFilter.rules defines N netty ipFilters separated by a comma a pattern rule must be in this
format.
+
+<'allow' or deny>:<'ip' or 'name' for computer name>:<pattern>
+or
+allow/deny:ip/name:pattern
+
+example: ipFilter.rules=allow:ip:127.*,allow:name:localhost,deny:ip:*
+
+Note that the first rule to match will apply as the example below shows from a client on
the localhost
+
+This will Allow the client on localhost be deny clients from any other ip "allow:name:localhost,deny:ip:*"
+This will deny the client on localhost be allow clients from any other ip "deny:name:localhost,allow:ip:*"
+
 Thrift Source
 ~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/30d9d56c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 25ea4e7..8b36402 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1049,7 +1049,7 @@ limitations under the License.
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty</artifactId>
-        <version>3.4.0.Final</version>
+        <version>3.5.12.Final</version>
       </dependency>
 
       <dependency>


Mime
View raw message