flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-1495][yarn] Make Akka timeout configurable in YARN client.
Date Mon, 09 Feb 2015 18:06:47 GMT
Repository: flink
Updated Branches:
  refs/heads/master 52d9806ba -> 46e052619


[FLINK-1495][yarn] Make Akka timeout configurable in YARN client.

This closes #377


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46e05261
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46e05261
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46e05261

Branch: refs/heads/master
Commit: 46e05261968d073dd7faf1c0089aa39e8277959d
Parents: 52d9806
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Feb 9 14:45:56 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Feb 9 19:06:20 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 40 +++++++++-----------
 2 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 23ef523..9536e22 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -586,7 +586,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 			Thread.sleep(1000);
 		}
 		// the Flink cluster is deployed in YARN. Represent cluster
-		return new FlinkYarnCluster(yarnClient, appId, conf, sessionFilesDir);
+		return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/46e05261/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 3f2e72e..1794d36 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -24,7 +24,6 @@ import static akka.pattern.Patterns.ask;
 
 import akka.actor.Props;
 import akka.util.Timeout;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.AkkaUtils$;
 import org.apache.flink.runtime.net.NetUtils;
@@ -46,9 +45,7 @@ import scala.None$;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
-import scala.concurrent.Awaitable;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -56,7 +53,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
@@ -69,21 +65,24 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	private Thread actorRunner;
 	private Thread clientShutdownHook = new ClientShutdownHook();
 	private PollingThread pollingRunner;
-	private Configuration hadoopConfig;
+	private final Configuration hadoopConfig;
 	// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
-	private Path sessionFilesDir;
-	private InetSocketAddress jobManagerAddress;
+	private final Path sessionFilesDir;
+	private final InetSocketAddress jobManagerAddress;
 
 	//---------- Class internal fields -------------------
 
 	private ActorSystem actorSystem;
 	private ActorRef applicationClient;
 	private ApplicationReport intialAppReport;
-	private static FiniteDuration akkaDuration = Duration.apply(5, TimeUnit.SECONDS);
-	private static Timeout akkaTimeout = Timeout.durationToTimeout(akkaDuration);
-
-	public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId,
-							Configuration hadoopConfig, Path sessionFilesDir) throws IOException, YarnException
{
+	private final FiniteDuration akkaDuration;
+	private final Timeout akkaTimeout;
+
+	public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, Configuration
hadoopConfig,
+							org.apache.flink.configuration.Configuration flinkConfig,
+							Path sessionFilesDir) throws IOException, YarnException {
+		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
+		this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
 		this.yarnClient = yarnClient;
 		this.hadoopConfig = hadoopConfig;
 		this.sessionFilesDir = sessionFilesDir;
@@ -97,7 +96,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		// start actor system
 		LOG.info("Start actor system.");
 		InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own
public interface, able to connect to the JM
-		actorSystem = AkkaUtils.createActorSystem(GlobalConfiguration.getConfiguration(),
+		actorSystem = AkkaUtils.createActorSystem(flinkConfig,
 				new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
 
 		// start application client
@@ -166,7 +165,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 			throw new RuntimeException("The FlinkYarnCluster has alread been stopped");
 		}
 		Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$,
akkaTimeout);
-		Object clusterStatus = awaitUtil(clusterStatusOption, "Unable to get Cluster status from
Application Client");
+		Object clusterStatus;
+		try {
+			clusterStatus = Await.result(clusterStatusOption, akkaDuration);
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to get Cluster status from Application Client", e);
+		}
 		if(clusterStatus instanceof None$) {
 			return null;
 		} else if(clusterStatus instanceof Some) {
@@ -234,14 +238,6 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		return ret;
 	}
 
-	private static <T> T awaitUtil(Awaitable<T> awaitable, String message) {
-		try {
-			return Await.result(awaitable, akkaDuration);
-		} catch (Exception e) {
-			throw new RuntimeException(message, e);
-		}
-	}
-
 	// -------------------------- Shutdown handling ------------------------
 
 	private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);


Mime
View raw message