Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9F351D5C4 for ; Fri, 24 Aug 2012 17:23:23 +0000 (UTC) Received: (qmail 73165 invoked by uid 500); 24 Aug 2012 17:23:23 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 73090 invoked by uid 500); 24 Aug 2012 17:23:23 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 73034 invoked by uid 99); 24 Aug 2012 17:23:23 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Aug 2012 17:23:23 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3ACBC1F9A6; Fri, 24 Aug 2012 17:23:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [4/9] git commit: FLUME-1492. Create integration test for file channel. Message-Id: <20120824172323.3ACBC1F9A6@tyr.zones.apache.org> Date: Fri, 24 Aug 2012 17:23:23 +0000 (UTC) FLUME-1492. Create integration test for file channel. (Will McQueen via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d80a7d68 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d80a7d68 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d80a7d68 Branch: refs/heads/flume-1.3.0 Commit: d80a7d6830c3143b2b400da4ace15da739de7221 Parents: c236771 Author: Mike Percy Authored: Thu Aug 23 01:20:35 2012 -0700 Committer: Hari Shreedharan Committed: Fri Aug 24 10:21:32 2012 -0700 ---------------------------------------------------------------------- flume-ng-tests/pom.xml | 9 + .../apache/flume/test/agent/TestFileChannel.java | 197 +++++++++++++++ .../org/apache/flume/test/util/StagedInstall.java | 37 ++- 3 files changed, 231 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml index 7d5ebed..939e66a 100644 --- a/flume-ng-tests/pom.xml +++ b/flume-ng-tests/pom.xml @@ -51,5 +51,14 @@ junit test + + org.apache.hadoop + ${hadoop.common.artifact.id} + true + + + com.google.guava + guava + http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java new file mode 100644 index 0000000..2feb506 --- /dev/null +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.test.agent; + +import java.io.File; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.flume.test.util.StagedInstall; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.io.Files; + +public class TestFileChannel { + + private static final Logger LOGGER = Logger.getLogger(TestFileChannel.class); + + private static final Collection tempResources = new ArrayList(); + + private Properties agentProps; + private File sinkOutputDir; + + @Before + public void setUp() throws Exception { + /* Create 3 temp dirs, each used as value within agentProps */ + + final File sinkOutputDir = Files.createTempDir(); + tempResources.add(sinkOutputDir); + final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath(); + LOGGER.info("Created rolling file sink's output dir: " + + sinkOutputDirPath); + + final File channelCheckpointDir = Files.createTempDir(); + tempResources.add(channelCheckpointDir); + final String channelCheckpointDirPath = channelCheckpointDir + .getCanonicalPath(); + LOGGER.info("Created file channel's checkpoint dir: " + + channelCheckpointDirPath); + + final File channelDataDir = Files.createTempDir(); + tempResources.add(channelDataDir); + final String channelDataDirPath = channelDataDir.getCanonicalPath(); + LOGGER.info("Created file channel's data dir: " + + channelDataDirPath); + + /* Build props to pass to flume agent */ + + Properties agentProps = new Properties(); + + // Active sets + agentProps.put("a1.channels", "c1"); + agentProps.put("a1.sources", "r1"); + agentProps.put("a1.sinks", "k1"); + + // c1 + agentProps.put("a1.channels.c1.type", "FILE"); + agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath); + agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath); + + // r1 + agentProps.put("a1.sources.r1.channels", "c1"); + agentProps.put("a1.sources.r1.type", "EXEC"); + agentProps.put("a1.sources.r1.command", "seq 1 100"); + + // k1 + agentProps.put("a1.sinks.k1.channel", "c1"); + agentProps.put("a1.sinks.k1.type", "FILE_ROLL"); + agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath); + agentProps.put("a1.sinks.k1.sink.rollInterval", "0"); + + this.agentProps = agentProps; + this.sinkOutputDir = sinkOutputDir; + } + + @After + public void tearDown() throws Exception { + StagedInstall.getInstance().stopAgent(); + for (File tempResource : tempResources) { + tempResource.delete(); + } + agentProps = null; + } + + /** + * File channel in/out test. Verifies that all events inserted into the + * file channel are received by the sink in order. + * + * The EXEC source creates 100 events where the event bodies have + * sequential numbers. The source puts those events into the file channel, + * and the FILE_ROLL The sink is expected to take all 100 events in FIFO + * order. + * + * @throws Exception + */ + @Test + public void testInOut() throws Exception { + LOGGER.debug("testInOut() started."); + + /* Find hadoop jar and append it to the flume agent's classpath */ + + String hadoopJarPath = findHadoopJar(); + Assert.assertNotNull("Hadoop jar not found in classpath.", + hadoopJarPath); + StagedInstall.getInstance().setAgentClasspath(hadoopJarPath); + StagedInstall.getInstance().startAgent("a1", agentProps); + TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish + // TODO make this more deterministic + + LOGGER.info("Started flume agent with hadoop in classpath"); + + /* Create expected output */ + + StringBuffer sb = new StringBuffer(); + for (int i = 1; i <= 100; i++) { + sb.append(i).append("\n"); + } + String expectedOutput = sb.toString(); + LOGGER.info("Created expected output: " + expectedOutput); + + /* Create actual output file */ + + File[] sinkOutputDirChildren = sinkOutputDir.listFiles(); + // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled) + Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," + + " but found " + sinkOutputDirChildren.length + " children.", + 1, sinkOutputDirChildren.length); + File actualOutput = sinkOutputDirChildren[0]; + + if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) { + LOGGER.error("Actual output doesn't match expected output.\n"); + throw new AssertionError("FILE_ROLL sink's actual output doesn't " + + "match expected output."); + } + + LOGGER.debug("testInOut() ended."); + } + + /** + * Search for and return the first path element found that includes hadoop. + * We search the class path of the current JVM process to grab the same + * hadoop jar that's depended on by the file channel. + * + * TODO Add all deps of hadoop jar to classpath + * + * @return path to the first hadoop jar found, null if not found + */ + private String findHadoopJar() { + //Grab classpath + String classpath = System.getProperty("java.class.path"); + String trimmedClasspath = classpath.trim(); + + //parse classpath into path elements + Iterable pathElements = Splitter.on(Pattern.compile("[;:]")) + .omitEmptyStrings() + .trimResults() + .split(trimmedClasspath); + + //find the first path element that includes the hadoop jar + for (String pathElement : pathElements) { + if (Pattern.compile("(?i)hadoop").matcher(pathElement).find()) { + return pathElement; + } + } + + LOGGER.error("Hadoop not found in classpath: |" + classpath + + "|"); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java index 3e7940d..e332b63 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.zip.GZIPInputStream; @@ -34,6 +35,9 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.log4j.Logger; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + /** * Attempts to setup a staged install using explicitly specified tar-ball @@ -62,6 +66,7 @@ public class StagedInstall { private Process process; private ProcessShutdownHook shutdownHook; private ProcessInputStreamConsumer consumer; + private String agentClasspath; private static StagedInstall INSTANCE; @@ -87,6 +92,7 @@ public class StagedInstall { consumer.interrupt(); consumer = null; configFilePath = null; + agentClasspath = null; Runtime.getRuntime().removeShutdownHook(shutdownHook); shutdownHook = null; @@ -122,20 +128,23 @@ public class StagedInstall { LOGGER.info("Created configuration file: " + configFilePath); - String[] cmdArgs = { - launchScriptPath, "agent", "-n", name, "-f", configFilePath, - "-c", confDirPath, - "-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath, - "-D" + ENV_FLUME_ROOT_LOGGER + "=" + ENV_FLUME_ROOT_LOGGER_VALUE, - "-D" + ENV_FLUME_LOG_FILE + "=" + logFileName - }; - - StringBuilder sb = new StringBuilder(""); - for (String cmdArg : cmdArgs) { - sb.append(cmdArg).append(" "); + ImmutableList.Builder builder = new ImmutableList.Builder(); + builder.add(launchScriptPath); + builder.add("agent"); + builder.add("--conf", confDirPath); + if (agentClasspath != null) { + builder.add("--classpath", agentClasspath); } + builder.add("--conf-file", configFilePath); + builder.add("--name", name); + builder.add("-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath); + builder.add("-D" + ENV_FLUME_ROOT_LOGGER + "=" + + ENV_FLUME_ROOT_LOGGER_VALUE); + builder.add("-D" + ENV_FLUME_LOG_FILE + "=" + logFileName); - LOGGER.info("Using command: " + sb.toString()); + List cmdArgs = builder.build(); + + LOGGER.info("Using command: " + Joiner.on(" ").join(cmdArgs)); ProcessBuilder pb = new ProcessBuilder(cmdArgs); @@ -155,6 +164,10 @@ public class StagedInstall { Thread.sleep(3000); // sleep for 3s to let system initialize } + public synchronized void setAgentClasspath(String agentClasspath) { + this.agentClasspath = agentClasspath; + } + private File createConfigurationFile(String agentName, Properties properties) throws Exception { File file = File.createTempFile("agent", "config.properties", stageDir);