flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/5] flink git commit: [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient
Date Thu, 11 Jan 2018 16:15:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master 63343fb8e -> d7ee60330


[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication
with Yarn and, thus, gives a better separation of concerns.

This closes #5215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ce5b98d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ce5b98d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ce5b98d

Branch: refs/heads/master
Commit: 2ce5b98da04cb3850ff91757cc4b74a98b8ce082
Parents: 63343fb
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Dec 7 13:57:24 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Jan 11 16:59:19 2018 +0100

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |   2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   2 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 110 +------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 283 ++++++++++++-------
 .../yarn/cli/YarnApplicationStatusMonitor.java  | 101 +++++++
 6 files changed, 287 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 1fed554..1b457a5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -379,7 +379,7 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger
{
 				}
 
 				@Override
-				protected YarnClient getYarnClient() {
+				public YarnClient getYarnClient() {
 					return new TestYarnClient();
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index ae39d0a..e0299aa 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -656,7 +656,7 @@ public abstract class YarnTestBase extends TestLogger {
 			throw new RuntimeException("Runner failed", runner.getRunnerError());
 		}
 		Assert.assertTrue("During the timeout period of " + startTimeoutSeconds + " seconds the
" +
-				"expected string did not show up", expectedStringSeen);
+				"expected string \"" + terminateAfterString + "\" did not show up.", expectedStringSeen);
 
 		LOG.info("Test was successful");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a910148..86ddd9b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -308,7 +308,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Gets a Hadoop Yarn client.
 	 * @return Returns a YarnClient which has to be shutdown manually
 	 */
-	protected YarnClient getYarnClient() {
+	public YarnClient getYarnClient() {
 		YarnClient yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index ceca29d..80d0943 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -40,7 +40,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,12 +67,9 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
 
-	private static final int POLLING_THREAD_INTERVAL_MS = 1000;
-
 	private YarnClient yarnClient;
 
 	private Thread clientShutdownHook = new ClientShutdownHook();
-	private PollingThread pollingRunner;
 
 	//---------- Class internal fields -------------------
 
@@ -130,10 +126,6 @@ public class YarnClusterClient extends ClusterClient {
 			actorSystemLoader,
 			highAvailabilityServices);
 
-		this.pollingRunner = new PollingThread(yarnClient, appId);
-		this.pollingRunner.setDaemon(true);
-		this.pollingRunner.start();
-
 		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 	}
 
@@ -158,14 +150,6 @@ public class YarnClusterClient extends ClusterClient {
 			// we are already in the shutdown hook
 		}
 
-		try {
-			pollingRunner.stopRunner();
-			pollingRunner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the polling runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
-
 		isConnected = false;
 	}
 
@@ -254,34 +238,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
-	public ApplicationStatus getApplicationStatus() {
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
-		}
-		ApplicationReport lastReport = null;
-		if (pollingRunner == null) {
-			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized
cluster." +
-					"The system might be in an erroneous state");
-		} else {
-			lastReport = pollingRunner.getLastReport();
-		}
-		if (lastReport == null) {
-			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't
receive a status so far." +
-					"The system might be in an erroneous state");
-			return ApplicationStatus.UNKNOWN;
-		} else {
-			YarnApplicationState appState = lastReport.getYarnApplicationState();
-			ApplicationStatus status =
-				(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED)
?
-					ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
-			if (status != ApplicationStatus.SUCCEEDED) {
-				LOG.warn("YARN reported application state {}", appState);
-				LOG.warn("Diagnostics: {}", lastReport.getDiagnostics());
-			}
-			return status;
-		}
-	}
-
 	@Override
 	public List<String> getNewMessages() {
 
@@ -371,7 +327,7 @@ public class YarnClusterClient extends ClusterClient {
 		try {
 			Future<Object> response =
 				Patterns.ask(applicationClient.get(),
-					new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+					new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
 							"Flink YARN Client requested shutdown"),
 					new Timeout(akkaDuration));
 			Await.ready(response, akkaDuration);
@@ -393,14 +349,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 
 		try {
-			pollingRunner.stopRunner();
-			pollingRunner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the polling runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
-
-		try {
 			ApplicationReport appReport = yarnClient.getApplicationReport(appId);
 
 			LOG.info("Application " + appId + " finished with state " + appReport
@@ -443,62 +391,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
-	// -------------------------- Polling ------------------------
-
-	private static class PollingThread extends Thread {
-
-		AtomicBoolean running = new AtomicBoolean(true);
-		private YarnClient yarnClient;
-		private ApplicationId appId;
-
-		// ------- status information stored in the polling thread
-		private final Object lock = new Object();
-		private ApplicationReport lastReport;
-
-		public PollingThread(YarnClient yarnClient, ApplicationId appId) {
-			this.yarnClient = yarnClient;
-			this.appId = appId;
-		}
-
-		public void stopRunner() {
-			if (!running.get()) {
-				LOG.warn("Polling thread was already stopped");
-			}
-			running.set(false);
-		}
-
-		public ApplicationReport getLastReport() {
-			synchronized (lock) {
-				return lastReport;
-			}
-		}
-
-		@Override
-		public void run() {
-			while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) {
-				try {
-					ApplicationReport report = yarnClient.getApplicationReport(appId);
-					synchronized (lock) {
-						lastReport = report;
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while getting application report", e);
-				}
-				try {
-					Thread.sleep(YarnClusterClient.POLLING_THREAD_INTERVAL_MS);
-				} catch (InterruptedException e) {
-					LOG.error("Polling thread got interrupted", e);
-					Thread.currentThread().interrupt(); // pass interrupt.
-					stopRunner();
-				}
-			}
-			if (running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
-				// == if the polling thread is still running but the yarn client is stopped.
-				LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState());
-			}
-		}
-	}
-
 	@Override
 	public boolean isDetached() {
 		return super.isDetached() || clusterDescriptor.isDetachedMode();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index df4ef1f..5483758 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -31,8 +31,10 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -71,6 +73,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
@@ -86,7 +90,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
 	public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
 
-	private static final int CLIENT_POLLING_INTERVALL = 3;
+	private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
 
 	/** The id for the CommandLine interface. */
 	private static final String ID = "yarn-cluster";
@@ -99,6 +103,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be
a regex for String.split()
 
+	private static final String YARN_SESSION_HELP = "Available commands:\n" +
+		"help - show these commands\n" +
+		"stop - stop the YARN session";
+
 	//------------------------------------ Command Line argument options -------------------------
 	// the prefix transformation is used by the CliFrontend static constructor.
 	private final Option query;
@@ -419,104 +427,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		formatter.printHelp(" ", options);
 	}
 
-	private static void writeYarnProperties(Properties properties, File propertiesFile) {
-		try (final OutputStream out = new FileOutputStream(propertiesFile)) {
-			properties.store(out, "Generated YARN properties file");
-		} catch (IOException e) {
-			throw new RuntimeException("Error writing the properties file", e);
-		}
-		propertiesFile.setReadable(true, false); // readable for all.
-	}
-
-	public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean readConsoleInput)
{
-		final String help = "Available commands:\n" +
-				"help - show these commands\n" +
-				"stop - stop the YARN session";
-		int numTaskmanagers = 0;
-		try {
-			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-			label:
-			while (true) {
-				// ------------------ check if there are updates by the cluster -----------
-
-				try {
-					GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-					LOG.debug("Received status message: {}", status);
-
-					if (status != null && numTaskmanagers != status.numRegisteredTaskManagers())
{
-						System.err.println("Number of connected TaskManagers changed to " +
-							status.numRegisteredTaskManagers() + ". " +
-							"Slots available: " + status.totalNumberOfSlots());
-						numTaskmanagers = status.numRegisteredTaskManagers();
-					}
-				} catch (Exception e) {
-					LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval
attempt ...", e);
-				}
-
-				List<String> messages = yarnCluster.getNewMessages();
-				if (messages != null && messages.size() > 0) {
-					System.err.println("New messages from the YARN cluster: ");
-					for (String msg : messages) {
-						System.err.println(msg);
-					}
-				}
-
-				if (yarnCluster.getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
-					System.err.println("The YARN cluster has failed");
-					yarnCluster.shutdown();
-				}
-
-				// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
-				long startTime = System.currentTimeMillis();
-				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-						&& (!readConsoleInput || !in.ready())) {
-					Thread.sleep(200);
-				}
-				//------------- handle interactive command by user. ----------------------
-
-				if (readConsoleInput && in.ready()) {
-					String command = in.readLine();
-					switch (command) {
-						case "quit":
-						case "stop":
-							yarnCluster.shutdownCluster();
-							break label;
-
-						case "help":
-							System.err.println(help);
-							break;
-						default:
-							System.err.println("Unknown command '" + command + "'. Showing help: \n" + help);
-							break;
-					}
-				}
-
-				if (yarnCluster.hasBeenShutdown()) {
-					LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
-					break;
-				}
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while running the interactive command line interface", e);
-		}
-	}
-
-	public static void main(final String[] args) throws Exception {
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN
session
-
-		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
-
-		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
-		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>()
{
-			@Override
-			public Integer call() {
-				return cli.run(args, flinkConfiguration, configurationDirectory);
-			}
-		});
-		System.exit(retCode);
-	}
-
 	@Override
 	public boolean isActive(CommandLine commandLine, Configuration configuration) {
 		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
@@ -660,7 +570,23 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					"yarn application -kill " + applicationId.getOpt());
 				yarnCluster.disconnect();
 			} else {
-				runInteractiveCli(yarnCluster, true);
+				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+						yarnDescriptor.getYarnClient(),
+						yarnCluster.getApplicationId(),
+						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+					runInteractiveCli(
+						yarnCluster,
+						yarnApplicationStatusMonitor,
+						true);
+				} finally {
+					// shut down the scheduled executor service
+					ExecutorUtils.gracefulShutdown(
+						1000L,
+						TimeUnit.MILLISECONDS,
+						scheduledExecutorService);
+				}
 			}
 		} else {
 
@@ -717,7 +643,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				yarnCluster.waitForClusterToBeReady();
 				yarnCluster.disconnect();
 			} else {
-				runInteractiveCli(yarnCluster, acceptInteractiveInput);
+
+				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+						yarnDescriptor.getYarnClient(),
+						yarnCluster.getApplicationId(),
+						new ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+					runInteractiveCli(
+						yarnCluster,
+						yarnApplicationStatusMonitor,
+						acceptInteractiveInput);
+				} finally {
+					// shut down the scheduled executor service
+					ExecutorUtils.gracefulShutdown(
+						1000L,
+						TimeUnit.MILLISECONDS,
+						scheduledExecutorService);
+				}
 			}
 		}
 		return 0;
