flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1331498 - in /incubator/flume/trunk/flume-ng-core/src: main/java/org/apache/flume/source/ test/java/org/apache/flume/source/
Date Fri, 27 Apr 2012 16:51:52 GMT
Author: arvind
Date: Fri Apr 27 16:51:52 2012
New Revision: 1331498

URL: http://svn.apache.org/viewvc?rev=1331498&view=rev
Log:
FLUME-1126. Support RFC 3164 and 5424 syslog formats

(Prasad Mujumdar via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java?rev=1331498&r1=1331497&r2=1331498&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Fri Apr 27 16:51:52 2012
@@ -19,6 +19,7 @@
 package org.apache.flume.source;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -53,6 +54,7 @@ implements EventDrivenSource, Configurab
   private String host = null;
   private Channel nettyChannel;
   private Integer eventSize;
+  private Map<String, String> formaterProp;
   private CounterGroup counterGroup = new CounterGroup();
 
   public class syslogTcpHandler extends SimpleChannelHandler {
@@ -63,6 +65,10 @@ implements EventDrivenSource, Configurab
       syslogUtils.setEventSize(eventSize);
     }
 
+    public void setFormater(Map<String, String> prop) {
+      syslogUtils.addFormats(prop);
+    }
+
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
       ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
@@ -95,6 +101,7 @@ implements EventDrivenSource, Configurab
       public ChannelPipeline getPipeline() {
         syslogTcpHandler handler = new syslogTcpHandler();
         handler.setEventSize(eventSize);
+        handler.setFormater(formaterProp);
         return Channels.pipeline(handler);
       }
     });
@@ -135,6 +142,7 @@ implements EventDrivenSource, Configurab
     port = context.getInteger("port");
     host = context.getString("host");
     eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
+    formaterProp = context.getSubProperties("format");
   }
 
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java?rev=1331498&r1=1331497&r2=1331498&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Fri Apr 27 16:51:52 2012
@@ -19,6 +19,7 @@
 package org.apache.flume.source;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -51,6 +52,7 @@ public class SyslogUDPSource extends Abs
   private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426
   private String host = null;
   private Channel nettyChannel;
+  private Map<String, String> formaterProp;
 
   private static final Logger logger = LoggerFactory
       .getLogger(SyslogUDPSource.class);
