Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7149A10F71 for ; Mon, 21 Oct 2013 19:16:24 +0000 (UTC) Received: (qmail 58712 invoked by uid 500); 21 Oct 2013 19:16:12 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 58689 invoked by uid 500); 21 Oct 2013 19:16:11 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 58672 invoked by uid 99); 21 Oct 2013 19:16:10 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Oct 2013 19:16:10 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B08CC4F3AC; Mon, 21 Oct 2013 19:16:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <6d0bff7f7c134ff9abe543b635a35340@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1666. Oops, forgot new test in previous commit Date: Mon, 21 Oct 2013 19:16:10 +0000 (UTC) Updated Branches: refs/heads/flume-1.5 443ea3b89 -> f6bbc5c5a FLUME-1666. Oops, forgot new test in previous commit Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f6bbc5c5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f6bbc5c5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f6bbc5c5 Branch: refs/heads/flume-1.5 Commit: f6bbc5c5af45cb12751a379c5e3060629365a6fd Parents: 443ea3b Author: Mike Percy Authored: Mon Oct 21 12:04:22 2013 -0700 Committer: Mike Percy Committed: Mon Oct 21 12:13:55 2013 -0700 ---------------------------------------------------------------------- .../flume/source/TestSyslogTcpSource.java | 136 +++++++++++++++++++ 1 file changed, 136 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f6bbc5c5/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java new file mode 100644 index 0000000..a6a1d5b --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import com.google.common.base.Charsets; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +public class TestSyslogTcpSource { + private static final org.slf4j.Logger logger = + LoggerFactory.getLogger(TestSyslogTcpSource.class); + private SyslogTcpSource source; + private Channel channel; + private static final int TEST_SYSLOG_PORT = 0; + private final DateTime time = new DateTime(); + private final String stamp1 = time.toString(); + private final String host1 = "localhost.localdomain"; + private final String data1 = "test syslog data"; + private final String bodyWithTandH = stamp1 + " " + host1 + " " + data1; + // Helper function to generate a syslog message. + private byte[] getEvent() { + // timestamp with 'Z' appended, translates to UTC + final String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; + return msg1.getBytes(); + } + + private void init(boolean keepFields){ + source = new SyslogTcpSource(); + channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + List channels = new ArrayList(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + Context context = new Context(); + context.put("port", String.valueOf(TEST_SYSLOG_PORT)); + context.put("keepFields", String.valueOf(keepFields)); + + source.configure(context); + + } + /** Tests the keepFields configuration parameter (enabled or disabled) + using SyslogTcpSource.*/ + private void runKeepFieldsTest(boolean keepFields) throws IOException { + init(keepFields); + source.start(); + // Write some message to the syslog port + Socket syslogSocket; + for (int i = 0; i < 10 ; i++) { + syslogSocket = new Socket( + InetAddress.getLocalHost(), source.getSourcePort()); + syslogSocket.getOutputStream().write(getEvent()); + syslogSocket.close(); + } + + List channelEvents = new ArrayList(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 10; i++) { + Event e = channel.take(); + if (e == null) { + throw new NullPointerException("Event is null"); + } + channelEvents.add(e); + } + + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + + source.stop(); + for (Event e : channelEvents) { + Assert.assertNotNull(e); + String str = new String(e.getBody(), Charsets.UTF_8); + logger.info(str); + if (keepFields) { + Assert.assertArrayEquals(bodyWithTandH.getBytes(), e.getBody()); + } else if (!keepFields) { + Assert.assertArrayEquals(data1.getBytes(), e.getBody()); + } + } + } + + @Test + public void testKeepFields () throws IOException { + runKeepFieldsTest(true); + } + + @Test + public void testRemoveFields() throws IOException{ + runKeepFieldsTest(false); + } + } +