flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szabofe...@apache.org
Subject flume git commit: FLUME-3133 Add client IP / hostname headers to Syslog sources.
Date Wed, 14 Nov 2018 12:27:03 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 5cfa8be41 -> 671fc3f1b


FLUME-3133 Add client IP / hostname headers to Syslog sources.

In the newer version of the Syslog message format (RFC-5424) the hostname
is not a mandatory header anymore so the Syslog client might not send it.
On the Flume side it would be a useful information that could be used
in interceptors or for event routing.
To keep this information, two new properties have been added to the Syslog
sources: clientIPHeader and clientHostnameHeader.
Flume users can define custom event header names through these parameters
for storing the IP address / hostname of the Syslog client in the Flume
event as headers.
The IP address / hostname are retrieved from the underlying network sockets,
not from the Syslog message.

This change is based on the patch submitted by Jinjiang Ling which has been
rebased onto the current trunk and the review comments have been implemented.

This closes #234

Reviewers: Ferenc Szabo, Endre Major

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/671fc3f1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/671fc3f1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/671fc3f1

Branch: refs/heads/trunk
Commit: 671fc3f1b6d6b2239b171f582b189d0fbadedb6f
Parents: 5cfa8be
Author: Peter Turcsanyi <turcsanyi@cloudera.com>
Authored: Wed Nov 14 13:25:06 2018 +0100
Committer: Ferenc Szabo <szaboferee@apache.org>
Committed: Wed Nov 14 13:25:06 2018 +0100

----------------------------------------------------------------------
 .../flume/source/MultiportSyslogTCPSource.java  | 35 +++++++--
 .../SyslogSourceConfigurationConstants.java     |  4 +
 .../apache/flume/source/SyslogTcpSource.java    | 47 ++++++++++--
 .../apache/flume/source/SyslogUDPSource.java    | 29 +++++++
 .../org/apache/flume/source/SyslogUtils.java    | 34 +++++++++
 .../source/TestMultiportSyslogTCPSource.java    | 80 +++++++++++++++-----
 .../flume/source/TestSyslogTcpSource.java       | 49 ++++++++++++
 .../flume/source/TestSyslogUdpSource.java       | 47 +++++++++++-
 .../apache/flume/source/TestSyslogUtils.java    | 58 ++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 68 +++++++++++++----
 10 files changed, 407 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index d6abd37..9cd7de5 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -71,6 +71,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
   private int batchSize;
   private int readBufferSize;
   private String portHeader;
+  private String clientIPHeader;
+  private String clientHostnameHeader;
   private SourceCounter sourceCounter = null;
   private Charset defaultCharset;
   private ThreadSafeDecoder defaultDecoder;
