flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/3] git commit: [FLINK-968] Add slot parameter to YARN client
Date Wed, 20 Aug 2014 09:15:58 GMT
[FLINK-968] Add slot parameter to YARN client


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e8f2e9d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e8f2e9d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e8f2e9d0

Branch: refs/heads/master
Commit: e8f2e9d0efa2fb27106658de38032d8adbb76dba
Parents: ae32c18
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Jul 17 10:55:30 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Aug 20 10:38:25 2014 +0200

----------------------------------------------------------------------
 docs/config.md                                  |   5 +
 docs/yarn_setup.md                              |  78 +++++++--
 .../main/java/org/apache/flink/yarn/Client.java | 167 ++++++++++++++-----
 .../apache/flink/yarn/ClientMasterControl.java  |  18 +-
 .../flink/yarn/appMaster/ApplicationMaster.java | 103 +++++++++---
 .../flink/yarn/rpc/ApplicationMasterStatus.java |   9 +
 .../yarn/rpc/YARNClientMasterProtocol.java      |  22 ++-
 .../org/apache/flink/client/CliFrontend.java    | 101 +++++++++--
 .../testconfigwithinvalidyarn/.yarn-jobmanager  |   1 -
 .../testconfigwithinvalidyarn/.yarn-properties  |   1 +
 .../testconfigwithyarn/.yarn-jobmanager         |   1 -
 .../testconfigwithyarn/.yarn-properties         |   3 +
 .../resources/web-docs-infoserver/analyze.html  |   4 +-
 .../web-docs-infoserver/blank-page.html         |   4 +-
 .../web-docs-infoserver/configuration.html      |   4 +-
 .../resources/web-docs-infoserver/history.html  |   6 +-
 .../resources/web-docs-infoserver/index.html    |  32 +++-
 .../js/jobmanagerFrontend.js                    |   1 +
 .../web-docs-infoserver/taskmanagers.html       |   6 +-
 .../org/apache/flink/runtime/ipc/Client.java    |  10 +-
 .../java/org/apache/flink/runtime/ipc/RPC.java  |   2 -
 .../org/apache/flink/runtime/ipc/Server.java    |   1 -
 .../flink/runtime/jobmanager/JobManager.java    |  10 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   2 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |   7 +
 25 files changed, 464 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index 4a02b0f..47a096f 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -252,5 +252,10 @@ input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
 
 # YARN
 
+Please note that all ports used by Flink in a YARN session are offsetted by the YARN application ID
+to avoid duplicate port allocations when running multiple YARN sessions in parallel. 
+
+So if `yarn.am.rpc.port` is configured to `10245` and the session's application ID is `application_1406629969999_0002`, then the actual port being used is 10245 + 2 = 10247
+
 - `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
 let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/docs/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index dc76826..86cb6b0 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -4,7 +4,7 @@ title:  "YARN Setup"
 
 # In a Nutshell
 
-Start YARN session with 4 Taskmanagers (each with 4 GB of Heapspace):
+Start YARN session with 4 Task Managers (each with 4 GB of Heapspace):
 
 ```bash
 wget {{ site.FLINK_DOWNLOAD_URL_YARN_STABLE }}
@@ -13,6 +13,8 @@ cd flink-yarn-{{ site.FLINK_VERSION_STABLE }}/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
 ```
 
+Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
+
 # Introducing YARN
 
 Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
@@ -20,7 +22,7 @@ Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management
 **Requirements**
 
 - Apache Hadoop 2.2
-- HDFS
+- HDFS (Hadoop Distributed File System)
 
 If you have troubles using the Flink YARN client, have a look in the [FAQ section]({{site.baseurl}}/docs/0.5/general/faq.html).
 
@@ -32,10 +34,10 @@ A session will start all required Flink services (JobManager and TaskManagers) s
 
 ### Download Flink for YARN
 
-Download the YARN tgz package on the [download page]({{site.baseurl}}/downloads/#nightly). It contains the required files.
+Download the YARN tgz package on the [download page]({{site.baseurl}}/downloads/). It contains the required files.
 
 
-If you want to build the YARN .tgz file from sources, follow the build instructions. Make sure to use the `-Dhadoop.profile=2` profile. You can find the file in `flink-dist/target/flink-dist-{{site.docs_05_stable}}-yarn.tar.gz` (*Note: The version might be different for you* ).
+If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html). Make sure to use the `-Dhadoop.profile=2` profile. You can find the file in `flink-dist/target/flink-dist-{{site.docs_05_stable}}-yarn.tar.gz` (*Note: The version might be different for you* ).
 
 Extract the package using:
 
@@ -57,11 +59,13 @@ This command will show you the following overview:
 ```bash
 Usage:
    Required
-     -n,--container <arg>   Number of Yarn container to allocate (=Number of TaskTrackers)
+     -n,--container <arg>   Number of Yarn container to allocate (=Number of Task Managers)
    Optional
+     -D <arg>                       Dynamic Properties
      -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
+     -s,--slots <arg>                Number of slots per TaskManager
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
      -tmc,--taskManagerCores <arg>   Virtual CPU cores per TaskManager
      -v,--verbose                    Verbose debug mode
@@ -69,17 +73,21 @@ Usage:
 
 Please note that the Client requires the `HADOOP_HOME` (or `YARN_CONF_DIR` or `HADOOP_CONF_DIR`) environment variable to be set to read the YARN and HDFS configuration.
 
