flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/6] flink git commit: [FLINK-8840] [yarn] Pull YarnClient and YarnConfiguration instantiation out of AbstractYarnClusterClient
Date Sat, 03 Mar 2018 07:34:13 GMT
[FLINK-8840] [yarn] Pull YarnClient and YarnConfiguration instantiation out of AbstractYarnClusterClient

For better testability, this commit moves the YarnClient and YarnConfiguration out of
the AbstractYarnClusterDescriptor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/193386bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/193386bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/193386bc

Branch: refs/heads/master
Commit: 193386bcd5d0e2003a831138d6282af1880e1ab8
Parents: 344a477
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Mar 2 15:27:13 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 2 23:45:50 2018 +0100

----------------------------------------------------------------------
 .../yarn/TestingYarnClusterDescriptor.java      | 12 ++-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  7 +-
 .../java/org/apache/flink/yarn/YARNITCase.java  |  6 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  5 +-
 .../org/apache/flink/yarn/YarnTestBase.java     | 10 ++-
 .../yarn/AbstractYarnClusterDescriptor.java     | 16 ++--
 .../flink/yarn/Flip6YarnClusterDescriptor.java  | 12 ++-
 .../flink/yarn/YarnClusterDescriptor.java       | 12 ++-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 30 +++++++-
 .../flink/yarn/AbstractYarnClusterTest.java     | 20 ++++-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 79 +++++++++++++++++---
 11 files changed, 175 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index ec41d8e..4d2aaa0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -37,11 +38,18 @@ import java.util.List;
  */
 public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 
