Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 758B0CC9A for ; Fri, 27 Apr 2012 16:52:15 +0000 (UTC) Received: (qmail 64355 invoked by uid 500); 27 Apr 2012 16:52:15 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 64307 invoked by uid 500); 27 Apr 2012 16:52:14 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 64296 invoked by uid 99); 27 Apr 2012 16:52:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Apr 2012 16:52:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Apr 2012 16:52:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A69242388860; Fri, 27 Apr 2012 16:51:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1331498 - in /incubator/flume/trunk/flume-ng-core/src: main/java/org/apache/flume/source/ test/java/org/apache/flume/source/ Date: Fri, 27 Apr 2012 16:51:52 -0000 To: flume-commits@incubator.apache.org From: arvind@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120427165152.A69242388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: arvind Date: Fri Apr 27 16:51:52 2012 New Revision: 1331498 URL: http://svn.apache.org/viewvc?rev=1331498&view=rev Log: FLUME-1126. Support RFC 3164 and 5424 syslog formats (Prasad Mujumdar via Arvind Prabhakar) Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java?rev=1331498&r1=1331497&r2=1331498&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java Fri Apr 27 16:51:52 2012 @@ -19,6 +19,7 @@ package org.apache.flume.source; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -53,6 +54,7 @@ implements EventDrivenSource, Configurab private String host = null; private Channel nettyChannel; private Integer eventSize; + private Map formaterProp; private CounterGroup counterGroup = new CounterGroup(); public class syslogTcpHandler extends SimpleChannelHandler { @@ -63,6 +65,10 @@ implements EventDrivenSource, Configurab syslogUtils.setEventSize(eventSize); } + public void setFormater(Map prop) { + syslogUtils.addFormats(prop); + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage(); @@ -95,6 +101,7 @@ implements EventDrivenSource, Configurab public ChannelPipeline getPipeline() { syslogTcpHandler handler = new syslogTcpHandler(); handler.setEventSize(eventSize); + handler.setFormater(formaterProp); return Channels.pipeline(handler); } }); @@ -135,6 +142,7 @@ implements EventDrivenSource, Configurab port = context.getInteger("port"); host = context.getString("host"); eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE); + formaterProp = context.getSubProperties("format"); } } Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java?rev=1331498&r1=1331497&r2=1331498&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java Fri Apr 27 16:51:52 2012 @@ -19,6 +19,7 @@ package org.apache.flume.source; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -51,6 +52,7 @@ public class SyslogUDPSource extends Abs private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 private String host = null; private Channel nettyChannel; + private Map formaterProp; private static final Logger logger = LoggerFactory .getLogger(SyslogUDPSource.class); @@ -59,6 +61,10 @@ public class SyslogUDPSource extends Abs public class syslogHandler extends SimpleChannelHandler { private SyslogUtils syslogUtils = new SyslogUtils(true); + public void setFormater(Map prop) { + syslogUtils.addFormats(prop); + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { @@ -82,9 +88,11 @@ public class SyslogUDPSource extends Abs // setup Netty server ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap (new OioDatagramChannelFactory(Executors.newCachedThreadPool())); + final syslogHandler handler = new syslogHandler(); + handler.setFormater(formaterProp); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { - return Channels.pipeline(new syslogHandler()); + return Channels.pipeline(handler); } }); @@ -120,6 +128,7 @@ public class SyslogUDPSource extends Abs Configurables.ensureRequiredNonNull(context, "port"); port = context.getInteger("port"); host = context.getString("host"); + formaterProp = context.getSubProperties("format"); } } Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java?rev=1331498&r1=1331497&r2=1331498&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java Fri Apr 27 16:51:52 2012 @@ -21,8 +21,14 @@ package org.apache.flume.source; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; import java.util.HashMap; import java.util.Map; +import java.util.Scanner; +import java.util.regex.MatchResult; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; @@ -31,6 +37,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SyslogUtils { + final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ"; + final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S"; + final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ"; + final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss"; + final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss"; + + final public static String SYSLOG_MSG_RFC5424_0 = + "[(?:\\d\\s)]?" +// version + // yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) + "(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp + "\\s" + // separator + "(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null) + "\\s" + // separator + "(.*)$"; // body + + final public static String SYSLOG_MSG_RFC3164_0 = + // stamp MMM d HH:mm:ss, single digit date has two spaces + "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + + "\\s" + // separator + "([\\w][\\w\\d\\.@-]*)" + // host + "\\s(.*)$"; // body + + final public static int SYSLOG_TIMESTAMP_POS = 1; + final public static int SYSLOG_HOSTNAME_POS = 2; + final public static int SYSLOG_BODY_POS = 3; private Mode m = Mode.START; private StringBuilder prio = new StringBuilder(); @@ -48,6 +79,19 @@ public class SyslogUtils { private boolean isIncompleteEvent; private Integer maxSize; + private class SyslogFormater { + public String regexPattern; + public ArrayList searchPattern = new ArrayList(); + public ArrayList replacePattern = new ArrayList(); + public ArrayList dateFormat = new ArrayList(); + public boolean addYear; + } + private ArrayList formats = new ArrayList(); + + private String timeStamp = null; + private String hostName = null; + private String msgBody = null; + public SyslogUtils() { this(false); } @@ -62,6 +106,56 @@ public class SyslogUtils { isIncompleteEvent = false; maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize; baos = new ByteArrayOutputStream(eventSize); + initHeaderFormats(); + } + + // extend the default header formatter + public void addFormats(Map formatProp) { + if (formatProp.isEmpty() || !formatProp.containsKey("regex")) { + return; + } + SyslogFormater fmt1 = new SyslogFormater(); + fmt1.regexPattern = formatProp.get("regex"); + if (formatProp.containsKey("search")) { + fmt1.searchPattern.add(formatProp.get("search")); + } + if (formatProp.containsKey("replace")) { + fmt1.replacePattern.add(formatProp.get("replace")); + } + if (formatProp.containsKey("dateFormat")) { + fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get("dateFormat"))); + } + formats.add(0, fmt1); + } + + // setup built-in formats + private void initHeaderFormats() { + // setup RFC5424 formater + SyslogFormater fmt1 = new SyslogFormater(); + fmt1.regexPattern = SYSLOG_MSG_RFC5424_0; + // 'Z' in timestamp indicates UTC zone, so replace it it with '+0000' for date formatting + fmt1.searchPattern.add("Z"); + fmt1.replacePattern.add("+0000"); + // timezone in RFC5424 is [+-]tt:tt, so remove the ':' for java date formatting + fmt1.searchPattern.add("([+-])(\\d{2})[:](\\d{2})"); + fmt1.replacePattern.add("$1$2$3"); + fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_1)); + fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_2)); + fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_3)); + fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_4)); + fmt1.addYear = false; + + // setup RFC3164 formater + SyslogFormater fmt2 = new SyslogFormater(); + fmt2.regexPattern = SYSLOG_MSG_RFC3164_0; + // the single digit date has two spaces, so trim it + fmt2.searchPattern.add(" "); + fmt2.replacePattern.add(" "); + fmt2.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC3164_1)); + fmt2.addYear = true; + + formats.add(fmt1); + formats.add(fmt2); } enum Mode { @@ -86,17 +180,27 @@ public class SyslogUtils { // create the event from syslog data Event buildEvent() { + byte[] body; int pri = 0; int sev = 0; int facility = 0; + if(!isBadEvent){ pri = Integer.parseInt(prio.toString()); sev = pri % 8; facility = pri - sev; + formatHeaders(); } + Map headers = new HashMap(); headers.put(SYSLOG_FACILITY, String.valueOf(facility)); headers.put(SYSLOG_SEVERITY, String.valueOf(sev)); + if ((timeStamp != null) && timeStamp.length() > 0) { + headers.put("timestamp", timeStamp); + } + if ((hostName != null) && (hostName.length() > 0)) { + headers.put("host", hostName); + } if(isBadEvent){ logger.warn("Event created from Invalid Syslog data."); headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); @@ -105,20 +209,73 @@ public class SyslogUtils { "consider increasing your event size.", maxSize); headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); } - // TODO: add hostname and timestamp if provided ... - byte[] body = baos.toByteArray(); + if ((msgBody != null) && (msgBody.length() > 0)) { + body = msgBody.getBytes(); + } else { + body = baos.toByteArray(); + } reset(); + // format the message return EventBuilder.withBody(body, headers); } + // Apply each known pattern to message + private void formatHeaders() { + Scanner scanner = new Scanner(baos.toString()); + MatchResult res = null; + + for(int p=0; p < formats.size(); p++) { + SyslogFormater fmt = formats.get(p); + try { + scanner.findInLine(fmt.regexPattern); + res = scanner.match(); + } catch (IllegalStateException e) { + // Ignore and move on .. + continue; + } + for (int grp=1; grp <= res.groupCount(); grp++) { + String value = res.group(grp); + if (grp == SYSLOG_TIMESTAMP_POS) { + // apply available format replacements to timestamp + if (value != null) { + for (int sp=0; sp < fmt.searchPattern.size(); sp++) { + value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp)); + } + // Add year to timestamp if needed + if (fmt.addYear) { + value = String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + value; + } + // try the available time formats to timestamp + for (int dt = 0; dt < fmt.dateFormat.size(); dt++) { + try { + timeStamp = String.valueOf(fmt.dateFormat.get(dt).parse(value).getTime()); + break; // done. formatted the time + } catch (ParseException e) { + // Error formatting the timeStamp, try next format + continue; + } + } + } + } else if (grp == SYSLOG_HOSTNAME_POS) { + hostName = value; + } else if (grp == SYSLOG_BODY_POS) { + msgBody = value; + } + } + break; // we successfully parsed the message using this pattern + } + } + private void reset(){ baos.reset(); m = Mode.START; prio.delete(0, prio.length()); isBadEvent = false; isIncompleteEvent = false; - + hostName = null; + timeStamp = null; + msgBody = null; } // extract relevant syslog data needed for building Flume event Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java?rev=1331498&r1=1331497&r2=1331498&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java (original) +++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java Fri Apr 27 16:51:52 2012 @@ -19,6 +19,9 @@ package org.apache.flume.source; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; import java.util.Map; import junit.framework.Assert; @@ -29,6 +32,150 @@ import org.jboss.netty.buffer.ChannelBuf import org.junit.Test; public class TestSyslogUtils { + @Test + public void TestHeader0() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // timestamp with hh:mm format timezone with no version + String msg1 = "<10>" + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); + } + + @Test + public void TestHeader1() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ss"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1, format1, host1, data1); + } + + @Test + public void TestHeader2() throws ParseException { + + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // timestamp with 'Z' appended, translates to UTC + String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1 + "+0000", format1, host1, data1); + } + + @Test + public void TestHeader3() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // timestamp with hh:mm format timezone + String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); + } + + @Test + public void TestHeader4() throws ParseException { + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // null format timestamp (-) + String msg1 = "<10>1 " + "-" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, null, null, host1, data1); + } + + @Test + public void TestHeader5() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ss"; + String host1 = "-"; + String data1 = "some msg"; + // null host + String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1, format1, null, data1); + } + + @Test + public void TestHeader6() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; + String host1 = "-"; + String data1 = "some msg"; + // null host + String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1 + "+0000", format1, null, data1); + } + + @Test + public void TestHeader7() throws ParseException { + String stamp1 = "2012-04-13T11:11:11"; + String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; + String host1 = "-"; + String data1 = "some msg"; + // null host + String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1 + "+0800", format1, null, data1); + } + + @Test + public void TestHeader8() throws ParseException { + String stamp1 = "2012-04-13T11:11:11.999"; + String format1 = "yyyy-MM-dd'T'HH:mm:ss.S"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, stamp1, format1, host1, data1); + } + + @Test + public void TestHeader9() throws ParseException { + String stamp1 = "Apr 11 13:14:04"; + String format1 = "yyyyMMM d HH:mm:ss"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // timestamp with 'Z' appended, translates to UTC + String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1, + format1, host1, data1); + } + + @Test + public void TestHeader10() throws ParseException { + String stamp1 = "Apr 1 13:14:04"; + String format1 = "yyyyMMM d HH:mm:ss"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + // timestamp with 'Z' appended, translates to UTC + String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1, + format1, host1, data1); + } + + public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException { + SyslogUtils util = new SyslogUtils(false); + ChannelBuffer buff = ChannelBuffers.buffer(100); + + buff.writeBytes(msg1.getBytes()); + Event e = util.extractEvent(buff); + if(e == null){ + throw new NullPointerException("Event is null"); + } + Map headers2 = e.getHeaders(); + if (stamp1 == null) { + Assert.assertFalse(headers2.containsKey("timestamp")); + } else { + SimpleDateFormat formater = new SimpleDateFormat(format1); + Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), headers2.get("timestamp")); + } + if (host1 == null) { + Assert.assertFalse(headers2.containsKey("host")); + } else { + String host2 = headers2.get("host"); + Assert.assertEquals(host2,host1); + } + Assert.assertEquals(data1, new String(e.getBody())); + } /** * Test bad event format 1: Priority is not numeric