@@ -743,6 +686,144 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		System.out.println(message);
 	}
 
+	public static void main(final String[] args) throws Exception {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN
session
+
+		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
+
+		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
+		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>()
{
+			@Override
+			public Integer call() {
+				return cli.run(args, flinkConfiguration, configurationDirectory);
+			}
+		});
+		System.exit(retCode);
+	}
+
+	private static void runInteractiveCli(
+		YarnClusterClient clusterClient,
+		YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+		boolean readConsoleInput) {
+		try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
+			boolean continueRepl = true;
+			int numTaskmanagers = 0;
+			boolean isLastStatusUnknown = true;
+			long unknownStatusSince = System.nanoTime();
+
+			while (continueRepl) {
+
+				final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+				switch (applicationStatus) {
+					case FAILED:
+					case CANCELED:
+						System.err.println("The Flink Yarn cluster has failed.");
+						continueRepl = false;
+						break;
+					case UNKNOWN:
+						if (!isLastStatusUnknown) {
+							unknownStatusSince = System.nanoTime();
+							isLastStatusUnknown = true;
+						}
+
+						if ((System.nanoTime() - unknownStatusSince) > 5L * CLIENT_POLLING_INTERVAL_MS *
1_000_000L) {
+							System.err.println("The Flink Yarn cluster is in an unknown state. Please check the
Yarn cluster.");
+							continueRepl = false;
+						} else {
+							continueRepl = repStep(in, readConsoleInput);
+						}
+						break;
+					case SUCCEEDED:
+						if (isLastStatusUnknown) {
+							isLastStatusUnknown = false;
+						}
+
+						// ------------------ check if there are updates by the cluster -----------
+						try {
+							final GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+							if (status != null && numTaskmanagers != status.numRegisteredTaskManagers())
{
+								System.err.println("Number of connected TaskManagers changed to " +
+									status.numRegisteredTaskManagers() + ". " +
+									"Slots available: " + status.totalNumberOfSlots());
+								numTaskmanagers = status.numRegisteredTaskManagers();
+							}
+						} catch (Exception e) {
+							LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval
attempt ...", e);
+						}
+
+						printClusterMessages(clusterClient);
+
+						continueRepl = repStep(in, readConsoleInput);
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Exception while running the interactive command line interface.", e);
+		}
+	}
+
+	private static void printClusterMessages(YarnClusterClient clusterClient) {
+		final List<String> messages = clusterClient.getNewMessages();
+		if (!messages.isEmpty()) {
+			System.err.println("New messages from the YARN cluster: ");
+			for (String msg : messages) {
+				System.err.println(msg);
+			}
+		}
+	}
+
+	/**
+	 * Read-Evaluate-Print step for the REPL.
+	 *
+	 * @param in to read from
+	 * @param readConsoleInput true if console input has to be read
+	 * @return true if the REPL shall be continued, otherwise false
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	private static boolean repStep(
+		BufferedReader in,
+		boolean readConsoleInput) throws IOException, InterruptedException {
+
+		// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
+		long startTime = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
+			&& (!readConsoleInput || !in.ready())) {
+			Thread.sleep(200L);
+		}
+		//------------- handle interactive command by user. ----------------------
+
+		if (readConsoleInput && in.ready()) {
+			String command = in.readLine();
+			switch (command) {
+				case "quit":
+				case "stop":
+					return false;
+
+				case "help":
+					System.err.println(YARN_SESSION_HELP);
+					break;
+				default:
+					System.err.println("Unknown command '" + command + "'. Showing help:");
+					System.err.println(YARN_SESSION_HELP);
+					break;
+			}
+		}
+
+		return true;
+	}
+
+	private static void writeYarnProperties(Properties properties, File propertiesFile) {
+		try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+			properties.store(out, "Generated YARN properties file");
+		} catch (IOException e) {
+			throw new RuntimeException("Error writing the properties file", e);
+		}
+		propertiesFile.setReadable(true, false); // readable for all.
+	}
+
 	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded)
{
 		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() >
0) {
 			Map<String, String> properties = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
new file mode 100644
index 0000000..88d7747
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class which monitors the specified yarn application status periodically.
+ */
+public class YarnApplicationStatusMonitor implements AutoCloseable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationStatusMonitor.class);
+
+	private static final long UPDATE_INTERVAL = 1000L;
+
+	private final YarnClient yarnClient;
+
+	private final ApplicationId yarnApplicationId;
+
+	private final ScheduledFuture<?> applicationStatusUpdateFuture;
+
+	private volatile ApplicationStatus applicationStatus;
+
+	public YarnApplicationStatusMonitor(
+			YarnClient yarnClient,
+			ApplicationId yarnApplicationId,
+			ScheduledExecutor scheduledExecutor) {
+		this.yarnClient = Preconditions.checkNotNull(yarnClient);
+		this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId);
+
+		applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
+			this::updateApplicationStatus,
+			UPDATE_INTERVAL,
+			UPDATE_INTERVAL,
+			TimeUnit.MILLISECONDS);
+
+		applicationStatus = ApplicationStatus.UNKNOWN;
+	}
+
+	public ApplicationStatus getApplicationStatusNow() {
+		return applicationStatus;
+	}
+
+	@Override
+	public void close() {
+		applicationStatusUpdateFuture.cancel(false);
+	}
+
+	private void updateApplicationStatus() {
+		if (yarnClient.isInState(Service.STATE.STARTED)) {
+			final ApplicationReport applicationReport;
+
+			try {
+				applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
+			} catch (Exception e) {
+				LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
+				return;
+			}
+
+			YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
+
+			if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED)
{
+				applicationStatus = ApplicationStatus.FAILED;
+			} else {
+				applicationStatus = ApplicationStatus.SUCCEEDED;
+			}
+		} else {
+			LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status
monitor.");
+			applicationStatusUpdateFuture.cancel(false);
+		}
+	}
+}


Mime
View raw message