@@ -59,6 +61,10 @@ public class SyslogUDPSource extends Abs
   public class syslogHandler extends SimpleChannelHandler {
     private SyslogUtils syslogUtils = new SyslogUtils(true);
 
+    public void setFormater(Map<String, String> prop) {
+      syslogUtils.addFormats(prop);
+    }
+
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
       try {
@@ -82,9 +88,11 @@ public class SyslogUDPSource extends Abs
     // setup Netty server
     ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
         (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
+    final syslogHandler handler = new syslogHandler();
+    handler.setFormater(formaterProp);
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       public ChannelPipeline getPipeline() {
-       return Channels.pipeline(new syslogHandler());
+       return Channels.pipeline(handler);
       }
      });
 
@@ -120,6 +128,7 @@ public class SyslogUDPSource extends Abs
     Configurables.ensureRequiredNonNull(context, "port");
     port = context.getInteger("port");
     host = context.getString("host");
+    formaterProp = context.getSubProperties("format");
   }
 
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java?rev=1331498&r1=1331497&r2=1331498&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
Fri Apr 27 16:51:52 2012
@@ -21,8 +21,14 @@ package org.apache.flume.source;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Scanner;
+import java.util.regex.MatchResult;
 
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -31,6 +37,31 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyslogUtils {
+  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
+  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
+  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ";
+  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss";
+  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
+
+  final public static String SYSLOG_MSG_RFC5424_0 =
+      "[(?:\\d\\s)]?" +// version
+  // yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp)
+      "(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)"
+ // stamp
+      "\\s" + // separator
+      "(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null)
+      "\\s" + // separator
+      "(.*)$"; // body
+
+  final public static String SYSLOG_MSG_RFC3164_0 =
+      // stamp MMM d HH:mm:ss, single digit date has two spaces
+      "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
+      "\\s" + // separator
+      "([\\w][\\w\\d\\.@-]*)" + // host
+      "\\s(.*)$";  // body
+
+  final public static int SYSLOG_TIMESTAMP_POS = 1;
+  final public static int SYSLOG_HOSTNAME_POS = 2;
+  final public static int SYSLOG_BODY_POS = 3;
 
   private Mode m = Mode.START;
   private StringBuilder prio = new StringBuilder();
@@ -48,6 +79,19 @@ public class SyslogUtils {
   private boolean isIncompleteEvent;
   private Integer maxSize;
 
+  private class SyslogFormater {
+    public String regexPattern;
+    public ArrayList<String> searchPattern = new ArrayList<String>();
+    public ArrayList<String> replacePattern = new ArrayList<String>();
+    public ArrayList<SimpleDateFormat> dateFormat = new ArrayList<SimpleDateFormat>();
+    public boolean addYear;
+  }
+  private ArrayList<SyslogFormater> formats = new ArrayList<SyslogFormater>();
+
+  private String timeStamp = null;
+  private String hostName = null;
+  private String msgBody = null;
+
   public SyslogUtils() {
     this(false);
   }
@@ -62,6 +106,56 @@ public class SyslogUtils {
     isIncompleteEvent = false;
     maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize;
     baos = new ByteArrayOutputStream(eventSize);
+    initHeaderFormats();
+  }
+
+  // extend the default header formatter
+  public void addFormats(Map<String, String> formatProp) {
+    if (formatProp.isEmpty() || !formatProp.containsKey("regex")) {
+      return;
+    }
+    SyslogFormater fmt1 = new SyslogFormater();
+    fmt1.regexPattern = formatProp.get("regex");
+    if (formatProp.containsKey("search")) {
+      fmt1.searchPattern.add(formatProp.get("search"));
+    }
+    if (formatProp.containsKey("replace")) {
+      fmt1.replacePattern.add(formatProp.get("replace"));
+    }
+    if (formatProp.containsKey("dateFormat")) {
+        fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get("dateFormat")));
+    }
+    formats.add(0, fmt1);
+  }
+
+  // setup built-in formats
+  private void initHeaderFormats() {
+    // setup RFC5424 formater
+    SyslogFormater fmt1 = new SyslogFormater();
+    fmt1.regexPattern = SYSLOG_MSG_RFC5424_0;
+    // 'Z' in timestamp indicates UTC zone, so replace it it with '+0000' for date formatting
+    fmt1.searchPattern.add("Z");
+    fmt1.replacePattern.add("+0000");
+    // timezone in RFC5424 is [+-]tt:tt, so remove the ':' for java date formatting
+    fmt1.searchPattern.add("([+-])(\\d{2})[:](\\d{2})");
+    fmt1.replacePattern.add("$1$2$3");
+    fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_1));
+    fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_2));
+    fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_3));
+    fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_4));
+    fmt1.addYear = false;
+
+    // setup RFC3164 formater
+    SyslogFormater fmt2 = new SyslogFormater();
+    fmt2.regexPattern = SYSLOG_MSG_RFC3164_0;
+    // the single digit date has two spaces, so trim it
+    fmt2.searchPattern.add("  ");
+    fmt2.replacePattern.add(" ");
+    fmt2.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC3164_1));
+    fmt2.addYear = true;
+
+    formats.add(fmt1);
+    formats.add(fmt2);
   }
 
   enum Mode {
@@ -86,17 +180,27 @@ public class SyslogUtils {
 
   // create the event from syslog data
   Event buildEvent() {
+    byte[] body;
     int pri = 0;
     int sev = 0;
     int facility = 0;
+
     if(!isBadEvent){
       pri = Integer.parseInt(prio.toString());
       sev = pri % 8;
       facility = pri - sev;
+      formatHeaders();
     }
+
     Map <String, String> headers = new HashMap<String, String>();
     headers.put(SYSLOG_FACILITY, String.valueOf(facility));
     headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
+    if ((timeStamp != null) && timeStamp.length() > 0) {
+      headers.put("timestamp", timeStamp);
+    }
+    if ((hostName != null) && (hostName.length() > 0)) {
+      headers.put("host", hostName);
+    }
     if(isBadEvent){
       logger.warn("Event created from Invalid Syslog data.");
       headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus());
@@ -105,20 +209,73 @@ public class SyslogUtils {
           "consider increasing your event size.", maxSize);
       headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
     }
-    // TODO: add hostname and timestamp if provided ...
 
-    byte[] body = baos.toByteArray();
+    if ((msgBody != null) && (msgBody.length() > 0)) {
+      body = msgBody.getBytes();
+    } else {
+      body = baos.toByteArray();
+    }
     reset();
+    // format the message
     return EventBuilder.withBody(body, headers);
   }
 
+  // Apply each known pattern to message
+  private void formatHeaders() {
+    Scanner scanner = new Scanner(baos.toString());
+    MatchResult res = null;
+
+    for(int p=0; p < formats.size(); p++) {
+      SyslogFormater fmt = formats.get(p);
+      try {
+        scanner.findInLine(fmt.regexPattern);
+        res = scanner.match();
+      } catch (IllegalStateException e) {
+        // Ignore and move on ..
+        continue;
+      }
+      for (int grp=1; grp <= res.groupCount(); grp++) {
+        String value = res.group(grp);
+        if (grp == SYSLOG_TIMESTAMP_POS) {
+          // apply available format replacements to timestamp
+          if (value != null) {
+            for (int sp=0; sp < fmt.searchPattern.size(); sp++) {
+              value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp));
+            }
+            // Add year to timestamp if needed
+            if (fmt.addYear) {
+              value = String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + value;
+            }
+            // try the available time formats to timestamp
+            for (int dt = 0; dt < fmt.dateFormat.size(); dt++) {
+              try {
+                timeStamp = String.valueOf(fmt.dateFormat.get(dt).parse(value).getTime());
+                break; // done. formatted the time
+              } catch (ParseException e) {
+                // Error formatting the timeStamp, try next format
+                continue;
+              }
+            }
+          }
+        } else if (grp == SYSLOG_HOSTNAME_POS) {
+          hostName = value;
+        } else if (grp == SYSLOG_BODY_POS) {
+          msgBody = value;
+        }
+      }
+      break; // we successfully parsed the message using this pattern
+    }
+  }
+
   private void reset(){
     baos.reset();
     m = Mode.START;
     prio.delete(0, prio.length());
     isBadEvent = false;
     isIncompleteEvent = false;
-
+    hostName = null;
+    timeStamp = null;
+    msgBody = null;
   }
 
   // extract relevant syslog data needed for building Flume event

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java?rev=1331498&r1=1331497&r2=1331498&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
Fri Apr 27 16:51:52 2012
@@ -19,6 +19,9 @@
 package org.apache.flume.source;
 
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Map;
 
 import junit.framework.Assert;
