flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [4/9] git commit: FLUME-1492. Create integration test for file channel.
Date Fri, 24 Aug 2012 17:23:23 GMT
FLUME-1492. Create integration test for file channel.

(Will McQueen via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d80a7d68
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d80a7d68
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d80a7d68

Branch: refs/heads/flume-1.3.0
Commit: d80a7d6830c3143b2b400da4ace15da739de7221
Parents: c236771
Author: Mike Percy <mpercy@cloudera.com>
Authored: Thu Aug 23 01:20:35 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Aug 24 10:21:32 2012 -0700

----------------------------------------------------------------------
 flume-ng-tests/pom.xml                             |    9 +
 .../apache/flume/test/agent/TestFileChannel.java   |  197 +++++++++++++++
 .../org/apache/flume/test/util/StagedInstall.java  |   37 ++-
 3 files changed, 231 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml
index 7d5ebed..939e66a 100644
--- a/flume-ng-tests/pom.xml
+++ b/flume-ng-tests/pom.xml
@@ -51,5 +51,14 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
new file mode 100644
index 0000000..2feb506
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.flume.test.util.StagedInstall;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Splitter;
+import com.google.common.io.Files;
+
+public class TestFileChannel {
+
+  private static final Logger LOGGER = Logger.getLogger(TestFileChannel.class);
+
+  private static final Collection<File> tempResources = new ArrayList<File>();
+
+  private Properties agentProps;
+  private File sinkOutputDir;
+
+  @Before
+  public void setUp() throws Exception {
+      /* Create 3 temp dirs, each used as value within agentProps */
+
+      final File sinkOutputDir = Files.createTempDir();
+      tempResources.add(sinkOutputDir);
+      final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath();
+      LOGGER.info("Created rolling file sink's output dir: "
+              + sinkOutputDirPath);
+
+      final File channelCheckpointDir = Files.createTempDir();
+      tempResources.add(channelCheckpointDir);
+      final String channelCheckpointDirPath = channelCheckpointDir
+              .getCanonicalPath();
+      LOGGER.info("Created file channel's checkpoint dir: "
+              + channelCheckpointDirPath);
+
+      final File channelDataDir = Files.createTempDir();
+      tempResources.add(channelDataDir);
+      final String channelDataDirPath = channelDataDir.getCanonicalPath();
+      LOGGER.info("Created file channel's data dir: "
+              + channelDataDirPath);
+
+      /* Build props to pass to flume agent */
+
+      Properties agentProps = new Properties();
+
+      // Active sets
+      agentProps.put("a1.channels", "c1");
+      agentProps.put("a1.sources", "r1");
+      agentProps.put("a1.sinks", "k1");
+
+      // c1
+      agentProps.put("a1.channels.c1.type", "FILE");
+      agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath);
+      agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath);
+
+      // r1
+      agentProps.put("a1.sources.r1.channels", "c1");
+      agentProps.put("a1.sources.r1.type", "EXEC");
+      agentProps.put("a1.sources.r1.command", "seq 1 100");
+
+      // k1
+      agentProps.put("a1.sinks.k1.channel", "c1");
+      agentProps.put("a1.sinks.k1.type", "FILE_ROLL");
+      agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath);
+      agentProps.put("a1.sinks.k1.sink.rollInterval", "0");
+
+      this.agentProps = agentProps;
+      this.sinkOutputDir = sinkOutputDir;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    StagedInstall.getInstance().stopAgent();
+    for (File tempResource : tempResources) {
+        tempResource.delete();
+    }
+    agentProps = null;
+  }
+
+  /**
+   * File channel in/out test. Verifies that all events inserted into the
+   * file channel are received by the sink in order.
+   *
+   * The EXEC source creates 100 events where the event bodies have
+   * sequential numbers. The source puts those events into the file channel,
+   * and the FILE_ROLL The sink is expected to take all 100 events in FIFO
+   * order.
+   *
+   * @throws Exception
+   */
+  @Test
+   public void testInOut() throws Exception {
+      LOGGER.debug("testInOut() started.");
+
+      /* Find hadoop jar and append it to the flume agent's classpath */
+
+      String hadoopJarPath = findHadoopJar();
+      Assert.assertNotNull("Hadoop jar not found in classpath.",
+              hadoopJarPath);
+      StagedInstall.getInstance().setAgentClasspath(hadoopJarPath);
+      StagedInstall.getInstance().startAgent("a1", agentProps);
+      TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish
+                                  // TODO make this more deterministic
+
+      LOGGER.info("Started flume agent with hadoop in classpath");
+
+      /* Create expected output */
+
+      StringBuffer sb = new StringBuffer();
+      for (int i = 1; i <= 100; i++) {
+          sb.append(i).append("\n");
+      }
+      String expectedOutput = sb.toString();
+      LOGGER.info("Created expected output: " + expectedOutput);
+
+      /* Create actual output file */
+
+      File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
+      // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
+      Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," +
+              " but found " + sinkOutputDirChildren.length + " children.",
+              1, sinkOutputDirChildren.length);
+      File actualOutput = sinkOutputDirChildren[0];
+
+      if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) {
+          LOGGER.error("Actual output doesn't match expected output.\n");
+          throw new AssertionError("FILE_ROLL sink's actual output doesn't " +
+                  "match expected output.");
+      }
+
+      LOGGER.debug("testInOut() ended.");
+  }
+
+  /**
+   * Search for and return the first path element found that includes hadoop.
+   * We search the class path of the current JVM process to grab the same
+   * hadoop jar that's depended on by the file channel.
+   *
+   * TODO Add all deps of hadoop jar to classpath
+   *
+   * @return path to the first hadoop jar found, null if not found
+   */
+  private String findHadoopJar() {
+      //Grab classpath
+      String classpath = System.getProperty("java.class.path");
+      String trimmedClasspath = classpath.trim();
+
+      //parse classpath into path elements
+      Iterable<String> pathElements = Splitter.on(Pattern.compile("[;:]"))
+                .omitEmptyStrings()
+                .trimResults()
+                .split(trimmedClasspath);
+
+      //find the first path element that includes the hadoop jar
+      for (String pathElement : pathElements) {
+          if (Pattern.compile("(?i)hadoop").matcher(pathElement).find()) {
+              return pathElement;
+          }
+      }
+
+      LOGGER.error("Hadoop not found in classpath: |" + classpath
+              + "|");
+
+      return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/d80a7d68/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index 3e7940d..e332b63 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.zip.GZIPInputStream;
@@ -34,6 +35,9 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
 
 /**
  * Attempts to setup a staged install using explicitly specified tar-ball
@@ -62,6 +66,7 @@ public class StagedInstall {
   private Process process;
   private ProcessShutdownHook shutdownHook;
   private ProcessInputStreamConsumer consumer;
+  private String agentClasspath;
 
   private static StagedInstall INSTANCE;
 
@@ -87,6 +92,7 @@ public class StagedInstall {
     consumer.interrupt();
     consumer = null;
     configFilePath = null;
+    agentClasspath = null;
     Runtime.getRuntime().removeShutdownHook(shutdownHook);
     shutdownHook = null;
 
@@ -122,20 +128,23 @@ public class StagedInstall {
 
     LOGGER.info("Created configuration file: " + configFilePath);
 
-    String[] cmdArgs = {
-        launchScriptPath, "agent", "-n", name, "-f", configFilePath,
-        "-c", confDirPath,
-        "-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath,
-        "-D" + ENV_FLUME_ROOT_LOGGER + "=" + ENV_FLUME_ROOT_LOGGER_VALUE,
-        "-D" + ENV_FLUME_LOG_FILE + "=" + logFileName
-    };
-
-    StringBuilder sb = new StringBuilder("");
-    for (String cmdArg : cmdArgs) {
-      sb.append(cmdArg).append(" ");
+    ImmutableList.Builder<String> builder = new ImmutableList.Builder<String>();
+    builder.add(launchScriptPath);
+    builder.add("agent");
+    builder.add("--conf", confDirPath);
+    if (agentClasspath != null) {
+        builder.add("--classpath", agentClasspath);
     }
+    builder.add("--conf-file", configFilePath);
+    builder.add("--name", name);
+    builder.add("-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath);
+    builder.add("-D" + ENV_FLUME_ROOT_LOGGER + "="
+            + ENV_FLUME_ROOT_LOGGER_VALUE);
+    builder.add("-D" + ENV_FLUME_LOG_FILE + "=" + logFileName);
 
-    LOGGER.info("Using command: " + sb.toString());
+    List<String> cmdArgs = builder.build();
+
+    LOGGER.info("Using command: " + Joiner.on(" ").join(cmdArgs));
 
     ProcessBuilder pb = new ProcessBuilder(cmdArgs);
 
@@ -155,6 +164,10 @@ public class StagedInstall {
     Thread.sleep(3000); // sleep for 3s to let system initialize
   }
 
+  public synchronized void setAgentClasspath(String agentClasspath) {
+      this.agentClasspath = agentClasspath;
+  }
+
   private File createConfigurationFile(String agentName, Properties properties)
       throws Exception {
     File file = File.createTempFile("agent", "config.properties", stageDir);


Mime
View raw message