flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/6] flink git commit: [FLINK-1946] reduce verbosity of Yarn cluster setup
Date Fri, 24 Jun 2016 16:56:05 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6420c1c26 -> d92aeb7aa


[FLINK-1946] reduce verbosity of Yarn cluster setup

This removes repeated printing of messages retrieved from the Yarn
cluster. Only new messages are printed.

- reduce waiting time between subsequent cluster queries

This closes #2147


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b2ad7f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b2ad7f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b2ad7f0

Branch: refs/heads/master
Commit: 5b2ad7f03ef67ce529551e7b464d7db94e2a1d90
Parents: 3b59363
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Jun 24 16:33:15 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Jun 24 17:00:34 2016 +0200

----------------------------------------------------------------------
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 13 ++++++++-----
 .../org/apache/flink/yarn/YarnClusterClient.java   | 17 ++++++++---------
 2 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b2ad7f0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index aebb14d..81690c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -727,8 +727,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		yarnClient.submitApplication(appContext);
 
 		LOG.info("Waiting for the cluster to be allocated");
-		int waittime = 0;
+		final long startTime = System.currentTimeMillis();
 		ApplicationReport report;
+		YarnApplicationState lastAppState = YarnApplicationState.NEW;
 		loop: while( true ) {
 			try {
 				report = yarnClient.getApplicationReport(appId);
@@ -750,14 +751,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					LOG.info("YARN application has been deployed successfully.");
 					break loop;
 				default:
-					LOG.info("Deploying cluster, current state " + appState);
-					if(waittime > 60000) {
+					if (appState != lastAppState) {
+						LOG.info("Deploying cluster, current state " + appState);
+					}
+					if(System.currentTimeMillis() - startTime > 60000) {
 						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources
are available in the YARN cluster");
 					}
 
 			}
-			waittime += 1000;
-			Thread.sleep(1000);
+			lastAppState = appState;
+			Thread.sleep(250);
 		}
 		// print the application id for user to cancel themselves.
 		if (isDetachedMode()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b2ad7f0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 9c77a8a..9518f75 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -155,22 +155,21 @@ public class YarnClusterClient extends ClusterClient {
 
 			logAndSysout("Waiting until all TaskManagers have connected");
 
-			while (true) {
-				GetClusterStatusResponse status = getClusterStatus();
-				if (status != null) {
-					if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount())
{
-						logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
-							+ clusterDescriptor.getTaskManagerCount() + ")");
-					} else {
+			for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus)
{
+				currentStatus = getClusterStatus();
+				if (currentStatus != null && !currentStatus.equals(lastStatus)) {
+					logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+						+ clusterDescriptor.getTaskManagerCount() + ")");
+					if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount())
{
 						logAndSysout("All TaskManagers are connected");
 						break;
 					}
-				} else {
+				} else if (lastStatus == null) {
 					logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
 				}
 
 				try {
-					Thread.sleep(500);
+					Thread.sleep(250);
 				} catch (InterruptedException e) {
 					LOG.error("Interrupted while waiting for TaskManagers");
 					System.err.println("Thread is interrupted");


Mime
View raw message