flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/4] flink git commit: [FLINK-4488][yarn] only automatically shutdown clusters for detached jobs
Date Mon, 29 Aug 2016 16:16:56 GMT
[FLINK-4488][yarn] only automatically shutdown clusters for detached jobs

- add check to yarn tests to verify cluster hasn't been shutdown prematurely

This closes #2419


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87114cd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87114cd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87114cd2

Branch: refs/heads/master
Commit: 87114cd2aaf78de2114d1ea4ab7bd2b57494d716
Parents: 842e3e7
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Aug 17 15:41:12 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Aug 29 17:48:56 2016 +0200

----------------------------------------------------------------------
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     | 92 ++++++++++++++++----
 .../apache/flink/yarn/YarnClusterClient.java    | 10 +--
 3 files changed, 79 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87114cd2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 510a048..d03d9eb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -150,7 +150,7 @@ public class FlinkYarnSessionCliTest {
 
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
-		System.out.println(descriptor.getZookeeperNamespace());
+
 		Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87114cd2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 76b5d31..0243012 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.yarn;
 
+import akka.actor.Identify;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,10 +47,14 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -61,6 +69,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 
@@ -562,28 +571,49 @@ public abstract class YarnTestBase extends TestLogger {
 
 		@Override
 		public void run() {
-			switch(type) {
-				case YARN_SESSION:
-					yCli = new FlinkYarnSessionCli("", "", false);
-					returnValue = yCli.run(args);
-					break;
-				case CLI_FRONTEND:
-					try {
-						CliFrontend cli = new CliFrontend();
-						returnValue = cli.parseParameters(args);
-					} catch (Exception e) {
-						throw new RuntimeException(e);
-					}
-					break;
-				default:
-					throw new RuntimeException("Unknown type " + type);
-			}
+			try {
+				switch (type) {
+					case YARN_SESSION:
+						yCli = new FlinkYarnSessionCli("", "", false);
+						returnValue = yCli.run(args);
+						break;
+					case CLI_FRONTEND:
+						TestingCLI cli;
+						try {
+							cli = new TestingCLI();
+							returnValue = cli.parseParameters(args);
+						} catch (Exception e) {
+							throw new RuntimeException("Failed to execute the following args with CliFrontend:
"
+								+ Arrays.toString(args), e);
+						}
 
-			if(returnValue != 0) {
-				Assert.fail("The YARN session returned with non-null value="+returnValue);
+						final ClusterClient client = cli.getClusterClient();
+						try {
+							// check if the JobManager is still alive after running the job
+							final FiniteDuration finiteDuration = new FiniteDuration(10, TimeUnit.SECONDS);
+							ActorGateway jobManagerGateway = client.getJobManagerGateway();
+							Await.ready(jobManagerGateway.ask(new Identify(true), finiteDuration), finiteDuration);
+						} catch (Exception e) {
+							throw new RuntimeException("It seems like the JobManager died although it should still
be alive");
+						}
+						// verify we would have shut down anyways and then shutdown
+						Mockito.verify(cli.getSpiedClusterClient()).shutdown();
+						client.shutdown();
+
+						break;
+					default:
+						throw new RuntimeException("Unknown type " + type);
+				}
+
+				if (returnValue != 0) {
+					Assert.fail("The YARN session returned with non-null value=" + returnValue);
+				}
+			} catch (Throwable t) {
+				Assert.fail(t.getMessage());
 			}
 		}
 
+		/** Stops the Yarn session */
 		public void sendStop() {
 			if(yCli != null) {
 				yCli.stop();
@@ -623,4 +653,30 @@ public abstract class YarnTestBase extends TestLogger {
 		return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
 	}
 
+	private static class TestingCLI extends CliFrontend {
+
+		private ClusterClient originalClusterClient;
+		private ClusterClient spiedClusterClient;
+
+		public TestingCLI() throws Exception {}
+
+		@Override
+		protected ClusterClient createClient(CommandLineOptions options, String programName) throws
Exception {
+			// mock the returned ClusterClient to disable shutdown and verify shutdown behavior later
on
+			originalClusterClient = super.createClient(options, programName);
+			spiedClusterClient = Mockito.spy(originalClusterClient);
+			Mockito.doNothing().when(spiedClusterClient).shutdown();
+			return spiedClusterClient;
+		}
+
+		public ClusterClient getClusterClient() {
+			return originalClusterClient;
+		}
+
+		public ClusterClient getSpiedClusterClient() {
+			return spiedClusterClient;
+		}
+
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87114cd2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index dfc71e0..e76b7e8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -198,11 +198,10 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
-		if (perJobCluster) {
-			stopAfterJob(jobGraph.getJobID());
-		}
-
 		if (isDetached()) {
+			if (perJobCluster) {
+				stopAfterJob(jobGraph.getJobID());
+			}
 			return super.runDetached(jobGraph, classLoader);
 		} else {
 			return super.run(jobGraph, classLoader);
@@ -248,7 +247,7 @@ public class YarnClusterClient extends ClusterClient {
 			throw new RuntimeException("Unable to get ClusterClient status from Application Client",
e);
 		}
 		if(clusterStatus instanceof None$) {
-			return null;
+			throw new RuntimeException("Unable to get ClusterClient status from Application Client");
 		} else if(clusterStatus instanceof Some) {
 			return (GetClusterStatusResponse) (((Some) clusterStatus).get());
 		} else {
@@ -572,7 +571,6 @@ public class YarnClusterClient extends ClusterClient {
 							Thread.sleep(250);
 						} catch (InterruptedException e) {
 							LOG.error("Interrupted while waiting for TaskManagers");
-							System.err.println("Thread is interrupted");
 							throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
 						}
 					}


Mime
View raw message