flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject svn commit: r1350062 - in /incubator/flume/trunk: ./ flume-ng-tests/ flume-ng-tests/src/ flume-ng-tests/src/test/ flume-ng-tests/src/test/java/ flume-ng-tests/src/test/java/org/ flume-ng-tests/src/test/java/org/apache/ flume-ng-tests/src/test/java/org/...
Date Thu, 14 Jun 2012 00:12:19 GMT
Author: mpercy
Date: Thu Jun 14 00:12:18 2012
New Revision: 1350062

URL: http://svn.apache.org/viewvc?rev=1350062&view=rev
Log:
FLUME-1253. Support for running integration tests.

(Arvind Prabhakar via Mike Percy)

Added:
    incubator/flume/trunk/flume-ng-tests/
    incubator/flume/trunk/flume-ng-tests/pom.xml
    incubator/flume/trunk/flume-ng-tests/src/
    incubator/flume/trunk/flume-ng-tests/src/test/
    incubator/flume/trunk/flume-ng-tests/src/test/java/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/
    incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
    incubator/flume/trunk/flume-ng-tests/src/test/resources/
    incubator/flume/trunk/flume-ng-tests/src/test/resources/log4j.properties
    incubator/flume/trunk/flume-ng-tests/src/test/resources/rpc-client-test.properties
Modified:
    incubator/flume/trunk/pom.xml

Added: incubator/flume/trunk/flume-ng-tests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/pom.xml?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/pom.xml (added)
+++ incubator/flume/trunk/flume-ng-tests/pom.xml Thu Jun 14 00:12:18 2012
@@ -0,0 +1,55 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flume-ng-tests</artifactId>
+  <packaging>jar</packaging>
+  <name>Flume NG Integration Tests</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

Added: incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java
(added)
+++ incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClient.java
Thu Jun 14 00:12:18 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.test.util.StagedInstall;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRpcClient {
+
+  public static final String CONFIG_FILE_PRCCLIENT_TEST =
+      "rpc-client-test.properties";
+
+  @Before
+  public void setUp() throws Exception {
+    StagedInstall.getInstance().startAgent(
+        "rpccagent", CONFIG_FILE_PRCCLIENT_TEST);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    StagedInstall.getInstance().stopAgent();
+  }
+
+  @Test
+  public void testRpcClient() throws Exception {
+    RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 12121);
+    String[] text = {"foo", "bar", "xyz", "abc"};
+    for (String str : text) {
+      client.append(EventBuilder.withBody(str.getBytes()));
+    }
+  }
+
+}

Added: incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
(added)
+++ incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestRpcClientCommunicationFailure.java
Thu Jun 14 00:12:18 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import junit.framework.Assert;
+
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.test.util.StagedInstall;
+import org.junit.Test;
+
+public class TestRpcClientCommunicationFailure {
+
+   public static final String CONFIG_FILE_PRCCLIENT_TEST =
+        "rpc-client-test.properties";
+
+   @Test
+   public void testFailure() throws Exception {
+     try {
+       StagedInstall.getInstance().startAgent(
+         "rpccagent", CONFIG_FILE_PRCCLIENT_TEST);
+
+       RpcClient client = RpcClientFactory.getDefaultInstance(
+           "localhost", 12121);
+       String[] text = {"foo", "bar", "xyz", "abc"};
+       for (String str : text) {
+         client.append(EventBuilder.withBody(str.getBytes()));
+       }
+
+       // Stop the agent
+       StagedInstall.getInstance().stopAgent();
+
+       // Try sending the event which should fail
+       try {
+         client.append(EventBuilder.withBody("test".getBytes()));
+         Assert.fail("EventDeliveryException expected but not raised");
+       } catch (EventDeliveryException ex) {
+         System.out.println("Attempting to close client");
+         client.close();
+       }
+     } finally {
+       if (StagedInstall.getInstance().isRunning()) {
+         StagedInstall.getInstance().stopAgent();
+       }
+     }
+   }
+
+}

