flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2198. Avro Source should disable itself if ipFilterRules contains invalid rules
Date Sat, 28 Sep 2013 04:49:45 GMT
Updated Branches:
  refs/heads/trunk ffa706429 -> 49933493f


FLUME-2198. Avro Source should disable itself if ipFilterRules contains invalid rules

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/49933493
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/49933493
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/49933493

Branch: refs/heads/trunk
Commit: 49933493f53633fec5846edf88aca4dd0dfdd52a
Parents: ffa7064
Author: Mike Percy <mpercy@cloudera.com>
Authored: Fri Sep 27 21:48:05 2013 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Fri Sep 27 21:48:05 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     | 125 +++++++--------
 .../org/apache/flume/source/TestAvroSource.java | 151 +++++++++++--------
 2 files changed, 141 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/49933493/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index f6e4cfe..c1ee3a9 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -59,6 +59,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.jboss.netty.handler.ipfilter.IpFilterRule;
 import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
 import org.jboss.netty.handler.ipfilter.PatternRule;
 import org.jboss.netty.handler.ssl.SslHandler;
@@ -153,6 +154,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private int maxThreads;
   private ScheduledExecutorService connectionCountUpdater;
 
+  private List<IpFilterRule> rules;
+
   @Override
   public void configure(Context context) {
     Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY);
@@ -191,11 +194,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     if (enableIpFilter) {
       patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
       if (patternRuleConfigDefinition == null ||
-        patternRuleConfigDefinition.isEmpty()) {
+        patternRuleConfigDefinition.trim().isEmpty()) {
         throw new FlumeException(
           "ipFilter is configured with true but ipFilterRules is not defined:" +
             " ");
       }
+      String[] patternRuleDefinitions = patternRuleConfigDefinition.split(
+        ",");
+      rules = new ArrayList<IpFilterRule>(patternRuleDefinitions.length);
+      for (String patternRuleDefinition : patternRuleDefinitions) {
+        rules.add(generateRule(patternRuleDefinition));
+      }
     }
 
     if (sourceCounter == null) {
@@ -369,11 +378,53 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     return Status.OK;
   }
 
