flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2217. Add option to preserve all Syslog headers in syslog sources
Date Tue, 10 Dec 2013 23:44:38 GMT
Updated Branches:
  refs/heads/trunk 67454a71a -> 9790ca758


FLUME-2217. Add option to preserve all Syslog headers in syslog sources

(Jeff Lord via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9790ca75
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9790ca75
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9790ca75

Branch: refs/heads/trunk
Commit: 9790ca7587060285efa4ae64591cea17dd3f00cf
Parents: 67454a7
Author: Mike Percy <mpercy@cloudera.com>
Authored: Tue Dec 10 14:38:06 2013 -0800
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Tue Dec 10 14:56:07 2013 -0800

----------------------------------------------------------------------
 .../flume/source/MultiportSyslogTCPSource.java  |  15 ++-
 .../org/apache/flume/source/SyslogParser.java   |  11 +-
 .../apache/flume/source/SyslogTcpSource.java    |   4 +-
 .../apache/flume/source/SyslogUDPSource.java    |  19 ++++
 .../org/apache/flume/source/SyslogUtils.java    |  40 +++----
 .../source/TestMultiportSyslogTCPSource.java    |   9 +-
 .../apache/flume/source/TestSyslogParser.java   |  16 ++-
 .../flume/source/TestSyslogTcpSource.java       |  14 +--
 .../flume/source/TestSyslogUdpSource.java       | 113 ++++++++++++-------
 .../apache/flume/source/TestSyslogUtils.java    |  25 ++--
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   6 +-
 11 files changed, 176 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 884fd62..427e0e3 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -67,6 +67,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
   private SourceCounter sourceCounter = null;
   private Charset defaultCharset;
   private ThreadSafeDecoder defaultDecoder;
+  private boolean keepFields;
 
   public MultiportSyslogTCPSource() {
     portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
@@ -138,6 +139,10 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
         SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
         SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE);
 
+    keepFields = context.getBoolean(
+        SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+        SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS);
+
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -159,7 +164,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
 
     acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
         getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
-        portCharsets));
+        portCharsets, keepFields));
 
     for (int port : ports) {
       InetSocketAddress addr;
@@ -213,11 +218,12 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     private final LineSplitter lineSplitter;
     private final ThreadSafeDecoder defaultDecoder;
     private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
+    private final boolean keepFields;
 
     public MultiportSyslogHandler(int maxEventSize, int batchSize,
         ChannelProcessor cp, SourceCounter ctr, String portHeader,
         ThreadSafeDecoder defaultDecoder,
-        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets) {
+        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets, boolean keepFields)
{
       channelProcessor = cp;
       sourceCounter = ctr;
       this.maxEventSize = maxEventSize;
@@ -225,6 +231,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
       this.portHeader = portHeader;
       this.defaultDecoder = defaultDecoder;
       this.portCharsets = portCharsets;
+      this.keepFields = keepFields;
       syslogParser = new SyslogParser();
       lineSplitter = new LineSplitter(maxEventSize);
     }
@@ -321,7 +328,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     /**
      * Decodes a syslog-formatted ParsedLine into a Flume Event.
      * @param parsedBuf Buffer containing characters to be parsed
-     * @param port Incoming port
+     * @param decoder Character set is configurable on a per-port basis.
      * @return
      */
     Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) {
@@ -351,7 +358,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
 
       Event event;
       try {
-        event = syslogParser.parseMessage(msg, decoder.charset());
+        event = syslogParser.parseMessage(msg, decoder.charset(), keepFields);
         if (parsedBuf.incomplete) {
           event.getHeaders().put(SyslogUtils.EVENT_STATUS,
               SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus());

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index bf3305c..557d121 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.event.EventBuilder;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
@@ -40,6 +42,8 @@ import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class SyslogParser {
 
   private static final Logger logger =
@@ -53,7 +57,6 @@ public class SyslogParser {
   private static final String timePat = "yyyy-MM-dd'T'HH:mm:ss";
   private static final int RFC3164_LEN = 15;
   private static final int RFC5424_PREFIX_LEN = 19;
-
   private final DateTimeFormatter timeParser;
 
   private Cache<String, Long> timestampCache;
@@ -76,7 +79,7 @@ public class SyslogParser {
    * @return Parsed Flume Event
    * @throws IllegalArgumentException if unable to successfully parse message
    */
-  public Event parseMessage(String msg, Charset charset) {
+  public Event parseMessage(String msg, Charset charset, boolean keepFields) {
     Map<String, String> headers = Maps.newHashMap();
 
     int msgLen = msg.length();
@@ -164,9 +167,11 @@ public class SyslogParser {
 
     // EventBuilder will do a copy of its own, so no defensive copy of the body
     String data = "";
-    if (msgLen > nextSpace + 1) {
+    if (msgLen > nextSpace + 1 && !keepFields) {
       curPos = nextSpace + 1;
       data = msg.substring(curPos);
+    } else {
+      data = msg;
     }
 
     Event event = EventBuilder.withBody(data, charset, headers);

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index 7a12d27..e84e4b6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -68,8 +68,8 @@ implements EventDrivenSource, Configurable {
       syslogUtils.setEventSize(eventSize);
     }
 
-    public void setKeepFields(boolean removeFields){
-      syslogUtils.setKeepFields(removeFields);
+    public void setKeepFields(boolean keepFields){
+      syslogUtils.setKeepFields(keepFields);
     }
 
     public void setFormater(Map<String, String> prop) {

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 96a9e85..8fb251b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -19,10 +19,12 @@
 package org.apache.flume.source;
 
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -53,6 +55,7 @@ public class SyslogUDPSource extends AbstractSource
   private String host = null;
   private Channel nettyChannel;
   private Map<String, String> formaterProp;
+  private boolean keepFields;
 
   private static final Logger logger = LoggerFactory
       .getLogger(SyslogUDPSource.class);
@@ -65,6 +68,10 @@ public class SyslogUDPSource extends AbstractSource
       syslogUtils.addFormats(prop);
     }
 
+    public void setKeepFields(boolean keepFields) {
+      syslogUtils.setKeepFields(keepFields);
+    }
+
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
       try {
@@ -90,6 +97,7 @@ public class SyslogUDPSource extends AbstractSource
         (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
     final syslogHandler handler = new syslogHandler();
     handler.setFormater(formaterProp);
+    handler.setKeepFields(keepFields);
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
@@ -132,6 +140,17 @@ public class SyslogUDPSource extends AbstractSource
     host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
     formaterProp = context.getSubProperties(
         SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
+    keepFields = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+      SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS);
   }
 
+  @VisibleForTesting
+  public int getSourcePort() {
+    SocketAddress localAddress = nettyChannel.getLocalAddress();
+    if (localAddress instanceof InetSocketAddress) {
+      InetSocketAddress addr = (InetSocketAddress) localAddress;
+      return addr.getPort();
+    }
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index f2ea932..a77bfc9 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -49,15 +48,18 @@ public class SyslogUtils {
   final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
 
   final public static String SYSLOG_MSG_RFC5424_0 =
-      "(?:\\d\\s)?" +// version
-  // yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp)
-      "(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)"
+ // stamp
+      "(?:\\<\\d{1,3}\\>\\d?\\s?)" + // priority
+      /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
+      "(?:" +
+        "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
+        "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
       "\\s" + // separator
       "(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null)
       "\\s" + // separator
       "(.*)$"; // body
 
   final public static String SYSLOG_MSG_RFC3164_0 =
+      "(?:\\<\\d{1,3}\\>\\d?\\s?)" +
       // stamp MMM d HH:mm:ss, single digit date has two spaces
       "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
       "\\s" + // separator
@@ -225,8 +227,13 @@ public class SyslogUtils {
       headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
     }
 
-    if ((msgBody != null) && (msgBody.length() > 0) && !keepFields) {
-      body = msgBody.getBytes();
+    if (!keepFields) {
+      if ((msgBody != null) && (msgBody.length() > 0)) {
+        body = msgBody.getBytes();
+      } else {
+        // Parse failed.
+        body = baos.toByteArray();
+      }
     } else {
       body = baos.toByteArray();
     }
@@ -311,14 +318,15 @@ public class SyslogUtils {
         switch (m) {
         case START:
           if (b == '<') {
+            baos.write(b);
             m = Mode.PRIO;
           } else if(b == '\n'){
-          //If the character is \n, it was because the last event was exactly
-          //as long  as the maximum size allowed and
-          //the only remaining character was the delimiter - '\n', or
-          //multiple delimiters were sent in a row.
-          //Just ignore it, and move forward, don't change the mode.
-          //This is a no-op, just ignore it.
+            //If the character is \n, it was because the last event was exactly
+            //as long  as the maximum size allowed and
+            //the only remaining character was the delimiter - '\n', or
+            //multiple delimiters were sent in a row.
+            //Just ignore it, and move forward, don't change the mode.
+            //This is a no-op, just ignore it.
             logger.debug("Delimiter found while in START mode, ignoring..");
 
           } else {
@@ -329,6 +337,7 @@ public class SyslogUtils {
           }
           break;
         case PRIO:
+          baos.write(b);
           if (b == '>') {
             m = Mode.DATA;
           } else {
@@ -336,9 +345,6 @@ public class SyslogUtils {
             prio.append(ch);
             if (!Character.isDigit(ch)) {
               isBadEvent = true;
-              //Append the priority to baos:
-              String badPrio = "<"+ prio;
-              baos.write(badPrio.getBytes());
               //If we hit a bad priority, just write as if everything is data.
               m = Mode.DATA;
             }
@@ -367,10 +373,6 @@ public class SyslogUtils {
         doneReading = true;
         e = buildEvent();
       }
-    //} catch (IndexOutOfBoundsException eF) {
-    //    e = buildEvent(prio, baos);
-    } catch (IOException e1) {
-      //no op
     } finally {
       // no-op
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 680e592..9b97c8c 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -70,6 +70,7 @@ public class TestMultiportSyslogTCPSource {
   private final String stamp1 = time.toString();
   private final String host1 = "localhost.localdomain";
   private final String data1 = "proc1 - some msg";
+  private final static boolean KEEP_FIELDS = false;
 
   /**
    * Helper function to generate a syslog message.
@@ -205,7 +206,8 @@ public class TestMultiportSyslogTCPSource {
         new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null,
         null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER,
         new ThreadSafeDecoder(Charsets.UTF_8),
-        new ConcurrentHashMap<Integer, ThreadSafeDecoder>());
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
+        KEEP_FIELDS);
 
     Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder());
     String body = new String(event.getBody(), Charsets.UTF_8);
@@ -231,7 +233,8 @@ public class TestMultiportSyslogTCPSource {
         1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
         new SourceCounter("test"), "port",
         new ThreadSafeDecoder(Charsets.UTF_8),
-        new ConcurrentHashMap<Integer, ThreadSafeDecoder>());
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
+        KEEP_FIELDS);
 
     ParsedBuffer parsedBuf = new ParsedBuffer();
     parsedBuf.incomplete = false;
@@ -331,7 +334,7 @@ public class TestMultiportSyslogTCPSource {
     // defaults to UTF-8
     MultiportSyslogHandler handler = new MultiportSyslogHandler(
         1000, 10, chanProc, new SourceCounter("test"), "port",
-        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets);
+        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, KEEP_FIELDS);
 
     // initialize buffers
     handler.sessionCreated(session1);

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
index 258c2f1..2809163 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
@@ -29,7 +29,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class TestSyslogParser {
-
   @Test
   public void testRfc5424DateParsing() {
     final String[] examples = {
@@ -55,7 +54,7 @@ public class TestSyslogParser {
     Charset charset = Charsets.UTF_8;
     List<String> messages = Lists.newArrayList();
 
-    // supported examples from RFC 3161
+    // supported examples from RFC 3164
     messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
         "lonvick on /dev/pts/8");
     messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
@@ -76,8 +75,19 @@ public class TestSyslogParser {
     messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
     messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
 
+    // test with default keepFields = false
+    for (String msg : messages) {
+      boolean keepFields = false;
+      Event event = parser.parseMessage(msg, charset, keepFields);
+      Assert.assertNull("Failure to parse known-good syslog message",
+        event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+    }
+
+    // test that priority, timestamp and hostname are preserved in event body
     for (String msg : messages) {
-      Event event = parser.parseMessage(msg, charset);
+      boolean keepFields = true;
+      Event event = parser.parseMessage(msg, charset, keepFields);
+      Assert.assertArrayEquals(event.getBody(), msg.getBytes());
       Assert.assertNull("Failure to parse known-good syslog message",
           event.getHeaders().get(SyslogUtils.EVENT_STATUS));
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index a6a1d5b..22fa200 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -49,13 +49,8 @@ public class TestSyslogTcpSource {
   private final String stamp1 = time.toString();
   private final String host1 = "localhost.localdomain";
   private final String data1 = "test syslog data";
-  private final String bodyWithTandH = stamp1 + " " + host1 + " " + data1;
-  // Helper function to generate a syslog message.
-  private byte[] getEvent() {
-    // timestamp with 'Z' appended, translates to UTC
-    final String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
-    return msg1.getBytes();
-  }
+  private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " +
+      data1 + "\n";
 
   private void init(boolean keepFields){
     source = new SyslogTcpSource();
@@ -87,7 +82,7 @@ public class TestSyslogTcpSource {
     for (int i = 0; i < 10 ; i++) {
       syslogSocket = new Socket(
         InetAddress.getLocalHost(), source.getSourcePort());
-      syslogSocket.getOutputStream().write(getEvent());
+      syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
       syslogSocket.close();
     }
 
@@ -116,7 +111,8 @@ public class TestSyslogTcpSource {
       String str = new String(e.getBody(), Charsets.UTF_8);
       logger.info(str);
       if (keepFields) {
-        Assert.assertArrayEquals(bodyWithTandH.getBytes(), e.getBody());
+        Assert.assertArrayEquals(bodyWithTandH.trim().getBytes(),
+          e.getBody());
       } else if (!keepFields) {
         Assert.assertArrayEquals(data1.getBytes(), e.getBody());
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index eae26ed..36f6479 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -20,15 +20,11 @@ package org.apache.flume.source;
 
 import java.util.ArrayList;
 import java.util.List;
-
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.DatagramSocket;
 import com.google.common.base.Charsets;
-import org.apache.log4j.Logger;
-import org.apache.log4j.net.SyslogAppender;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
@@ -38,16 +34,27 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.SyslogUtils;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
 
 public class TestSyslogUdpSource {
+  private static final org.slf4j.Logger logger =
+    LoggerFactory.getLogger(TestSyslogUdpSource.class);
   private SyslogUDPSource source;
   private Channel channel;
-  private static final int TEST_SYSLOG_PORT = 14455;
-
-  @Before
-  public void setUp() {
-    source = new SyslogUDPSource(); //SyslogTcpSource();
+  private static final int TEST_SYSLOG_PORT = 0;
+  private final DateTime time = new DateTime();
+  private final String stamp1 = time.toString();
+  private final String host1 = "localhost.localdomain";
+  private final String data1 = "test UDP syslog data";
+  private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " +
+      data1;
+
+  private void init(boolean keepFields) {
+    source = new SyslogUDPSource();
     channel = new MemoryChannel();
 
     Configurables.configure(channel, new Context());
@@ -61,49 +68,69 @@ public class TestSyslogUdpSource {
     source.setChannelProcessor(new ChannelProcessor(rcs));
     Context context = new Context();
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
+    context.put("keepFields", String.valueOf(keepFields));
+
     source.configure(context);
+
   }
 
-  @Test
-  public void testAppend() throws InterruptedException {
-    Logger logger = Logger.getLogger(getClass());
-    // use the Apache syslog appender to write to syslog source
-    SyslogAppender appender = new SyslogAppender(null,
-        "localhost:"+TEST_SYSLOG_PORT, SyslogAppender.LOG_FTP);
-    logger.addAppender(appender);
-    Event e = null;
-    Event e2 = null;
+  /** Tests the keepFields configuration parameter (enabled or disabled)
+   using SyslogUDPSource.*/
 
+  private void runKeepFieldsTest(boolean keepFields) throws IOException {
+    init(keepFields);
     source.start();
+    // Write some message to the syslog port
+    DatagramSocket syslogSocket;
+    DatagramPacket datagramPacket;
+    datagramPacket = new DatagramPacket(bodyWithTandH.getBytes(),
+      bodyWithTandH.getBytes().length,
+      InetAddress.getLocalHost(), source.getSourcePort());
+    for (int i = 0; i < 10 ; i++) {
+      syslogSocket = new DatagramSocket();
+      syslogSocket.send(datagramPacket);
+      syslogSocket.close();
+    }
 
-    // write to syslog
-    logger.info("test flume syslog");
-    logger.info("");
-
+    List<Event> channelEvents = new ArrayList<Event>();
     Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 10; i++) {
+      Event e = channel.take();
+      Assert.assertNotNull(e);
+      channelEvents.add(e);
+    }
+
     try {
-      txn.begin();
-      e = channel.take();
-      e2 = channel.take();
       txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
     } finally {
       txn.close();
     }
 
     source.stop();
-    logger.removeAppender(appender);
-
-    String str = new String(e.getBody(), Charsets.UTF_8);
-    logger.info(str);
-    Assert.assertNotNull(e);
-    Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
-        e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));
-    Assert.assertArrayEquals(e.getBody(), "test flume syslog".getBytes());
-
-    Assert.assertNotNull(e2);
-    Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
-        e2.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));
-    Assert.assertArrayEquals(e2.getBody(), "".getBytes());
+    for (Event e : channelEvents) {
+      Assert.assertNotNull(e);
+      String str = new String(e.getBody(), Charsets.UTF_8);
+      logger.info(str);
+      if (keepFields) {
+        Assert.assertArrayEquals(bodyWithTandH.getBytes(),
+          e.getBody());
+      } else if (!keepFields) {
+        Assert.assertArrayEquals(data1.getBytes(), e.getBody());
+      }
+    }
   }
 
+  @Test
+  public void testKeepFields() throws IOException {
+    runKeepFieldsTest(true);
+  }
+
+  @Test
+  public void testRemoveFields() throws IOException {
+    runKeepFieldsTest(false);
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 898096b..82b7dd0 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -251,7 +251,8 @@ public class TestSyslogUtils {
     Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY));
     Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim());
+    Assert.assertEquals(priority + goodData1.trim(),
+        new String(e.getBody()).trim());
 
   }
 
@@ -277,7 +278,8 @@ public class TestSyslogUtils {
     Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
         headers.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
+    Assert.assertEquals(badData1.trim(), new String(e.getBody())
+      .trim());
 
     Event e2 = util.extractEvent(buff);
     if(e2 == null){
@@ -288,7 +290,8 @@ public class TestSyslogUtils {
     Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(null,
         headers2.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim());
+    Assert.assertEquals(priority + goodData1.trim(),
+        new String(e2.getBody()).trim());
   }
 
   @Test
@@ -310,7 +313,8 @@ public class TestSyslogUtils {
     Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(null,
         headers2.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim());
+    Assert.assertEquals(priority + goodData1.trim(),
+        new String(e2.getBody()).trim());
 
     Event e = util.extractEvent(buff);
 
@@ -379,7 +383,8 @@ public class TestSyslogUtils {
     Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(null,
         headers.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim());
+    Assert.assertEquals(priority + goodData1.trim(),
+        new String(e.getBody()).trim());
 
 
     Event e2 = util.extractEvent(buff);
@@ -391,14 +396,16 @@ public class TestSyslogUtils {
     Assert.assertEquals("4", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(null,
         headers.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals(goodData2.trim(), new String(e2.getBody()).trim());
+    Assert.assertEquals(priority2 + goodData2.trim(),
+        new String(e2.getBody()).trim());
 
   }
 
   @Test
   public void testExtractBadEventLarge() {
     String badData1 = "<10> bad bad data bad bad\n";
-    SyslogUtils util = new SyslogUtils(5, true, false);
+    // The minimum size (which is 10) overrides the 5 specified here.
+    SyslogUtils util = new SyslogUtils(5, false, false);
     ChannelBuffer buff = ChannelBuffers.buffer(100);
     buff.writeBytes(badData1.getBytes());
     Event e = util.extractEvent(buff);
@@ -410,7 +417,7 @@ public class TestSyslogUtils {
     Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus(),
         headers.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals("bad bad d".trim(), new String(e.getBody()).trim());
+    Assert.assertEquals("<10> bad b".trim(), new String(e.getBody()).trim());
 
     Event e2 = util.extractEvent(buff);
 
@@ -422,7 +429,7 @@ public class TestSyslogUtils {
     Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
     Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
         headers2.get(SyslogUtils.EVENT_STATUS));
-    Assert.assertEquals("ata bad ba".trim(), new String(e2.getBody()).trim());
+    Assert.assertEquals("ad data ba".trim(), new String(e2.getBody()).trim());
 
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0737c44..ae66f89 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1171,7 +1171,7 @@ Property Name    Default      Description
 **host**         --           Host name or IP address to bind to
 **port**         --           Port # to bind to
 eventSize        2500         Maximum size of a single event line, in bytes
-keepFields       false        Setting this to true will preserve the
+keepFields       false        Setting this to true will preserve the Priority,
                               Timestamp and Hostname in the body of the event.
 selector.type                 replicating or multiplexing
 selector.*       replicating  Depends on the selector.type value
@@ -1209,6 +1209,8 @@ Property Name         Default           Description
 **host**              --                Host name or IP address to bind to.
 **ports**             --                Space-separated list (one or more) of ports to bind
to.
 eventSize             2500              Maximum size of a single event line, in bytes.
+keepFields            false             Setting this to true will preserve the
+                                        Priority, Timestamp and Hostname in the body of the
event.
 portHeader            --                If specified, the port number will be stored in the
header of each event using the header name specified here. This allows for interceptors and
channel selectors to customize routing logic based on the incoming port.
 charset.default       UTF-8             Default character set used while parsing syslog events
into strings.
 charset.port.<port>   --                Character set is configurable on a per-port
basis.
@@ -1243,6 +1245,8 @@ Property Name   Default      Description
 **type**        --           The component type name, needs to be ``syslogudp``
 **host**        --           Host name or IP address to bind to
 **port**        --           Port # to bind to
+keepFields      false        Setting this to true will preserve the Priority,
+                             Timestamp and Hostname in the body of the event.
 selector.type                replicating or multiplexing
 selector.*      replicating  Depends on the selector.type value
 interceptors    --           Space-separated list of interceptors


Mime
View raw message