-	public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory)
{
+	public TestingYarnClusterDescriptor(
+			Configuration configuration,
+			YarnConfiguration yarnConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient,
+			boolean sharedYarnClient) {
 		super(
 			configuration,
+			yarnConfiguration,
 			configurationDirectory,
-			YarnClient.createYarnClient());
+			yarnClient,
+			sharedYarnClient);
 		List<File> filesToShip = new ArrayList<>();
 
 		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 05be03a..f9c03f9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -110,7 +110,12 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		final int numberKillingAttempts = numberApplicationAttempts - 1;
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(configuration,
confDirPath);
+		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
+			configuration,
+			getYarnConfiguration(),
+			confDirPath,
+			getYarnClient(),
+			true);
 
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 037e086..ef6706a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -55,12 +55,14 @@ public class YARNITCase extends YarnTestBase {
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-		final YarnClient yarnClient = YarnClient.createYarnClient();
+		final YarnClient yarnClient = getYarnClient();
 
 		try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
 			configuration,
+			getYarnConfiguration(),
 			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
-			yarnClient)) {
+			yarnClient,
+			true)) {
 
 			flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 			flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index e545187..b3dcaca 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -231,12 +231,13 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		Configuration configuration = GlobalConfiguration.loadConfiguration();
-		final YarnClient yarnClient = YarnClient.createYarnClient();
 
 		try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
+			getYarnConfiguration(),
 			confDirPath,
-			yarnClient)) {
+			getYarnClient(),
+			true)) {
 			Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
 			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 			clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 7bca321..b74a155 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -128,7 +128,7 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static File flinkUberjar;
 
-	protected static final Configuration YARN_CONFIGURATION;
+	protected static final YarnConfiguration YARN_CONFIGURATION;
 
 	/**
 	 * lib/ folder of the flink distribution.
@@ -213,6 +213,14 @@ public abstract class YarnTestBase extends TestLogger {
 		flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
 	}
 
+	protected YarnClient getYarnClient() {
+		return yarnClient;
+	}
+
+	protected static YarnConfiguration getYarnConfiguration() {
+		return YARN_CONFIGURATION;
+	}
+
 	/**
 	 * Locate a file or directory.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 6b93016..bdb59b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -118,6 +118,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private final YarnClient yarnClient;
 
+	/** True if the descriptor must not shut down the YarnClient. */
+	private final boolean sharedYarnClient;
+
 	private String yarnQueue;
 
 	private String configurationDirectory;
@@ -145,10 +148,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	public AbstractYarnClusterDescriptor(
 			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration,
 			String configurationDirectory,
-			YarnClient yarnClient) {
+			YarnClient yarnClient,
+			boolean sharedYarnClient) {
 
-		yarnConfiguration = new YarnConfiguration();
+		this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
 
 		// for unit tests only
 		if (System.getenv("IN_TESTS") != null) {
@@ -160,8 +165,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		this.yarnClient = Preconditions.checkNotNull(yarnClient);
-		yarnClient.init(yarnConfiguration);
-		yarnClient.start();
+		this.sharedYarnClient = sharedYarnClient;
 
 		this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
 		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
@@ -328,7 +332,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	@Override
 	public void close() {
-		yarnClient.stop();
+		if (!sharedYarnClient) {
+			yarnClient.stop();
+		}
 	}
 
 	// -------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
index 461dd55..9860363 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is
used to start the
@@ -39,9 +40,16 @@ public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor
{
 
 	public Flip6YarnClusterDescriptor(
 			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration,
 			String configurationDirectory,
-			YarnClient yarnCLient) {
-		super(flinkConfiguration, configurationDirectory, yarnCLient);
+			YarnClient yarnClient,
+			boolean sharedYarnClient) {
+		super(
+			flinkConfiguration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			sharedYarnClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index a5254a0..8625cee 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link
YarnApplicationMasterRunner}.
@@ -34,9 +35,16 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor
{
 
 	public YarnClusterDescriptor(
 			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration,
 			String configurationDirectory,
-			YarnClient yarnClient) {
-		super(flinkConfiguration, configurationDirectory, yarnClient);
+			YarnClient yarnClient,
+			boolean sharedYarnClient) {
+		super(
+			flinkConfiguration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			sharedYarnClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index e4e3dbd..7773600 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
@@ -159,6 +160,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final boolean flip6;
 
+	private final YarnConfiguration yarnConfiguration;
+
 	//------------------------------------ Internal fields -------------------------
 	private boolean detachedMode = false;
 
@@ -257,16 +260,20 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		} else {
 			yarnApplicationIdFromYarnProperties = null;
 		}
+
+		this.yarnConfiguration = new YarnConfiguration();
 	}
 
 	private AbstractYarnClusterDescriptor createDescriptor(
 		Configuration configuration,
+		YarnConfiguration yarnConfiguration,
 		String configurationDirectory,
 		String defaultApplicationName,
 		CommandLine cmd) {
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
 			configuration,
+			yarnConfiguration,
 			configurationDirectory);
 
 		// Jar Path
@@ -440,6 +447,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 		return createDescriptor(
 			effectiveConfiguration,
+			yarnConfiguration,
 			configurationDirectory,
 			null,
 			commandLine);
@@ -955,12 +963,28 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}
 
-	private AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration,
String configurationDirectory) {
+	private AbstractYarnClusterDescriptor getClusterDescriptor(
+			Configuration configuration,
+			YarnConfiguration yarnConfiguration,
+			String configurationDirectory) {
 		final YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+
 		if (flip6) {
-			return new Flip6YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
+			return new Flip6YarnClusterDescriptor(
+				configuration,
+				yarnConfiguration,
+				configurationDirectory,
+				yarnClient,
+				false);
 		} else {
-			return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
+			return new YarnClusterDescriptor(
+				configuration,
+				yarnConfiguration,
+				configurationDirectory,
+				yarnClient,
+				false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
index 8db3740..8ee08db 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.Rule;
 import org.junit.Test;
@@ -63,13 +64,22 @@ public class AbstractYarnClusterTest extends TestLogger {
 			FinalApplicationStatus.SUCCEEDED);
 
 		final YarnClient yarnClient = new TestingYarnClient(Collections.singletonMap(applicationId,
applicationReport));
+		final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
 
 		final TestingAbstractYarnClusterDescriptor clusterDescriptor = new TestingAbstractYarnClusterDescriptor(
 			new Configuration(),
+			yarnConfiguration,
 			temporaryFolder.newFolder().getAbsolutePath(),
-			yarnClient);
+			yarnClient,
+			false);
 
-		clusterDescriptor.retrieve(applicationId);
+		try {
+			clusterDescriptor.retrieve(applicationId);
+		} finally {
+			clusterDescriptor.close();
+		}
 	}
 
 	private ApplicationReport createApplicationReport(
@@ -121,9 +131,11 @@ public class AbstractYarnClusterTest extends TestLogger {
 
 		private TestingAbstractYarnClusterDescriptor(
 				Configuration flinkConfiguration,
+				YarnConfiguration yarnConfiguration,
 				String configurationDirectory,
-				YarnClient yarnClient) {
-			super(flinkConfiguration, configurationDirectory, yarnClient);
+				YarnClient yarnClient,
+				boolean sharedYarnClient) {
+			super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, sharedYarnClient);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/193386bc/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index e0f69cb..dd8b625 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -30,10 +30,14 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -47,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -55,26 +60,43 @@ import static org.junit.Assert.fail;
  */
 public class YarnClusterDescriptorTest extends TestLogger {
 
+	private static YarnConfiguration yarnConfiguration;
+
+	private static YarnClient yarnClient;
+
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private File flinkJar;
 
+	@BeforeClass
+	public static void setupClass() {
+		yarnConfiguration = new YarnConfiguration();
+		yarnClient = YarnClient.createYarnClient();
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+	}
+
 	@Before
 	public void beforeTest() throws IOException {
 		temporaryFolder.create();
 		flinkJar = temporaryFolder.newFile("flink.jar");
 	}
 
+	@AfterClass
+	public static void tearDownClass() {
+		yarnClient.stop();
+	}
+
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
 
-		final YarnClient yarnClient = YarnClient.createYarnClient();
-
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			new Configuration(),
+			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
-			yarnClient);
+			yarnClient,
+			true);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -105,12 +127,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		// overwrite vcores in config
 		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 
-		final YarnClient yarnClient = YarnClient.createYarnClient();
-
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
+			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
-			yarnClient);
+			yarnClient,
+			true);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -139,11 +161,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	@Test
 	public void testSetupApplicationMasterContainer() {
 		Configuration cfg = new Configuration();
-		final YarnClient yarnClient = YarnClient.createYarnClient();
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			cfg,
+			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
-			yarnClient);
+			yarnClient,
+			true);
 
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xmx424m";
@@ -391,8 +414,10 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	public void testExplicitLibShipping() throws Exception {
 		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
 			new Configuration(),
+			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
-			YarnClient.createYarnClient());
+			yarnClient,
+			true);
 
 		try {
 			descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
@@ -432,8 +457,10 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	public void testEnvironmentLibShipping() throws Exception {
 		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
 			new Configuration(),
+			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
-			YarnClient.createYarnClient());
+			yarnClient,
+			true);
 
 		try {
 			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
@@ -462,4 +489,36 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			descriptor.close();
 		}
 	}
+
+	/**
+	 * Tests that the YarnClient is only shut down if it is not shared.
+	 */
+	@Test
+	public void testYarnClientShutDown() {
+		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			yarnConfiguration,
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient,
+			true);
+
+		yarnClusterDescriptor.close();
+
+		assertTrue(yarnClient.isInState(Service.STATE.STARTED));
+
+		final YarnClient closableYarnClient = YarnClient.createYarnClient();
+		closableYarnClient.init(yarnConfiguration);
+		closableYarnClient.start();
+
+		yarnClusterDescriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			yarnConfiguration,
+			temporaryFolder.getRoot().getAbsolutePath(),
+			closableYarnClient,
+			false);
+
+		yarnClusterDescriptor.close();
+
+		assertTrue(closableYarnClient.isInState(Service.STATE.STOPPED));
+	}
 }


Mime
View raw message