flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/6] flink git commit: [FLINK-1295] Address pull request review comments
Date Fri, 23 Jan 2015 17:43:55 GMT
[FLINK-1295] Address pull request review comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c1c91e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c1c91e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c1c91e2

Branch: refs/heads/master
Commit: 2c1c91e2f31f49b27a5574c5338653f1a074b5dc
Parents: 2af6586
Author: Robert Metzger <rmetzger@apache.org>
Authored: Wed Jan 21 13:56:41 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Jan 23 18:39:10 2015 +0100

----------------------------------------------------------------------
 docs/cli.md                                     |   4 +-
 docs/yarn_setup.md                              |   6 +-
 .../org/apache/flink/client/CliFrontend.java    |   4 -
 .../flink/client/FlinkYarnSessionCli.java       |  10 +-
 .../src/main/flink-bin/conf/log4j.properties    |  12 +-
 .../org/apache/flink/runtime/net/NetUtils.java  |   4 +-
 .../runtime/yarn/AbstractFlinkYarnClient.java   |   3 +
 .../yarn/YARNSessionCapacitySchedulerIT.java    |  72 ------
 .../YARNSessionCapacitySchedulerITCase.java     |  70 ++++++
 .../apache/flink/yarn/YARNSessionFIFOIT.java    | 225 -------------------
 .../flink/yarn/YARNSessionFIFOITCase.java       | 225 +++++++++++++++++++
 .../org/apache/flink/yarn/YarnTestBase.java     |  10 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  18 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |  17 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   1 -
 16 files changed, 354 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/docs/cli.md
----------------------------------------------------------------------
diff --git a/docs/cli.md b/docs/cli.md
index 8d6048a..f411c3e 100644
--- a/docs/cli.md
+++ b/docs/cli.md
@@ -63,13 +63,13 @@ The command line can be used to
 
         ./bin/flink run -m myJMHost:6123 \
                                ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT
}}-WordCount.jar \
-                               -file:///home/user/hamlet.txt file:///home/user/wordcount_out
+                               file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Run example program using a [per-job YARN cluster](yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)
with 2 TaskManagers:
 
         ./bin/flink run -m yarn-cluster -yn 2 \
                                ./examples/flink-java-examples-{{ site.FLINK_VERSION_STABLE
}}-WordCount.jar \
-                               -file:///home/user/hamlet.txt file:///home/user/wordcount_out
+                               hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
 
 -   Display the expected arguments for the WordCount example program:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/docs/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index af036af..d636189 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -154,9 +154,9 @@ Use the *run* action to submit a job to YARN. The client is able to determine
th
 
 ~~~bash
 wget -O apache-license-v2.txt http://www.apache.org/licenses/LICENSE-2.0.txt
-
+hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
 ./bin/flink run -j ./examples/flink-java-examples-{{site.FLINK_VERSION_SHORT }}-WordCount.jar
\
-                       -a 1 file://`pwd`/apache-license-v2.txt file://`pwd`/wordcount-result.txt

+                       -a 1 hdfs:///..../apache-license-v2.txt hdfs:///.../wordcount-result.txt

 ~~~
 
 If there is the following error, make sure that all TaskManagers started:
@@ -177,7 +177,7 @@ The documentation above describes how to start a Flink cluster within
a Hadoop Y
 It is also possible to launch Flink within YARN only for executing a single job.
 
 To deploy a job to a per-job YARN cluster, set the master name to `yarn-cluster`.