Added: incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
(added)
+++ incubator/flume/trunk/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
Thu Jun 14 00:12:18 2012
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.util;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Attempts to setup a staged install using explicitly specified tar-ball
+ * distribution or by using relative path into the flume-ng-dist module.
+ */
+public class StagedInstall {
+
+  private static final Logger LOGGER = Logger.getLogger(StagedInstall.class);
+
+  public static final String PROP_PATH_TO_DIST_TARBALL =
+      "flume.dist.tarball";
+
+  public static final String ENV_FLUME_LOG_DIR = "flume.log.dir";
+  public static final String ENV_FLUME_ROOT_LOGGER = "flume.root.logger";
+  public static final String ENV_FLUME_ROOT_LOGGER_VALUE = "DEBUG,LOGFILE";
+  public static final String ENV_FLUME_LOG_FILE = "flume.log.file";
+
+  private final File stageDir;
+  private final File baseDir;
+  private final String launchScriptPath;
+  private final String confDirPath;
+  private final String logDirPath;
+
+  // State per invocation - config file, process, shutdown hook
+  private String configFilePath;
+  private Process process;
+  private ProcessShutdownHook shutdownHook;
+  private ProcessInputStreamConsumer consumer;
+
+  private static StagedInstall INSTANCE;
+
+  public synchronized static StagedInstall getInstance() throws Exception {
+    if (INSTANCE == null) {
+      INSTANCE = new StagedInstall();
+    }
+    return INSTANCE;
+  }
+
+  public synchronized boolean isRunning() {
+    return process != null;
+  }
+
+  public synchronized void stopAgent() throws Exception {
+    if (process == null) {
+      throw new Exception("Process not found");
+    }
+
+    LOGGER.info("Shutting down agent process");
+    process.destroy();
+    process = null;
+    consumer.interrupt();
+    consumer = null;
+    configFilePath = null;
+    Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    shutdownHook = null;
+
+    Thread.sleep(3000); // sleep for 3s to let system shutdown
+  }
+
+  public synchronized void startAgent(String name, String configResource)
+      throws Exception {
+    if (process != null) {
+      throw new Exception("A process is already running");
+    }
+
+    Properties props = new Properties();
+    props.load(ClassLoader.getSystemResourceAsStream(configResource));
+
+    startAgent(name, props);
+  }
+
+  public synchronized void startAgent(String name, Properties properties)
+      throws Exception {
+    if (process != null) {
+      throw new Exception("A process is already running");
+    }
+    LOGGER.info("Starting process for agent: " + name + " using config: "
+       + properties);
+
+    File configFile = createConfigurationFile(name, properties);
+    configFilePath = configFile.getCanonicalPath();
+
+    String configFileName = configFile.getName();
+    String logFileName = "flume-" + name + "-"
+        + configFileName.substring(0, configFileName.indexOf('.')) + ".log";
+
+    LOGGER.info("Created configuration file: " + configFilePath);
+
+    String[] cmdArgs = {
+        launchScriptPath, "agent", "-n", name, "-f", configFilePath,
+        "-c", confDirPath,
+        "-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath,
+        "-D" + ENV_FLUME_ROOT_LOGGER + "=" + ENV_FLUME_ROOT_LOGGER_VALUE,
+        "-D" + ENV_FLUME_LOG_FILE + "=" + logFileName
+    };
+
+    StringBuilder sb = new StringBuilder("");
+    for (String cmdArg : cmdArgs) {
+      sb.append(cmdArg).append(" ");
+    }
+
+    LOGGER.info("Using command: " + sb.toString());
+
+    ProcessBuilder pb = new ProcessBuilder(cmdArgs);
+
+    Map<String, String> env = pb.environment();
+
+    LOGGER.debug("process environment: " + env);
+    pb.directory(baseDir);
+    pb.redirectErrorStream(true);
+
+    process = pb.start();
+    consumer = new ProcessInputStreamConsumer(process.getInputStream());
+    consumer.start();
+
+    shutdownHook = new ProcessShutdownHook();
+    Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+    Thread.sleep(3000); // sleep for 3s to let system initialize
+  }
+
+  private File createConfigurationFile(String agentName, Properties properties)
+      throws Exception {
+    File file = File.createTempFile("agent", "config.properties", stageDir);
+
+    OutputStream os = null;
+    try {
+      os = new FileOutputStream(file);
+      properties.store(os, "Config file for agent: " + agentName);
+    } catch (Exception ex) {
+      LOGGER.error("Failed to create config file: " + file, ex);
+      throw ex;
+    } finally {
+      if (os != null) {
+        try {
+          os.close();
+        } catch (Exception ex) {
+          LOGGER.warn("Unable to close config file stream", ex);
+        }
+      }
+    }
+
+    return file;
+  }
+
+  private StagedInstall() throws Exception {
+
+    String tarballPath = System.getProperty(PROP_PATH_TO_DIST_TARBALL);
+    if (tarballPath == null || tarballPath.trim().length() == 0) {
+      LOGGER.info("No value specified for system property: "
+              + PROP_PATH_TO_DIST_TARBALL
+              + ". Will attempt to use relative path to locate dist tarball.");
+
+      tarballPath = getRelativeTarballPath();
+    }
+
+    if (tarballPath == null || tarballPath.trim().length() == 0) {
+      throw new Exception("Failed to locate tar-ball distribution. "
+          + "Please specify explicitly via system property: "
+          + PROP_PATH_TO_DIST_TARBALL);
+    }
+
+    // Validate
+    File tarballFile = new File(tarballPath);
+    if (!tarballFile.isFile() || !tarballFile.canRead()) {
+      throw new Exception("The tarball distribution file is invalid: "
+          + tarballPath + ". You can override this by explicitly setting the "
+          + "system property: " + PROP_PATH_TO_DIST_TARBALL);
+    }
+
+    LOGGER.info("Dist tarball to use: " + tarballPath);
+
+    // Now set up a staging directory for this distribution
+    stageDir = getStagingDirectory();
+
+    // Deflate the gzip compressed archive
+    File tarFile = gunzipDistTarball(tarballFile, stageDir);
+
+    // Untar the deflated file
+    untarTarFile(tarFile, stageDir);
+
+    // Delete the tarfile
+    tarFile.delete();
+
+    LOGGER.info("Dist tarball staged to: " + stageDir);
+
+    File rootDir = stageDir;
+    File[] listBaseDirs = stageDir.listFiles();
+    if (listBaseDirs != null && listBaseDirs.length == 1
+        && listBaseDirs[0].isDirectory()) {
+      rootDir =listBaseDirs[0];
+    }
+    baseDir = rootDir;
+
+    // Give execute permissions to the bin/flume-ng script
+    File launchScript = new File(baseDir, "bin/flume-ng");
+    giveExecutePermissions(launchScript);
+
+    launchScriptPath = launchScript.getCanonicalPath();
+
+    File confDir = new File(baseDir, "conf");
+    confDirPath = confDir.getCanonicalPath();
+
+    File logDir = new File(baseDir, "logs");
+    logDir.mkdirs();
+
+    logDirPath = logDir.getCanonicalPath();
+
+    LOGGER.info("Staged install root directory: " + rootDir.getCanonicalPath());
+  }
+
+  private void giveExecutePermissions(File file) throws Exception {
+    String[] args = {
+        "chmod", "+x", file.getCanonicalPath()
+    };
+    Runtime.getRuntime().exec(args);
+    LOGGER.info("Set execute permissions on " + file);
+  }
+
+  private void untarTarFile(File tarFile, File destDir) throws Exception {
+    TarArchiveInputStream tarInputStream = null;
+    try {
+      tarInputStream = new TarArchiveInputStream(new FileInputStream(tarFile));
+      TarArchiveEntry entry = null;
+      while ((entry = tarInputStream.getNextTarEntry()) != null) {
+        String name = entry.getName();
+        LOGGER.debug("Next file: " + name);
+        File destFile = new File(destDir, entry.getName());
+        if (entry.isDirectory()) {
+          destFile.mkdirs();
+          continue;
+        }
+        File destParent = destFile.getParentFile();
+        destParent.mkdirs();
+        OutputStream entryOutputStream = null;
+        try {
+          entryOutputStream = new FileOutputStream(destFile);
+          byte[] buffer = new byte[2048];
+          int length = 0;
+          while ((length = tarInputStream.read(buffer, 0, 2048)) != -1) {
+            entryOutputStream.write(buffer, 0, length);
+          }
+        } catch (Exception ex) {
+          LOGGER.error("Exception while expanding tar file", ex);
+          throw ex;
+        } finally {
+          if (entryOutputStream != null) {
+            try {
+              entryOutputStream.close();
+            } catch (Exception ex) {
+              LOGGER.warn("Failed to close entry output stream", ex);
+            }
+          }
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception caught while untarring tar file: "
+            + tarFile.getAbsolutePath(), ex);
+      throw ex;
+    } finally {
+      if (tarInputStream != null) {
+        try {
+          tarInputStream.close();
+        } catch (Exception ex) {
+          LOGGER.warn("Unable to close tar input stream: "
+                + tarFile.getCanonicalPath(), ex);
+        }
+      }
+    }
+
+  }
+
+  private File gunzipDistTarball(File tarballFile, File destDir)
+      throws Exception {
+    File tarFile = null;
+
+    InputStream tarballInputStream = null;
+    OutputStream tarFileOutputStream = null;
+    try {
+      tarballInputStream = new GZIPInputStream(
+          new FileInputStream(tarballFile));
+      File temp2File = File.createTempFile("flume", "-dist", destDir);
+      String temp2FilePath = temp2File.getCanonicalPath();
+      temp2File.delete();
+
+      tarFile = new File(temp2FilePath + ".tar");
+
+      LOGGER.info("Tarball being unzipped to: " + tarFile.getCanonicalPath());
+
+      tarFileOutputStream = new FileOutputStream(tarFile);
+      int length = 0;
+      byte[] buffer = new byte[10240];
+      while ((length = tarballInputStream.read(buffer, 0, 10240)) != -1) {
+        tarFileOutputStream.write(buffer, 0, length);
+      }
+
+    } catch (Exception ex) {
+      LOGGER.error("Exception caught while unpacking the tarball", ex);
+      throw ex;
+    } finally {
+      if (tarballInputStream != null) {
+        try {
+          tarballInputStream.close();
+        } catch (Exception ex) {
+          LOGGER.warn("Unable to close input stream to tarball", ex);
+        }
+      }
+      if (tarFileOutputStream != null) {
+        try {
+          tarFileOutputStream.close();
+        } catch (Exception ex) {
+          LOGGER.warn("Unable to close tarfile output stream", ex);
+        }
+      }
+    }
+    return tarFile;
+  }
+
+  private File getStagingDirectory() throws Exception {
+    File targetDir = new File("target");
+    if (!targetDir.exists() || !targetDir.isDirectory()) {
+      // Probably operating from command line. Use temp dir as target
+      targetDir = new File(System.getProperty("java.io.tmpdir"));
+    }
+    File testDir = new File(targetDir, "test");
+    testDir.mkdirs();
+
+    File tempFile = File.createTempFile("flume", "_stage", testDir);
+    String absFileName = tempFile.getCanonicalPath();
+    tempFile.delete();
+
+
+    File stageDir = new File(absFileName + "_dir");
+
+    if (stageDir.exists()) {
+      throw new Exception("Stage directory exists: " +
+          stageDir.getCanonicalPath());
+    }
+
+    stageDir.mkdirs();
+
+    LOGGER.info("Staging Directory: " + stageDir.getCanonicalPath());
+
+    return stageDir;
+  }
+
+  private String getRelativeTarballPath() throws Exception {
+    String tarballPath = null;
+    File dir = new File("..");
+    while (dir != null && dir.isDirectory()) {
+      File testFile = new File(dir, "flume-ng-dist/target");
+
+      if (testFile.exists() && testFile.isDirectory()) {
+        LOGGER.info("Found candidate dir: " + testFile.getCanonicalPath());
+        File[] candidateFiles = testFile.listFiles(new FileFilter() {
+
+          @Override
+          public boolean accept(File pathname) {
+            String name = pathname.getName();
+            if (name != null && name.startsWith("flume-ng-dist-")
+                && name.endsWith("-dist.tar.gz")) {
+              return true;
+            }
+            return false;
+          }});
+
+        // There should be at most one
+        if (candidateFiles != null && candidateFiles.length > 0) {
+          if (candidateFiles.length == 1) {
+            // Found it
+            File file = candidateFiles[0];
+            if (file.isFile() && file.canRead()) {
+              tarballPath = file.getCanonicalPath();
+              LOGGER.info("Found file: " + tarballPath);
+              break;
+            } else {
+              LOGGER.warn("Invalid file: " + file.getCanonicalPath());
+            }
+          } else {
+            StringBuilder sb = new StringBuilder("Multiple candate tarballs");
+            sb.append(" found in directory ");
+            sb.append(testFile.getCanonicalPath()).append(": ");
+            boolean first = true;
+            for (File file : candidateFiles) {
+              if (first) {
+                first = false;
+                sb.append(" ");
+              } else {
+                sb.append(", ");
+              }
+              sb.append(file.getCanonicalPath());
+            }
+            sb.append(". All these files will be ignored.");
+            LOGGER.warn(sb.toString());
+          }
+        }
+      }
+
+      dir = dir.getParentFile();
+    }
+    return tarballPath;
+  }
+
+  private class ProcessShutdownHook extends Thread {
+    public void run() {
+      synchronized (StagedInstall.this) {
+        if (StagedInstall.this.process != null) {
+          process.destroy();
+        }
+      }
+    }
+  }
+
+  private static class ProcessInputStreamConsumer extends Thread {
+    private final InputStream is;
+
+    private ProcessInputStreamConsumer(InputStream is) {
+      this.is = is;
+      this.setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        byte[] buffer = new byte[1024];
+        int length = 0;
+        while ((length = is.read(buffer, 0, 1024)) != -1) {
+          LOGGER.info("[process-out] " + new String(buffer, 0, length));
+        }
+      } catch (Exception ex) {
+        LOGGER.warn("Error while reading process stream", ex);
+      }
+    }
+  }
+}

