flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2438. Make Syslog source message body configurable
Date Thu, 14 Aug 2014 20:47:38 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 adec93ee5 -> 5734a4ab5


FLUME-2438. Make Syslog source message body configurable

Allow for specifying particular fields to keep in the body of the event
or drop, representing them only in Flume event headers.

(Abraham Elmahrek via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5734a4ab
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5734a4ab
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5734a4ab

Branch: refs/heads/flume-1.6
Commit: 5734a4ab5e1445c5c77c0277c1a22ca7addfdc9d
Parents: adec93e
Author: Mike Percy <mpercy@cloudera.com>
Authored: Thu Aug 14 16:42:12 2014 -0400
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Thu Aug 14 16:43:49 2014 -0400

----------------------------------------------------------------------
 .../flume/source/MultiportSyslogTCPSource.java  |  15 +-
 .../org/apache/flume/source/SyslogParser.java   |  27 ++-
 .../SyslogSourceConfigurationConstants.java     |   7 +-
 .../apache/flume/source/SyslogTcpSource.java    |  11 +-
 .../apache/flume/source/SyslogUDPSource.java    |  15 +-
 .../org/apache/flume/source/SyslogUtils.java    | 126 ++++++++--
 .../source/TestMultiportSyslogTCPSource.java    |   8 +-
 .../apache/flume/source/TestSyslogParser.java   |  19 +-
 .../flume/source/TestSyslogTcpSource.java       |  42 +++-
 .../flume/source/TestSyslogUdpSource.java       |  46 +++-
 .../apache/flume/source/TestSyslogUtils.java    |  50 +++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  14 +-
 .../flume/test/agent/TestSyslogSource.java      |  97 ++++++++
 .../apache/flume/test/util/StagedInstall.java   |   2 +
 .../org/apache/flume/test/util/SyslogAgent.java | 238 +++++++++++++++++++
 15 files changed, 644 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 427e0e3..87f0db1 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -28,6 +28,7 @@ import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.flume.Context;
@@ -67,7 +68,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
   private SourceCounter sourceCounter = null;
   private Charset defaultCharset;
   private ThreadSafeDecoder defaultDecoder;
-  private boolean keepFields;
+  private Set<String> keepFields;
 
   public MultiportSyslogTCPSource() {
     portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
@@ -139,9 +140,10 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
         SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
         SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE);
 
-    keepFields = context.getBoolean(
-        SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
-        SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS);
+    keepFields = SyslogUtils.chooseFieldsToKeep(
+        context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+            SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -218,12 +220,13 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     private final LineSplitter lineSplitter;
     private final ThreadSafeDecoder defaultDecoder;
     private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
-    private final boolean keepFields;
+    private Set<String> keepFields;
 
     public MultiportSyslogHandler(int maxEventSize, int batchSize,
         ChannelProcessor cp, SourceCounter ctr, String portHeader,
         ThreadSafeDecoder defaultDecoder,
-        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets, boolean keepFields) {
+        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets,
+        Set<String> keepFields) {
       channelProcessor = cp;
       sourceCounter = ctr;
       this.maxEventSize = maxEventSize;

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index 557d121..0171309 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -30,6 +30,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Maps;
 import java.nio.charset.Charset;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 import org.apache.flume.Event;
@@ -79,7 +80,7 @@ public class SyslogParser {
    * @return Parsed Flume Event
    * @throws IllegalArgumentException if unable to successfully parse message
    */
-  public Event parseMessage(String msg, Charset charset, boolean keepFields) {
+  public Event parseMessage(String msg, Charset charset, Set<String> keepFields) {
     Map<String, String> headers = Maps.newHashMap();
 
     int msgLen = msg.length();
@@ -98,6 +99,9 @@ public class SyslogParser {
     int facility = pri / 8;
     int severity = pri % 8;
 
+    // Remember priority
+    headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
+
     // put fac / sev into header
     headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
     headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
@@ -108,20 +112,25 @@ public class SyslogParser {
     // update parsing position
     curPos = endBracketPos + 1;
 
-    // ignore version string
+    // remember version string
+    String version = null;
     if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) {
+      version = msg.substring(curPos, curPos+1);
+      headers.put(SyslogUtils.SYSLOG_VERSION, version);
       curPos += 2;
     }
 
     // now parse timestamp (handle different varieties)
 
     long ts;
+    String tsString;
     char dateStartChar = msg.charAt(curPos);
 
     try {
 
       // no timestamp specified; use relay current time
       if (dateStartChar == '-') {
+        tsString = Character.toString(dateStartChar);
         ts = System.currentTimeMillis();
         if (msgLen <= curPos + 2) {
           throw new IllegalArgumentException(
@@ -129,22 +138,23 @@ public class SyslogParser {
         }
         curPos += 2; // assume we skip past a space to get to the hostname
 
-        // rfc3164 imestamp
+      // rfc3164 timestamp
       } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
         if (msgLen <= curPos + RFC3164_LEN) {
           throw new IllegalArgumentException("bad timestamp format");
         }
-        ts = parseRfc3164Time(
-            msg.substring(curPos, curPos + RFC3164_LEN));
+        tsString = msg.substring(curPos, curPos + RFC3164_LEN);
+        ts = parseRfc3164Time(tsString);
         curPos += RFC3164_LEN + 1;
 
-        // rfc 5424 timestamp
+      // rfc 5424 timestamp
       } else {
         int nextSpace = msg.indexOf(' ', curPos);
         if (nextSpace == -1) {
           throw new IllegalArgumentException("bad timestamp format");
         }
-        ts = parseRfc5424Date(msg.substring(curPos, nextSpace));
+        tsString = msg.substring(curPos, nextSpace);
+        ts = parseRfc5424Date(tsString);
         curPos = nextSpace + 1;
       }
 
@@ -167,9 +177,10 @@ public class SyslogParser {
 
     // EventBuilder will do a copy of its own, so no defensive copy of the body
     String data = "";
-    if (msgLen > nextSpace + 1 && !keepFields) {
+    if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
       curPos = nextSpace + 1;
       data = msg.substring(curPos);
+      data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version, tsString, hostname);
     } else {
       data = msg;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index 985949c..fb8df81 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -67,7 +67,12 @@ public final class SyslogSourceConfigurationConstants {
   public static final int DEFAULT_READBUF_SIZE = 1024;
 
   public static final String CONFIG_KEEP_FIELDS = "keepFields";
-  public static final boolean DEFAULT_KEEP_FIELDS = false;
+  public static final String DEFAULT_KEEP_FIELDS = "none";
+
+  public static final String CONFIG_KEEP_FIELDS_PRIORITY = "priority";
+  public static final String CONFIG_KEEP_FIELDS_VERSION = "version";
+  public static final String CONFIG_KEEP_FIELDS_TIMESTAMP = "timestamp";
+  public static final String CONFIG_KEEP_FIELDS_HOSTNAME = "hostname";
 
   private SyslogSourceConfigurationConstants() {
     // Disable explicit creation of objects.

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index e84e4b6..c117813 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -58,7 +59,7 @@ implements EventDrivenSource, Configurable {
   private Integer eventSize;
   private Map<String, String> formaterProp;
   private CounterGroup counterGroup = new CounterGroup();
-  private Boolean keepFields;
+  private Set<String> keepFields;
 
   public class syslogTcpHandler extends SimpleChannelHandler {
 
@@ -68,7 +69,7 @@ implements EventDrivenSource, Configurable {
       syslogUtils.setEventSize(eventSize);
     }
 
-    public void setKeepFields(boolean keepFields){
+    public void setKeepFields(Set<String> keepFields) {
       syslogUtils.setKeepFields(keepFields);
     }
 
@@ -154,8 +155,10 @@ implements EventDrivenSource, Configurable {
     eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
     formaterProp = context.getSubProperties(
         SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
-    keepFields = context.getBoolean
-      (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false);
+    keepFields = SyslogUtils.chooseFieldsToKeep(
+        context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+            SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 01b8905..378d484 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -32,7 +33,6 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.SyslogUtils;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
@@ -49,7 +49,7 @@ public class SyslogUDPSource extends AbstractSource
   private String host = null;
   private Channel nettyChannel;
   private Map<String, String> formaterProp;
-  private boolean keepFields;
+  private Set<String> keepFields;
 
   private static final Logger logger = LoggerFactory
       .getLogger(SyslogUDPSource.class);
@@ -61,14 +61,13 @@ public class SyslogUDPSource extends AbstractSource
   public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE;
 
   public class syslogHandler extends SimpleChannelHandler {
-    private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE,
-      SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, true);
+    private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true);
 
     public void setFormater(Map<String, String> prop) {
       syslogUtils.addFormats(prop);
     }
 
-    public void setKeepFields(boolean keepFields) {
+    public void setKeepFields(Set<String> keepFields) {
       syslogUtils.setKeepFields(keepFields);
     }
 
@@ -143,8 +142,10 @@ public class SyslogUDPSource extends AbstractSource
     host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
     formaterProp = context.getSubProperties(
         SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
-    keepFields = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
-      SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS);
+    keepFields = SyslogUtils.chooseFieldsToKeep(
+        context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+            SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index a77bfc9..208fefe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -31,9 +31,13 @@ import java.io.ByteArrayOutputStream;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.MatchResult;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -48,27 +52,31 @@ public class SyslogUtils {
   final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
 
   final public static String SYSLOG_MSG_RFC5424_0 =
-      "(?:\\<\\d{1,3}\\>\\d?\\s?)" + // priority
+      "(?:\\<(\\d{1,3})\\>)" + // priority
+      "(?:(\\d?)\\s?)" + // version
       /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
       "(?:" +
         "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
         "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
       "\\s" + // separator
-      "(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null)
+      "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
       "\\s" + // separator
       "(.*)$"; // body
 
   final public static String SYSLOG_MSG_RFC3164_0 =
-      "(?:\\<\\d{1,3}\\>\\d?\\s?)" +
+      "(?:\\<(\\d{1,3})\\>)" +
+      "(?:(\\d)?\\s?)" + // version
       // stamp MMM d HH:mm:ss, single digit date has two spaces
       "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
       "\\s" + // separator
       "([\\w][\\w\\d\\.@-]*)" + // host
       "\\s(.*)$";  // body
 
-  final public static int SYSLOG_TIMESTAMP_POS = 1;
-  final public static int SYSLOG_HOSTNAME_POS = 2;
-  final public static int SYSLOG_BODY_POS = 3;
+  final public static int SYSLOG_PRIORITY_POS = 1;
+  final public static int SYSLOG_VERSION_POS = 2;
+  final public static int SYSLOG_TIMESTAMP_POS = 3;
+  final public static int SYSLOG_HOSTNAME_POS = 4;
+  final public static int SYSLOG_BODY_POS = 5;
 
   private Mode m = Mode.START;
   private StringBuilder prio = new StringBuilder();
@@ -78,6 +86,8 @@ public class SyslogUtils {
 
   final public static String SYSLOG_FACILITY = "Facility";
   final public static String SYSLOG_SEVERITY = "Severity";
+  final public static String SYSLOG_PRIORITY = "Priority";
+  final public static String SYSLOG_VERSION = "Version";
   final public static String EVENT_STATUS = "flume.syslog.status";
   final public static Integer MIN_SIZE = 10;
   final public static Integer DEFAULT_SIZE = 2500;
@@ -85,7 +95,7 @@ public class SyslogUtils {
   private boolean isBadEvent;
   private boolean isIncompleteEvent;
   private Integer maxSize;
-  private boolean keepFields;
+  private Set<String> keepFields;
 
   private class SyslogFormatter {
     public Pattern regexPattern;
@@ -96,19 +106,93 @@ public class SyslogUtils {
   }
   private ArrayList<SyslogFormatter> formats = new ArrayList<SyslogFormatter>();
 
+  private String priority = null;
+  private String version = null;
   private String timeStamp = null;
   private String hostName = null;
   private String msgBody = null;
 
+  private static final String[] DEFAULT_FIELDS_TO_KEEP = {
+    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY,
+    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION,
+    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP,
+    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME
+  };
+  public static final String KEEP_FIELDS_ALL = "--all--";
+
+  public static boolean keepAllFields(Set<String> keepFields) {
+    if (keepFields == null) {
+      return false;
+    }
+    return keepFields.contains(KEEP_FIELDS_ALL);
+  }
+
+  public static Set<String> chooseFieldsToKeep(String keepFields) {
+    if (keepFields == null) {
+      return null;
+    }
+
+    keepFields = keepFields.trim().toLowerCase();
+
+    if (keepFields.equals("false") || keepFields.equals("none")) {
+      return null;
+    }
+
+    if (keepFields.equals("true") || keepFields.equals("all")) {
+      Set<String> fieldsToKeep = new HashSet<String>(1);
+      fieldsToKeep.add(KEEP_FIELDS_ALL);
+      return fieldsToKeep;
+    }
+
+    Set<String> fieldsToKeep = new HashSet<String>(DEFAULT_FIELDS_TO_KEEP.length);
+
+    for (String field : DEFAULT_FIELDS_TO_KEEP) {
+      if (keepFields.indexOf(field) != -1) {
+        fieldsToKeep.add(field);
+      }
+    }
+
+    return fieldsToKeep;
+  }
+
+  public static String addFieldsToBody(Set<String> keepFields,
+                                       String body,
+                                       String priority,
+                                       String version,
+                                       String timestamp,
+                                       String hostname) {
+    // Prepend fields to be kept in message body.
+    if (keepFields != null) {
+      if (keepFields.contains(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME)) {
+        body = hostname + " " + body;
+      }
+      if (keepFields.contains(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP)) {
+        body = timestamp + " " + body;
+      }
+      if (keepFields.contains(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION)) {
+        if (version != null && !version.isEmpty()) {
+          body = version + " " + body;
+        }
+      }
+      if (keepFields.contains(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY)) {
+        body = "<" + priority + ">" + body;
+      }
+    }
+
+    return body;
+  }
+
   public SyslogUtils() {
     this(false);
   }
 
   public SyslogUtils(boolean isUdp) {
-    this(DEFAULT_SIZE, SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS, isUdp);
+    this(DEFAULT_SIZE,
+        new HashSet<String>(Arrays.asList(SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)),
+        isUdp);
   }
 
-  public SyslogUtils(Integer eventSize, boolean keepFields, boolean isUdp) {
+  public SyslogUtils(Integer eventSize, Set<String> keepFields, boolean isUdp) {
     this.isUdp = isUdp;
     isBadEvent = false;
     isIncompleteEvent = false;
@@ -212,6 +296,12 @@ public class SyslogUtils {
     Map <String, String> headers = new HashMap<String, String>();
     headers.put(SYSLOG_FACILITY, String.valueOf(facility));
     headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
+    if ((priority != null) && (priority.length() > 0)) {
+      headers.put("priority", priority);
+    }
+    if ((version != null) && (version.length() > 0)) {
+      headers.put("version", version);
+    }
     if ((timeStamp != null) && timeStamp.length() > 0) {
       headers.put("timestamp", timeStamp);
     }
@@ -227,7 +317,7 @@ public class SyslogUtils {
       headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
     }
 
-    if (!keepFields) {
+    if (!keepAllFields(keepFields)) {
       if ((msgBody != null) && (msgBody.length() > 0)) {
         body = msgBody.getBytes();
       } else {
@@ -245,6 +335,8 @@ public class SyslogUtils {
   // Apply each known pattern to message
   private void formatHeaders() {
     String eventStr = baos.toString();
+    String timeStampString = null;
+
     for(int p=0; p < formats.size(); p++) {
       SyslogFormatter fmt = formats.get(p);
       Pattern pattern = fmt.regexPattern;
@@ -256,6 +348,8 @@ public class SyslogUtils {
       for (int grp=1; grp <= res.groupCount(); grp++) {
         String value = res.group(grp);
         if (grp == SYSLOG_TIMESTAMP_POS) {
+          timeStampString = value;
+
           // apply available format replacements to timestamp
           if (value != null) {
             for (int sp=0; sp < fmt.searchPattern.size(); sp++) {
@@ -278,8 +372,12 @@ public class SyslogUtils {
           }
         } else if (grp == SYSLOG_HOSTNAME_POS) {
           hostName = value;
+        } else if (grp == SYSLOG_PRIORITY_POS) {
+          priority = value;
+        } else if (grp == SYSLOG_VERSION_POS) {
+          version = value;
         } else if (grp == SYSLOG_BODY_POS) {
-          msgBody = value;
+          msgBody = addFieldsToBody(keepFields, value, priority, version, timeStampString, hostName);
         }
       }
       break; // we successfully parsed the message using this pattern
@@ -388,9 +486,9 @@ public class SyslogUtils {
     this.maxSize = eventSize;
   }
 
-  public void setKeepFields(Boolean keepFields) {
-    this.keepFields= keepFields;
-  }
+  public void setKeepFields(Set<String> keepFields) {
+    this.keepFields = keepFields;
   }
+}
 
 

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 9b97c8c..c3dc241 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -70,7 +70,6 @@ public class TestMultiportSyslogTCPSource {
   private final String stamp1 = time.toString();
   private final String host1 = "localhost.localdomain";
   private final String data1 = "proc1 - some msg";
-  private final static boolean KEEP_FIELDS = false;
 
   /**
    * Helper function to generate a syslog message.
@@ -207,7 +206,7 @@ public class TestMultiportSyslogTCPSource {
         null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER,
         new ThreadSafeDecoder(Charsets.UTF_8),
         new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
-        KEEP_FIELDS);
+        null);
 
     Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder());
     String body = new String(event.getBody(), Charsets.UTF_8);
@@ -234,7 +233,7 @@ public class TestMultiportSyslogTCPSource {
         new SourceCounter("test"), "port",
         new ThreadSafeDecoder(Charsets.UTF_8),
         new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
-        KEEP_FIELDS);
+        null);
 
     ParsedBuffer parsedBuf = new ParsedBuffer();
     parsedBuf.incomplete = false;
@@ -334,7 +333,8 @@ public class TestMultiportSyslogTCPSource {
     // defaults to UTF-8
     MultiportSyslogHandler handler = new MultiportSyslogHandler(
         1000, 10, chanProc, new SourceCounter("test"), "port",
-        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, KEEP_FIELDS);
+        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets,
+        null);
 
     // initialize buffers
     handler.sessionCreated(session1);

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
index 2809163..6e0fd66 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
@@ -21,7 +21,10 @@ package org.apache.flume.source;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import java.nio.charset.Charset;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.flume.Event;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
@@ -77,7 +80,7 @@ public class TestSyslogParser {
 
     // test with default keepFields = false
     for (String msg : messages) {
-      boolean keepFields = false;
+      Set<String> keepFields = new HashSet<String>();
       Event event = parser.parseMessage(msg, charset, keepFields);
       Assert.assertNull("Failure to parse known-good syslog message",
         event.getHeaders().get(SyslogUtils.EVENT_STATUS));
@@ -85,11 +88,23 @@ public class TestSyslogParser {
 
     // test that priority, timestamp and hostname are preserved in event body
     for (String msg : messages) {
-      boolean keepFields = true;
+      Set<String> keepFields = new HashSet<String>();
+      keepFields.add(SyslogUtils.KEEP_FIELDS_ALL);
       Event event = parser.parseMessage(msg, charset, keepFields);
       Assert.assertArrayEquals(event.getBody(), msg.getBytes());
       Assert.assertNull("Failure to parse known-good syslog message",
           event.getHeaders().get(SyslogUtils.EVENT_STATUS));
     }
+
+    // test that hostname is preserved in event body
+    for (String msg : messages) {
+      Set<String> keepFields = new HashSet<String>();
+      keepFields.add(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
+      Event event = parser.parseMessage(msg, charset, keepFields);
+      Assert.assertTrue("Failure to persist hostname",
+          new String(event.getBody()).contains(event.getHeaders().get("host")));
+      Assert.assertNull("Failure to parse known-good syslog message",
+          event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index 22fa200..239ba51 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -49,10 +49,14 @@ public class TestSyslogTcpSource {
   private final String stamp1 = time.toString();
   private final String host1 = "localhost.localdomain";
   private final String data1 = "test syslog data";
+  private final String bodyWithHostname = host1 + " " +
+      data1;
+  private final String bodyWithTimestamp = stamp1 + " " +
+      data1;
   private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " +
       data1 + "\n";
 
-  private void init(boolean keepFields){
+  private void init(String keepFields){
     source = new SyslogTcpSource();
     channel = new MemoryChannel();
 
@@ -67,14 +71,14 @@ public class TestSyslogTcpSource {
     source.setChannelProcessor(new ChannelProcessor(rcs));
     Context context = new Context();
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
-    context.put("keepFields", String.valueOf(keepFields));
+    context.put("keepFields", keepFields);
 
     source.configure(context);
 
   }
   /** Tests the keepFields configuration parameter (enabled or disabled)
    using SyslogTcpSource.*/
-  private void runKeepFieldsTest(boolean keepFields) throws IOException {
+  private void runKeepFieldsTest(String keepFields) throws IOException {
     init(keepFields);
     source.start();
     // Write some message to the syslog port
@@ -110,23 +114,43 @@ public class TestSyslogTcpSource {
       Assert.assertNotNull(e);
       String str = new String(e.getBody(), Charsets.UTF_8);
       logger.info(str);
-      if (keepFields) {
+      if (keepFields.equals("true") || keepFields.equals("all")) {
         Assert.assertArrayEquals(bodyWithTandH.trim().getBytes(),
           e.getBody());
-      } else if (!keepFields) {
+      } else if (keepFields.equals("false") || keepFields.equals("none")) {
         Assert.assertArrayEquals(data1.getBytes(), e.getBody());
+      } else if (keepFields.equals("hostname")) {
+        Assert.assertArrayEquals(bodyWithHostname.getBytes(), e.getBody());
+      } else if (keepFields.equals("timestamp")) {
+        Assert.assertArrayEquals(bodyWithTimestamp.getBytes(), e.getBody());
       }
     }
   }
 
   @Test
-  public void testKeepFields () throws IOException {
-    runKeepFieldsTest(true);
+  public void testKeepFields() throws IOException {
+    runKeepFieldsTest("all");
+
+    // Backwards compatibility
+    runKeepFieldsTest("true");
   }
 
   @Test
   public void testRemoveFields() throws IOException{
-      runKeepFieldsTest(false);
-    }
+    runKeepFieldsTest("none");
+
+    // Backwards compatibility
+    runKeepFieldsTest("false");
+  }
+
+  @Test
+  public void testKeepHostname() throws IOException{
+    runKeepFieldsTest("hostname");
+  }
+
+  @Test
+  public void testKeepTimestamp() throws IOException{
+    runKeepFieldsTest("timestamp");
   }
+}
 

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index 95ee48c..8fc80be 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -50,11 +50,15 @@ public class TestSyslogUdpSource {
   private final DateTime time = new DateTime();
   private final String stamp1 = time.toString();
   private final String host1 = "localhost.localdomain";
-  private final String data1 = "test UDP syslog data";
+  private final String data1 = "test syslog data";
+  private final String bodyWithHostname = host1 + " " +
+      data1;
+  private final String bodyWithTimestamp = stamp1 + " " +
+      data1;
   private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " +
       data1;
 
-  private void init(boolean keepFields) {
+  private void init(String keepFields) {
     source = new SyslogUDPSource();
     channel = new MemoryChannel();
 
@@ -69,7 +73,7 @@ public class TestSyslogUdpSource {
     source.setChannelProcessor(new ChannelProcessor(rcs));
     Context context = new Context();
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
-    context.put("keepFields", String.valueOf(keepFields));
+    context.put("keepFields", keepFields);
 
     source.configure(context);
 
@@ -78,7 +82,7 @@ public class TestSyslogUdpSource {
   /** Tests the keepFields configuration parameter (enabled or disabled)
    using SyslogUDPSource.*/
 
-  private void runKeepFieldsTest(boolean keepFields) throws IOException {
+  private void runKeepFieldsTest(String keepFields) throws IOException {
     init(keepFields);
     source.start();
     // Write some message to the syslog port
@@ -115,18 +119,22 @@ public class TestSyslogUdpSource {
       Assert.assertNotNull(e);
       String str = new String(e.getBody(), Charsets.UTF_8);
       logger.info(str);
-      if (keepFields) {
-        Assert.assertArrayEquals(bodyWithTandH.getBytes(),
-          e.getBody());
-      } else if (!keepFields) {
+      if (keepFields.equals("true") || keepFields.equals("all")) {
+        Assert.assertArrayEquals(bodyWithTandH.trim().getBytes(),
+            e.getBody());
+      } else if (keepFields.equals("false") || keepFields.equals("none")) {
         Assert.assertArrayEquals(data1.getBytes(), e.getBody());
+      } else if (keepFields.equals("hostname")) {
+        Assert.assertArrayEquals(bodyWithHostname.getBytes(), e.getBody());
+      } else if (keepFields.equals("timestamp")) {
+        Assert.assertArrayEquals(bodyWithTimestamp.getBytes(), e.getBody());
       }
     }
   }
 
   @Test
   public void testLargePayload() throws Exception {
-    init(true);
+    init("true");
     source.start();
     // Write some message to the syslog port
 
@@ -169,12 +177,28 @@ public class TestSyslogUdpSource {
 
   @Test
   public void testKeepFields() throws IOException {
-    runKeepFieldsTest(true);
+    runKeepFieldsTest("all");
+
+    // Backwards compatibility
+    runKeepFieldsTest("true");
   }
 
   @Test
   public void testRemoveFields() throws IOException {
-    runKeepFieldsTest(false);
+    runKeepFieldsTest("none");
+
+    // Backwards compatibility
+    runKeepFieldsTest("false");
+  }
+
+  @Test
+  public void testKeepHostname() throws IOException{
+    runKeepFieldsTest("hostname");
+  }
+
+  @Test
+  public void testKeepTimestamp() throws IOException{
+    runKeepFieldsTest("timestamp");
   }
 
   private String getPayload(int length) {

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 82b7dd0..76ee5b1 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.HashSet;
 import java.util.Map;
 
 public class TestSyslogUtils {
@@ -54,7 +55,6 @@ public class TestSyslogUtils {
 
   @Test
   public void TestHeader2() throws ParseException {
-
     String stamp1 = "2012-04-13T11:11:11";
     String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
     String host1 = "ubuntu-11.cloudera.com";
@@ -162,9 +162,17 @@ public class TestSyslogUtils {
             format1, host1, data1);
   }
 
-  public void checkHeader(String msg1, String stamp1, String format1,
-      String host1, String data1) throws ParseException {
-    SyslogUtils util = new SyslogUtils(false);
+  public static void checkHeader(String keepFields, String msg1, String stamp1, String format1,
+                                 String host1, String data1) throws ParseException {
+    SyslogUtils util;
+    if (keepFields == null || keepFields.isEmpty()) {
+      util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, new HashSet<String>(), false);
+    } else {
+      util = new SyslogUtils(
+          SyslogUtils.DEFAULT_SIZE,
+          SyslogUtils.chooseFieldsToKeep(keepFields),
+          false);
+    }
     ChannelBuffer buff = ChannelBuffers.buffer(200);
 
     buff.writeBytes(msg1.getBytes());
@@ -188,6 +196,12 @@ public class TestSyslogUtils {
     Assert.assertEquals(data1, new String(e.getBody()));
   }
 
+  // Check headers for when keepFields is "none".
+  public static void checkHeader(String msg1, String stamp1, String format1,
+                                 String host1, String data1) throws ParseException {
+    checkHeader("none", msg1, stamp1, format1, host1, data1);
+  }
+
   /**
    * Test bad event format 1: Priority is not numeric
    */
@@ -405,7 +419,7 @@ public class TestSyslogUtils {
   public void testExtractBadEventLarge() {
     String badData1 = "<10> bad bad data bad bad\n";
     // The minimum size (which is 10) overrides the 5 specified here.
-    SyslogUtils util = new SyslogUtils(5, false, false);
+    SyslogUtils util = new SyslogUtils(5, null, false);
     ChannelBuffer buff = ChannelBuffers.buffer(100);
     buff.writeBytes(badData1.getBytes());
     Event e = util.extractEvent(buff);
@@ -433,4 +447,30 @@ public class TestSyslogUtils {
 
   }
 
+  @Test
+  public void testKeepFields() throws Exception {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with hh:mm format timezone
+    String msg1 = "<10>1 " + stamp1 + "+08:00" + " " + host1 + " " + data1 + "\n";
+    checkHeader("none", msg1, stamp1 + "+0800", format1, host1, data1);
+    checkHeader("false", msg1, stamp1 + "+0800", format1, host1, data1);
+
+    String data2 = "ubuntu-11.cloudera.com some msg";
+    checkHeader("hostname", msg1, stamp1 + "+0800", format1, host1, data2);
+
+    String data3 = "2012-04-13T11:11:11+08:00 ubuntu-11.cloudera.com some msg";
+    checkHeader("timestamp hostname", msg1, stamp1 + "+0800", format1, host1, data3);
+
+    String data4 = "<10>2012-04-13T11:11:11+08:00 ubuntu-11.cloudera.com some msg";
+    checkHeader("priority timestamp hostname", msg1, stamp1 + "+0800", format1, host1, data4);
+
+    String data5 = "<10>1 2012-04-13T11:11:11+08:00 ubuntu-11.cloudera.com some msg";
+    checkHeader("priority version timestamp hostname", msg1, stamp1 + "+0800", format1, host1, data5);
+    checkHeader("all", msg1, stamp1 + "+0800", format1, host1, data5);
+    checkHeader("true", msg1, stamp1 + "+0800", format1, host1, data5);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index ed90022..a718fbf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1182,8 +1182,13 @@ Property Name    Default      Description
 **host**         --           Host name or IP address to bind to
 **port**         --           Port # to bind to
 eventSize        2500         Maximum size of a single event line, in bytes
-keepFields       false        Setting this to true will preserve the Priority,
+keepFields       none         Setting this to 'all' will preserve the Priority,
                               Timestamp and Hostname in the body of the event.
+                              A spaced separated list of fields to include
+                              is allowed as well. Currently, the following
+                              fields can be included: priority, version,
+                              timestamp, hostname. The values 'true' and 'false'
+                              have been deprecated in favor of 'all' and 'none'.
 selector.type                 replicating or multiplexing
 selector.*       replicating  Depends on the selector.type value
 interceptors     --           Space-separated list of interceptors
@@ -1220,8 +1225,13 @@ Property Name         Default           Description
 **host**              --                Host name or IP address to bind to.
 **ports**             --                Space-separated list (one or more) of ports to bind to.
 eventSize             2500              Maximum size of a single event line, in bytes.
-keepFields            false             Setting this to true will preserve the
+keepFields            none              Setting this to 'all' will preserve the
                                         Priority, Timestamp and Hostname in the body of the event.
+                                        A spaced separated list of fields to include
+                                        is allowed as well. Currently, the following
+                                        fields can be included: priority, version,
+                                        timestamp, hostname. The values 'true' and 'false'
+                                        have been deprecated in favor of 'all' and 'none'.
 portHeader            --                If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.
 charset.default       UTF-8             Default character set used while parsing syslog events into strings.
 charset.port.<port>   --                Character set is configurable on a per-port basis.

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSyslogSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSyslogSource.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSyslogSource.java
new file mode 100644
index 0000000..9b1e325
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSyslogSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.test.agent;
+
+import org.apache.flume.test.util.SyslogAgent;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TestSyslogSource {
+  private static final Logger LOGGER = Logger.getLogger(TestSyslogSource.class);
+
+  private SyslogAgent agent;
+  private SyslogAgent.SyslogSourceType sourceType;
+
+  public TestSyslogSource(SyslogAgent.SyslogSourceType sourceType) {
+    this.sourceType = sourceType;
+  }
+
+  @Parameterized.Parameters
+  public static Collection syslogSourceTypes() {
+    List<Object[]> sourceTypes = new ArrayList<Object[]>();
+    for (SyslogAgent.SyslogSourceType sourceType : SyslogAgent.SyslogSourceType.values()) {
+      sourceTypes.add(new Object[]{sourceType});
+    }
+    return sourceTypes;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    agent = new SyslogAgent();
+    agent.configure(sourceType);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (agent != null) {
+      agent.stop();
+      agent = null;
+    }
+  }
+
+  @Test
+  public void testKeepFields() throws Exception {
+    LOGGER.debug("testKeepFields() started.");
+
+    agent.start("all");
+    agent.runKeepFieldsTest();
+
+    LOGGER.debug("testKeepFields() ended.");
+  }
+
+  @Test
+  public void testRemoveFields() throws Exception {
+    LOGGER.debug("testRemoveFields() started.");
+
+    agent.start("none");
+    agent.runKeepFieldsTest();
+
+    LOGGER.debug("testRemoveFields() ended.");
+  }
+
+  @Test
+  public void testKeepTimestampAndHostname() throws Exception {
+    LOGGER.debug("testKeepTimestampAndHostname() started.");
+
+    agent.start("timestamp hostname");
+    agent.runKeepFieldsTest();
+
+    LOGGER.debug("testKeepTimestampAndHostname() ended.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index a6bd5e9..973ff4a 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -91,6 +91,7 @@ public class StagedInstall {
 
     LOGGER.info("Shutting down agent process");
     process.destroy();
+    process.waitFor();
     process = null;
     consumer.interrupt();
     consumer = null;
@@ -155,6 +156,7 @@ public class StagedInstall {
     Map<String, String> env = pb.environment();
 
     LOGGER.debug("process environment: " + env);
+
     pb.directory(baseDir);
     pb.redirectErrorStream(true);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/5734a4ab/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
new file mode 100644
index 0000000..7159549
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.test.util;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Syslog Flume Agent.
+ * A Syslog source of some kind is configured and a client is available to write
+ * messages to the agent. The Flume agents port is randomly assigned (not in use).
+ *
+ */
+public class SyslogAgent {
+  private static final Logger LOGGER = Logger.getLogger(SyslogAgent.class);
+  private static final Collection<File> tempResources = new ArrayList<File>();
+  private static final int DEFAULT_ATTEMPTS = 20;
+  private static final long DEFAULT_TIMEOUT = 500L;
+
+  public enum SyslogSourceType {
+    TCP("syslogtcp"),
+    MULTIPORTTCP("multiport_syslogtcp");
+
+    private final String syslogSourceType;
+
+    private SyslogSourceType(String syslogSourceType) {
+      this.syslogSourceType = syslogSourceType;
+    }
+
+    public String toString() {
+      return syslogSourceType;
+    }
+  };
+
+  private Properties agentProps;
+  private File sinkOutputDir;
+  private String keepFields;
+
+  private int port;
+  private String hostname;
+
+  BufferedOutputStream client;
+
+  public SyslogAgent() throws IOException {
+    hostname = "localhost";
+
+    setRandomPort();
+  }
+
+  public void setRandomPort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    port = s.getLocalPort();
+    s.close();
+  }
+
+  public void configure(SyslogSourceType sourceType) throws IOException {
+    /* Create 3 temp dirs, each used as value within agentProps */
+    sinkOutputDir = Files.createTempDir();
+    tempResources.add(sinkOutputDir);
+    final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath();
+    LOGGER.info("Created rolling file sink's output dir: "
+        + sinkOutputDirPath);
+
+    /* Build props to pass to flume agent */
+    agentProps = new Properties();
+
+    // Active sets
+    agentProps.put("a1.channels", "c1");
+    agentProps.put("a1.sources", "r1");
+    agentProps.put("a1.sinks", "k1");
+
+    // c1
+    agentProps.put("a1.channels.c1.type", "memory");
+    agentProps.put("a1.channels.c1.capacity", "1000");
+    agentProps.put("a1.channels.c1.transactionCapacity", "100");
+
+    // r1
+    agentProps.put("a1.sources.r1.channels", "c1");
+    agentProps.put("a1.sources.r1.type", sourceType.toString());
+    agentProps.put("a1.sources.r1.host", hostname);
+    if (sourceType.equals(SyslogSourceType.MULTIPORTTCP)) {
+      agentProps.put("a1.sources.r1.ports", Integer.toString(port));
+    } else {
+      agentProps.put("a1.sources.r1.port", Integer.toString(port));
+    }
+
+    // k1
+    agentProps.put("a1.sinks.k1.channel", "c1");
+    agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath);
+    agentProps.put("a1.sinks.k1.type", "FILE_ROLL");
+    agentProps.put("a1.sinks.k1.sink.rollInterval", "0");
+  }
+
+  // Blocks until flume agent boots up.
+  public void start(String keepFields) throws Exception {
+    this.keepFields = keepFields;
+
+    // Set properties that should be different per agent start and stop.
+    agentProps.put("a1.sources.r1.keepFields", keepFields);
+
+    // Recreate temporary directory.
+    sinkOutputDir.mkdir();
+
+    /* Start flume agent */
+    StagedInstall.getInstance().startAgent("a1", agentProps);
+
+    LOGGER.info("Started flume agent with syslog source on port " + port);
+
+    // Wait for source, channel, sink to start and create client.
+    int numberOfAttempts = 0;
+    while (client == null) {
+      try {
+        client = new BufferedOutputStream(new Socket(hostname, port).getOutputStream());
+      } catch(IOException e) {
+        if (++numberOfAttempts >= DEFAULT_ATTEMPTS) {
+          throw new AssertionError("Could not connect to source after "
+              + DEFAULT_ATTEMPTS + " attempts with " + DEFAULT_TIMEOUT + " ms timeout.");
+        }
+
+        TimeUnit.MILLISECONDS.sleep(DEFAULT_TIMEOUT);
+      }
+    }
+  }
+
+  public boolean isRunning() throws Exception {
+    return StagedInstall.getInstance().isRunning();
+  }
+
+  public void stop() throws Exception {
+    client.close();
+    client = null;
+
+    StagedInstall.getInstance().stopAgent();
+    for (File tempResource : tempResources) {
+      // Should always be a directory.
+      FileUtils.deleteDirectory(tempResource);
+    }
+  }
+
+  public void runKeepFieldsTest() throws Exception {
+    /* Create expected output and log message */
+    String logMessage = "<34>1 Oct 11 22:14:15 mymachine su: Test\n";
+    String expectedOutput = "su: Test\n";
+    if (keepFields.equals("true") || keepFields.equals("all")) {
+      expectedOutput = logMessage;
+    } else if (!keepFields.equals("false") && !keepFields.equals("none")) {
+      if (keepFields.indexOf("hostname") != -1) {
+        expectedOutput = "mymachine " + expectedOutput;
+      }
+      if (keepFields.indexOf("timestamp") != -1) {
+        expectedOutput = "Oct 11 22:14:15 " + expectedOutput;
+      }
+      if (keepFields.indexOf("version") != -1) {
+        expectedOutput = "1 " + expectedOutput;
+      }
+      if (keepFields.indexOf("priority") != -1) {
+        expectedOutput = "<34>" + expectedOutput;
+      }
+    }
+    LOGGER.info("Created expected output: " + expectedOutput);
+
+    /* Send test message to agent */
+    sendMessage(logMessage);
+
+    /* Wait for output file */
+    int numberOfListDirAttempts = 0;
+    while (sinkOutputDir.listFiles().length == 0) {
+      if (++numberOfListDirAttempts >= DEFAULT_ATTEMPTS) {
+        throw new AssertionError("FILE_ROLL sink hasn't written any files after "
+            + DEFAULT_ATTEMPTS + " attempts with " + DEFAULT_TIMEOUT + " ms timeout.");
+      }
+
+      TimeUnit.MILLISECONDS.sleep(DEFAULT_TIMEOUT);
+    }
+
+    // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
+    File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
+    Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," +
+        " but found " + sinkOutputDirChildren.length + " children.",
+    1, sinkOutputDirChildren.length);
+
+    /* Wait for output file stats to be as expected. */
+    File outputDirChild = sinkOutputDirChildren[0];
+    int numberOfStatsAttempts = 0;
+    while (outputDirChild.length() != expectedOutput.length()) {
+      if (++numberOfStatsAttempts >= DEFAULT_ATTEMPTS) {
+        throw new AssertionError("Expected output and FILE_ROLL sink's"
+            + " lengths did not match after " + DEFAULT_ATTEMPTS
+            + " attempts with " + DEFAULT_TIMEOUT + " ms timeout.");
+      }
+
+      TimeUnit.MILLISECONDS.sleep(DEFAULT_TIMEOUT);
+    }
+
+    File actualOutput = sinkOutputDirChildren[0];
+    if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) {
+      LOGGER.error("Actual output doesn't match expected output.\n");
+      LOGGER.debug("Output: " + Files.toString(actualOutput, Charsets.UTF_8));
+      throw new AssertionError("FILE_ROLL sink's actual output doesn't " +
+          "match expected output.");
+    }
+  }
+
+  private void sendMessage(String message) throws IOException {
+    client.write(message.getBytes());
+    client.flush();
+  }
+}


Mime
View raw message