+  private PatternRule generateRule(
+    String patternRuleDefinition) throws FlumeException {
+    patternRuleDefinition = patternRuleDefinition.trim();
+    //first validate the format
+    int firstColonIndex = patternRuleDefinition.indexOf(":");
+    if (firstColonIndex == -1) {
+      throw new FlumeException(
+        "Invalid ipFilter patternRule '" + patternRuleDefinition +
+          "' should look like <'allow'  or 'deny'>:<'ip' or " +
+          "'name'>:<pattern>");
+    } else {
+      String ruleAccessFlag = patternRuleDefinition.substring(0,
+        firstColonIndex);
+      int secondColonIndex = patternRuleDefinition.indexOf(":",
+        firstColonIndex + 1);
+      if ((!ruleAccessFlag.equals("allow") &&
+        !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {
+        throw new FlumeException(
+          "Invalid ipFilter patternRule '" + patternRuleDefinition +
+            "' should look like <'allow'  or 'deny'>:<'ip' or " +
+            "'name'>:<pattern>");
+      }
+
+      String patternTypeFlag = patternRuleDefinition.substring(
+        firstColonIndex + 1, secondColonIndex);
+      if ((!patternTypeFlag.equals("ip") &&
+        !patternTypeFlag.equals("name"))) {
+        throw new FlumeException(
+          "Invalid ipFilter patternRule '" + patternRuleDefinition +
+            "' should look like <'allow'  or 'deny'>:<'ip' or " +
+            "'name'>:<pattern>");
+      }
+
+      boolean isAllow = ruleAccessFlag.equals("allow");
+      String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n")
+        + ":" + patternRuleDefinition.substring(secondColonIndex + 1);
+      logger.info("Adding ipFilter PatternRule: "
+        + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
+      return new PatternRule(isAllow, patternRuleString);
+    }
+  }
+
   /**
    * Factory of SSL-enabled server worker channel pipelines
    * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
    */
-  private static class AdvancedChannelPipelineFactory
+  private class AdvancedChannelPipelineFactory
       implements ChannelPipelineFactory {
 
     private boolean enableCompression;
@@ -448,23 +499,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
         logger.info("Setting up ipFilter with the following rule definition: " +
           patternRuleConfigDefinition);
         IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
-
-        if (patternRuleConfigDefinition != null &&
-          !patternRuleConfigDefinition.isEmpty()) {
-          String[] patternRuleDefinitions = patternRuleConfigDefinition.split(
-            ",");
-          for (String patternRuleDefinition : patternRuleDefinitions) {
-
-            PatternRule patternRule
-              = PatternRuleBuilder.withConfigRuleDefinition(
-              patternRuleDefinition);
-
-            if (patternRule != null) {
-              ipFilterHandler.add(patternRule);
-            }
-          }
-        }
-
+        ipFilterHandler.addAll(rules);
         logger.info(
           "Adding ipFilter with " + ipFilterHandler.size() + " rules");
 
@@ -473,57 +508,5 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
       return pipeline;
     }
-
-    public static class PatternRuleBuilder {
-      public static PatternRule withConfigRuleDefinition(
-        String patternRuleDefinition) throws FlumeException {
-        patternRuleDefinition = patternRuleDefinition.trim();
-        //first validation the format
-
-        int firstColonIndex = patternRuleDefinition.indexOf(":");
-        if (firstColonIndex == -1) {
-          logger.error(
-            "Invalid ipFilter patternRule '" + patternRuleDefinition +
-              "' should look like <'allow'  or 'deny'>:<'ip' or " +
-              "'name'>:<pattern>");
-          return null;
-        } else {
-
-          String ruleAccessFlag = patternRuleDefinition.substring(0,
-            firstColonIndex);
-          int secondColonIndex = patternRuleDefinition.indexOf(":",
-            firstColonIndex + 1);
-          if ((!ruleAccessFlag.equals("allow") &&
-            !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {
-            logger.error(
-              "Invalid ipFilter patternRule '" + patternRuleDefinition +
-                "' should look like <'allow'  or 'deny'>:<'ip' or " +
-                "'name'>:<pattern>");
-            return null;
-          }
-
-          String patternTypeFlag = patternRuleDefinition.substring(
-            firstColonIndex + 1, secondColonIndex);
-          if ((!patternTypeFlag.equals("ip") &&
-            !patternTypeFlag.equals("name"))) {
-            logger.error(
-              "Invalid ipFilter patternRule '" + patternRuleDefinition +
-                "' should look like <'allow'  or 'deny'>:<'ip' or " +
-                "'name'>:<pattern>");
-            return null;
-          }
-
-          boolean isAllow = ruleAccessFlag.equals("allow");
-          String patternRuleString =
-            (patternTypeFlag.equals("ip") ? "i" : "n") + ":" +
-              patternRuleDefinition.substring(secondColonIndex + 1);
-          logger.info("Adding ipFilter PatternRule: "
-            + (isAllow ? "Allow" : "deny") +
-            " " + patternRuleString);
-          return new PatternRule(isAllow, patternRuleString);
-        }
-      }
-    }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/49933493/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index e208fff..c75d098 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -21,13 +21,14 @@ package org.apache.flume.source;
 
 import java.io.IOException;
 import java.net.Inet4Address;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -72,9 +73,11 @@ public class TestAvroSource {
   private int selectedPort;
   private AvroSource source;
   private Channel channel;
+  private InetAddress localhost;
 
   @Before
-  public void setUp() {
+  public void setUp() throws UnknownHostException {
+    localhost = InetAddress.getByName("127.0.0.1");
     source = new AvroSource();
     channel = new MemoryChannel();
 
@@ -383,65 +386,84 @@ public class TestAvroSource {
   }
 
   @Test
-  public void testValidIpFilterAllows() throws InterruptedException, IOException {
-
-    doIpFilterTest("allow:name:localhost,deny:ip:*", true, false);
-    doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",deny:ip:*",
true, false);
-    doIpFilterTest("allow:ip:*", true, false);
-    doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,deny:ip:*", true, false);
-    doIpFilterTest("allow:ip:127.0.0.2,allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,deny:ip:*", true, false);
-
-    doIpFilterTest("allow:name:localhost,deny:ip:*", true, true);
-    doIpFilterTest("allow:ip:*", true, true);
-
+  public void testValidIpFilterAllows()
+      throws InterruptedException, IOException {
+    doIpFilterTest(localhost, "allow:name:localhost,deny:ip:*", true, false);
+    doIpFilterTest(localhost, "allow:ip:" + localhost.getHostAddress() +
+        ",deny:ip:*", true, false);
+    doIpFilterTest(localhost, "allow:ip:*", true, false);
+    doIpFilterTest(localhost, "allow:ip:" +
+        localhost.getHostAddress().substring(0, 3) +
+        "*,deny:ip:*", true, false);
+    doIpFilterTest(localhost, "allow:ip:127.0.0.2,allow:ip:" +
+        localhost.getHostAddress().substring(0, 3) +
+        "*,deny:ip:*", true, false);
+    doIpFilterTest(localhost, "allow:name:localhost,deny:ip:*", true, true);
+    doIpFilterTest(localhost, "allow:ip:*", true, true);
   }
 
   @Test
-  public void testValidIpFilterDenys() throws InterruptedException, IOException {
-
-    doIpFilterTest("deny:ip:*", false, false);
-    doIpFilterTest("deny:name:localhost", false, false);
-    doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",allow:ip:*",
false, false);
-    doIpFilterTest("deny:ip:*", false, false);
-    doIpFilterTest("allow:ip:45.2.2.2,deny:ip:*", false, false);
-    doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,allow:ip:*", false, false);
-
-
-    doIpFilterTest("deny:ip:*", false, true);
+  public void testValidIpFilterDenys()
+      throws InterruptedException, IOException {
+    doIpFilterTest(localhost, "deny:ip:*", false, false);
+    doIpFilterTest(localhost, "deny:name:localhost", false, false);
+    doIpFilterTest(localhost, "deny:ip:" + localhost.getHostAddress() +
+        ",allow:ip:*", false, false);
+    doIpFilterTest(localhost, "deny:ip:*", false, false);
+    doIpFilterTest(localhost, "allow:ip:45.2.2.2,deny:ip:*", false, false);
+    doIpFilterTest(localhost, "deny:ip:" +
+        localhost.getHostAddress().substring(0, 3) +
+        "*,allow:ip:*", false, false);
+    doIpFilterTest(localhost, "deny:ip:*", false, true);
   }
 
   @Test
   public void testInvalidIpFilter() throws InterruptedException, IOException {
-
-    doIpFilterTest("deny:ip?*", true, false);
-    doIpFilterTest("deny?name:localhost", true, false);
-    doIpFilterTest("deny:ip:127.0.0.2,allow:ip?*,deny:ip:" + Inet4Address.getLocalHost().getHostAddress()
+ "", false, false);
-    doIpFilterTest("deny:*", true, false);
-    doIpFilterTest("deny:id:" + Inet4Address.getLocalHost().getHostAddress().substring(0,
3) + "*,allow:ip:*", true, false);
+    doIpFilterTest(localhost, "deny:ip:*", false, false);
+    doIpFilterTest(localhost, "allow:name:localhost", true, false);
+    doIpFilterTest(localhost, "deny:ip:127.0.0.2,allow:ip:*,deny:ip:" +
+        localhost.getHostAddress(), true, false);
+    doIpFilterTest(localhost, "deny:ip:" +
+        localhost.getHostAddress().substring(0, 3) + "*,allow:ip:*",
+        false, false);
     try {
-      doIpFilterTest(null, true, false);
-      Assert.fail("The null ipFilterRules config should had thrown an exception.");
+      doIpFilterTest(localhost, null, false, false);
+      Assert.fail(
+        "The null ipFilterRules config should have thrown an exception.");
     } catch (FlumeException e) {
       //Do nothing
     }
 
-    try{
-      doIpFilterTest("", true, false);
-      Assert.fail("The empty string ipFilterRules config should had thrown an exception.");
-    }  catch (FlumeException e) {
+    try {
+      doIpFilterTest(localhost, "", true, false);
+      Assert.fail("The empty string ipFilterRules config should have thrown "
+          + "an exception");
+    } catch (FlumeException e) {
       //Do nothing
     }
 
-
+    try {
+      doIpFilterTest(localhost, "homer:ip:45.4.23.1", true, false);
+      Assert.fail("Bad ipFilterRules config should have thrown an exception.");
+    } catch (FlumeException e) {
+      //Do nothing
+    }
+    try {
+      doIpFilterTest(localhost, "allow:sleeps:45.4.23.1", true, false);
+      Assert.fail("Bad ipFilterRules config should have thrown an exception.");
+    } catch (FlumeException e) {
+      //Do nothing
+    }
   }
 
-  public void doIpFilterTest(String ruleDefinition, boolean eventShouldBeAllowed, boolean
testWithSSL) throws InterruptedException, IOException {
+  public void doIpFilterTest(InetAddress dest, String ruleDefinition,
+      boolean eventShouldBeAllowed, boolean testWithSSL)
+      throws InterruptedException, IOException {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
       try {
         Context context = new Context();
-
         context.put("port", String.valueOf(selectedPort = 41414 + i));
         context.put("bind", "0.0.0.0");
         context.put("ipFilter", "true");
@@ -476,34 +498,41 @@ public class TestAvroSource {
         source.getLifecycleState());
 
     AvroSourceProtocol client;
-    NettyTransceiver nettyTransceiver;
-
-    if (testWithSSL) {
-      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort), new SSLChannelFactory());
-      client = SpecificRequestor.getClient(
-          AvroSourceProtocol.class, nettyTransceiver );
-    } else {
-      nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort));
-      client = SpecificRequestor.getClient(
+    NettyTransceiver nettyTransceiver = null;
+    try {
+      if (testWithSSL) {
+        nettyTransceiver = new NettyTransceiver(
+          new InetSocketAddress (dest, selectedPort),
+          new SSLChannelFactory());
+        client = SpecificRequestor.getClient(
           AvroSourceProtocol.class, nettyTransceiver);
-    }
+      } else {
+        nettyTransceiver = new NettyTransceiver(
+          new InetSocketAddress (dest, selectedPort));
+        client = SpecificRequestor.getClient(
+          AvroSourceProtocol.class, nettyTransceiver);
+      }
 
-    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
-    avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes()));
+      AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+      avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+      avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes()));
 
-    try {
       logger.info("Client about to append");
       Status status = client.append(avroEvent);
       logger.info("Client appended");
       Assert.assertEquals(Status.OK, status);
-    } catch(IOException e) {
-      Assert.assertTrue("Should have been Allowed:" + ruleDefinition, !eventShouldBeAllowed);
+    } catch (IOException e) {
+      Assert.assertTrue("Should have been allowed: " + ruleDefinition,
+        !eventShouldBeAllowed);
       return;
+    } finally {
+      if (nettyTransceiver != null) {
+        nettyTransceiver.close();
+      }
+      source.stop();
     }
-    Assert.assertTrue("Should have been denied:" + ruleDefinition, eventShouldBeAllowed);
-
-
+    Assert.assertTrue("Should have been denied: " + ruleDefinition,
+        eventShouldBeAllowed);
 
     Transaction transaction = channel.getTransaction();
     transaction.begin();
@@ -514,17 +543,11 @@ public class TestAvroSource {
         new String(event.getBody()));
     transaction.commit();
     transaction.close();
-
     logger.debug("Round trip event:{}", event);
 
-    nettyTransceiver.close();
-    source.stop();
     Assert.assertTrue("Reached stop or error",
         LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
     Assert.assertEquals("Server is stopped", LifecycleState.STOP,
         source.getLifecycleState());
-
-
   }
-
 }


Mime
View raw message