flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2392][yarn tests] Use log event which is guaranteed to show up to test whether the job was successful
Date Tue, 22 Sep 2015 07:21:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5e6175d6e -> 6d1656a21


[FLINK-2392][yarn tests] Use log event which is guaranteed to show up
to test whether the job was successful


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d1656a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d1656a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d1656a2

Branch: refs/heads/master
Commit: 6d1656a2148f63435a9b3dd38f4cbd196e88e3a4
Parents: 5e6175d
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Sep 21 13:46:51 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Sep 22 09:20:31 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 17 ++++++----
 .../flink/yarn/YARNSessionFIFOITCase.java       | 13 +++++---
 .../org/apache/flink/yarn/YarnTestBase.java     | 35 ++++++++++++++++----
 3 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 9ee60a5..086384d 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -63,8 +63,17 @@ public class UtilsTest {
 	}
 
 	public static void checkForLogString(String expected) {
+		LoggingEvent found = getEventContainingString(expected);
+		if(found != null) {
+			LOG.info("Found expected string '"+expected+"' in log message "+found);
+			return;
+		}
+		Assert.fail("Unable to find expected string '" + expected + "' in log messages");
+	}
+
+	public static LoggingEvent getEventContainingString(String expected) {
 		if(testAppender == null) {
-			throw new NullPointerException("Initialize it first");
+			throw new NullPointerException("Initialize test appender first");
 		}
 		LoggingEvent found = null;
 		for(LoggingEvent event: testAppender.events) {
@@ -73,11 +82,7 @@ public class UtilsTest {
 				break;
 			}
 		}
-		if(found != null) {
-			LOG.info("Found expected string '"+expected+"' in log message "+found);
-			return;
-		}
-		Assert.fail("Unable to find expected string '" + expected + "' in log messages");
+		return found;
 	}
 
 	public static class TestAppender extends AppenderSkeleton {

http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index aac9fe1..1d08ebf 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
@@ -408,6 +409,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Test
 	public void perJobYarnCluster() {
 		LOG.info("Starting perJobYarnCluster()");
+		addTestAppender(JobClient.class, Level.INFO);
 		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}
, "streaming")); // exclude streaming wordcount here.
 		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
 		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
@@ -417,10 +419,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-yjm", "768",
 						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */
-				"Job execution switched to status FINISHED.",
+				"Job execution complete",
 				/* prohibited strings: (we want to see (2/2)) */
 				new String[]{"System.out)(1/1) switched to FINISHED "},
-				RunTypes.CLI_FRONTEND, 0);
+				RunTypes.CLI_FRONTEND, 0, true);
 		LOG.info("Finished perJobYarnCluster()");
 	}
 
@@ -430,6 +432,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Test
 	public void perJobYarnClusterWithParallelism() {
 		LOG.info("Starting perJobYarnClusterWithParallelism()");
+		// write log messages to stdout as well, so that the runWithArgs() method
+		// is catching the log output
+		addTestAppender(JobClient.class, Level.INFO);
 		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"},
"streaming")); // exclude streaming wordcount here.
 		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
 		runWithArgs(new String[]{"run",
@@ -440,10 +445,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-yjm", "768",
 						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */
-				"Job execution switched to status FINISHED.",
+				"Job execution complete",
 				/* prohibited strings: (we want to see (2/2)) */
 				new String[]{"System.out)(1/1) switched to FINISHED "},
-				RunTypes.CLI_FRONTEND, 0);
+				RunTypes.CLI_FRONTEND, 0, true);
 		LOG.info("Finished perJobYarnClusterWithParallelism()");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d1656a2/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 84a7a76..fb8e5a1 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -240,8 +241,8 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[]
whitelisted) {
 		File cwd = new File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
-		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
-		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
+		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory",
cwd.isDirectory());
 		
 		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
 			@Override
@@ -390,7 +391,7 @@ public abstract class YarnTestBase extends TestLogger {
 		final int START_TIMEOUT_SECONDS = 60;
 
 		Runner runner = new Runner(args, type);
-		runner.setName("Frontend (CLI/YARN Client) runner thread (runWithArgs()).");
+		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
 		runner.start();
 
 		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
@@ -414,10 +415,19 @@ public abstract class YarnTestBase extends TestLogger {
 		return null;
 	}
 
+	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings,
RunTypes type, int returnCode) {
+		runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false);
+	}
 	/**
 	 * The test has been passed once the "terminateAfterString" has been seen.
+	 * @param args Command line arguments for the runner
+	 * @param terminateAfterString the runner is searching the stdout and stderr for this string.
as soon as it appears, the test has passed
+	 * @param failOnStrings The runner is searching stdout and stderr for the strings specified
here. If one appears, the test has failed
+	 * @param type Set the type of the runner
+	 * @param returnCode Expected return code from the runner.
+	 * @param checkLogForTerminateString  If true, the runner checks also the log4j logger for
the terminate string
 	 */
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings,
RunTypes type, int returnCode) {
+	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings,
RunTypes type, int returnCode, boolean checkLogForTerminateString) {
 		LOG.info("Running with args {}", Arrays.toString(args));
 
 		outContent = new ByteArrayOutputStream();
@@ -434,6 +444,7 @@ public abstract class YarnTestBase extends TestLogger {
 		runner.start();
 
 		boolean expectedStringSeen = false;
+		boolean testPassedFromLog4j = false;
 		do {
 			sleep(1000);
 			String outContentString = outContent.toString();
@@ -450,8 +461,17 @@ public abstract class YarnTestBase extends TestLogger {
 					}
 				}
 			}
-			// check output for correct TaskManager startup.
-			if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString)
) {
+			// check output for the expected terminateAfterString.
+			if(checkLogForTerminateString) {
+				LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString);
+				if(matchedEvent != null) {
+					testPassedFromLog4j = true;
+					LOG.info("Found expected output in logging event {}", matchedEvent);
+				}
+
+			}
+
+			if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString)
|| testPassedFromLog4j ) {
 				expectedStringSeen = true;
 				LOG.info("Found expected output in redirected streams");
 				// send "stop" command to command line interface
@@ -469,13 +489,14 @@ public abstract class YarnTestBase extends TestLogger {
 			else {
 				// check if thread died
 				if (!runner.isAlive()) {
-					sendOutput();
 					if (runner.getReturnValue() != 0) {
 						Assert.fail("Runner thread died before the test was finished. Return value = "
 								+ runner.getReturnValue());
 					} else {
 						LOG.info("Runner stopped earlier than expected with return value = 0");
 					}
+					// leave loop: the runner died, so we can not expect new strings to show up.
+					break;
 				}
 			}
 		}


Mime
View raw message