Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D9C83109F5 for ; Fri, 20 Sep 2013 04:55:04 +0000 (UTC) Received: (qmail 6120 invoked by uid 500); 20 Sep 2013 04:55:01 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 5957 invoked by uid 500); 20 Sep 2013 04:54:53 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 5696 invoked by uid 99); 20 Sep 2013 04:54:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 04:54:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9D3A58A37DF; Fri, 20 Sep 2013 04:54:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2189. Add support for IP filtering on AvroSource Date: Fri, 20 Sep 2013 04:54:51 +0000 (UTC) Updated Branches: refs/heads/trunk 629b7e6b8 -> bb7fb11b6 FLUME-2189. Add support for IP filtering on AvroSource (Ted Malaska via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bb7fb11b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bb7fb11b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bb7fb11b Branch: refs/heads/trunk Commit: bb7fb11b6ad3945704a6511140538b998e5143d1 Parents: 629b7e6 Author: Hari Shreedharan Authored: Thu Sep 19 21:53:55 2013 -0700 Committer: Hari Shreedharan Committed: Thu Sep 19 21:53:55 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/source/AvroSource.java | 122 +++++++++++++- .../org/apache/flume/source/TestAvroSource.java | 163 ++++++++++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 17 ++ pom.xml | 2 +- 4 files changed, 292 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bb7fb11b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index f23cd93..f6e4cfe 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; - import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Responder; import org.apache.avro.ipc.Server; @@ -60,6 +59,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.codec.compression.ZlibDecoder; import org.jboss.netty.handler.codec.compression.ZlibEncoder; +import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler; +import org.jboss.netty.handler.ipfilter.PatternRule; import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +132,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private static final String BIND_KEY = "bind"; private static final String COMPRESSION_TYPE = "compression-type"; private static final String SSL_KEY = "ssl"; + private static final String IP_FILTER_KEY = "ipFilter"; + private static final String IP_FILTER_RULES_KEY = "ipFilterRules"; private static final String KEYSTORE_KEY = "keystore"; private static final String KEYSTORE_PASSWORD_KEY = "keystore-password"; private static final String KEYSTORE_TYPE_KEY = "keystore-type"; @@ -141,6 +144,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private String keystorePassword; private String keystoreType; private boolean enableSsl = false; + private boolean enableIpFilter; + private String patternRuleConfigDefinition; private Server server; private SourceCounter sourceCounter; @@ -182,6 +187,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, } } + enableIpFilter = context.getBoolean(IP_FILTER_KEY, false); + if (enableIpFilter) { + patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY); + if (patternRuleConfigDefinition == null || + patternRuleConfigDefinition.isEmpty()) { + throw new FlumeException( + "ipFilter is configured with true but ipFilterRules is not defined:" + + " "); + } + } + if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } @@ -233,10 +249,11 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private ChannelPipelineFactory initChannelPipelineFactory() { ChannelPipelineFactory pipelineFactory; boolean enableCompression = compressionType.equalsIgnoreCase("deflate"); - if (enableCompression || enableSsl) { - pipelineFactory = new SSLCompressionChannelPipelineFactory( - enableCompression, enableSsl, keystore, - keystorePassword, keystoreType); + if (enableCompression || enableSsl || enableIpFilter) { + pipelineFactory = new AdvancedChannelPipelineFactory( + enableCompression, enableSsl, keystore, + keystorePassword, keystoreType, enableIpFilter, + patternRuleConfigDefinition); } else { pipelineFactory = new ChannelPipelineFactory() { @Override @@ -356,7 +373,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, * Factory of SSL-enabled server worker channel pipelines * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test */ - private static class SSLCompressionChannelPipelineFactory + private static class AdvancedChannelPipelineFactory implements ChannelPipelineFactory { private boolean enableCompression; @@ -365,12 +382,20 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private String keystorePassword; private String keystoreType; - public SSLCompressionChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType) { + private boolean enableIpFilter; + private String patternRuleConfigDefinition; + + public AdvancedChannelPipelineFactory(boolean enableCompression, + boolean enableSsl, String keystore, String keystorePassword, + String keystoreType, boolean enableIpFilter, + String patternRuleConfigDefinition) { this.enableCompression = enableCompression; this.enableSsl = enableSsl; this.keystore = keystore; this.keystorePassword = keystorePassword; this.keystoreType = keystoreType; + this.enableIpFilter = enableIpFilter; + this.patternRuleConfigDefinition = patternRuleConfigDefinition; } private SSLContext createServerSSLContext() { @@ -407,6 +432,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, pipeline.addFirst("deflater", encoder); pipeline.addFirst("inflater", new ZlibDecoder()); } + + if (enableSsl) { SSLEngine sslEngine = createServerSSLContext().createSSLEngine(); sslEngine.setUseClientMode(false); @@ -415,7 +442,88 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, // adding compression handling above pipeline.addFirst("ssl", new SslHandler(sslEngine)); } + + if (enableIpFilter) { + + logger.info("Setting up ipFilter with the following rule definition: " + + patternRuleConfigDefinition); + IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler(); + + if (patternRuleConfigDefinition != null && + !patternRuleConfigDefinition.isEmpty()) { + String[] patternRuleDefinitions = patternRuleConfigDefinition.split( + ","); + for (String patternRuleDefinition : patternRuleDefinitions) { + + PatternRule patternRule + = PatternRuleBuilder.withConfigRuleDefinition( + patternRuleDefinition); + + if (patternRule != null) { + ipFilterHandler.add(patternRule); + } + } + } + + logger.info( + "Adding ipFilter with " + ipFilterHandler.size() + " rules"); + + pipeline.addFirst("ipFilter", ipFilterHandler); + } + return pipeline; } + + public static class PatternRuleBuilder { + public static PatternRule withConfigRuleDefinition( + String patternRuleDefinition) throws FlumeException { + patternRuleDefinition = patternRuleDefinition.trim(); + //first validation the format + + int firstColonIndex = patternRuleDefinition.indexOf(":"); + if (firstColonIndex == -1) { + logger.error( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:"); + return null; + } else { + + String ruleAccessFlag = patternRuleDefinition.substring(0, + firstColonIndex); + int secondColonIndex = patternRuleDefinition.indexOf(":", + firstColonIndex + 1); + if ((!ruleAccessFlag.equals("allow") && + !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) { + logger.error( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:"); + return null; + } + + String patternTypeFlag = patternRuleDefinition.substring( + firstColonIndex + 1, secondColonIndex); + if ((!patternTypeFlag.equals("ip") && + !patternTypeFlag.equals("name"))) { + logger.error( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:"); + return null; + } + + boolean isAllow = ruleAccessFlag.equals("allow"); + String patternRuleString = + (patternTypeFlag.equals("ip") ? "i" : "n") + ":" + + patternRuleDefinition.substring(secondColonIndex + 1); + logger.info("Adding ipFilter PatternRule: " + + (isAllow ? "Allow" : "deny") + + " " + patternRuleString); + return new PatternRule(isAllow, patternRuleString); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/bb7fb11b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index 2667a6f..e208fff 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -20,6 +20,7 @@ package org.apache.flume.source; import java.io.IOException; +import java.net.Inet4Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; @@ -39,6 +40,7 @@ import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -199,14 +201,19 @@ public class TestAvroSource { source.getLifecycleState()); AvroSourceProtocol client; + NettyTransceiver nettyTransceiver; if (clientEnableCompression) { + + nettyTransceiver = new NettyTransceiver(new InetSocketAddress( + selectedPort), new CompressionChannelFactory(compressionLevel)); + client = SpecificRequestor.getClient( - AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress( - selectedPort), new CompressionChannelFactory(6))); + AvroSourceProtocol.class, nettyTransceiver); } else { + nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort)); + client = SpecificRequestor.getClient( - AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress( - selectedPort))); + AvroSourceProtocol.class, nettyTransceiver); } AvroFlumeEvent avroEvent = new AvroFlumeEvent(); @@ -230,6 +237,8 @@ public class TestAvroSource { logger.debug("Round trip event:{}", event); + + nettyTransceiver.close(); source.stop(); Assert.assertTrue("Reached stop or error", LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); @@ -372,4 +381,150 @@ public class TestAvroSource { return new X509Certificate[0]; } } + + @Test + public void testValidIpFilterAllows() throws InterruptedException, IOException { + + doIpFilterTest("allow:name:localhost,deny:ip:*", true, false); + doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",deny:ip:*", true, false); + doIpFilterTest("allow:ip:*", true, false); + doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,deny:ip:*", true, false); + doIpFilterTest("allow:ip:127.0.0.2,allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,deny:ip:*", true, false); + + doIpFilterTest("allow:name:localhost,deny:ip:*", true, true); + doIpFilterTest("allow:ip:*", true, true); + + } + + @Test + public void testValidIpFilterDenys() throws InterruptedException, IOException { + + doIpFilterTest("deny:ip:*", false, false); + doIpFilterTest("deny:name:localhost", false, false); + doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",allow:ip:*", false, false); + doIpFilterTest("deny:ip:*", false, false); + doIpFilterTest("allow:ip:45.2.2.2,deny:ip:*", false, false); + doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,allow:ip:*", false, false); + + + doIpFilterTest("deny:ip:*", false, true); + } + + @Test + public void testInvalidIpFilter() throws InterruptedException, IOException { + + doIpFilterTest("deny:ip?*", true, false); + doIpFilterTest("deny?name:localhost", true, false); + doIpFilterTest("deny:ip:127.0.0.2,allow:ip?*,deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + "", false, false); + doIpFilterTest("deny:*", true, false); + doIpFilterTest("deny:id:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,allow:ip:*", true, false); + try { + doIpFilterTest(null, true, false); + Assert.fail("The null ipFilterRules config should had thrown an exception."); + } catch (FlumeException e) { + //Do nothing + } + + try{ + doIpFilterTest("", true, false); + Assert.fail("The empty string ipFilterRules config should had thrown an exception."); + } catch (FlumeException e) { + //Do nothing + } + + + } + + public void doIpFilterTest(String ruleDefinition, boolean eventShouldBeAllowed, boolean testWithSSL) throws InterruptedException, IOException { + boolean bound = false; + + for (int i = 0; i < 100 && !bound; i++) { + try { + Context context = new Context(); + + context.put("port", String.valueOf(selectedPort = 41414 + i)); + context.put("bind", "0.0.0.0"); + context.put("ipFilter", "true"); + if (ruleDefinition != null) { + context.put("ipFilterRules", ruleDefinition); + } + if (testWithSSL) { + logger.info("Client testWithSSL" + testWithSSL); + context.put("ssl", "true"); + context.put("keystore", "src/test/resources/server.p12"); + context.put("keystore-password", "password"); + context.put("keystore-type", "PKCS12"); + } + + Configurables.configure(source, context); + + source.start(); + bound = true; + } catch (ChannelException e) { + /* + * NB: This assume we're using the Netty server under the hood and the + * failure is to bind. Yucky. + */ + Thread.sleep(100); + } + } + + Assert + .assertTrue("Reached start or error", LifecycleController.waitForOneOf( + source, LifecycleState.START_OR_ERROR)); + Assert.assertEquals("Server is started", LifecycleState.START, + source.getLifecycleState()); + + AvroSourceProtocol client; + NettyTransceiver nettyTransceiver; + + if (testWithSSL) { + nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort), new SSLChannelFactory()); + client = SpecificRequestor.getClient( + AvroSourceProtocol.class, nettyTransceiver ); + } else { + nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort)); + client = SpecificRequestor.getClient( + AvroSourceProtocol.class, nettyTransceiver); + } + + AvroFlumeEvent avroEvent = new AvroFlumeEvent(); + avroEvent.setHeaders(new HashMap()); + avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes())); + + try { + logger.info("Client about to append"); + Status status = client.append(avroEvent); + logger.info("Client appended"); + Assert.assertEquals(Status.OK, status); + } catch(IOException e) { + Assert.assertTrue("Should have been Allowed:" + ruleDefinition, !eventShouldBeAllowed); + return; + } + Assert.assertTrue("Should have been denied:" + ruleDefinition, eventShouldBeAllowed); + + + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertEquals("Channel contained our event", "Hello avro ipFilter", + new String(event.getBody())); + transaction.commit(); + transaction.close(); + + logger.debug("Round trip event:{}", event); + + nettyTransceiver.close(); + source.stop(); + Assert.assertTrue("Reached stop or error", + LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + + + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/bb7fb11b/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index bbfb5d0..dac3ce7 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -705,6 +705,8 @@ ssl false Set this to true to enable SSL encryption. You keystore -- This is the path to a Java keystore file. Required for SSL. keystore-password -- The password for the Java keystore. Required for SSL. keystore-type JKS The type of the Java keystore. This can be "JKS" or "PKCS12". +ipFilter false Set this to true to enable ipFiltering for netty +ipFilter.rules -- Define N netty ipFilter pattern rules with this config. ================== =========== =================================================== Example for agent named a1: @@ -718,6 +720,21 @@ Example for agent named a1: a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 +Example of ipFilter.rules + +ipFilter.rules defines N netty ipFilters separated by a comma a pattern rule must be in this format. + +<'allow' or deny>:<'ip' or 'name' for computer name>: +or +allow/deny:ip/name:pattern + +example: ipFilter.rules=allow:ip:127.*,allow:name:localhost,deny:ip:* + +Note that the first rule to match will apply as the example below shows from a client on the localhost + +This will Allow the client on localhost be deny clients from any other ip "allow:name:localhost,deny:ip:*" +This will deny the client on localhost be allow clients from any other ip "deny:name:localhost,allow:ip:*" + Thrift Source ~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/bb7fb11b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 25ea4e7..8b36402 100644 --- a/pom.xml +++ b/pom.xml @@ -1049,7 +1049,7 @@ limitations under the License. io.netty netty - 3.4.0.Final + 3.5.12.Final