Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 587311721C for ; Tue, 22 Sep 2015 07:21:12 +0000 (UTC) Received: (qmail 20692 invoked by uid 500); 22 Sep 2015 07:21:12 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 20654 invoked by uid 500); 22 Sep 2015 07:21:12 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 20644 invoked by uid 99); 22 Sep 2015 07:21:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 07:21:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9A76E060C; Tue, 22 Sep 2015 07:21:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2392][yarn tests] Use log event which is guaranteed to show up to test whether the job was successful Date: Tue, 22 Sep 2015 07:21:11 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 5e6175d6e -> 6d1656a21 [FLINK-2392][yarn tests] Use log event which is guaranteed to show up to test whether the job was successful Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d1656a2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d1656a2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d1656a2 Branch: refs/heads/master Commit: 6d1656a2148f63435a9b3dd38f4cbd196e88e3a4 Parents: 5e6175d Author: Robert Metzger Authored: Mon Sep 21 13:46:51 2015 +0200 Committer: Robert Metzger Committed: Tue Sep 22 09:20:31 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/yarn/UtilsTest.java | 17 ++++++---- .../flink/yarn/YARNSessionFIFOITCase.java | 13 +++++--- .../org/apache/flink/yarn/YarnTestBase.java | 35 ++++++++++++++++---- 3 files changed, 48 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java index 9ee60a5..086384d 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java @@ -63,8 +63,17 @@ public class UtilsTest { } public static void checkForLogString(String expected) { + LoggingEvent found = getEventContainingString(expected); + if(found != null) { + LOG.info("Found expected string '"+expected+"' in log message "+found); + return; + } + Assert.fail("Unable to find expected string '" + expected + "' in log messages"); + } + + public static LoggingEvent getEventContainingString(String expected) { if(testAppender == null) { - throw new NullPointerException("Initialize it first"); + throw new NullPointerException("Initialize test appender first"); } LoggingEvent found = null; for(LoggingEvent event: testAppender.events) { @@ -73,11 +82,7 @@ public class UtilsTest { break; } } - if(found != null) { - LOG.info("Found expected string '"+expected+"' in log message "+found); - return; - } - Assert.fail("Unable to find expected string '" + expected + "' in log messages"); + return found; } public static class TestAppender extends AppenderSkeleton { http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index aac9fe1..1d08ebf 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; @@ -408,6 +409,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { @Test public void perJobYarnCluster() { LOG.info("Starting perJobYarnCluster()"); + addTestAppender(JobClient.class, Level.INFO); File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here. Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); runWithArgs(new String[]{"run", "-m", "yarn-cluster", @@ -417,10 +419,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ - "Job execution switched to status FINISHED.", + "Job execution complete", /* prohibited strings: (we want to see (2/2)) */ new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0); + RunTypes.CLI_FRONTEND, 0, true); LOG.info("Finished perJobYarnCluster()"); } @@ -430,6 +432,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase { @Test public void perJobYarnClusterWithParallelism() { LOG.info("Starting perJobYarnClusterWithParallelism()"); + // write log messages to stdout as well, so that the runWithArgs() method + // is catching the log output + addTestAppender(JobClient.class, Level.INFO); File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here. Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); runWithArgs(new String[]{"run", @@ -440,10 +445,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ - "Job execution switched to status FINISHED.", + "Job execution complete", /* prohibited strings: (we want to see (2/2)) */ new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0); + RunTypes.CLI_FRONTEND, 0, true); LOG.info("Finished perJobYarnClusterWithParallelism()"); } http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index 84a7a76..fb8e5a1 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -240,8 +241,8 @@ public abstract class YarnTestBase extends TestLogger { */ public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) { File cwd = new File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY)); - Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists()); - Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory()); + Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists()); + Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory()); File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { @Override @@ -390,7 +391,7 @@ public abstract class YarnTestBase extends TestLogger { final int START_TIMEOUT_SECONDS = 60; Runner runner = new Runner(args, type); - runner.setName("Frontend (CLI/YARN Client) runner thread (runWithArgs())."); + runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs())."); runner.start(); for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { @@ -414,10 +415,19 @@ public abstract class YarnTestBase extends TestLogger { return null; } + protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) { + runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false); + } /** * The test has been passed once the "terminateAfterString" has been seen. + * @param args Command line arguments for the runner + * @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed + * @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed + * @param type Set the type of the runner + * @param returnCode Expected return code from the runner. + * @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string */ - protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) { + protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) { LOG.info("Running with args {}", Arrays.toString(args)); outContent = new ByteArrayOutputStream(); @@ -434,6 +444,7 @@ public abstract class YarnTestBase extends TestLogger { runner.start(); boolean expectedStringSeen = false; + boolean testPassedFromLog4j = false; do { sleep(1000); String outContentString = outContent.toString(); @@ -450,8 +461,17 @@ public abstract class YarnTestBase extends TestLogger { } } } - // check output for correct TaskManager startup. - if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) ) { + // check output for the expected terminateAfterString. + if(checkLogForTerminateString) { + LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString); + if(matchedEvent != null) { + testPassedFromLog4j = true; + LOG.info("Found expected output in logging event {}", matchedEvent); + } + + } + + if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) { expectedStringSeen = true; LOG.info("Found expected output in redirected streams"); // send "stop" command to command line interface @@ -469,13 +489,14 @@ public abstract class YarnTestBase extends TestLogger { else { // check if thread died if (!runner.isAlive()) { - sendOutput(); if (runner.getReturnValue() != 0) { Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue()); } else { LOG.info("Runner stopped earlier than expected with return value = 0"); } + // leave loop: the runner died, so we can not expect new strings to show up. + break; } } }