@@ -141,7 +143,13 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
         SyslogSourceConfigurationConstants.DEFAULT_BATCHSIZE);
 
     portHeader = context.getString(
-            SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER);
+        SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER);
+
+    clientIPHeader = context.getString(
+        SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
+
+    clientHostnameHeader = context.getString(
+        SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
 
     readBufferSize = context.getInteger(
         SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
@@ -181,8 +189,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
     acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
 
     acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
-        getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
-        portCharsets, keepFields));
+        getChannelProcessor(), sourceCounter, portHeader, clientIPHeader,
+        clientHostnameHeader, defaultDecoder, portCharsets, keepFields));
 
     for (int port : ports) {
       InetSocketAddress addr;
@@ -237,6 +245,8 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
     private final int batchSize;
     private final SourceCounter sourceCounter;
     private final String portHeader;
+    private final String clientIPHeader;
+    private final String clientHostnameHeader;
     private final SyslogParser syslogParser;
     private final LineSplitter lineSplitter;
     private final ThreadSafeDecoder defaultDecoder;
@@ -245,14 +255,16 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
 
     public MultiportSyslogHandler(int maxEventSize, int batchSize,
         ChannelProcessor cp, SourceCounter ctr, String portHeader,
-        ThreadSafeDecoder defaultDecoder,
-        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets,
-        Set<String> keepFields) {
+        String clientIPHeader, String clientHostnameHeader,
+        ThreadSafeDecoder defaultDecoder, ConcurrentMap<Integer,
+        ThreadSafeDecoder> portCharsets, Set<String> keepFields) {
       channelProcessor = cp;
       sourceCounter = ctr;
       this.maxEventSize = maxEventSize;
       this.batchSize = batchSize;
       this.portHeader = portHeader;
+      this.clientIPHeader = clientIPHeader;
+      this.clientHostnameHeader = clientHostnameHeader;
       this.defaultDecoder = defaultDecoder;
       this.portCharsets = portCharsets;
       this.keepFields = keepFields;
@@ -320,6 +332,17 @@ public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource
impl
             if (portHeader != null) {
               event.getHeaders().put(portHeader, String.valueOf(port));
             }
+
+            if (clientIPHeader != null) {
+              event.getHeaders().put(clientIPHeader,
+                  SyslogUtils.getIP(session.getRemoteAddress()));
+            }
+
+            if (clientHostnameHeader != null) {
+              event.getHeaders().put(clientHostnameHeader,
+                  SyslogUtils.getHostname(session.getRemoteAddress()));
+            }
+
             events.add(event);
           } else {
             logger.trace("Parsed null event");

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index fb8df81..c88c957 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -61,6 +61,7 @@ public final class SyslogSourceConfigurationConstants {
 
   public static final String CONFIG_PORT_HEADER = "portHeader";
 
+  @Deprecated
   public static final String DEFAULT_PORT_HEADER = "port";
 
   public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
@@ -74,6 +75,9 @@ public final class SyslogSourceConfigurationConstants {
   public static final String CONFIG_KEEP_FIELDS_TIMESTAMP = "timestamp";
   public static final String CONFIG_KEEP_FIELDS_HOSTNAME = "hostname";
 
+  public static final String CONFIG_CLIENT_IP_HEADER = "clientIPHeader";
+  public static final String CONFIG_CLIENT_HOSTNAME_HEADER = "clientHostnameHeader";
+
   private SyslogSourceConfigurationConstants() {
     // Disable explicit creation of objects.
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index 067c21b..39aa5cb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -67,10 +67,14 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
   private Map<String, String> formaterProp;
   private SourceCounter sourceCounter;
   private Set<String> keepFields;
+  private String clientIPHeader;
+  private String clientHostnameHeader;
 
   public class syslogTcpHandler extends SimpleChannelHandler {
 
     private SyslogUtils syslogUtils = new SyslogUtils();
+    private String clientIPHeader;
+    private String clientHostnameHeader;
 
     public void setEventSize(int eventSize) {
       syslogUtils.setEventSize(eventSize);
@@ -84,6 +88,14 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
       syslogUtils.addFormats(prop);
     }
 
+    public void setClientIPHeader(String clientIPHeader) {
+      this.clientIPHeader = clientIPHeader;
+    }
+
+    public void setClientHostnameHeader(String clientHostnameHeader) {
+      this.clientHostnameHeader = clientHostnameHeader;
+    }
+
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
       ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
@@ -94,6 +106,17 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
               "rest of the event is received.");
           continue;
         }
+
+        if (clientIPHeader != null) {
+          e.getHeaders().put(clientIPHeader,
+              SyslogUtils.getIP(ctx.getChannel().getRemoteAddress()));
+        }
+
+        if (clientHostnameHeader != null) {
+          e.getHeaders().put(clientHostnameHeader,
+              SyslogUtils.getHostname(ctx.getChannel().getRemoteAddress()));
+        }
+
         sourceCounter.incrementEventReceivedCount();
 
         try {
@@ -120,7 +143,8 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
     ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
 
     serverBootstrap.setPipelineFactory(new PipelineFactory(
-        eventSize, formaterProp, keepFields, getSslEngineSupplier(false)
+        eventSize, formaterProp, keepFields, clientIPHeader, clientHostnameHeader,
+        getSslEngineSupplier(false)
     ));
     logger.info("Syslog TCP Source starting...");
 
@@ -163,11 +187,15 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
     host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
     eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
     formaterProp = context.getSubProperties(
-      SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
+        SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
     keepFields = SyslogUtils.chooseFieldsToKeep(
-      context.getString(
-        SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
-        SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+        context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+            SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+    clientIPHeader =
+        context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
+    clientHostnameHeader =
+        context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -193,13 +221,18 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
     private final Integer eventSize;
     private final Map<String, String> formaterProp;
     private final Set<String> keepFields;
+    private String clientIPHeader;
+    private String clientHostnameHeader;
     private Supplier<Optional<SSLEngine>> sslEngineSupplier;
 
     public PipelineFactory(Integer eventSize, Map<String, String> formaterProp,
-        Set<String> keepFields, Supplier<Optional<SSLEngine>> sslEngineSupplier)
{
+        Set<String> keepFields, String clientIPHeader, String clientHostnameHeader,
+        Supplier<Optional<SSLEngine>> sslEngineSupplier) {
       this.eventSize = eventSize;
       this.formaterProp = formaterProp;
       this.keepFields = keepFields;
+      this.clientIPHeader = clientIPHeader;
+      this.clientHostnameHeader = clientHostnameHeader;
       this.sslEngineSupplier = sslEngineSupplier;
     }
 
@@ -209,6 +242,8 @@ public class SyslogTcpSource extends SslContextAwareAbstractSource
       handler.setEventSize(eventSize);
       handler.setFormater(formaterProp);
       handler.setKeepFields(keepFields);
+      handler.setClientIPHeader(clientIPHeader);
+      handler.setClientHostnameHeader(clientHostnameHeader);
 
       ChannelPipeline pipeline = Channels.pipeline(handler);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 1e47f34..fac067b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -57,6 +57,8 @@ public class SyslogUDPSource extends AbstractSource
   private Channel nettyChannel;
   private Map<String, String> formaterProp;
   private Set<String> keepFields;
+  private String clientIPHeader;
+  private String clientHostnameHeader;
 
   private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class);
 
@@ -68,6 +70,8 @@ public class SyslogUDPSource extends AbstractSource
 
   public class syslogHandler extends SimpleChannelHandler {
     private SyslogUtils syslogUtils = new SyslogUtils(DEFAULT_INITIAL_SIZE, null, true);
+    private String clientIPHeader;
+    private String clientHostnameHeader;
 
     public void setFormater(Map<String, String> prop) {
       syslogUtils.addFormats(prop);
@@ -77,6 +81,14 @@ public class SyslogUDPSource extends AbstractSource
       syslogUtils.setKeepFields(keepFields);
     }
 
+    public void setClientIPHeader(String clientIPHeader) {
+      this.clientIPHeader = clientIPHeader;
+    }
+
+    public void setClientHostnameHeader(String clientHostnameHeader) {
+      this.clientHostnameHeader = clientHostnameHeader;
+    }
+
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
       try {
@@ -85,6 +97,17 @@ public class SyslogUDPSource extends AbstractSource
         if (e == null) {
           return;
         }
+
+        if (clientIPHeader != null) {
+          e.getHeaders().put(clientIPHeader,
+              SyslogUtils.getIP(mEvent.getRemoteAddress()));
+        }
+
+        if (clientHostnameHeader != null) {
+          e.getHeaders().put(clientHostnameHeader,
+              SyslogUtils.getHostname(mEvent.getRemoteAddress()));
+        }
+
         sourceCounter.incrementEventReceivedCount();
 
         getChannelProcessor().processEvent(e);
@@ -109,6 +132,8 @@ public class SyslogUDPSource extends AbstractSource
     final syslogHandler handler = new syslogHandler();
     handler.setFormater(formaterProp);
     handler.setKeepFields(keepFields);
+    handler.setClientIPHeader(clientIPHeader);
+    handler.setClientHostnameHeader(clientHostnameHeader);
     serverBootstrap.setOption("receiveBufferSizePredictorFactory",
         new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE,
             DEFAULT_INITIAL_SIZE, maxsize));
@@ -160,6 +185,10 @@ public class SyslogUDPSource extends AbstractSource
         context.getString(
             SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
             SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+    clientIPHeader =
+        context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_IP_HEADER);
+    clientHostnameHeader =
+        context.getString(SyslogSourceConfigurationConstants.CONFIG_CLIENT_HOSTNAME_HEADER);
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index 2df5ae0..032366d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.Clock;
@@ -186,6 +188,38 @@ public class SyslogUtils {
     return body;
   }
 
+  public static String getIP(SocketAddress socketAddress) {
+    try {
+      InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+      String ip = inetSocketAddress.getAddress().getHostAddress();
+      if (ip != null) {
+        return ip;
+      } else {
+        throw new NullPointerException("The returned IP is null");
+      }
+    } catch (Exception e) {
+      logger.warn("Unable to retrieve client IP address", e);
+    }
+    // return a safe value instead of null
+    return "";
+  }
+
+  public static String getHostname(SocketAddress socketAddress) {
+    try {
+      InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+      String hostname = inetSocketAddress.getHostName();
+      if (hostname != null) {
+        return hostname;
+      } else {
+        throw new NullPointerException("The returned hostname is null");
+      }
+    } catch (Exception e) {
+      logger.warn("Unable to retrieve client hostname", e);
+    }
+    // return a safe value instead of null
+    return "";
+  }
+
   public SyslogUtils() {
     this(false);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index f132152..726c0b6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -64,6 +64,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 import javax.net.SocketFactory;
@@ -132,6 +134,7 @@ public class TestMultiportSyslogTCPSource {
     Context context = new Context();
     context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
         ports.toString().trim());
+    context.put("portHeader", "port");
     context.putAll(additionalContext.getParameters());
     source.configure(context);
     source.start();
@@ -257,10 +260,8 @@ public class TestMultiportSyslogTCPSource {
         Map<String, String> headers = e.getHeaders();
         // rely on port to figure out which event it is
         Integer port = null;
-        if (headers.containsKey(
-            SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)) {
-          port = Integer.parseInt(headers.get(
-              SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER));
+        if (headers.containsKey("port")) {
+          port = Integer.parseInt(headers.get("port"));
         }
         iter.remove();
 
@@ -311,12 +312,10 @@ public class TestMultiportSyslogTCPSource {
         parsedLine.buffer.getString(Charsets.UTF_8.newDecoder()));
     parsedLine.buffer.rewind();
 
-    MultiportSyslogTCPSource.MultiportSyslogHandler handler =
-        new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null,
-        null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER,
+    MultiportSyslogHandler handler = new MultiportSyslogHandler(
+        maxLen, 100, null, null, null, null, null,
         new ThreadSafeDecoder(Charsets.UTF_8),
-        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
-        null);
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),null);
 
     Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder());
     String body = new String(event.getBody(), Charsets.UTF_8);
@@ -340,10 +339,9 @@ public class TestMultiportSyslogTCPSource {
     // defaults to UTF-8
     MultiportSyslogHandler handler = new MultiportSyslogHandler(
         1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
-        new SourceCounter("test"), "port",
+        new SourceCounter("test"), null, null, null,
         new ThreadSafeDecoder(Charsets.UTF_8),
-        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
-        null);
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),null);
 
     ParsedBuffer parsedBuf = new ParsedBuffer();
     parsedBuf.incomplete = false;
@@ -393,10 +391,9 @@ public class TestMultiportSyslogTCPSource {
     // defaults to UTF-8
     MultiportSyslogHandler handler = new MultiportSyslogHandler(
         1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
-        new SourceCounter("test"), "port",
+        new SourceCounter("test"), null, null, null,
         new ThreadSafeDecoder(Charsets.UTF_8),
-        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
-        null);
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), null);
 
     handler.exceptionCaught(null, new RuntimeException("dummy"));
     SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
@@ -460,9 +457,8 @@ public class TestMultiportSyslogTCPSource {
 
     // defaults to UTF-8
     MultiportSyslogHandler handler = new MultiportSyslogHandler(
-        1000, 10, chanProc, new SourceCounter("test"), "port",
-        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets,
-        null);
+        1000, 10, chanProc, new SourceCounter("test"), null, null, null,
+        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, null);
 
     // initialize buffers
     handler.sessionCreated(session1);
@@ -532,4 +528,52 @@ public class TestMultiportSyslogTCPSource {
     source.stop();
   }
 
+  @Test
+  public void testClientHeaders() throws IOException {
+    String testClientIPHeader = "testClientIPHeader";
+    String testClientHostnameHeader = "testClientHostnameHeader";
+
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = Lists.newArrayList();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    int port = getFreePort();
+    Context context = new Context();
+    context.put("host", InetAddress.getLoopbackAddress().getHostAddress());
+    context.put("ports", String.valueOf(port));
+    context.put("clientIPHeader", testClientIPHeader);
+    context.put("clientHostnameHeader", testClientHostnameHeader);
+
+    source.configure(context);
+    source.start();
+
+    //create a socket to send a test event
+    Socket syslogSocket = new Socket(InetAddress.getLoopbackAddress().getHostAddress(), port);
+    syslogSocket.getOutputStream().write(getEvent(0));
+
+    Event e = takeEvent(channel);
+
+    source.stop();
+
+    Map<String, String> headers = e.getHeaders();
+
+    checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress());
+    checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName());
+  }
+
+  private static void checkHeader(Map<String, String> headers, String headerName,
+      String expectedValue) {
+    assertTrue("Missing event header: " + headerName, headers.containsKey(headerName));
+    assertEquals("Event header value does not match: " + headerName,
+        expectedValue, headers.get(headerName));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index 9398707..057aef5 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -37,13 +37,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
@@ -86,6 +89,8 @@ public class TestSyslogTcpSource {
     rcs.setChannels(channels);
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    context.put("host", InetAddress.getLoopbackAddress().getHostAddress());
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
     context.put("keepFields", keepFields);
 
@@ -264,5 +269,49 @@ public class TestSyslogTcpSource {
 
   }
 
+
+  @Test
+  public void testClientHeaders() throws IOException {
+    String testClientIPHeader = "testClientIPHeader";
+    String testClientHostnameHeader = "testClientHostnameHeader";
+
+    Context context = new Context();
+    context.put("clientIPHeader", testClientIPHeader);
+    context.put("clientHostnameHeader", testClientHostnameHeader);
+
+    init("none", context);
+
+    source.start();
+    // Write some message to the syslog port
+    InetSocketAddress addr = source.getBoundAddress();
+    Socket syslogSocket = new Socket(addr.getAddress(), addr.getPort());
+    syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+
+    source.stop();
+
+    Map<String, String> headers = e.getHeaders();
+
+    checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress());
+    checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName());
+  }
+
+  private static void checkHeader(Map<String, String> headers, String headerName,
+      String expectedValue) {
+    assertTrue("Missing event header: " + headerName, headers.containsKey(headerName));
+    assertEquals("Event header value does not match: " + headerName,
+        expectedValue, headers.get(headerName));
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index 76c5759..a96140a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -42,7 +42,10 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
@@ -64,6 +67,10 @@ public class TestSyslogUdpSource {
       data1;
 
   private void init(String keepFields) {
+    init(keepFields, new Context());
+  }
+
+  private void init(String keepFields, Context context) {
     source = new SyslogUDPSource();
     channel = new MemoryChannel();
 
@@ -76,7 +83,7 @@ public class TestSyslogUdpSource {
     rcs.setChannels(channels);
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
-    Context context = new Context();
+
     context.put("host", InetAddress.getLoopbackAddress().getHostAddress());
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
     context.put("keepFields", keepFields);
@@ -266,5 +273,43 @@ public class TestSyslogUdpSource {
     }
     return payload.toString();
   }
+
+  @Test
+  public void testClientHeaders() throws IOException {
+    String testClientIPHeader = "testClientIPHeader";
+    String testClientHostnameHeader = "testClientHostnameHeader";
+
+
+    Context context = new Context();
+    context.put("clientIPHeader", testClientIPHeader);
+    context.put("clientHostnameHeader", testClientHostnameHeader);
+
+    init("none", context);
+
+    source.start();
+
+    DatagramPacket datagramPacket = createDatagramPacket(bodyWithTandH.getBytes());
+    sendDatagramPacket(datagramPacket);
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+
+    commitAndCloseTransaction(txn);
+
+    source.stop();
+
+    Map<String, String> headers = e.getHeaders();
+
+    checkHeader(headers, testClientIPHeader, InetAddress.getLoopbackAddress().getHostAddress());
+    checkHeader(headers, testClientHostnameHeader, InetAddress.getLoopbackAddress().getHostName());
+  }
+
+  private static void checkHeader(Map<String, String> headers, String headerName,
+      String expectedValue) {
+    assertTrue("Missing event header: " + headerName, headers.containsKey(headerName));
+    assertEquals("Event header value does not match: " + headerName,
+        expectedValue, headers.get(headerName));
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 2479413..851290d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -18,12 +18,16 @@
  */
 package org.apache.flume.source;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.flume.Event;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.Clock;
@@ -596,4 +600,58 @@ public class TestSyslogUtils {
     checkHeader("true", msg1, stamp1 + "+0800", format1, host1, data5);
   }
 
+  @Test
+  public void testGetIPWhenSuccessful() {
+    SocketAddress socketAddress = new InetSocketAddress("localhost", 2000);
+
+    String ip = SyslogUtils.getIP(socketAddress);
+
+    assertEquals("127.0.0.1", ip);
+  }
+
+  @Test
+  public void testGetIPWhenInputIsNull() {
+    SocketAddress socketAddress = null;
+
+    String ip = SyslogUtils.getIP(socketAddress);
+
+    assertEquals("", ip);
+  }
+
+  @Test
+  public void testGetIPWhenInputIsNotInetSocketAddress() {
+    SocketAddress socketAddress = new SocketAddress() {};
+
+    String ip = SyslogUtils.getIP(socketAddress);
+
+    assertEquals("", ip);
+  }
+
+  @Test
+  public void testGetHostnameWhenSuccessful() {
+    SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 2000);
+
+    String hostname = SyslogUtils.getHostname(socketAddress);
+
+    assertEquals("localhost", hostname);
+  }
+
+  @Test
+  public void testGetHostnameWhenInputIsNull() {
+    SocketAddress socketAddress = null;
+
+    String hostname = SyslogUtils.getHostname(socketAddress);
+
+    assertEquals("", hostname);
+  }
+
+  @Test
+  public void testGetHostnameWhenInputIsNotInetSocketAddress() {
+    SocketAddress socketAddress = new SocketAddress() {};
+
+    String hostname = SyslogUtils.getHostname(socketAddress);
+
+    assertEquals("", hostname);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/671fc3f1/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 1f244c5..6939b59 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1809,6 +1809,20 @@ keepFields            none         Setting this to 'all' will preserve
the Prior
                                    fields can be included: priority, version,
                                    timestamp, hostname. The values 'true' and 'false'
                                    have been deprecated in favor of 'all' and 'none'.
+clientIPHeader        --           If specified, the IP address of the client will be stored
in
+                                   the header of each event using the header name specified
here.
+                                   This allows for interceptors and channel selectors to
customize
+                                   routing logic based on the IP address of the client.
+                                   Do not use the standard Syslog header names here (like
_host_)
+                                   because the event header will be overridden in that case.
+clientHostnameHeader  --           If specified, the host name of the client will be stored
in
+                                   the header of each event using the header name specified
here.
+                                   This allows for interceptors and channel selectors to
customize
+                                   routing logic based on the host name of the client.
+                                   Retrieving the host name may involve a name service reverse
+                                   lookup which may affect the performance.
+                                   Do not use the standard Syslog header names here (like
_host_)
+                                   because the event header will be overridden in that case.
 selector.type                      replicating or multiplexing
 selector.*            replicating  Depends on the selector.type value
 interceptors          --           Space-separated list of interceptors
@@ -1875,6 +1889,20 @@ keepFields            none              Setting this to 'all' will
preserve the
                                         timestamp, hostname. The values 'true' and 'false'
                                         have been deprecated in favor of 'all' and 'none'.
 portHeader            --                If specified, the port number will be stored in the
header of each event using the header name specified here. This allows for interceptors and
channel selectors to customize routing logic based on the incoming port.
+clientIPHeader        --                If specified, the IP address of the client will be
stored in
+                                        the header of each event using the header name specified
here.
+                                        This allows for interceptors and channel selectors
to customize
+                                        routing logic based on the IP address of the client.
+                                        Do not use the standard Syslog header names here
(like _host_)
+                                        because the event header will be overridden in that
case.
+clientHostnameHeader  --                If specified, the host name of the client will be
stored in
+                                        the header of each event using the header name specified
here.
+                                        This allows for interceptors and channel selectors
to customize
+                                        routing logic based on the host name of the client.
+                                        Retrieving the host name may involve a name service
reverse
+                                        lookup which may affect the performance.
+                                        Do not use the standard Syslog header names here
(like _host_)
+                                        because the event header will be overridden in that
case.
 charset.default       UTF-8             Default character set used while parsing syslog events
into strings.
 charset.port.<port>   --                Character set is configurable on a per-port
basis.
 batchSize             100               Maximum number of events to attempt to process per
request loop. Using the default is usually fine.
@@ -1923,20 +1951,34 @@ For example, a multiport syslog TCP source for agent named a1:
 Syslog UDP Source
 '''''''''''''''''
 
-==============  ===========  ==============================================
-Property Name   Default      Description
-==============  ===========  ==============================================
-**channels**    --
-**type**        --           The component type name, needs to be ``syslogudp``
-**host**        --           Host name or IP address to bind to
-**port**        --           Port # to bind to
-keepFields      false        Setting this to true will preserve the Priority,
-                             Timestamp and Hostname in the body of the event.
-selector.type                replicating or multiplexing
-selector.*      replicating  Depends on the selector.type value
-interceptors    --           Space-separated list of interceptors
+====================  ===========  ================================================================
+Property Name         Default      Description
+====================  ===========  ================================================================
+**channels**          --
+**type**              --           The component type name, needs to be ``syslogudp``
+**host**              --           Host name or IP address to bind to
+**port**              --           Port # to bind to
+keepFields            false        Setting this to true will preserve the Priority,
+                                   Timestamp and Hostname in the body of the event.
+clientIPHeader        --           If specified, the IP address of the client will be stored
in
+                                   the header of each event using the header name specified
here.
+                                   This allows for interceptors and channel selectors to
customize
+                                   routing logic based on the IP address of the client.
+                                   Do not use the standard Syslog header names here (like
_host_)
+                                   because the event header will be overridden in that case.
+clientHostnameHeader  --           If specified, the host name of the client will be stored
in
+                                   the header of each event using the header name specified
here.
+                                   This allows for interceptors and channel selectors to
customize
+                                   routing logic based on the host name of the client.
+                                   Retrieving the host name may involve a name service reverse
+                                   lookup which may affect the performance.
+                                   Do not use the standard Syslog header names here (like
_host_)
+                                   because the event header will be overridden in that case.
+selector.type                      replicating or multiplexing
+selector.*            replicating  Depends on the selector.type value
+interceptors          --           Space-separated list of interceptors
 interceptors.*
-==============  ===========  ==============================================
+====================  ===========  =================================================================
 
 
 For example, a syslog UDP source for agent named a1:


Mime
View raw message