flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2130. Handle larger payloads via SyslogUDPSource
Date Fri, 07 Feb 2014 20:33:38 GMT
Updated Branches:
  refs/heads/trunk e30cbd544 -> e07a0a688


FLUME-2130. Handle larger payloads via SyslogUDPSource

(Ashish Paliwal via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e07a0a68
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e07a0a68
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e07a0a68

Branch: refs/heads/trunk
Commit: e07a0a6883c84836e618d187c1381d47a26bfc71
Parents: e30cbd5
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Feb 7 12:32:56 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Feb 7 12:32:56 2014 -0800

----------------------------------------------------------------------
 .../apache/flume/source/SyslogUDPSource.java    | 19 ++++---
 .../flume/source/TestSyslogUdpSource.java       | 52 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e07a0a68/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 8fb251b..01b8905 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -35,13 +35,7 @@ import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.SyslogUtils;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
 
 import org.slf4j.Logger;
@@ -61,8 +55,14 @@ public class SyslogUDPSource extends AbstractSource
       .getLogger(SyslogUDPSource.class);
 
   private CounterGroup counterGroup = new CounterGroup();
+
+  // Default Min size
+  public static final int DEFAULT_MIN_SIZE = 2048;
+  public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE;
+
   public class syslogHandler extends SimpleChannelHandler {
-    private SyslogUtils syslogUtils = new SyslogUtils(true);
+    private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE,
+      SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, true);
 
     public void setFormater(Map<String, String> prop) {
       syslogUtils.addFormats(prop);
@@ -98,6 +98,9 @@ public class SyslogUDPSource extends AbstractSource
     final syslogHandler handler = new syslogHandler();
     handler.setFormater(formaterProp);
     handler.setKeepFields(keepFields);
+    serverBootstrap.setOption("receiveBufferSizePredictorFactory",
+      new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE,
+        DEFAULT_INITIAL_SIZE, maxsize));
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {

http://git-wip-us.apache.org/repos/asf/flume/blob/e07a0a68/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index 36f6479..95ee48c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -25,6 +25,7 @@ import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.DatagramSocket;
 import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
@@ -124,6 +125,49 @@ public class TestSyslogUdpSource {
   }
 
   @Test
+  public void testLargePayload() throws Exception {
+    init(true);
+    source.start();
+    // Write some message to the syslog port
+
+    byte[] largePayload = getPayload(1000).getBytes();
+
+    DatagramSocket syslogSocket;
+    DatagramPacket datagramPacket;
+    datagramPacket = new DatagramPacket(largePayload,
+            1000,
+            InetAddress.getLocalHost(), source.getSourcePort());
+    for (int i = 0; i < 10 ; i++) {
+      syslogSocket = new DatagramSocket();
+      syslogSocket.send(datagramPacket);
+      syslogSocket.close();
+    }
+
+    List<Event> channelEvents = new ArrayList<Event>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 10; i++) {
+      Event e = channel.take();
+      Assert.assertNotNull(e);
+      channelEvents.add(e);
+    }
+
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+
+    source.stop();
+    for (Event e : channelEvents) {
+      Assert.assertNotNull(e);
+      Assert.assertArrayEquals(largePayload, e.getBody());
+    }
+  }
+
+  @Test
   public void testKeepFields() throws IOException {
     runKeepFieldsTest(true);
   }
@@ -132,5 +176,13 @@ public class TestSyslogUdpSource {
   public void testRemoveFields() throws IOException {
     runKeepFieldsTest(false);
   }
+
+  private String getPayload(int length) {
+    StringBuilder payload = new StringBuilder(length);
+    for (int n = 0; n < length; ++n) {
+      payload.append("x");
+    }
+    return payload.toString();
+  }
 }
 


Mime
View raw message