-**Example:** Issue the following command to allocate 10 TaskTrackers, with 8 GB of memory each:
+**Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
 
 ```bash
-./bin/yarn-session.sh -n 10 -tm 8192
+./bin/yarn-session.sh -n 10 -tm 8192 -s 32
 ```
 
-The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something. Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines) and `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN).
+The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something. 
+
+Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelization.degree.default` if the number of slots has been specified.
+
+If you don't want to change the configuration file to pass configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
 
-The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and JobTracker.
+The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and Job Manager.
 
-Once Flink is deployed in your YARN cluster, it will show you the connection details of the JobTracker.
+Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.
 
 The client has to remain open to keep the deployment running. We suggest to use `screen`, which will start a detachable shell:
 
@@ -88,6 +96,7 @@ The client has to remain open to keep the deployment running. We suggest to use
 3. Use `CTRL+a`, then press `d` to detach the screen session,
 4. Use `screen -r` to resume again.
 
+
 # Submit Job to Flink
 
 Use the following command to submit a Flink program to the YARN cluster:
@@ -102,14 +111,22 @@ The command will show you a help menu like this:
 
 ```bash
 [...]
-Action "run" compiles and submits a Flink program.
+Action "run" compiles and runs a program.
+
+  Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action arguments:
-     -a,--arguments <programArgs>   Program arguments
-     -c,--class <classname>         Program class
-     -j,--jarfile <jarfile>         Flink program JAR file
-     -m,--jobmanager <host:port>    Jobmanager to which the program is submitted
-     -w,--wait                      Wait for program to finish
-[...]
+     -c,--class <classname>           Class with the program entry point ("main"
+                                      method or "getPlan()" method. Only needed
+                                      if the JAR file does not specify the class
+                                      in its manifest.
+     -m,--jobmanager <host:port>      Address of the JobManager (master) to
+                                      which to connect. Use this flag to connect
+                                      to a different JobManager than the one
+                                      specified in the configuration.
+     -p,--parallelism <parallelism>   The parallelism with which to run the
+                                      program. Optional flag to override the
+                                      default value specified in the
+                                      configuration
 ```
 
 Use the *run* action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the `-m` argument. The JobManager address is visible in the YARN console.
@@ -135,6 +152,31 @@ You can check the number of TaskManagers in the JobManager web interface. The ad
 If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
 
 
+# Debugging a failed YARN session
+
+There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
+
+## Log Files
+
+In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). 
+To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file.
+Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
+
+```
+yarn logs -applicationId <application ID>
+```
+
+Note that it takes a few seconds after the session has finished until the logs show up.
+
+## YARN Client console & Webinterfaces
+
+The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).
+
+In addition to that, there is the YARN Resource Manager webinterface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value. 
+
+It allows to access log files for running YARN applications and shows diagnostics for failed apps.
+
+
 # Build YARN client for a specific Hadoop version
 
 Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](building.html) for more details.
@@ -155,6 +197,6 @@ When starting a new Flink YARN session, the client first checks if the requested
 
 The next step of the client is to request (step 2) a YARN container to start the *ApplicationMaster* (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the *ApplicationMaster* (AM) is started.
 
-The *JobManager* and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the *AM* container is also serving Flink's web interface.
+The *JobManager* and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the *AM* container is also serving Flink's web interface. The ports Flink is using for its services are the standard ports configured by the user + the application id as an offset. This allows users to execute multiple Flink YARN sessions in parallel.
 
 After that, the AM starts allocating the containers for Flink's TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index a2090f1..292d59a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
-
 package org.apache.flink.yarn;
 
 import java.io.BufferedReader;
@@ -28,7 +25,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.PrintWriter;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.jar.JarFile;
 
 import org.apache.commons.cli.CommandLine;
@@ -45,11 +43,13 @@ import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.appMaster.ApplicationMaster;
 import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -104,47 +104,69 @@ public class Client {
 	private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
 	private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
 	private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
-	private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
+	private static final Option SHIP_PATH = new Option("t","ship",true, "Ship files in the specified directory (t for transfer)");
 	private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
 	private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
 	private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
 	private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
 	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
 	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
-			+ " TaskTrackers)");
+			+ " Task Managers)");
+	private static final Option SLOTS = new Option("s","slots",true, "Number of slots per TaskManager");
+	/**
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+	 */
+	private static final Option DYNAMIC_PROPERTIES = new Option("D", true, "Dynamic properties");
 
 	/**
-	 * Constants
+	 * Constants,
+	 * all starting with ENV_ are used as environment variables to pass values from the Client
+	 * to the Application Master.
 	 */
-	// environment variable names
 	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
 	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
 	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
 	public final static String ENV_APP_ID = "_APP_ID";
+	public final static String ENV_APP_NUMBER = "_APP_NUMBER";
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
 	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
 	public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
 
 	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
+	
 	/**
 	 * Seconds to wait between each status query to the AM.
 	 */
 	private static final int CLIENT_POLLING_INTERVALL = 3;
+	/**
+	 * Minimum memory requirements, checked by the Client.
+	 */
+	private static final int MIN_JM_MEMORY = 128;
+	private static final int MIN_TM_MEMORY = 128;
 
 	private Configuration conf;
 	private YarnClient yarnClient;
 
 	private ClientMasterControl cmc;
 
-	private ApplicationId appId;
-
-	private File addrFile;
+	private File yarnPropertiesFile;
 
+	/**
+	 * Files (usually in a distributed file system) used for the YARN session of Flink.
+	 * Contains configuration files and jar files.
+	 */
 	private Path sessionFilesDir;
 
