flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/7] flink git commit: [FLINK-3667] delay connection to JobManager until job execution
Date Fri, 01 Jul 2016 13:22:09 GMT
[FLINK-3667] delay connection to JobManager until job execution

- lazily initialize ActorSystem
- make sure it is not created before job execution
- print connection information on the CLI

This closes #2189


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d589623
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d589623
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d589623

Branch: refs/heads/master
Commit: 8d589623d2c2d039b014bc8783bef25351ec36ce
Parents: b674fd5
Author: Maximilian Michels <mxm@apache.org>
Authored: Thu Jun 30 17:41:42 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Jul 1 15:22:31 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  18 ++-
 .../deployment/StandaloneClusterDescriptor.java |   2 +-
 .../flink/client/program/ClusterClient.java     |  94 ++++++++----
 .../client/program/StandaloneClusterClient.java |   5 +-
 .../apache/flink/client/CliFrontendRunTest.java |  11 +-
 .../TestingClusterClientWithoutActorSystem.java |  55 -------
 ...CliFrontendYarnAddressConfigurationTest.java |   4 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  10 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 149 +++++++++++--------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   7 +-
 10 files changed, 177 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 94f5cdb..1322f23 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -154,7 +154,6 @@ public class CliFrontend {
 		LOG.info("Trying to load configuration file");
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 		System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
-
 		this.config = GlobalConfiguration.getConfiguration();
 
 		try {
@@ -234,7 +233,7 @@ public class CliFrontend {
 		ClusterClient client = null;
 		try {
 
-			client = getClient(options, program.getMainClassName());
+			client = createClient(options, program.getMainClassName());
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			client.setDetached(options.getDetachedMode());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
@@ -810,7 +809,7 @@ public class CliFrontend {
 		CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
 		try {
 			ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
-			LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
+			logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect
to JobManager.");
 			return client;
 		} catch (Exception e) {
 			LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
@@ -827,6 +826,7 @@ public class CliFrontend {
 	 * @throws Exception
 	 */
 	protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception
{
+		logAndSysout("Retrieving JobManager.");
 		return retrieveClient(options).getJobManagerGateway();
 	}
 
@@ -836,7 +836,7 @@ public class CliFrontend {
 	 * @param programName Program name
 	 * @throws Exception
 	 */
-	protected ClusterClient getClient(
+	protected ClusterClient createClient(
 			CommandLineOptions options,
 			String programName) throws Exception {
 
@@ -846,12 +846,12 @@ public class CliFrontend {
 		ClusterClient client;
 		try {
 			client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
-			logAndSysout("Cluster retrieved");
+			logAndSysout("Cluster retrieved: " + client.getClusterIdentifier());
 		} catch (UnsupportedOperationException e) {
 			try {
 				String applicationName = "Flink Application: " + programName;
 				client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
-				logAndSysout("Cluster started");
+				logAndSysout("Cluster started: " + client.getClusterIdentifier());
 			} catch (UnsupportedOperationException e2) {
 				throw new IllegalConfigurationException(
 					"The JobManager address is neither provided at the command-line, " +
@@ -859,7 +859,9 @@ public class CliFrontend {
 			}
 		}
 
-		logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
+		// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the
user's program.
+		final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig();
+		logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort()
+ " to connect to JobManager.");
 		logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
 		return client;
 	}
@@ -1054,7 +1056,7 @@ public class CliFrontend {
 	 * @param config The config to write to
 	 */
 	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress
address) {
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString());
 		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 57ccc47..7a3d4d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -50,7 +50,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public StandaloneClusterClient deploy() {
+	public StandaloneClusterClient deploy() throws UnsupportedOperationException {
 		throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index d5057b8..6cb5abb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -81,8 +81,8 @@ public abstract class ClusterClient {
 	/** The optimizer used in the optimization of batch programs */
 	final Optimizer compiler;
 
-	/** The actor system used to communicate with the JobManager */
-	protected final ActorSystem actorSystem;
+	/** The actor system used to communicate with the JobManager. Lazily initialized upon first
use */
+	protected final LazyActorSystemLoader actorSystemLoader;
 
 	/** Configuration of the client */
 	protected final Configuration flinkConfig;
@@ -127,39 +127,74 @@ public abstract class ClusterClient {
 		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
 		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
-		this.actorSystem = createActorSystem();
+		this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);
 	}
 
 	// ------------------------------------------------------------------------
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Method to create the ActorSystem of the Client. May be overriden in subclasses.
-	 * @return ActorSystem
-	 * @throws IOException
-	 */
-	protected ActorSystem createActorSystem() throws IOException {
+	protected static class LazyActorSystemLoader {
+
+		private final Logger LOG;
+
+		private final Configuration flinkConfig;
+
+		private ActorSystem actorSystem;
 
-		if (actorSystem != null) {
-			throw new RuntimeException("This method may only be called once.");
+		private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) {
+			this.flinkConfig = flinkConfig;
+			this.LOG = LOG;
 		}
 
-		// start actor system
-		LOG.info("Starting client actor system.");
+		/**
+		 * Indicates whether the ActorSystem has already been instantiated.
+		 * @return boolean True if it exists, False otherwise
+		 */
+		public boolean isLoaded() {
+			return actorSystem != null;
+		}
 
-		String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-		if (hostName == null || port == -1) {
-			throw new IOException("The initial JobManager address has not been set correctly.");
+		public void shutdown() {
+			if (isLoaded()) {
+				actorSystem.shutdown();
+				actorSystem.awaitTermination();
+				actorSystem = null;
+			}
+		}
+
+		/**
+		 * Creates a new ActorSystem or returns an existing one.
+		 * @return ActorSystem
+		 */
+		public ActorSystem get() {
+
+			if (!isLoaded()) {
+				// start actor system
+				LOG.info("Starting client actor system.");
+
+				String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
null);
+				int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+				if (hostName == null || port == -1) {
+					throw new RuntimeException("The initial JobManager address has not been set correctly.");
+				}
+				InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
+
+				// find name of own public interface, able to connect to the JM
+				// try to find address for 2 seconds. log after 400 ms.
+				InetAddress ownHostname;
+				try {
+					ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000,
400);
+				} catch (IOException e) {
+					throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress,
e);
+				}
+				actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+					new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(),
0)));
+			}
+
+			return actorSystem;
 		}
-		InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
 
-		// find name of own public interface, able to connect to the JM
-		// try to find address for 2 seconds. log after 400 ms.
-		InetAddress ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress,
2000, 400);
-		return AkkaUtils.createActorSystem(flinkConfig,
-			new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(),
0)));
 	}
 
 	/**
@@ -170,10 +205,7 @@ public abstract class ClusterClient {
 			try {
 				finalizeCluster();
 			} finally {
-				if (!this.actorSystem.isTerminated()) {
-					this.actorSystem.shutdown();
-					this.actorSystem.awaitTermination();
-				}
+				this.actorSystemLoader.shutdown();
 			}
 		}
 	}
@@ -201,7 +233,7 @@ public abstract class ClusterClient {
 
 	/**
 	 * Gets the current JobManager address from the Flink configuration (may change in case
of a HA setup).
-	 * @return The address (host and port) of the leading JobManager
+	 * @return The address (host and port) of the leading JobManager when it was last retrieved
(may be outdated)
 	 */
 	public InetSocketAddress getJobManagerAddressFromConfig() {
 		try {
@@ -375,7 +407,7 @@ public abstract class ClusterClient {
 		try {
 			logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job
completion.");
 			this.lastJobID = jobGraph.getJobID();
-			return JobClient.submitJobAndWait(actorSystem,
+			return JobClient.submitJobAndWait(actorSystemLoader.get(),
 				leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
e);
@@ -614,7 +646,7 @@ public abstract class ClusterClient {
 
 		return LeaderRetrievalUtils.retrieveLeaderGateway(
 			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
-			actorSystem,
+			actorSystemLoader.get(),
 			lookupTimeout);
 	}
 
@@ -652,7 +684,7 @@ public abstract class ClusterClient {
 	/**
 	 * Returns a string representation of the cluster.
 	 */
-	protected abstract String getClusterIdentifier();
+	public abstract String getClusterIdentifier();
 
 	/**
 	 * Request the cluster to shut down or disconnect.

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 82f350a..2c6e101 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
 	@Override
 	public String getWebInterfaceURL() {
-		String host = this.getJobManagerAddress().getHostName();
+		String host = this.getJobManagerAddressFromConfig().getHostString();
 		int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
 			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 		return "http://" +  host + ":" + port;
@@ -74,7 +74,8 @@ public class StandaloneClusterClient extends ClusterClient {
 
 	@Override
 	public String getClusterIdentifier() {
-		return "Standalone cluster with JobManager running at " + this.getJobManagerAddress();
+		// Avoid blocking here by getting the address from the config without resolving the address
+		return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index fa554c6..f710d8e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -118,13 +118,13 @@ public class CliFrontendRunTest {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static final class RunTestingCliFrontend extends CliFrontend {
-		
+
 		private final int expectedParallelism;
 		private final boolean sysoutLogging;
 		private final boolean isDetached;
-		
+
 		public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached)
throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
 			this.expectedParallelism = expectedParallelism;
@@ -139,10 +139,5 @@ public class CliFrontendRunTest {
 			assertEquals(expectedParallelism, parallelism);
 			return 0;
 		}
-
-		@Override
-		protected ClusterClient getClient(CommandLineOptions options, String programName) throws
Exception {
-			return TestingClusterClientWithoutActorSystem.create();
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
deleted file mode 100644
index ab608cb..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.client;
-
-import akka.actor.ActorSystem;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-
-/**
- * A client to use in tests which does not instantiate an ActorSystem.
- */
-public class TestingClusterClientWithoutActorSystem extends StandaloneClusterClient {
-
-	private TestingClusterClientWithoutActorSystem() throws IOException {
-		super(new Configuration());
-	}
-
-	/**
-	 * Do not instantiate the Actor System to save resources.
-	 * @return Mocked ActorSystem
-	 * @throws IOException
-	 */
-	@Override
-	protected ActorSystem createActorSystem() throws IOException {
-		return Mockito.mock(ActorSystem.class);
-	}
-
-	public static ClusterClient create() {
-		try {
-			return new TestingClusterClientWithoutActorSystem();
-		} catch (IOException e) {
-			throw new RuntimeException("Could not create TestingClientWithoutActorSystem.", e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 5c10de8..323c10b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -303,8 +303,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
 		@Override
 		// make method public
-		public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception
{
-			return super.getClient(options, programName);
+		public ClusterClient createClient(CommandLineOptions options, String programName) throws
Exception {
+			return super.createClient(options, programName);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 2d08bee..ba249c2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -134,6 +134,7 @@ public class FlinkYarnSessionCliTest {
 		Assert.assertEquals(6, client.getMaxSlots());
 	}
 
+
 	private static class TestCLI extends FlinkYarnSessionCli {
 
 		public TestCLI(String shortPrefix, String longPrefix) {
@@ -143,7 +144,7 @@ public class FlinkYarnSessionCliTest {
 		private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
 			@Override
 			public void setLocalJarPath(Path localJarPath) {
-//				setLocalJarPath("/tmp");
+				// add nothing
 			}
 		}
 
@@ -160,12 +161,7 @@ public class FlinkYarnSessionCliTest {
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
-				new Path("/tmp"), true);
-		}
-
-		@Override
-		protected ActorSystem createActorSystem() throws IOException {
-			return Mockito.mock(ActorSystem.class);
+				new Path("/tmp"), false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 9518f75..dfc71e0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -81,14 +81,14 @@ public class YarnClusterClient extends ClusterClient {
 	//---------- Class internal fields -------------------
 
 	private final AbstractYarnClusterDescriptor clusterDescriptor;
-	private final ActorRef applicationClient;
+	private final LazApplicationClientLoader applicationClient;
 	private final FiniteDuration akkaDuration;
 	private final Timeout akkaTimeout;
-	private final ApplicationReport applicationId;
+	private final ApplicationReport appReport;
 	private final ApplicationId appId;
 	private final String trackingURL;
 
-	private boolean isConnected = false;
+	private boolean isConnected = true;
 
 	private final boolean perJobCluster;
 
@@ -120,63 +120,18 @@ public class YarnClusterClient extends ClusterClient {
 		this.yarnClient = yarnClient;
 		this.hadoopConfig = yarnClient.getConfig();
 		this.sessionFilesDir = sessionFilesDir;
-		this.applicationId = appReport;
+		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
 		this.perJobCluster = perJobCluster;
 
-		/* The leader retrieval service for connecting to the cluster and finding the active leader.
*/
-		LeaderRetrievalService leaderRetrievalService;
-		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
-		} catch (Exception e) {
-			throw new IOException("Could not create the leader retrieval service.", e);
-		}
-
-		// start application client
-		LOG.info("Start application client.");
-
-		applicationClient = actorSystem.actorOf(
-			Props.create(
-				ApplicationClient.class,
-				flinkConfig,
-				leaderRetrievalService),
-			"applicationClient");
+		this.applicationClient = new LazApplicationClientLoader();
 
 		pollingRunner = new PollingThread(yarnClient, appId);
 		pollingRunner.setDaemon(true);
 		pollingRunner.start();
 
 		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
-
-		isConnected = true;
-
-		if (perJobCluster) {
-
-			logAndSysout("Waiting until all TaskManagers have connected");
-
-			for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus)
{
-				currentStatus = getClusterStatus();
-				if (currentStatus != null && !currentStatus.equals(lastStatus)) {
-					logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
-						+ clusterDescriptor.getTaskManagerCount() + ")");
-					if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount())
{
-						logAndSysout("All TaskManagers are connected");
-						break;
-					}
-				} else if (lastStatus == null) {
-					logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
-				}
-
-				try {
-					Thread.sleep(250);
-				} catch (InterruptedException e) {
-					LOG.error("Interrupted while waiting for TaskManagers");
-					System.err.println("Thread is interrupted");
-					throw new IOException("Interrupted while waiting for TaskManagers", e);
-				}
-			}
-		}
 	}
 
 	/**
@@ -219,7 +174,10 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	private void stopAfterJob(JobID jobID) {
 		Preconditions.checkNotNull(jobID, "The job id must not be null");
-		Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID),
akkaTimeout);
+		Future<Object> messageReceived =
+			ask(
+				applicationClient.get(),
+				new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
 		try {
 			Await.result(messageReceived, akkaDuration);
 		} catch (Exception e) {
@@ -263,7 +221,7 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	public String getClusterIdentifier() {
-		return applicationId.getApplicationId().toString();
+		return "Yarn cluster with application id " + appReport.getApplicationId();
 	}
 
 	/**
@@ -278,7 +236,11 @@ public class YarnClusterClient extends ClusterClient {
 			return null;
 		}
 
-		Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(),
akkaTimeout);
+		Future<Object> clusterStatusOption =
+			ask(
+				applicationClient.get(),
+				YarnMessages.getLocalGetyarnClusterStatus(),
+				akkaTimeout);
 		Object clusterStatus;
 		try {
 			clusterStatus = Await.result(clusterStatusOption, akkaDuration);
@@ -338,9 +300,11 @@ public class YarnClusterClient extends ClusterClient {
 		while(true) {
 			Object result;
 			try {
-				Future<Object> response = Patterns.ask(applicationClient,
-						YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
-
+				Future<Object> response =
+					Patterns.ask(
+						applicationClient.get(),
+						YarnMessages.getLocalGetYarnMessage(),
+						new Timeout(akkaDuration));
 				result = Await.result(response, akkaDuration);
 			} catch(Exception ioe) {
 				LOG.warn("Error retrieving the YARN messages locally", ioe);
@@ -406,11 +370,12 @@ public class YarnClusterClient extends ClusterClient {
 			// we are already in the shutdown hook
 		}
 
-		if(actorSystem != null){
+		if(actorSystemLoader.isLoaded()){
 			LOG.info("Sending shutdown request to the Application Master");
-			if(applicationClient != ActorRef.noSender()) {
+			if(applicationClient.get() != ActorRef.noSender()) {
 				try {
-					Future<Object> response = Patterns.ask(applicationClient,
+					Future<Object> response =
+						Patterns.ask(applicationClient.get(),
 							new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
 									"Flink YARN Client requested shutdown"),
 							new Timeout(akkaDuration));
@@ -467,7 +432,7 @@ public class YarnClusterClient extends ClusterClient {
 				LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
 				LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve
"
 					+ "the full application log using this command:\n"
-					+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+					+ "\tyarn logs -appReport " + appReport.getApplicationId() + "\n"
 					+ "(It sometimes takes a few seconds until the logs are aggregated)");
 			}
 		} catch (Exception e) {
@@ -553,4 +518,68 @@ public class YarnClusterClient extends ClusterClient {
 	public boolean isDetached() {
 		return super.isDetached() || clusterDescriptor.isDetachedMode();
 	}
+
+	public ApplicationId getApplicationId() {
+		return appId;
+	}
+
+	protected class LazApplicationClientLoader {
+
+		private ActorRef applicationClient;
+
+		/**
+		 * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
+		 * @return ActorSystem
+		 */
+		public ActorRef get() {
+			if (applicationClient == null) {
+				/* The leader retrieval service for connecting to the cluster and finding the active
leader. */
+				LeaderRetrievalService leaderRetrievalService;
+				try {
+					leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+				} catch (Exception e) {
+					throw new RuntimeException("Could not create the leader retrieval service.", e);
+				}
+
+				// start application client
+				LOG.info("Start application client.");
+
+				applicationClient = actorSystemLoader.get().actorOf(
+					Props.create(
+						ApplicationClient.class,
+						flinkConfig,
+						leaderRetrievalService),
+					"applicationClient");
+
+				if (perJobCluster) {
+
+					logAndSysout("Waiting until all TaskManagers have connected");
+
+					for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus)
{
+						currentStatus = getClusterStatus();
+						if (currentStatus != null && !currentStatus.equals(lastStatus)) {
+							logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() +
"/"
+								+ clusterDescriptor.getTaskManagerCount() + ")");
+							if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount())
{
+								logAndSysout("All TaskManagers are connected");
+								break;
+							}
+						} else if (lastStatus == null) {
+							logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
+						}
+
+						try {
+							Thread.sleep(250);
+						} catch (InterruptedException e) {
+							LOG.error("Interrupted while waiting for TaskManagers");
+							System.err.println("Thread is interrupted");
+							throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
+						}
+					}
+				}
+			}
+
+			return applicationClient;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index c0ad27e..a0225a7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -113,7 +112,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	private final boolean acceptInteractiveInput;
 	
 	//------------------------------------ Internal fields -------------------------
-	private YarnClusterClient yarnCluster = null;
+	private YarnClusterClient yarnCluster;
 	private boolean detachedMode = false;
 
 	public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
@@ -555,7 +554,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			if (detachedMode) {
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop "
+
 					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-					"yarn application -kill "+yarnCluster.getClusterIdentifier());
+					"yarn application -kill " + APPLICATION_ID.getOpt());
 				yarnCluster.disconnect();
 			} else {
 				runInteractiveCli(yarnCluster, true);
@@ -608,7 +607,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				// print info and quit:
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop "
+
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" +
+						"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator()
+
 						"Please also note that the temporary files of the YARN session in {} will not be removed.",
 						yarnDescriptor.getSessionFilesDir());
 				yarnCluster.disconnect();


Mime
View raw message