flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1385. Add a multiport syslog source
Date Fri, 14 Sep 2012 21:14:56 GMT
Updated Branches:
  refs/heads/trunk f54148a3f -> 30293ea1e


FLUME-1385. Add a multiport syslog source

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30293ea1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30293ea1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30293ea1

Branch: refs/heads/trunk
Commit: 30293ea1e9421cc4684d8e997770fe6f171970eb
Parents: f54148a
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 14 14:12:51 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 14 14:12:51 2012 -0700

----------------------------------------------------------------------
 flume-ng-core/pom.xml                              |    5 +
 .../flume/source/MultiportSyslogTCPSource.java     |  539 +++++++++++++++
 .../java/org/apache/flume/source/SyslogParser.java |  323 +++++++++
 .../source/SyslogSourceConfigurationConstants.java |   31 +
 .../flume/source/TestMultiportSyslogTCPSource.java |  382 ++++++++++
 .../org/apache/flume/source/TestSyslogParser.java  |   85 +++
 pom.xml                                            |    5 +
 7 files changed, 1370 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 9600dfc..4592a9d 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -183,6 +183,11 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.mina</groupId>
+      <artifactId>mina-core</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
new file mode 100644
index 0000000..884fd62
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class MultiportSyslogTCPSource extends AbstractSource implements
+        EventDrivenSource, Configurable {
+
+  public static final Logger logger = LoggerFactory.getLogger(
+          MultiportSyslogTCPSource.class);
+
+  private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
+
+  private List<Integer> ports = Lists.newArrayList();
+  private String host;
+  private NioSocketAcceptor acceptor;
+  private Integer numProcessors;
+  private int maxEventSize;
+  private int batchSize;
+  private int readBufferSize;
+  private String portHeader;
+  private SourceCounter sourceCounter = null;
+  private Charset defaultCharset;
+  private ThreadSafeDecoder defaultDecoder;
+
+  public MultiportSyslogTCPSource() {
+    portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
+  }
+
+  @Override
+  public void configure(Context context) {
+    String portsStr = context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_PORTS);
+
+    Preconditions.checkNotNull(portsStr, "Must define config "
+            + "parameter for MultiportSyslogTCPSource: ports");
+
+    for (String portStr : portsStr.split("\\s+")) {
+      Integer port = Integer.parseInt(portStr);
+      ports.add(port);
+    }
+
+    host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
+
+    numProcessors = context.getInteger(
+            SyslogSourceConfigurationConstants.CONFIG_NUMPROCESSORS);
+
+    maxEventSize = context.getInteger(
+        SyslogSourceConfigurationConstants.CONFIG_EVENTSIZE,
+        SyslogUtils.DEFAULT_SIZE);
+
+    String defaultCharsetStr = context.getString(
+        SyslogSourceConfigurationConstants.CONFIG_CHARSET,
+        SyslogSourceConfigurationConstants.DEFAULT_CHARSET);
+    try {
+      defaultCharset = Charset.forName(defaultCharsetStr);
+    } catch (Exception ex) {
+      throw new IllegalArgumentException("Unable to parse charset "
+          + "string (" + defaultCharsetStr + ") from port configuration.", ex);
+
+    }
+
+    defaultDecoder = new ThreadSafeDecoder(defaultCharset);
+
+    // clear any previous charset configuration and reconfigure it
+    portCharsets.clear();
+    {
+      ImmutableMap<String, String> portCharsetCfg = context.getSubProperties(
+        SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX);
+      for (Map.Entry<String, String> entry : portCharsetCfg.entrySet()) {
+        String portStr = entry.getKey();
+        String charsetStr = entry.getValue();
+        Integer port = Integer.parseInt(portStr);
+        Preconditions.checkNotNull(port, "Invalid port number in config");
+        try {
+          Charset charset = Charset.forName(charsetStr);
+          portCharsets.put(port, new ThreadSafeDecoder(charset));
+        } catch (Exception ex) {
+          throw new IllegalArgumentException("Unable to parse charset " +
+              "string (" + charsetStr + ") from port configuration.", ex);
+        }
+      }
+    }
+
+    batchSize = context.getInteger(
+        SyslogSourceConfigurationConstants.CONFIG_BATCHSIZE,
+        SyslogSourceConfigurationConstants.DEFAULT_BATCHSIZE);
+
+    portHeader = context.getString(
+            SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER);
+
+    readBufferSize = context.getInteger(
+        SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
+        SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE);
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
+  }
+
+  @Override
+  public void start() {
+    logger.info("Starting {}...", this);
+
+    // allow user to specify number of processors to use for thread pool
+    if (numProcessors != null) {
+      acceptor = new NioSocketAcceptor(numProcessors);
+    } else {
+      acceptor = new NioSocketAcceptor();
+    }
+    acceptor.setReuseAddress(true);
+    acceptor.getSessionConfig().setReadBufferSize(readBufferSize);
+    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
+
+    acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
+        getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
+        portCharsets));
+
+    for (int port : ports) {
+      InetSocketAddress addr;
+      if (host != null) {
+        addr = new InetSocketAddress(host, port);
+      } else {
+        addr = new InetSocketAddress(port);
+      }
+      try {
+        //Not using the one that takes an array because we won't want one bind
+        //error affecting the next.
+        acceptor.bind(addr);
+      } catch (IOException ex) {
+        logger.error("Could not bind to address: " + String.valueOf(addr), ex);
+      }
+    }
+
+    sourceCounter.start();
+    super.start();
+
+    logger.info("{} started.", this);
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Stopping {}...", this);
+
+    acceptor.unbind();
+    acceptor.dispose();
+
+    sourceCounter.stop();
+    super.stop();
+
+    logger.info("{} stopped. Metrics: {}", this, sourceCounter);
+  }
+
+  @Override
+  public String toString() {
+    return "Multiport Syslog TCP source " + getName();
+  }
+
+  static class MultiportSyslogHandler extends IoHandlerAdapter {
+
+    private static final String SAVED_BUF = "savedBuffer";
+    private final ChannelProcessor channelProcessor;
+    private final int maxEventSize;
+    private final int batchSize;
+    private final SourceCounter sourceCounter;
+    private final String portHeader;
+    private final SyslogParser syslogParser;
+    private final LineSplitter lineSplitter;
+    private final ThreadSafeDecoder defaultDecoder;
+    private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
+
+    public MultiportSyslogHandler(int maxEventSize, int batchSize,
+        ChannelProcessor cp, SourceCounter ctr, String portHeader,
+        ThreadSafeDecoder defaultDecoder,
+        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets) {
+      channelProcessor = cp;
+      sourceCounter = ctr;
+      this.maxEventSize = maxEventSize;
+      this.batchSize = batchSize;
+      this.portHeader = portHeader;
+      this.defaultDecoder = defaultDecoder;
+      this.portCharsets = portCharsets;
+      syslogParser = new SyslogParser();
+      lineSplitter = new LineSplitter(maxEventSize);
+    }
+
+    @Override
+    public void exceptionCaught(IoSession session, Throwable cause)
+        throws Exception {
+      logger.error("Error in syslog message handler", cause);
+      if (cause instanceof Error) {
+        Throwables.propagate(cause);
+      }
+    }
+
+    @Override
+    public void sessionCreated(IoSession session) {
+      logger.info("Session created: {}", session);
+
+      // Allocate saved buffer when session is created.
+      // This allows us to parse an incomplete message and use it on
+      // the next request.
+      session.setAttribute(SAVED_BUF, IoBuffer.allocate(maxEventSize, false));
+    }
+
+    @Override
+    public void sessionOpened(IoSession session) {
+      // debug level so it isn't too spammy together w/ sessionCreated()
+      logger.debug("Session opened: {}", session);
+    }
+
+    @Override
+    public void sessionClosed(IoSession session) {
+      logger.info("Session closed: {}", session);
+    }
+
+    @Override
+    public void messageReceived(IoSession session, Object message) {
+
+      IoBuffer buf = (IoBuffer) message;
+      IoBuffer savedBuf = (IoBuffer) session.getAttribute(SAVED_BUF);
+
+      ParsedBuffer parsedLine = new ParsedBuffer();
+      List<Event> events = Lists.newArrayList();
+
+      // the character set can be specified per-port
+      CharsetDecoder decoder = defaultDecoder.get();
+      int port =
+          ((InetSocketAddress) session.getLocalAddress()).getPort();
+      if (portCharsets.containsKey(port)) {
+        decoder = portCharsets.get(port).get();
+      }
+
+      // while the buffer is not empty
+      while (buf.hasRemaining()) {
+        events.clear();
+
+        // take number of events no greater than batchSize
+        for (int num = 0; num < batchSize && buf.hasRemaining(); num++) {
+
+          if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) {
+            Event event = parseEvent(parsedLine, decoder);
+            if (portHeader != null) {
+              event.getHeaders().put(portHeader, String.valueOf(port));
+            }
+            events.add(event);
+          } else {
+            logger.trace("Parsed null event");
+          }
+
+        }
+
+        // don't try to write anything if we didn't get any events somehow
+        if (events.isEmpty()) {
+          logger.trace("Empty set!");
+          return;
+        }
+
+        int numEvents = events.size();
+        sourceCounter.addToEventReceivedCount(numEvents);
+
+        // write the events to the downstream channel
+        try {
+          channelProcessor.processEventBatch(events);
+          sourceCounter.addToEventAcceptedCount(numEvents);
+        } catch (Throwable t) {
+          logger.error("Error writing to channel, event dropped", t);
+          if (t instanceof Error) {
+            Throwables.propagate(t);
+          }
+        }
+      }
+
+    }
+
+    /**
+     * Decodes a syslog-formatted ParsedLine into a Flume Event.
+     * @param parsedBuf Buffer containing characters to be parsed
+     * @param port Incoming port
+     * @return
+     */
+    Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) {
+      String msg = null;
+      try {
+        msg = parsedBuf.buffer.getString(decoder);
+      } catch (Throwable t) {
+        logger.info("Error decoding line with charset (" + decoder.charset() +
+            "). Exception follows.", t);
+
+        if (t instanceof Error) {
+          Throwables.propagate(t);
+        }
+
+        // fall back to byte array
+        byte[] bytes = new byte[parsedBuf.buffer.remaining()];
+        parsedBuf.buffer.get(bytes);
+
+        Event event = EventBuilder.withBody(bytes);
+        event.getHeaders().put(SyslogUtils.EVENT_STATUS,
+            SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
+
+        return event;
+      }
+
+      logger.trace("Seen raw event: {}", msg);
+
+      Event event;
+      try {
+        event = syslogParser.parseMessage(msg, decoder.charset());
+        if (parsedBuf.incomplete) {
+          event.getHeaders().put(SyslogUtils.EVENT_STATUS,
+              SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus());
+        }
+      } catch (IllegalArgumentException ex) {
+        event = EventBuilder.withBody(msg, decoder.charset());
+        event.getHeaders().put(SyslogUtils.EVENT_STATUS,
+            SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
+        logger.debug("Error parsing syslog event", ex);
+      }
+
+      return event;
+    }
+  }
+
+  /**
+   * This class is designed to parse lines up to a maximum length. If the line
+   * exceeds the given length, it is cut off at that mark and an overflow flag
+   * is set for the line. If less than the specified length is parsed, and a
+   * newline is not found, then the parsed data is saved in a buffer provided
+   * for that purpose so that it can be used in the next round of parsing.
+   */
+  static class LineSplitter {
+
+    private final static byte NEWLINE = '\n';
+    private final int maxLineLength;
+
+    public LineSplitter(int maxLineLength) {
+      this.maxLineLength = maxLineLength;
+    }
+
+    /**
+     * Parse a line from the IoBuffer {@code buf} and store it into
+     * {@code parsedBuf} except for the trailing newline character. If a line
+     * is successfully parsed, returns {@code true}.
+     * <p/>If no newline is found, and
+     * the number of bytes traversed is less than {@code maxLineLength}, then
+     * the data read from {@code buf} is stored in {@code savedBuf} and this
+     * method returns {@code false}.
+     * <p/>If the number of characters traversed
+     * equals {@code maxLineLength}, but a newline was not found, then the
+     * {@code parsedBuf} variable will be populated, the {@code overflow} flag
+     * will be set in the {@code ParsedBuffer} object, and this function will
+     * return {@code true}.
+     */
+    public boolean parseLine(IoBuffer buf, IoBuffer savedBuf,
+        ParsedBuffer parsedBuf) {
+
+      // clear out passed-in ParsedBuffer object
+      parsedBuf.buffer = null;
+      parsedBuf.incomplete = false;
+
+      byte curByte;
+
+      buf.mark();
+      int msgPos = savedBuf.position(); // carry on from previous buffer
+      boolean seenNewline = false;
+      while (!seenNewline && buf.hasRemaining() && msgPos < maxLineLength) {
+        curByte = buf.get();
+
+        // we are looking for newline delimiters between events
+        if (curByte == NEWLINE) {
+          seenNewline = true;
+        }
+
+        msgPos++;
+      }
+
+      // hit a newline?
+      if (seenNewline) {
+
+        int end = buf.position();
+        buf.reset();
+        int start = buf.position();
+
+        if (savedBuf.position() > 0) {
+          // complete the saved buffer
+          byte[] tmp = new byte[end - start];
+          buf.get(tmp);
+          savedBuf.put(tmp);
+          int len = savedBuf.position() - 1;
+          savedBuf.flip();
+
+          parsedBuf.buffer = savedBuf.getSlice(len);
+
+          savedBuf.clear();
+        } else {
+          parsedBuf.buffer = buf.getSlice(end - start - 1);
+
+          buf.get();  // throw away newline
+        }
+
+        return true;
+
+      // we either emptied our buffer or hit max msg size
+      } else {
+
+        // exceeded max message size
+        if (msgPos == maxLineLength) {
+
+          int end = buf.position();
+          buf.reset();
+          int start = buf.position();
+
+          if (savedBuf.position() > 0) {
+            // complete the saved buffer
+            byte[] tmp = new byte[end - start];
+            buf.get(tmp);
+            savedBuf.put(tmp);
+            savedBuf.flip();
+            parsedBuf.buffer = savedBuf.getSlice(msgPos);
+            savedBuf.clear();
+          } else {
+            // no newline found
+            parsedBuf.buffer = buf.getSlice(msgPos);
+          }
+
+          logger.warn("Event size larger than specified event size: {}. "
+              + "Consider increasing the max event size.", maxLineLength);
+
+          parsedBuf.incomplete = true;
+
+          return true;
+
+        // message fragmentation; save in buffer for later
+        } else if (!buf.hasRemaining()) {
+
+          int end = buf.position();
+          buf.reset();
+          int start = buf.position();
+          byte[] tmp = new byte[end - start];
+          buf.get(tmp);
+          savedBuf.put(tmp);
+
+          return false;
+
+        // this should never happen
+        } else {
+
+          throw new IllegalStateException("unexpected buffer state: " +
+              "msgPos=" + msgPos + ", buf.hasRemaining=" + buf.hasRemaining() +
+              ", savedBuf.hasRemaining=" + savedBuf.hasRemaining() +
+              ", seenNewline=" + seenNewline + ", maxLen=" + maxLineLength);
+
+        }
+
+      }
+    }
+
+  }
+
+  /**
+   * Private struct to represent a simple text line parsed from a message.
+   */
+  static class ParsedBuffer {
+
+    /**
+     * The parsed line of text, without the newline character.
+     */
+    public IoBuffer buffer = null;
+    /**
+     * The incomplete flag is set if the source line length exceeds the maximum
+     * allowed line length. In that case, the returned line will have length
+     * equal to the maximum line length.
+     */
+    public boolean incomplete = false;
+  }
+
+  /**
+   * Package private only for unit testing
+   */
+  static class ThreadSafeDecoder extends ThreadLocal<CharsetDecoder> {
+    private final Charset charset;
+
+    public ThreadSafeDecoder(Charset charset) {
+      this.charset = charset;
+    }
+
+    @Override
+    protected CharsetDecoder initialValue() {
+      return charset.newDecoder();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
new file mode 100644
index 0000000..bf3305c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.flume.source;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Maps;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyslogParser {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(SyslogParser.class);
+
+  private static final int TS_CACHE_MAX = 1000;  // timestamp cache size limit
+  private static final Pattern TWO_SPACES = Pattern.compile("  ");
+  private static final DateTimeFormatter rfc3164Format =
+      DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
+
+  private static final String timePat = "yyyy-MM-dd'T'HH:mm:ss";
+  private static final int RFC3164_LEN = 15;
+  private static final int RFC5424_PREFIX_LEN = 19;
+
+  private final DateTimeFormatter timeParser;
+
+  private Cache<String, Long> timestampCache;
+
+  public SyslogParser() {
+    timeParser = DateTimeFormat.forPattern(timePat).withZoneUTC();
+    timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(
+        new CacheLoader<String, Long>() {
+
+          @Override
+          public Long load(String key) throws Exception {
+            return timeParser.parseMillis(key);
+          }
+        });
+  }
+
+  /**
+   * Parses a Flume Event out of a syslog message string.
+   * @param msg Syslog message, not including the newline character
+   * @return Parsed Flume Event
+   * @throws IllegalArgumentException if unable to successfully parse message
+   */
+  public Event parseMessage(String msg, Charset charset) {
+    Map<String, String> headers = Maps.newHashMap();
+
+    int msgLen = msg.length();
+
+    int curPos = 0;
+
+    Preconditions.checkArgument(msg.charAt(curPos) == '<',
+        "Bad format: invalid priority: cannot find open bracket '<' (%s)", msg);
+
+    int endBracketPos = msg.indexOf('>');
+    Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6,
+        "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg);
+
+    String priority = msg.substring(1, endBracketPos);
+    int pri = Integer.parseInt(priority);
+    int facility = pri / 8;
+    int severity = pri % 8;
+
+    // put fac / sev into header
+    headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
+    headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
+
+    Preconditions.checkArgument(msgLen > endBracketPos + 1,
+        "Bad format: no data except priority (%s)", msg);
+
+    // update parsing position
+    curPos = endBracketPos + 1;
+
+    // ignore version string
+    if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) {
+      curPos += 2;
+    }
+
+    // now parse timestamp (handle different varieties)
+
+    long ts;
+    char dateStartChar = msg.charAt(curPos);
+
+    try {
+
+      // no timestamp specified; use relay current time
+      if (dateStartChar == '-') {
+        ts = System.currentTimeMillis();
+        if (msgLen <= curPos + 2) {
+          throw new IllegalArgumentException(
+              "bad syslog format (missing hostname)");
+        }
+        curPos += 2; // assume we skip past a space to get to the hostname
+
+        // rfc3164 imestamp
+      } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
+        if (msgLen <= curPos + RFC3164_LEN) {
+          throw new IllegalArgumentException("bad timestamp format");
+        }
+        ts = parseRfc3164Time(
+            msg.substring(curPos, curPos + RFC3164_LEN));
+        curPos += RFC3164_LEN + 1;
+
+        // rfc 5424 timestamp
+      } else {
+        int nextSpace = msg.indexOf(' ', curPos);
+        if (nextSpace == -1) {
+          throw new IllegalArgumentException("bad timestamp format");
+        }
+        ts = parseRfc5424Date(msg.substring(curPos, nextSpace));
+        curPos = nextSpace + 1;
+      }
+
+    } catch (IllegalArgumentException ex) {
+      throw new IllegalArgumentException("Unable to parse message: " + msg, ex);
+    }
+
+    headers.put("timestamp", String.valueOf(ts));
+
+    // parse out hostname
+    int nextSpace = msg.indexOf(' ', curPos);
+    if (nextSpace == -1) {
+      throw new IllegalArgumentException(
+          "bad syslog format (missing hostname)");
+    }
+    // copy the host string to avoid holding the message string in memory
+    // if using a memory-based queue
+    String hostname = new String(msg.substring(curPos, nextSpace));
+    headers.put("host", hostname);
+
+    // EventBuilder will do a copy of its own, so no defensive copy of the body
+    String data = "";
+    if (msgLen > nextSpace + 1) {
+      curPos = nextSpace + 1;
+      data = msg.substring(curPos);
+    }
+
+    Event event = EventBuilder.withBody(data, charset, headers);
+
+    return event;
+  }
+
+  /**
+   * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
+   * multiple messages that occur in the same second.
+   * @param msg
+   * @return Typical (for Java) milliseconds since UNIX epoch
+   */
+  protected long parseRfc5424Date(String msg) {
+
+    Long ts = null;
+    int curPos = 0;
+
+    int msgLen = msg.length();
+    Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN,
+        "Bad format: Not a valid RFC5424 timestamp: %s", msg);
+    String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
+
+    try {
+      ts = timestampCache.get(timestampPrefix);
+    } catch (ExecutionException ex) {
+      throw new IllegalArgumentException("bad timestamp format", ex);
+    }
+
+    curPos += RFC5424_PREFIX_LEN;
+
+    Preconditions.checkArgument(ts != null, "Parsing error: timestamp is null");
+
+    // look for the optional fractional seconds
+    if (msg.charAt(curPos) == '.') {
+      // figure out how many numeric digits
+      boolean foundEnd = false;
+      int endMillisPos = curPos + 1;
+
+      if (msgLen <= endMillisPos) {
+        throw new IllegalArgumentException("bad timestamp format (no TZ)");
+      }
+
+      // FIXME: TODO: ensure we handle all bad formatting cases
+      while (!foundEnd) {
+        char curDigit = msg.charAt(endMillisPos);
+        if (curDigit >= '0' && curDigit <= '9') {
+          endMillisPos++;
+        } else {
+          foundEnd = true;
+        }
+      }
+
+      // if they had a valid fractional second, append it rounded to millis
+      if (endMillisPos - (curPos + 1) > 0) {
+        float frac = Float.parseFloat(msg.substring(curPos, endMillisPos));
+        long milliseconds = (long) (frac * 1000f);
+        ts += milliseconds;
+      } else {
+        throw new IllegalArgumentException(
+            "Bad format: Invalid timestamp (fractional portion): " + msg);
+      }
+
+      curPos = endMillisPos;
+    }
+
+    // look for timezone
+    char tzFirst = msg.charAt(curPos);
+
+    // UTC
+    if (tzFirst == 'Z') {
+      // no-op
+    } else if (tzFirst == '+' || tzFirst == '-') {
+
+      Preconditions.checkArgument(msgLen > curPos + 5,
+          "Bad format: Invalid timezone (%s)", msg);
+
+      int polarity;
+      if (tzFirst == '+') {
+        polarity = +1;
+      } else {
+        polarity = -1;
+      }
+
+      char[] h = new char[5];
+      for (int i = 0; i < 5; i++) {
+        h[i] = msg.charAt(curPos + 1 + i);
+      }
+
+      if (h[0] >= '0' && h[0] <= '9'
+          && h[1] >= '0' && h[1] <= '9'
+          && h[2] == ':'
+          && h[3] >= '0' && h[3] <= '9'
+          && h[4] >= '0' && h[4] <= '9') {
+        int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 3));
+        int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 6));
+        ts -= polarity * ((hourOffset * 60) + minOffset) * 60000;
+      } else {
+        throw new IllegalArgumentException(
+            "Bad format: Invalid timezone: " + msg);
+      }
+
+    }
+
+
+    return ts;
+  }
+
+  /**
+   * Parse the RFC3164 date format. This is trickier than it sounds because this
+   * format does not specify a year so we get weird edge cases at year
+   * boundaries. This implementation tries to "do what I mean".
+   * @param ts RFC3164-compatible timestamp to be parsed
+   * @return Typical (for Java) milliseconds since the UNIX epoch
+   */
+  protected long parseRfc3164Time(String ts) {
+    DateTime now = DateTime.now();
+    int year = now.getYear();
+
+    ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
+
+    DateTime date;
+    try {
+      date = rfc3164Format.parseDateTime(ts);
+    } catch (IllegalArgumentException e) {
+      logger.debug("rfc3164 date parse failed on ("+ts+"): invalid format", e);
+      return 0;
+    }
+
+    // try to deal with boundary cases, i.e. new year's eve.
+    // rfc3164 dates are really dumb.
+    // NB: cannot handle replaying of old logs or going back to the future
+    if (date != null) {
+      DateTime fixed = date.withYear(year);
+
+      // flume clock is ahead or there is some latency, and the year rolled
+      if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
+        fixed = date.withYear(year - 1);
+      // flume clock is behind and the year rolled
+      } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
+        fixed = date.withYear(year + 1);
+      }
+      date = fixed;
+    }
+
+    if (date == null) {
+      return 0;
+    }
+
+    return date.getMillis();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index 8c87215..5a73c88 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -22,6 +22,11 @@ public final class SyslogSourceConfigurationConstants {
 
   public static final String CONFIG_PORT = "port";
 
+  /**
+   * List of ports to listen to.
+   */
+  public static final String CONFIG_PORTS = "ports";
+
   public static final String CONFIG_HOST = "host";
 
   public static final String CONFIG_FORMAT_PREFIX = "format.";
@@ -34,6 +39,32 @@ public final class SyslogSourceConfigurationConstants {
 
   public static final String CONFIG_DATEFORMAT = "dateFormat";
 
+  /**
+   * Number of processors used to calculate number of threads to spawn.
+   */
+  public static final String CONFIG_NUMPROCESSORS = "numProcessors";
+
+  /**
+   * Maximum allowable size of events.
+   */
+  public static final String CONFIG_EVENTSIZE = "eventSize";
+
+  public static final String CONFIG_BATCHSIZE = "batchSize";
+
+  public static final String CONFIG_CHARSET = "charset.default";
+
+  public static final String DEFAULT_CHARSET = "UTF-8";
+
+  public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port.";
+
+  public static final int DEFAULT_BATCHSIZE = 100;
+
+  public static final String CONFIG_PORT_HEADER = "portHeader";
+
+  public static final String DEFAULT_PORT_HEADER = "port";
+
+  public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
+  public static final int DEFAULT_READBUF_SIZE = 1024;
 
   private SyslogSourceConfigurationConstants() {
     // Disable explicit creation of objects.

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
new file mode 100644
index 0000000..680e592
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.MultiportSyslogTCPSource.LineSplitter;
+import org.apache.flume.source.MultiportSyslogTCPSource.MultiportSyslogHandler;
+import org.apache.flume.source.MultiportSyslogTCPSource.ParsedBuffer;
+import org.apache.flume.source.MultiportSyslogTCPSource.ThreadSafeDecoder;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
+import org.apache.mina.transport.socket.nio.NioSession;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.mockito.Mockito.*;
+
+public class TestMultiportSyslogTCPSource {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestMultiportSyslogTCPSource.class);
+
+  private static final int BASE_TEST_SYSLOG_PORT = 14455;
+  private final DateTime time = new DateTime();
+  private final String stamp1 = time.toString();
+  private final String host1 = "localhost.localdomain";
+  private final String data1 = "proc1 - some msg";
+
+  /**
+   * Helper function to generate a syslog message.
+   * @param counter
+   * @return 
+   */
+  private byte[] getEvent(int counter) {
+    // timestamp with 'Z' appended, translates to UTC
+    String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + " "
+            + String.valueOf(counter) + "\n";
+    return msg1.getBytes();
+  }
+
+  /**
+   * Basic test to exercise multiple-port parsing.
+   */
+  @Test
+  public void testMultiplePorts() throws IOException, ParseException {
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+
+    Context channelContext = new Context();
+    channelContext.put("capacity", String.valueOf(2000));
+    channelContext.put("transactionCapacity", String.valueOf(2000));
+    Configurables.configure(channel, channelContext);
+
+    List<Channel> channels = Lists.newArrayList();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    Context context = new Context();
+    StringBuilder ports = new StringBuilder();
+    for (int i = 0; i < 1000; i++) {
+      ports.append(String.valueOf(BASE_TEST_SYSLOG_PORT + i)).append(" ");
+    }
+    context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
+        ports.toString().trim());
+    source.configure(context);
+    source.start();
+
+    Socket syslogSocket;
+    for (int i = 0; i < 1000 ; i++) {
+      syslogSocket = new Socket(
+              InetAddress.getLocalHost(), BASE_TEST_SYSLOG_PORT + i);
+      syslogSocket.getOutputStream().write(getEvent(i));
+      syslogSocket.close();
+    }
+
+    List<Event> channelEvents = new ArrayList<Event>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 1000; i++) {
+      Event e = channel.take();
+      if (e == null) {
+        throw new NullPointerException("Event is null");
+      }
+      channelEvents.add(e);
+    }
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+    //Since events can arrive out of order, search for each event in the array
+    for (int i = 0; i < 1000 ; i++) {
+      Iterator<Event> iter = channelEvents.iterator();
+      while (iter.hasNext()) {
+        Event e = iter.next();
+        Map<String, String> headers = e.getHeaders();
+        // rely on port to figure out which event it is
+        Integer port = null;
+        if (headers.containsKey(
+            SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)) {
+          port = Integer.parseInt(headers.get(
+                SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER));
+        }
+        iter.remove();
+
+        Assert.assertEquals("Timestamps must match",
+            String.valueOf(time.getMillis()), headers.get("timestamp"));
+
+        String host2 = headers.get("host");
+        Assert.assertEquals(host1, host2);
+
+        if (port != null) {
+          int num = port - BASE_TEST_SYSLOG_PORT;
+          Assert.assertEquals(data1 + " " + String.valueOf(num),
+              new String(e.getBody()));
+        }
+      }
+    }
+    source.stop();
+  }
+
+  /**
+   * Test the reassembly of a single line across multiple packets.
+   */
+  @Test
+  public void testFragmented() throws CharacterCodingException {
+    final int maxLen = 100;
+
+    IoBuffer savedBuf = IoBuffer.allocate(maxLen);
+
+    String origMsg = "<1>- - blah blam foo\n";
+    IoBuffer buf1 = IoBuffer.wrap(
+        origMsg.substring(0, 11).getBytes(Charsets.UTF_8));
+    IoBuffer buf2 = IoBuffer.wrap(
+        origMsg.substring(11, 16).getBytes(Charsets.UTF_8));
+    IoBuffer buf3 = IoBuffer.wrap(
+        origMsg.substring(16, 21).getBytes(Charsets.UTF_8));
+
+    LineSplitter lineSplitter = new LineSplitter(maxLen);
+    ParsedBuffer parsedLine = new ParsedBuffer();
+
+    Assert.assertFalse("Incomplete line should not be parsed",
+        lineSplitter.parseLine(buf1, savedBuf, parsedLine));
+    Assert.assertFalse("Incomplete line should not be parsed",
+        lineSplitter.parseLine(buf2, savedBuf, parsedLine));
+    Assert.assertTrue("Completed line should be parsed",
+        lineSplitter.parseLine(buf3, savedBuf, parsedLine));
+
+    // the fragmented message should now be reconstructed
+    Assert.assertEquals(origMsg.trim(),
+        parsedLine.buffer.getString(Charsets.UTF_8.newDecoder()));
+    parsedLine.buffer.rewind();
+
+    MultiportSyslogTCPSource.MultiportSyslogHandler handler =
+        new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null,
+        null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER,
+        new ThreadSafeDecoder(Charsets.UTF_8),
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>());
+
+    Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder());
+    String body = new String(event.getBody(), Charsets.UTF_8);
+    Assert.assertEquals("Event body incorrect",
+        origMsg.trim().substring(7), body);
+  }
+
+  /**
+   * Test parser handling of different character sets.
+   */
+  @Test
+  public void testCharsetParsing() throws FileNotFoundException, IOException {
+    String header = "<10>2012-08-11T01:01:01Z localhost ";
+    String enBody = "Yarf yarf yarf";
+    String enMsg = header + enBody;
+    String frBody = "Comment " + "\u00EA" + "tes-vous?";
+    String frMsg = header + frBody;
+    String esBody = "¿Cómo estás?";
+    String esMsg = header + esBody;
+
+    // defaults to UTF-8
+    MultiportSyslogHandler handler = new MultiportSyslogHandler(
+        1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
+        new SourceCounter("test"), "port",
+        new ThreadSafeDecoder(Charsets.UTF_8),
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>());
+
+    ParsedBuffer parsedBuf = new ParsedBuffer();
+    parsedBuf.incomplete = false;
+
+    // should be able to encode/decode any of these messages in UTF-8 or ISO
+    String[] bodies = { enBody, esBody, frBody };
+    String[] msgs = { enMsg, esMsg, frMsg };
+    Charset[] charsets = { Charsets.UTF_8, Charsets.ISO_8859_1 };
+    for (Charset charset : charsets) {
+      for (int i = 0; i < msgs.length; i++) {
+        String msg = msgs[i];
+        String body = bodies[i];
+        parsedBuf.buffer = IoBuffer.wrap(msg.getBytes(charset));
+        Event evt = handler.parseEvent(parsedBuf, charset.newDecoder());
+        String result = new String(evt.getBody(), charset);
+        // this doesn't work with non-UTF-8 chars... not sure why...
+        Assert.assertEquals(charset + " parse error: " + msg, body, result);
+        Assert.assertNull(
+            evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+      }
+    }
+
+    // Construct an invalid UTF-8 sequence.
+    // The parser should still generate an Event, but mark it as INVALID.
+    byte[] badUtf8Seq = enMsg.getBytes(Charsets.ISO_8859_1);
+    int badMsgLen = badUtf8Seq.length;
+    badUtf8Seq[badMsgLen - 2] = (byte)0xFE; // valid ISO-8859-1, invalid UTF-8
+    badUtf8Seq[badMsgLen - 1] = (byte)0xFF; // valid ISO-8859-1, invalid UTF-8
+    parsedBuf.buffer = IoBuffer.wrap(badUtf8Seq);
+    Event evt = handler.parseEvent(parsedBuf, Charsets.UTF_8.newDecoder());
+    Assert.assertEquals("event body: " +
+        new String(evt.getBody(), Charsets.ISO_8859_1) +
+        " and my default charset = " + Charset.defaultCharset() +
+        " with event = " + evt,
+        SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
+        evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+    Assert.assertArrayEquals("Raw message data should be kept in body of event",
+        badUtf8Seq, evt.getBody());
+
+  }
+
+  // helper function
+  private static Event takeEvent(Channel channel) {
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event evt = channel.take();
+    txn.commit();
+    txn.close();
+    return evt;
+  }
+
+  /**
+   * Test that different charsets are parsed by different ports correctly.
+   */
+  @Test
+  public void testPortCharsetHandling() throws UnknownHostException, Exception {
+
+    ///////////////////////////////////////////////////////
+    // port setup
+
+    InetAddress localAddr = InetAddress.getLocalHost();
+    DefaultIoSessionDataStructureFactory dsFactory =
+        new DefaultIoSessionDataStructureFactory();
+
+
+    // one faker on port 10001
+    int port1 = 10001;
+    NioSession session1 = mock(NioSession.class);
+    session1.setAttributeMap(dsFactory.getAttributeMap(session1));
+    SocketAddress sockAddr1 = new InetSocketAddress(localAddr, port1);
+    when(session1.getLocalAddress()).thenReturn(sockAddr1);
+
+    // another faker on port 10002
+    int port2 = 10002;
+    NioSession session2 = mock(NioSession.class);
+    session2.setAttributeMap(dsFactory.getAttributeMap(session2));
+    SocketAddress sockAddr2 = new InetSocketAddress(localAddr, port2);
+    when(session2.getLocalAddress()).thenReturn(sockAddr2);
+
+    // set up expected charsets per port
+    ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets =
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
+    portCharsets.put(port1, new ThreadSafeDecoder(Charsets.ISO_8859_1));
+    portCharsets.put(port2, new ThreadSafeDecoder(Charsets.UTF_8));
+
+    ///////////////////////////////////////////////////////
+    // channel / source setup
+
+    // set up channel to receive events
+    MemoryChannel chan = new MemoryChannel();
+    chan.configure(new Context());
+    chan.start();
+    ReplicatingChannelSelector sel = new ReplicatingChannelSelector();
+    sel.setChannels(Lists.<Channel>newArrayList(chan));
+    ChannelProcessor chanProc = new ChannelProcessor(sel);
+
+    // defaults to UTF-8
+    MultiportSyslogHandler handler = new MultiportSyslogHandler(
+        1000, 10, chanProc, new SourceCounter("test"), "port",
+        new ThreadSafeDecoder(Charsets.UTF_8), portCharsets);
+
+    // initialize buffers
+    handler.sessionCreated(session1);
+    handler.sessionCreated(session2);
+
+    ///////////////////////////////////////////////////////
+    // event setup
+
+    // Create events of varying charsets.
+    String header = "<10>2012-08-17T02:14:00-07:00 192.168.1.110 ";
+
+    // These chars encode under ISO-8859-1 as illegal bytes under UTF-8.
+    String dangerousChars = "þÿÀÁ";
+
+    ///////////////////////////////////////////////////////
+    // encode and send them through the message handler
+    String msg;
+    IoBuffer buf;
+    Event evt;
+
+    // valid ISO-8859-1 on the right (ISO-8859-1) port
+    msg = header + dangerousChars + "\n";
+    buf = IoBuffer.wrap(msg.getBytes(Charsets.ISO_8859_1));
+    handler.messageReceived(session1, buf);
+    evt = takeEvent(chan);
+    Assert.assertNotNull("Event vanished!", evt);
+    Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+
+    // valid ISO-8859-1 on the wrong (UTF-8) port
+    msg = header + dangerousChars + "\n";
+    buf = IoBuffer.wrap(msg.getBytes(Charsets.ISO_8859_1));
+    handler.messageReceived(session2, buf);
+    evt = takeEvent(chan);
+    Assert.assertNotNull("Event vanished!", evt);
+    Assert.assertEquals("Expected invalid event due to character encoding",
+        SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
+        evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+
+    // valid UTF-8 on the right (UTF-8) port
+    msg = header + dangerousChars + "\n";
+    buf = IoBuffer.wrap(msg.getBytes(Charsets.UTF_8));
+    handler.messageReceived(session2, buf);
+    evt = takeEvent(chan);
+    Assert.assertNotNull("Event vanished!", evt);
+    Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
new file mode 100644
index 0000000..258c2f1
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.flume.Event;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSyslogParser {
+
+  @Test
+  public void testRfc5424DateParsing() {
+    final String[] examples = {
+      "1985-04-12T23:20:50.52Z", "1985-04-12T19:20:50.52-04:00",
+      "2003-10-11T22:14:15.003Z", "2003-08-24T05:14:15.000003-07:00",
+      "2012-04-13T11:11:11-08:00", "2012-04-13T08:08:08.0001+00:00"
+    };
+
+    SyslogParser parser = new SyslogParser();
+    DateTimeFormatter jodaParser = ISODateTimeFormat.dateTimeParser();
+
+    for (String ex : examples) {
+      Assert.assertEquals(
+          "Problem parsing date string: " + ex,
+          jodaParser.parseMillis(ex),
+          parser.parseRfc5424Date(ex));
+    }
+  }
+
+  @Test
+  public void testMessageParsing() {
+    SyslogParser parser = new SyslogParser();
+    Charset charset = Charsets.UTF_8;
+    List<String> messages = Lists.newArrayList();
+
+    // supported examples from RFC 3161
+    messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
+        "lonvick on /dev/pts/8");
+    messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
+    messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
+        "It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # " +
+         "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
+         "Conveyer1=OK, Conveyer2=OK # %%");
+    messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
+         "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
+
+    // supported examples from RFC 5424
+    messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
+        "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+    messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
+        "8710 - - %% It's time to make the do-nuts.");
+
+    // non-standard (but common) messages (RFC3339 dates, no version digit)
+    messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+    messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+    for (String msg : messages) {
+      Event event = parser.parseMessage(msg, charset);
+      Assert.assertNull("Failure to parse known-good syslog message",
+          event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/30293ea1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3ac6d8..f34c808 100644
--- a/pom.xml
+++ b/pom.xml
@@ -876,6 +876,11 @@ limitations under the License.
         <version>1.4.1</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.mina</groupId>
+        <artifactId>mina-core</artifactId>
+        <version>2.0.4</version>
+      </dependency>
 
      <dependency>
       <groupId>org.hbase</groupId>


Mime
View raw message