+	/**
+	 * If the user has specified a different number of slots, we store them here
+	 */
+	private int slots = -1;
+	
 	public void run(String[] args) throws Exception {
 
 		if(UserGroupInformation.isSecurityEnabled()) {
@@ -167,6 +189,8 @@ public class Client {
 		options.addOption(QUEUE);
 		options.addOption(QUERY);
 		options.addOption(SHIP_PATH);
+		options.addOption(SLOTS);
+		options.addOption(DYNAMIC_PROPERTIES);
 
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd = null;
@@ -287,12 +311,31 @@ public class Client {
 		if(cmd.hasOption(JM_MEMORY.getOpt())) {
 			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
 		}
-
+		if(jmMemory < MIN_JM_MEMORY) {
+			System.out.println("The JobManager memory is below the minimum required memory amount "
+					+ "of "+MIN_JM_MEMORY+" MB");
+			System.exit(1);
+		}
 		// Task Managers memory
 		int tmMemory = 1024;
 		if(cmd.hasOption(TM_MEMORY.getOpt())) {
 			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
 		}
+		if(tmMemory < MIN_TM_MEMORY) {
+			System.out.println("The TaskManager memory is below the minimum required memory amount "
+					+ "of "+MIN_TM_MEMORY+" MB");
+			System.exit(1);
+		}
+		
+		if(cmd.hasOption(SLOTS.getOpt())) {
+			slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+		}
+		
+		String[] dynamicProperties = null;
+		if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		}
+		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		// Task Managers vcores
 		int tmCores = 1;
@@ -305,6 +348,7 @@ public class Client {
 			LOG.warn("Unable to find job manager port in configuration!");
 			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
 		}
+		
 		conf = Utils.initializeYarnConfiguration();
 
 		// intialize HDFS
@@ -320,7 +364,7 @@ public class Client {
 		}
 
 		// Create yarnClient
-		final YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();
 
@@ -395,7 +439,7 @@ public class Client {
 		if(hasLog4j) {
 			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 		}
-		amCommand 	+= " org.apache.flink.yarn.appMaster.ApplicationMaster" + " "
+		amCommand 	+= " "+ApplicationMaster.class.getName()+" "
 					+ " 1>"
 					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
 					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
@@ -406,7 +450,15 @@ public class Client {
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
 		final ApplicationId appId = appContext.getApplicationId();
-
+		/**
+		 * All network ports are offsetted by the application number 
+		 * to avoid version port clashes when running multiple Flink sessions
+		 * in parallel
+		 */
+		int appNumber = appId.getId();
+
+		jmPort += appNumber;
+				
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
@@ -447,6 +499,7 @@ public class Client {
 		fs.close();
 
 		int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
+		amRPCPort += appNumber;
 		// Setup CLASSPATH for ApplicationMaster
 		Map<String, String> appMasterEnv = new HashMap<String, String>();
 		Utils.setupEnv(conf, appMasterEnv);
@@ -460,6 +513,11 @@ public class Client {
 		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
 		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
 		appMasterEnv.put(Client.ENV_AM_PRC_PORT, String.valueOf(amRPCPort));
+		appMasterEnv.put(Client.ENV_SLOTS, String.valueOf(slots));
+		appMasterEnv.put(Client.ENV_APP_NUMBER, String.valueOf(appNumber));
+		if(dynamicPropertiesEncoded != null) {
+			appMasterEnv.put(Client.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
 
 		amContainer.setEnvironment(appMasterEnv);
 
@@ -473,8 +531,8 @@ public class Client {
 		appContext.setResource(capability);
 		appContext.setQueue(queue);
 
-		// file that we write into the conf/ dir containing the jobManager address.
-		addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+		// file that we write into the conf/ dir containing the jobManager address and the dop.
+		yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
 
 
 		LOG.info("Submitting application master " + appId);
@@ -485,6 +543,7 @@ public class Client {
 		char[] el = { '/', '|', '\\', '-'};
 		int i = 0;
 		int numTaskmanagers = 0;
+		int numMessages = 0;
 
 		BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
 
@@ -495,10 +554,19 @@ public class Client {
 				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
 				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
 				// write jobmanager connect information
-				PrintWriter out = new PrintWriter(addrFile);
-				out.println(appReport.getHost()+":"+jmPort);
+				Properties yarnProps = new Properties();
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, appReport.getHost()+":"+jmPort);
+				if(slots != -1) {
+					yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(slots * taskManagerCount) );
+				}
+				// add dynamic properties
+				if(dynamicProperties != null) {
+					yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicPropertiesEncoded);
+				}
+				OutputStream out = new FileOutputStream(yarnPropertiesFile);
+				yarnProps.store(out, "Generated YARN properties file");
 				out.close();
-				addrFile.setReadable(true, false); // readable for all.
+				yarnPropertiesFile.setReadable(true, false); // readable for all.
 
 				// connect RPC service
 				cmc = new ClientMasterControl(new InetSocketAddress(appReport.getHost(), amRPCPort));
@@ -515,26 +583,34 @@ public class Client {
 			} else {
 				int newTmCount = cmc.getNumberOfTaskManagers();
 				if(numTaskmanagers != newTmCount) {
-					System.err.println("Number of connected TaskManagers changed to "+newTmCount+" slots available: "+cmc.getNumberOfAvailableSlots());
+					System.err.println("Number of connected TaskManagers changed to "+newTmCount+". "
+							+ "Slots available: "+cmc.getNumberOfAvailableSlots());
 					numTaskmanagers = newTmCount;
 				}
+				// we also need to show new messages.
 				if(cmc.getFailedStatus()) {
 					System.err.println("The Application Master failed!\nMessages:\n");
 					for(Message m: cmc.getMessages() ) {
-						System.err.println("Message: "+m.text);
+						System.err.println("Message: "+m.getMessage());
 					}
 					System.err.println("Requesting Application Master shutdown");
 					cmc.shutdownAM();
+					cmc.close();
 					System.err.println("Application Master closed.");
 				}
-				for(Message m: cmc.getMessages() ) {
-					System.err.println("Message: "+m.text);
+				if(cmc.getMessages().size() != numMessages) {
+					System.err.println("Received new message(s) from the Application Master");
+					List<Message> msg = cmc.getMessages();
+					while(msg.size() > numMessages) {
+						System.err.println("Message: "+msg.get(numMessages).getMessage());
+						numMessages++;
+					}
 				}
 
 				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
 				long startTime = System.currentTimeMillis();
 				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-				        && !in.ready()) {
+						&& !in.ready()) {
 					Thread.sleep(200);
 				}
 				if (in.ready()) {
@@ -549,10 +625,15 @@ public class Client {
 		}
 
 		LOG.info("Application " + appId + " finished with"
-				+ " state " + appState + "and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+				+ " state " + appState + " and "
+				+ "final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
 
 		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
 			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
+			LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retreive "
+					+ "the full application log using this command:\n"
+					+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
+					+ "(It sometimes takes a few seconds until the logs are aggregated)");
 		}
 
 	}
@@ -560,8 +641,6 @@ public class Client {
 	private void printHelp() {
 		System.err.println("Available commands:\n"
 				+ "\t stop : Stop the YARN session\n"
-			//	+ "\t add n : Add n TaskManagers to the YARN session\n"
-			//	+ "\t remove n : Remove n TaskManagers to the YARN session\n"
 				+ "\t allmsg : Show all messages\n");
 	}
 	private void evalCommand(String command) {
@@ -573,34 +652,38 @@ public class Client {
 		} else if(command.equals("allmsg")) {
 			System.err.println("All messages from the ApplicationMaster:");
 			for(Message m: cmc.getMessages() ) {
-				System.err.println("Message: "+m.text);
+				System.err.println("Message: "+m.getMessage());
 			}
 		} else if(command.startsWith("add")) {
-			String nStr = command.replace("add", "").trim();
-			int n = Integer.valueOf(nStr);
-			System.err.println("Adding "+n+" TaskManagers to the session");
-			cmc.addTaskManagers(n);
+			System.err.println("This feature is not implemented yet!");
+//			String nStr = command.replace("add", "").trim();
+//			int n = Integer.valueOf(nStr);
+//			System.err.println("Adding "+n+" TaskManagers to the session");
+//			cmc.addTaskManagers(n);
 		} else {
 			System.err.println("Unknown command '"+command+"'");
 			printHelp();
 		}
 	}
 
+	private void cleanUp() throws IOException {
+		LOG.info("Deleting files in "+sessionFilesDir );
+		FileSystem shutFS = FileSystem.get(conf);
+		shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+		shutFS.close();
+	}
+	
 	private void stopSession() {
 		try {
 			LOG.info("Sending shutdown request to the Application Master");
 			cmc.shutdownAM();
-			yarnClient.killApplication(appId);
-			LOG.info("Deleting files in "+sessionFilesDir );
-			FileSystem shutFS = FileSystem.get(conf);
-			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-			shutFS.close();
+			cleanUp();
 			cmc.close();
 		} catch (Exception e) {
 			LOG.warn("Exception while killing the YARN application", e);
 		}
 		try {
-			addrFile.delete();
+			yarnPropertiesFile.delete();
 		} catch (Exception e) {
 			LOG.warn("Exception while deleting the JobManager address file", e);
 		}
@@ -653,6 +736,8 @@ public class Client {
 		opt.addOption(TM_CORES);
 		opt.addOption(QUERY);
 		opt.addOption(QUEUE);
+		opt.addOption(SLOTS);
+		opt.addOption(DYNAMIC_PROPERTIES);
 		formatter.printHelp(" ", opt);
 	}
 
@@ -722,6 +807,4 @@ public class Client {
 		Client c = new Client();
 		c.run(args);
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
index 44633ea..1c87289 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -40,7 +40,7 @@ public class ClientMasterControl extends Thread {
 	private ApplicationMasterStatus appMasterStatus;
 	private YARNClientMasterProtocol cmp;
 	private Object lock = new Object();
-	private List<Message> messages;
+	private List<Message> messages = new ArrayList<Message>();
 	private boolean running = true;
 
 	public ClientMasterControl(InetSocketAddress applicationMasterAddress) {
@@ -55,9 +55,13 @@ public class ClientMasterControl extends Thread {
 
 			while(running) {
 				synchronized (lock) {
-					appMasterStatus = cmp.getAppplicationMasterStatus();
-					if(messages != null && appMasterStatus != null &&
-							messages.size() != appMasterStatus.getMessageCount()) {
+					try {
+						appMasterStatus = cmp.getAppplicationMasterStatus();
+					} catch(Throwable e) {
+						// TODO: try to clean up as much as possible! (set to failed state? // kill app? // clean up files)
+						LOG.warn("Failed to get Application Master status", e);
+					}
+					if(appMasterStatus != null && messages.size() != appMasterStatus.getMessageCount()) {
 						messages = cmp.getMessages();
 					}
 				}
@@ -104,7 +108,8 @@ public class ClientMasterControl extends Thread {
 
 	public boolean shutdownAM() {
 		try {
-			return cmp.shutdownAM().getValue();
+			boolean result = cmp.shutdownAM().getValue();
+			return result;
 		} catch(Throwable e) {
 			LOG.warn("Error shutting down the application master", e);
 			return false;
@@ -112,9 +117,6 @@ public class ClientMasterControl extends Thread {
 	}
 
 	public List<Message> getMessages() {
-		if(this.messages == null) {
-			return new ArrayList<Message>();
-		}
 		return this.messages;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
index 2186fca..30d2f30 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -36,6 +36,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.ipc.RPC;
@@ -81,7 +83,8 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	private final String currDir;
 	private final String logDirs;
 	private final String ownHostname;
-	private final String appId;
+	private final String appId; // YARN style application id, for example: application_1406629969999_0002
+	private final int appNumber; // app number, for example 2 (see above)
 	private final String clientHomeDir;
 	private final String applicationMasterHost;
 	private final String remoteFlinkJarPath;
@@ -91,8 +94,10 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	private final int taskManagerCount;
 	private final int memoryPerTaskManager;
 	private final int coresPerTaskManager;
+	private final int slots;
 	private final String localWebInterfaceDir;
-	private final Configuration conf;
+	private final Configuration conf; // Hadoop!! configuration.
+	
 
 	/**
 	 * File system for interacting with Flink's files such as the jar
@@ -153,6 +158,21 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	 * that the client can still retrieve the messages and then shut it down)
 	 */
 	private Boolean isFailed = false;
+	private boolean isClosed = false;
+
+	private String dynamicPropertiesEncodedString;
+	
+	/**
+	 * AM status that is send to the Client periodically
+	 */
+	private ApplicationMasterStatus amStatus;
+
+	/**
+	 * The JobManager's port, offsetted by the appNumber.
+	 */
+	private final int jobManagerPort;
+	private final int jobManagerWebPort;
+	
 
 	public ApplicationMaster(Configuration conf) throws IOException {
 		fs = FileSystem.get(conf);
@@ -161,15 +181,19 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		logDirs =  envs.get(Environment.LOG_DIRS.key());
 		ownHostname = envs.get(Environment.NM_HOST.key());
 		appId = envs.get(Client.ENV_APP_ID);
+		appNumber = Integer.valueOf(envs.get(Client.ENV_APP_NUMBER));
 		clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
 		applicationMasterHost = envs.get(Environment.NM_HOST.key());
 		remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
 		shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
 		yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		rpcPort = envs.get(Client.ENV_AM_PRC_PORT);
+		rpcPort = envs.get(Client.ENV_AM_PRC_PORT); // already offsetted
 		taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
 		memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
 		coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+		slots = Integer.valueOf(envs.get(Client.ENV_SLOTS));
+		dynamicPropertiesEncodedString = envs.get(Client.ENV_DYNAMIC_PROPERTIES); // might return null!
+		
 		localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
 		this.conf = conf;
 
@@ -179,7 +203,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		if(ownHostname == null) {
 			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
 		}
-		LOG.info("Working directory "+currDir);
+		LOG.debug("Working directory "+currDir);
 
 		// load Flink configuration.
 		Utils.getFlinkConfiguration(currDir);
@@ -187,6 +211,16 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		// start AM RPC service
 		amRpcServer = RPC.getServer(this, ownHostname, Integer.valueOf(rpcPort), 2);
 		amRpcServer.start();
+		
+		// determine JobManager port
+		int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		if(port != -1) {
+			port += appNumber;
+		} else {
+			LOG.warn("JobManager port is unknown");
+		}
+		this.jobManagerPort = port;
+		this.jobManagerWebPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)+appNumber;
 	}
 	
 	private void setFailed(boolean failed) {
@@ -210,8 +244,26 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		}
 		// just to make sure.
 		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY+": "+jobManagerPort+"\n"); // already offsetted here.
 		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
 		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+		
+		output.append(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY+": "+ jobManagerWebPort +"\n");
+		
+		
+		if(slots != -1) {
+			// configure slots and default dop
+			output.append(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS+": "+slots+"\n");
+			output.append(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY+": "+slots*taskManagerCount+"\n");
+		}
+		// add dynamic properties
+		List<Tuple2<String, String>> dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString);
+		for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
+			String propLine = dynamicProperty.f0+": "+dynamicProperty.f1;
+			output.append(propLine+"\n");
+			LOG.debug("Adding user-supplied configuration value to generated configuration file: "+propLine);
+		}
+		
 		output.close();
 		br.close();
 		File newConf = new File(currDir+"/flink-conf-modified.yaml");
@@ -245,10 +297,11 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		nmClient.init(conf);
 		nmClient.start();
 		nmClient.cleanupRunningContainersOnStop(true);
-
+		
 		// Register with ResourceManager
-		LOG.info("Registering ApplicationMaster");
-		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+		String url = "http://"+applicationMasterHost+":"+jobManagerWebPort;
+		LOG.info("Registering ApplicationMaster with tracking url "+url);
+		rmClient.registerApplicationMaster(applicationMasterHost, 0, url);
 
 		// Priority for worker containers - priorities are intra-application
 		Priority priority = Records.newRecord(Priority.class);
@@ -329,15 +382,17 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 			}
 			Thread.sleep(5000);
 		}
-		LOG.info("Shutting down JobManager");
-		jobManager.shutdown();
-
+		if(isClosed) {
+			return;
+		}
 		// Un-register with ResourceManager
 		final String diagnosticsMessage = "Application Master shut down after all "
 				+ "containers finished\n"+containerDiag.toString();
 		LOG.info("Diagnostics message: "+diagnosticsMessage);
 		rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnosticsMessage, "");
 		this.close();
+		amRpcServer.stop(); // we need to manually stop the RPC service. Usually, the Client stops the RPC,
+		// but at this point, the AM has been shut down (for some reason).
 		LOG.info("Application Master shutdown completed.");
 	}
 
@@ -364,7 +419,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 				if(hasLog4j) {
 					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 				}
-				tmCommand	+= " org.apache.flink.appMaster.YarnTaskManagerRunner -configDir . "
+				tmCommand	+= " "+YarnTaskManagerRunner.class.getName()+" -configDir . "
 						+ " 1>"
 						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
 						+ "/taskmanager-stdout.log" 
@@ -398,7 +453,6 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 
 				LOG.info("Launching container " + allocatedContainers);
 				nmClient.startContainer(container, ctx);
-				messages.add(new Message("Launching new container"));
 			}
 			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
 				++completedContainers;
@@ -422,12 +476,16 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	
 	@Override
 	public ApplicationMasterStatus getAppplicationMasterStatus() {
-		ApplicationMasterStatus amStatus;
+		if(amStatus == null) {
+			amStatus = new ApplicationMasterStatus();
+		}
 		if(jobManager == null) {
 			// JM not yet started
-			amStatus = new ApplicationMasterStatus(0, 0 );
+			amStatus.setNumTaskManagers(0);
+			amStatus.setNumSlots(0);
 		} else {
-			amStatus = new ApplicationMasterStatus(jobManager.getNumberOfTaskManagers(), jobManager.getAvailableSlots() );
+			amStatus.setNumTaskManagers(jobManager.getNumberOfTaskManagers());
+			amStatus.setNumSlots(jobManager.getAvailableSlots());
 		}
 		amStatus.setMessageCount(messages.size());
 		amStatus.setFailed(isFailed);
@@ -452,12 +510,17 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	}
 	
 	private void close() throws Exception {
-		nmClient.close();
-		rmClient.close();
-		if(!isFailed) {
-			LOG.warn("Can not close AM RPC connection since this the AM is in failed state");
-			amRpcServer.stop();
+		if(!isClosed) {
+			jobManager.shutdown();
+			nmClient.close();
+			rmClient.close();
+			if(!isFailed) {
+			//	amRpcServer.stop();
+			} else {
+				LOG.warn("Can not close AM RPC connection since the AM is in failed state");
+			}
 		}
+		this.isClosed = true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
index b2bdf6b..39ef31a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
@@ -71,6 +71,15 @@ public class ApplicationMasterStatus implements IOReadableWritable {
 		this.failed = isFailed;
 	}
 
+	public void setNumTaskManagers(int num) {
+		this.numTaskManagers = num;
+	}
+	
+	public void setNumSlots(int slots) {
+		this.numSlots = slots;
+	}
+	
+	
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(numTaskManagers);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
index 560147c..670a931 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn.rpc;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -34,21 +35,37 @@ import org.apache.flink.types.BooleanValue;
  */
 public interface YARNClientMasterProtocol extends VersionedProtocol {
 
+	/**
+	 * Message from Am to Client.
+	 *
+	 */
 	public static class Message implements IOReadableWritable {
-		public String text;
-
+		private String text;
+		private Date date;
+		
+		public Message() {	
+			// for deserializability
+		}
+		
 		public Message(String msg) {
 			this.text = msg;
+			this.date = new Date();
+		}
+		
+		public String getMessage() {
+			return "["+date+"] "+text;
 		}
 
 		@Override
 		public void write(DataOutputView out) throws IOException {
 			out.writeUTF(text);
+			out.writeLong(date.getTime());
 		}
 
 		@Override
 		public void read(DataInputView in) throws IOException {
 			text = in.readUTF();
+			date = new Date(in.readLong());
 		}
 	}
 
@@ -59,5 +76,4 @@ public interface YARNClientMasterProtocol extends VersionedProtocol {
 	List<Message> getMessages();
 
 	void addTaskManagers(int n);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e516255..d15b0d0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -20,7 +20,9 @@
 package org.apache.flink.client;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -40,9 +43,9 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.cli.UnrecognizedOptionException;
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -109,7 +112,15 @@ public class CliFrontend {
 	private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 	
-	public static final String JOBMANAGER_ADDRESS_FILE = ".yarn-jobmanager";
+	/**
+	 * YARN-session related constants
+	 */
+	public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
+	public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
+	public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
+	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
+	// this has to be a regex for String.split()
+	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
 	
 
 	private CommandLineParser parser;
@@ -118,6 +129,10 @@ public class CliFrontend {
 	private boolean printHelp;
 	
 	private boolean globalConfigurationLoaded;
+	
+	private boolean yarnPropertiesLoaded = false;
+	
+	private Properties yarnProperties;
 
 	/**
 	 * Initializes the class
@@ -723,24 +738,19 @@ public class CliFrontend {
 			}
 		}
 		else {
-			// second, search for a .yarn-jobmanager file
-			String loc = getConfigurationDirectory();
-			File jmAddressFile = new File(loc + '/' + JOBMANAGER_ADDRESS_FILE);
-			
-			if (jmAddressFile.exists()) {
+			Properties yarnProps = getYarnProperties();
+			if(yarnProps != null) {
 				try {
-					String address = FileUtils.readFileToString(jmAddressFile).trim();
-					System.out.println("Found a " + JOBMANAGER_ADDRESS_FILE + " file, using \""+address+"\" to connect to the JobManager");
-					
+					String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+					System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, "
+							+ "using \""+address+"\" to connect to the JobManager");
 					return RemoteExecutor.getInetFromHostport(address);
-				}
-				catch (Exception e) {
-					System.out.println("Found a " + JOBMANAGER_ADDRESS_FILE + " file, but could not read the JobManager address from the file. " 
+				} catch (Exception e) {
+					System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " 
 								+ e.getMessage());
 					return null;
 				}
-			}
-			else {
+			} else {
 				// regular config file gives the address
 				String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 				
@@ -809,11 +819,72 @@ public class CliFrontend {
 		if (!globalConfigurationLoaded) {
 			String location = getConfigurationDirectory();
 			GlobalConfiguration.loadConfiguration(location);
+			// set default parallelization degree
+			Properties yarnProps;
+			try {
+				yarnProps = getYarnProperties();
+				if(yarnProps != null) {
+					String propDegree = yarnProps.getProperty(YARN_PROPERTIES_DOP);
+					int paraDegree = -1;
+					if(propDegree != null) { // maybe the property is not set
+						paraDegree = Integer.valueOf(propDegree);
+					}
+					Configuration c = GlobalConfiguration.getConfiguration();
+					if(paraDegree != -1) {
+						c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
+					}
+					// handle the YARN client's dynamic properties
+					String dynamicPropertiesEncoded = yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+					List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+					for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
+						c.setString(dynamicProperty.f0, dynamicProperty.f1);
+					}
+					GlobalConfiguration.includeConfiguration(c); // update config
+				}
+			} catch (IOException e) {
+				System.err.println("Error while loading YARN properties: "+e.getMessage());
+				e.printStackTrace();
+			}
+			
 			globalConfigurationLoaded = true;
 		}
 		return GlobalConfiguration.getConfiguration();
 	}
 	
+	public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
+		List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
+		if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+			String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+			for(String propLine : propertyLines) {
+				if(propLine == null) {
+					continue;
+				}
+				String[] kv = propLine.split("=");
+				if(kv != null && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
+					ret.add(new Tuple2<String, String>(kv[0], kv[1]));
+				}
+			}
+		}
+		return ret;
+	}
+	
+	protected Properties getYarnProperties() throws IOException {
+		if(!yarnPropertiesLoaded) {
+			String loc = getConfigurationDirectory();
+			File propertiesFile = new File(loc + '/' + YARN_PROPERTIES_FILE);
+			if (propertiesFile.exists()) {
+				Properties props = new Properties();
+				InputStream is = new FileInputStream( propertiesFile );
+				props.load(is);
+				yarnProperties = props;
+				is.close();
+			} else {
+				yarnProperties = null;
+			}
+		}
+		return yarnProperties;
+	}
+	
 	protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException {
 		return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
deleted file mode 100644
index 6dd40ef..0000000
--- a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
+++ /dev/null
@@ -1 +0,0 @@
-some-invalid-string

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
new file mode 100644
index 0000000..6dd40ef
--- /dev/null
+++ b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
@@ -0,0 +1 @@
+some-invalid-string

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
deleted file mode 100644
index d2ce743..0000000
--- a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
+++ /dev/null
@@ -1 +0,0 @@
-22.33.44.55:6655

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
new file mode 100644
index 0000000..e2442b7
--- /dev/null
+++ b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
@@ -0,0 +1,3 @@
+#Generated YARN properties file
+#Tue Jul 29 11:40:48 CEST 2014
+jobManager=22.33.44.55\:6655

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/analyze.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/analyze.html b/flink-runtime/resources/web-docs-infoserver/analyze.html
index cf1b09e..c7d5fe4 100755
--- a/flink-runtime/resources/web-docs-infoserver/analyze.html
+++ b/flink-runtime/resources/web-docs-infoserver/analyze.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -61,7 +61,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/blank-page.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/blank-page.html b/flink-runtime/resources/web-docs-infoserver/blank-page.html
index ef03e42..4cddd14 100755
--- a/flink-runtime/resources/web-docs-infoserver/blank-page.html
+++ b/flink-runtime/resources/web-docs-infoserver/blank-page.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -53,7 +53,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/configuration.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/configuration.html b/flink-runtime/resources/web-docs-infoserver/configuration.html
index 06ea81a..84a4043 100755
--- a/flink-runtime/resources/web-docs-infoserver/configuration.html
+++ b/flink-runtime/resources/web-docs-infoserver/configuration.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/history.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/history.html b/flink-runtime/resources/web-docs-infoserver/history.html
index 3041837..8e94317 100755
--- a/flink-runtime/resources/web-docs-infoserver/history.html
+++ b/flink-runtime/resources/web-docs-infoserver/history.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -78,7 +78,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>History <small>Overview about recent jobs</small></h1>
+            <h1>History <small>Overview over recent jobs</small></h1>
             <ol class="breadcrumb">
               <li><a href="index.html"><i class="icon-dashboard"></i> Dashboard</a></li>
               <li class="active"><i class="fa fa-bar-chart-o"></i> History</li>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/index.html b/flink-runtime/resources/web-docs-infoserver/index.html
index be89945..f9d9e56 100755
--- a/flink-runtime/resources/web-docs-infoserver/index.html
+++ b/flink-runtime/resources/web-docs-infoserver/index.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -67,7 +67,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -91,7 +91,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>Dashboard Flink<small id="version"></small></h1>
+            <h1>Apache Flink Dashboard<small id="version"></small></h1>
             <ol class="breadcrumb">
               <li class="active"><i class="fa fa-dashboard"></i> Dashboard</li>
             </ol>
@@ -99,7 +99,7 @@
         </div><!-- /.row -->
 
         <div class="row">
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-info">
               <div class="panel-heading">
                 <div class="row">
@@ -114,7 +114,22 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+           <div class="col-lg-2">
+            <div class="panel panel-primary">
+              <div class="panel-heading">
+                <div class="row">
+                  <div class="col-xs-6">
+                    <i class="fa fa-list-ol fa-5x"></i>
+                  </div>
+                  <div class="col-xs-6 text-right">
+                    <p class="announcement-heading"><span id="stat-slots" class="stats"></span></p>
+                    <p class="announcement-text">Processing Slots</p>
+                  </div>
+                </div>
+              </div>
+            </div>
+          </div>
+          <div class="col-lg-2">
             <div class="panel panel-success">
               <div class="panel-heading">
                 <div class="row">
@@ -129,7 +144,7 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-warning">
               <div class="panel-heading">
                 <div class="row">
@@ -144,7 +159,7 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-danger">
               <div class="panel-heading">
                 <div class="row">
@@ -159,6 +174,9 @@
               </div>
             </div>
           </div>
+          <div class="col-lg-2">
+            <!-- empty -->
+          </div>
         </div><!-- /.row -->
 
         <div class="row">

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js b/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
index 68d178b..5daabe8 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
@@ -86,6 +86,7 @@ function poll(jobId) {
 	$.ajax({ url : "jobsInfo?get=taskmanagers", cache: false, type : "GET",
 	    success : function(json) {
 		$("#stat-taskmanagers").html(json.taskmanagers);
+		$("#stat-slots").html(json.slots);
 	    }, dataType : "json",
 	});
 })();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/taskmanagers.html b/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
index ce7f7f0..0fd9f50 100755
--- a/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
+++ b/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -78,7 +78,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>Task Managers <small>Overview about connected Task Managers</small></h1>
+            <h1>Task Managers <small>Overview over connected Task Managers</small></h1>
             <ol class="breadcrumb">
               <li><a href="index.html"><i class="icon-dashboard"></i> Dashboard</a></li>
               <li class="active"><i class="icon-file-alt"></i> Task Managers</li>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
index 8f537de..346b79a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
@@ -497,7 +497,6 @@ public class Client {
 				return;
 			}
 			touch();
-
 			try {
 				int id = in.readInt(); // try to read an id
 
@@ -521,8 +520,12 @@ public class Client {
 							LOG.error(e);
 						} catch (IllegalAccessException e) {
 							LOG.error(e);
+						} 
+						try {
+							value.read(new InputViewDataInputStreamWrapper(in)); // read value
+						} catch(Throwable e) {
+							LOG.error("Exception while receiving an RPC call", e);
 						}
-						value.read(new InputViewDataInputStreamWrapper(in)); // read value
 					}
 					call.setValue(value);
 				} else if (state == Status.ERROR.state) {
@@ -532,6 +535,9 @@ public class Client {
 					markClosed(new RemoteException(StringRecord.readString(in), StringRecord.readString(in)));
 				}
 			} catch (IOException e) {
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("Closing RPC connection due to exception", e);
+				}
 				markClosed(e);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
index ea2fa1d..efeeadc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
@@ -420,9 +420,7 @@ public class RPC {
 				method.setAccessible(true);
 
 				final Object value = method.invoke((Object) instance, (Object[]) call.getParameters());
-
 				return (IOReadableWritable) value;
-
 			} catch (InvocationTargetException e) {
 				
 				final Throwable target = e.getTargetException();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
index 115722f..a8c970c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
@@ -956,7 +956,6 @@ public abstract class Server {
 					CurCall.set(call);
 
 					value = call(call.connection.protocol, call.param, call.timestamp);
-
 					CurCall.set(null);
 
 					setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 82f663c..71957ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -250,7 +250,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	}
 
 	public void shutdown() {
-
+		LOG.debug("JobManager shutdown requested");
 		if (!this.isShutdownInProgress.compareAndSet(false, true)) {
 			return;
 		}
@@ -289,6 +289,14 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		if (this.scheduler != null) {
 			this.scheduler.shutdown();
 		}
+		
+		if(server != null) {
+			try {
+				server.stop();
+			} catch (Exception e) {
+				LOG.error("Error while shutting down the JobManager's webserver", e);
+			}
+		}
 
 		this.isShutDown = true;
 		LOG.debug("Shutdown of job manager completed");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 3ffe5af..2f738a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getAvailableSlots()+"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 7856652..933e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -191,4 +191,11 @@ public class WebInfoServer {
 		server.start();
 	}
 
+	/**
+	 * Stop the webserver
+	 */
+	public void stop() throws Exception {
+		server.stop();
+	}
+
 }


Mime
View raw message