-Please note that the client then expects the `-n` value to be set (number of TaskManagers).
+Please note that the client then expects the `-yn` value to be set (number of TaskManagers).
 
 ***Example:***
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 358783a..36679d2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -67,9 +67,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.PatternLayout;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -915,7 +912,6 @@ public class CliFrontend {
 		InetSocketAddress jobManagerAddress = null;
 		if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
 			System.out.println("YARN cluster mode detected. Switching Log4j output to console");
-			LogManager.getRootLogger().addAppender(new ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
 
 			this.runInYarnCluster = true;
 			// user wants to run Flink in YARN cluster.

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 6546ef0..e120166 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
@@ -131,6 +132,8 @@ public class FlinkYarnSessionCli {
 		// Conf Path
 		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
 		GlobalConfiguration.loadConfiguration(confDirPath);
+		Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
+		flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 		File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
 		if(!confFile.exists()) {
@@ -163,18 +166,17 @@ public class FlinkYarnSessionCli {
 			File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
 			if(logback.exists()) {
 				shipFiles.add(logback);
-				flinkYarnClient.setConfigurationFilePath(new Path(logback.toURI()));
+				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
 			}
 			File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
 			if(log4j.exists()) {
 				shipFiles.add(log4j);
 				if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
 					// this means there is already a logback configuration file --> fail
-					LOG.error("The configuration directory ('"+confDirPath+"') contains both LOG4J and Logback
configuration files." +
+					LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and
Logback configuration files." +
 							"Please delete or rename one of them.");
-					return null;
 				} // else
-				flinkYarnClient.setConfigurationFilePath(new Path(log4j.toURI()));
+				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
index 172f566..d421333 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -23,4 +23,14 @@ log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+
+# Log output from org.apache.flink.yarn to the console. This is used by the
+# CliFrontend class when using a per-job YARN cluster.
+log4j.logger.org.apache.flink.yarn=INFO, console
+log4j.logger.org.apache.hadoop=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 5a5f515..73504e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -76,7 +76,7 @@ public class NetUtils {
 						case ADDRESS:
 							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress()))
{
 								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
-									LOG.info("Determined " + i + " as the machine's own IP address");
+									LOG.info("Determined {} as the machine's own IP address", i);
 									return i;
 								}
 							}
@@ -86,7 +86,7 @@ public class NetUtils {
 						case SLOW_CONNECT:
 							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
 							if (correct) {
-								LOG.info("Determined " + i + " as the machine's own IP address");
+								LOG.info("Determined {} as the machine's own IP address", i);
 								return i;
 							}
 							break;

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 7f2b14e..77f4301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -26,6 +26,9 @@ public abstract class AbstractFlinkYarnClient {
 	// ---- Setter for YARN Cluster properties ----- //
 	public abstract void setJobManagerMemory(int memoryMB);
 	public abstract void setTaskManagerMemory(int memoryMB);
+
+	public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration
conf);
+
 	public abstract void setTaskManagerSlots(int slots);
 	public abstract int getTaskManagerSlots();
 	public abstract void setQueue(String queue);

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
deleted file mode 100644
index 25e1aa2..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This test starts a MiniYARNCluster with a CapacityScheduler.
- * Is has, by default a queue called "default". The configuration here adds another queue:
"qa-team".
- */
-public class YARNSessionCapacitySchedulerIT extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerIT.class);
-
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
-		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test
-	public void testClientStartup() {
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "512",
-						"-tm", "1024", "-qu", "qa-team"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-	}
-
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "512",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "Error while deploying YARN cluster: The specified queue 'doesntExist'
does not exist. Available queues: default, qa-team, ", RunTypes.YARN_SESSION);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
new file mode 100644
index 0000000..d37c716
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This test starts a MiniYARNCluster with a CapacityScheduler.
+ * Is has, by default a queue called "default". The configuration here adds another queue:
"qa-team".
+ */
+public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "512",
+						"-tm", "1024", "-qu", "qa-team"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+	}
+
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "512",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "Error while deploying YARN cluster: The specified queue 'doesntExist'
does not exist. Available queues: default, qa-team, ", RunTypes.YARN_SESSION);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
deleted file mode 100644
index 5f8ae87..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-
-/**
- * This test starts a MiniYARNCluster with a FIFO scheudler.
- * There are no queues for that scheduler.
- */
-public class YARNSessionFIFOIT extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class);
-
-	/*
-	Override init with FIFO scheduler.
-	 */
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
-		startYARNWithConfig(yarnConfiguration);
-	}
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test
-	public void testClientStartup() {
-		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "512",
-						"-tm", "1024"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-		LOG.info("Finished testClientStartup()");
-	}
-
-	/**
-	 * Test querying the YARN cluster.
-	 *
-	 * This test validates through 666*2 cores in the "cluster".
-	 */
-	@Test
-	public void testQueryCluster() {
-		LOG.info("Starting testQueryCluster()");
-		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION);
// we have 666*2 cores.
-		LOG.info("Finished testQueryCluster()");
-	}
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "512",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available:
1", RunTypes.YARN_SESSION);
-		LOG.info("Finished testNonexistingQueue()");
-	}
-
-	/**
-	 * Test requesting more resources than available.
-	 */
-	@Test
-	public void testMoreNodesThanAvailable() {
-		LOG.info("Starting testMoreNodesThanAvailable()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "10",
-				"-jm", "512",
-				"-tm", "1024"}, "Error while deploying YARN cluster: This YARN session requires 10752MB
of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
-		LOG.info("Finished testMoreNodesThanAvailable()");
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 5 TaskManagers with 1585 MB
-	 *
-	 * user sees a total request of: 8181 MB (fits)
-	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
-	 */
-	@Test
-	public void testResourceComputation() {
-		LOG.info("Starting testResourceComputation()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "5",
-				"-jm", "256",
-				"-tm", "1585"}, "Error while deploying YARN cluster: This YARN session requires 8437MB
of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
-		LOG.info("Finished testResourceComputation()");
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 2 TaskManagers with 3840 MB
-	 *
-	 * the user sees a total request of: 7936 MB (fits)
-	 * the system sees a request of: 8192 MB (fits)
-	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
-	 *
-	 * --> check if the system properly rejects allocating this session.
-	 */
-	@Test
-	public void testfullAlloc() {
-		LOG.info("Starting testfullAlloc()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "2",
-				"-jm", "256",
-				"-tm", "3840"}, "Error while deploying YARN cluster: There is not enough memory available
in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096,
4096]\n" +
-				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers
are available: [3584, 256]", RunTypes.YARN_SESSION);
-		LOG.info("Finished testfullAlloc()");
-	}
-
-	/**
-	 * Test per-job yarn cluster
-	 *
-	 * This also tests the prefixed CliFrontend options for the YARN case
-	 */
-	@Test
-	public void perJobYarnCluster() {
-		LOG.info("Starting perJobYarnCluster()");
-		File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar",
"streaming")); // exclude streaming wordcount here.
-		runWithArgs(new String[] {"run", "-m", "yarn-cluster",
-				"-yj", flinkUberjar.getAbsolutePath(),
-				"-yn", "1",
-				"-yjm", "512",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status
FINISHED.", RunTypes.CLI_FRONTEND);
-		LOG.info("Finished perJobYarnCluster()");
-	}
-
-	/**
-	 * Test the YARN Java API
-	 */
-	@Test
-	public void testJavaAPI() {
-		final int WAIT_TIME = 15;
-		LOG.info("Starting testJavaAPI()");
-
-		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(512);
-		flinkYarnClient.setTaskManagerMemory(512);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		String confDirPath = System.getenv("FLINK_CONF_DIR");
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
-
-		// deploy
-		AbstractFlinkYarnCluster yarnCluster = null;
-		try {
-			yarnCluster = flinkYarnClient.deploy(null);
-		} catch (Exception e) {
-			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
-			e.printStackTrace(System.err);
-			Assert.fail();
-		}
-		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
-		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				LOG.warn("Interrupted", e);
-				Thread.interrupted();
-			}
-			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
-			if(status != null && status.equals(expectedStatus)) {
-				LOG.info("Cluster reached status " + status);
-				break; // all good, cluster started
-			}
-			if(second > WAIT_TIME) {
-				// we waited for 15 seconds. cluster didn't come up correctly
-				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
-			}
-		}
-
-		// use the cluster
-		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-		LOG.info("Shutting down cluster. All tests passed");
-		// shutdown cluster
-		yarnCluster.shutdown();
-		LOG.info("Finished testJavaAPI()");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
new file mode 100644
index 0000000..55523ca
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheudler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOITCase extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
+
+	/*
+	Override init with FIFO scheduler.
+	 */
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+		startYARNWithConfig(yarnConfiguration);
+	}
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		LOG.info("Starting testClientStartup()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "512",
+						"-tm", "1024"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testClientStartup()");
+	}
+
+	/**
+	 * Test querying the YARN cluster.
+	 *
+	 * This test validates through 666*2 cores in the "cluster".
+	 */
+	@Test
+	public void testQueryCluster() {
+		LOG.info("Starting testQueryCluster()");
+		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION);
// we have 666*2 cores.
+		LOG.info("Finished testQueryCluster()");
+	}
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		LOG.info("Starting testNonexistingQueue()");
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "512",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available:
1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testNonexistingQueue()");
+	}
+
+	/**
+	 * Test requesting more resources than available.
+	 */
+	@Test
+	public void testMoreNodesThanAvailable() {
+		LOG.info("Starting testMoreNodesThanAvailable()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "10",
+				"-jm", "512",
+				"-tm", "1024"}, "Error while deploying YARN cluster: This YARN session requires 10752MB
of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
+		LOG.info("Finished testMoreNodesThanAvailable()");
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 5 TaskManagers with 1585 MB
+	 *
+	 * user sees a total request of: 8181 MB (fits)
+	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
+	 */
+	@Test
+	public void testResourceComputation() {
+		LOG.info("Starting testResourceComputation()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "5",
+				"-jm", "256",
+				"-tm", "1585"}, "Error while deploying YARN cluster: This YARN session requires 8437MB
of memory in the cluster. There are currently only 8192MB available.", RunTypes.YARN_SESSION);
+		LOG.info("Finished testResourceComputation()");
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 2 TaskManagers with 3840 MB
+	 *
+	 * the user sees a total request of: 7936 MB (fits)
+	 * the system sees a request of: 8192 MB (fits)
+	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
+	 *
+	 * --> check if the system properly rejects allocating this session.
+	 */
+	@Test
+	public void testfullAlloc() {
+		LOG.info("Starting testfullAlloc()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "2",
+				"-jm", "256",
+				"-tm", "3840"}, "Error while deploying YARN cluster: There is not enough memory available
in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096,
4096]\n" +
+				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers
are available: [3584, 256]", RunTypes.YARN_SESSION);
+		LOG.info("Finished testfullAlloc()");
+	}
+
+	/**
+	 * Test per-job yarn cluster
+	 *
+	 * This also tests the prefixed CliFrontend options for the YARN case
+	 */
+	@Test
+	public void perJobYarnCluster() {
+		LOG.info("Starting perJobYarnCluster()");
+		File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar",
"streaming")); // exclude streaming wordcount here.
+		runWithArgs(new String[] {"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "512",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status
FINISHED.", RunTypes.CLI_FRONTEND);
+		LOG.info("Finished perJobYarnCluster()");
+	}
+
+	/**
+	 * Test the YARN Java API
+	 */
+	@Test
+	public void testJavaAPI() {
+		final int WAIT_TIME = 15;
+		LOG.info("Starting testJavaAPI()");
+
+		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(512);
+		flinkYarnClient.setTaskManagerMemory(512);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+		// deploy
+		AbstractFlinkYarnCluster yarnCluster = null;
+		try {
+			yarnCluster = flinkYarnClient.deploy(null);
+		} catch (Exception e) {
+			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
+			e.printStackTrace(System.err);
+			Assert.fail();
+		}
+		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
+		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				LOG.warn("Interrupted", e);
+				Thread.interrupted();
+			}
+			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+			if(status != null && status.equals(expectedStatus)) {
+				LOG.info("Cluster reached status " + status);
+				break; // all good, cluster started
+			}
+			if(second > WAIT_TIME) {
+				// we waited for 15 seconds. cluster didn't come up correctly
+				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
+			}
+		}
+
+		// use the cluster
+		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+		LOG.info("Shutting down cluster. All tests passed");
+		// shutdown cluster
+		yarnCluster.shutdown();
+		LOG.info("Finished testJavaAPI()");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index b12952a..89fc239 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -59,7 +59,7 @@ import java.util.Map;
  * we can use the YARN uberjar of flink to start a Flink YARN session.
  */
 public abstract class YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
 
 	private final static PrintStream originalStdout = System.out;
 	private final static PrintStream originalStderr = System.err;