@@ -29,6 +32,150 @@ import org.jboss.netty.buffer.ChannelBuf
 import org.junit.Test;
 
 public class TestSyslogUtils {
+  @Test
+  public void TestHeader0() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with hh:mm format timezone with no version
+    String msg1 = "<10>" + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1 + "+0800", format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader1() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ss";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1, format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader2() throws ParseException {
+
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with 'Z' appended, translates to UTC
+    String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1 + "+0000", format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader3() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with hh:mm format timezone
+    String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1 + "+0800", format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader4() throws ParseException {
+     String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // null format timestamp (-)
+    String msg1 = "<10>1 " + "-" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, null, null, host1, data1);
+  }
+
+  @Test
+  public void TestHeader5() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ss";
+    String host1 = "-";
+    String data1 = "some msg";
+    // null host
+    String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1, format1, null, data1);
+  }
+
+  @Test
+  public void TestHeader6() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "-";
+    String data1 = "some msg";
+    // null host
+    String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1 + "+0000", format1, null, data1);
+  }
+
+  @Test
+  public void TestHeader7() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
+    String host1 = "-";
+    String data1 = "some msg";
+    // null host
+    String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1 + "+0800", format1, null, data1);
+  }
+
+  @Test
+  public void TestHeader8() throws ParseException {
+    String stamp1 = "2012-04-13T11:11:11.999";
+    String format1 = "yyyy-MM-dd'T'HH:mm:ss.S";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, stamp1, format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader9() throws ParseException {
+    String stamp1 = "Apr 11 13:14:04";
+    String format1 = "yyyyMMM d HH:mm:ss";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with 'Z' appended, translates to UTC
+    String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1,
+        format1, host1, data1);
+  }
+
+  @Test
+  public void TestHeader10() throws ParseException {
+    String stamp1 = "Apr  1 13:14:04";
+    String format1 = "yyyyMMM d HH:mm:ss";
+    String host1 = "ubuntu-11.cloudera.com";
+    String data1 = "some msg";
+    // timestamp with 'Z' appended, translates to UTC
+    String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
+    checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1,
+        format1, host1, data1);
+  }
+
+  public void checkHeader(String msg1, String stamp1, String format1, String host1, String
data1) throws ParseException {
+    SyslogUtils util = new SyslogUtils(false);
+    ChannelBuffer buff = ChannelBuffers.buffer(100);
+
+    buff.writeBytes(msg1.getBytes());
+    Event e = util.extractEvent(buff);
+    if(e == null){
+      throw new NullPointerException("Event is null");
+    }
+    Map<String, String> headers2 = e.getHeaders();
+    if (stamp1 == null) {
+      Assert.assertFalse(headers2.containsKey("timestamp"));
+    } else {
+      SimpleDateFormat formater = new SimpleDateFormat(format1);
+      Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), headers2.get("timestamp"));
+    }
+    if (host1 == null) {
+      Assert.assertFalse(headers2.containsKey("host"));
+    } else {
+      String host2 = headers2.get("host");
+      Assert.assertEquals(host2,host1);
+    }
+    Assert.assertEquals(data1, new String(e.getBody()));
+  }
 
   /**
    * Test bad event format 1: Priority is not numeric



Mime
View raw message