Added: incubator/flume/trunk/flume-ng-tests/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/src/test/resources/log4j.properties?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/src/test/resources/log4j.properties (added)
+++ incubator/flume/trunk/flume-ng-tests/src/test/resources/log4j.properties Thu Jun 14 00:12:18
2012
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG

Added: incubator/flume/trunk/flume-ng-tests/src/test/resources/rpc-client-test.properties
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-tests/src/test/resources/rpc-client-test.properties?rev=1350062&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-tests/src/test/resources/rpc-client-test.properties (added)
+++ incubator/flume/trunk/flume-ng-tests/src/test/resources/rpc-client-test.properties Thu
Jun 14 00:12:18 2012
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+rpccagent.sources = src1
+rpccagent.channels = ch1
+
+rpccagent.sources.src1.type = avro
+rpccagent.sources.src1.bind = 127.0.0.1
+rpccagent.sources.src1.port = 12121
+rpccagent.sources.src1.channels = ch1
+
+rpccagent.channels.ch1.type = memory
+rpccagent.channels.ch1.capacity = 100

Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1350062&r1=1350061&r2=1350062&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Thu Jun 14 00:12:18 2012
@@ -53,6 +53,7 @@ limitations under the License.
     <module>flume-ng-legacy-sources</module>
     <module>flume-ng-clients</module>
     <module>flume-ng-sdk</module>
+    <module>flume-ng-tests</module>
   </modules>
 
   <profiles>
@@ -780,6 +781,13 @@ limitations under the License.
         <version>1.8</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-compress</artifactId>
+        <version>1.4.1</version>
+      </dependency>
+
+
     </dependencies>
   </dependencyManagement>
 



Mime
View raw message