@@ -227,7 +227,7 @@ public abstract class YarnTestBase {
 		try {
 			LOG.info("Starting up MiniYARN cluster");
 			if (yarnCluster == null) {
-				yarnCluster = new MiniYARNCluster(YARNSessionFIFOIT.class.getName(), 2, 1, 1);
+				yarnCluster = new MiniYARNCluster(YarnTestBase.class.getName(), 2, 1, 1);
 
 				yarnCluster.init(conf);
 				yarnCluster.start();
@@ -266,7 +266,7 @@ public abstract class YarnTestBase {
 	}
 
 	protected void runWithArgs(String[] args, String expect, RunTypes type) {
-		LOG.info("Running with args "+ Arrays.toString(args));
+		LOG.info("Running with args {}", Arrays.toString(args));
 
 		outContent = new ByteArrayOutputStream();
 		errContent = new ByteArrayOutputStream();
@@ -319,8 +319,8 @@ public abstract class YarnTestBase {
 		System.setOut(originalStdout);
 		System.setErr(originalStderr);
 
-		LOG.info("Sending stdout content through logger: \n\n"+outContent.toString()+"\n\n");
-		LOG.info("Sending stderr content through logger: \n\n"+errContent.toString()+"\n\n");
+		LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
+		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
 	}
 
 	public static class Runner extends Thread {

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index 26d6a12..237acb5 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, file
+log4j.rootLogger=WARN, file
 
 # Log all infos in the given file
 log4j.appender.file=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index c922963..cc22c5d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -134,6 +133,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	private String dynamicPropertiesEncoded;
 
 	private List<File> shipFiles = new ArrayList<File>();
+	private org.apache.flink.configuration.Configuration flinkConfiguration;
 
 
 	public FlinkYarnClient() {
@@ -170,6 +170,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	}
 
 	@Override
+	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf)
{
+		this.flinkConfiguration = conf;
+	}
+
+	@Override
 	public void setTaskManagerSlots(int slots) {
 		if(slots <= 0) {
 			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
@@ -255,6 +260,9 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		if(this.flinkConfigurationPath == null) {
 			throw new YarnDeploymentException("Configuration path not set");
 		}
+		if(this.flinkConfiguration == null) {
+			throw new YarnDeploymentException("Flink configuration object has not been set");
+		}
 
 	}
 
@@ -277,9 +285,9 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		isReadyForDepoyment();
 
 		LOG.info("Using values:");
-		LOG.info("\tTaskManager count = " + taskManagerCount);
-		LOG.info("\tJobManager memory = " + jobManagerMemoryMb);
-		LOG.info("\tTaskManager memory = " + taskManagerMemoryMb);
+		LOG.info("\tTaskManager count = {}", taskManagerCount);
+		LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
+		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
 
 		// Create application via yarnClient
 		yarnApplication = yarnClient.createApplication();
@@ -386,7 +394,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS,
"");
+		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS,
"");
 
 		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
 		boolean hasLogback = new File(logbackFile).exists();

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 98abd5e..273439e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask;
 import akka.actor.Props;
 import akka.util.Timeout;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils$;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
@@ -210,9 +211,14 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		}
 		List<String> ret = new ArrayList<String>();
 		// get messages from ApplicationClient (locally)
+
 		while(true) {
-			Future<Object> messageOptionFuture = ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$,
akkaTimeout);
-			Object messageOption = awaitUtil(messageOptionFuture, "Error getting new messages from
Appliation Client");
+			Object messageOption = null;
+			try {
+				messageOption = AkkaUtils$.MODULE$.ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$,
akkaDuration);
+			} catch(IOException ioe) {
+				LOG.warn("Error getting the yarn messages locally", ioe);
+			}
 			if(messageOption instanceof None$) {
 				break;
 			} else if(messageOption instanceof org.apache.flink.yarn.Messages.YarnMessage) {
@@ -252,8 +258,11 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		if(actorSystem != null){
 			LOG.info("Sending shutdown request to the Application Master");
 			if(applicationClient != ActorRef.noSender()) {
-				Future<Object> future = ask(applicationClient, new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED),
akkaTimeout);
-				awaitUtil(future, "Error while stopping YARN Application Client");
+				try {
+					AkkaUtils$.MODULE$.ask(applicationClient, new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED),
akkaDuration);
+				} catch(IOException e) {
+					throw new RuntimeException("Error while stopping YARN Application Client", e);
+				}
 			}
 
 			actorSystem.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/2c1c91e2/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index fd67b01..01900ee 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -173,7 +173,6 @@ object ApplicationMaster {
     for(property <- dynamicProperties.asScala){
       configuration.setString(property.f0, property.f1)
     }
-    GlobalConfiguration.getConfiguration.addAll(configuration) // make part of globalConf.
 
     // set port to 0 to let Akka automatically determine the port.
     implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port = 0, configuration)


Mime
View raw message