flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/2] flink git commit: [FLINK-4152] Allow re-registration of TMs at resource manager
Date Tue, 26 Jul 2016 14:42:41 GMT
[FLINK-4152] Allow re-registration of TMs at resource manager

- Add YarnFlinkResourceManager test to reaccept task manager registrations from a re-elected job manager

- Remove unnecessary sync logic between JobManager and ResourceManager

- Avoid duplicate reigstration attempts in case of a refused registration

- Add test case to check that not an excessive amount of RegisterTaskManager messages are sent

- Remove containersLaunched from YarnFlinkResourceManager and instead not clearing registeredWorkers when JobManager loses leadership

- Let YarnFlinkResourceManagerTest extend TestLogger

- Harden YarnFlinkResourceManager.getContainersFromPreviousAttempts

- Add FatalErrorOccurred message handler to FlinkResourceManager;
  Increase timeout for YarnFlinkResourceManagerTest;
  Add additional constructor to TestingYarnFlinkResourceManager for tests

- Rename registeredWorkers field into startedWorkers
Additionally, the RegisterResource message is renamed into NotifyResourceStarted which
tells the RM that a resource has been started. This reflects the current semantics of
the startedWorkers map in the resource manager.

- Fix concurrency issues in TestingLeaderRetrievalService

This closes #2257


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2648bc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2648bc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2648bc1a

Branch: refs/heads/master
Commit: 2648bc1a5a5faed8c2061bcab40a8949fd02751c
Parents: c6715b7
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Jul 15 10:51:59 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Jul 26 16:39:22 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  34 ++-
 .../clusterframework/FlinkResourceManager.java  | 119 ++++----
 .../messages/NotifyResourceStarted.java         |  47 +++
 .../messages/RegisterResource.java              |  55 ----
 .../messages/RegisterResourceFailed.java        |  68 -----
 .../messages/RegisterResourceSuccessful.java    |  58 ----
 .../standalone/StandaloneResourceManager.java   |   4 +-
 .../testutils/TestingResourceManager.java       |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  51 +---
 .../runtime/messages/RegistrationMessages.scala |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 202 +++++++++----
 .../taskmanager/TaskManagerConfiguration.scala  |  27 +-
 .../testingUtils/TestingJobManagerLike.scala    |   5 +-
 .../TestingLeaderRetrievalService.java          |   9 +-
 .../resourcemanager/ResourceManagerTest.java    |  66 +---
 .../TaskManagerRegistrationTest.java            | 107 ++++++-
 flink-yarn-tests/pom.xml                        |   8 +
 .../yarn/TestingYarnClusterDescriptor.java      |   5 +
 .../yarn/TestingYarnFlinkResourceManager.java   |  56 ----
 flink-yarn/pom.xml                              |  31 +-
 .../flink/yarn/RegisteredYarnWorkerNode.java    |   1 -
 .../flink/yarn/YarnFlinkResourceManager.java    | 113 +++++--
 .../YarnResourceManagerCallbackHandler.java     |  33 +-
 .../yarn/TestingYarnFlinkResourceManager.java   | 111 +++++++
 .../yarn/YarnFlinkResourceManagerTest.java      | 298 +++++++++++++++++++
 .../messages/NotifyWhenResourcesRegistered.java |  32 ++
 .../RequestNumberOfRegisteredResources.java     |  25 ++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-yarn/src/test/resources/logback-test.xml  |  34 +++
 29 files changed, 1127 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index e40bed3..028732a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -243,11 +243,28 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs";
 
 	/**
-	 *
+	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
+	 * exceeded without a successful registration, then the TaskManager terminates.
 	 */
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
 
 	/**
+	 * The initial registration pause between two consecutive registration attempts. The pause
+	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
+	 */
+	public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause";
+
+	/**
+	 * The maximum registration pause between two consecutive registration attempts.
+	 */
+	public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause";
+
+	/**
+	 * The pause after a registration has been refused by the job manager before retrying to connect.
+	 */
+	public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
+
+	/**
 	 * Time interval between two successive task cancellation attempts in milliseconds.
 	 */
 	@PublicEvolving
@@ -788,6 +805,21 @@ public final class ConfigConstants {
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 
 	/**
+	 * The default task manager's initial registration pause.
+	 */
+	public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
+
+	/**
+	 * The default task manager's maximum registration pause.
+	 */
+	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s";
+
+	/**
+	 * The default task manager's refused registration pause.
+	 */
+	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
+
+	/**
 	 * The default setting for TaskManager memory eager allocation of managed memory
 	 */
 	public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index d28d4aa..95be084 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -34,10 +34,8 @@ import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContai
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
 import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
@@ -51,10 +49,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import scala.concurrent.Future;
@@ -89,7 +86,7 @@ import static java.util.Objects.requireNonNull;
  *     <li>At some point, the TaskManager processes will have started and send a registration
  *         message to the JobManager. The JobManager will perform
  *         a lookup with the ResourceManager to check if it really started this TaskManager.
- *         The method {@link #workerRegistered(ResourceID)} will be called
+ *         The method {@link #workerStarted(ResourceID)} will be called
  *         to inform about a registered worker.</li>
  * </ol>
  *
@@ -113,8 +110,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	/** The service to find the right leader JobManager (to support high availability) */
 	private final LeaderRetrievalService leaderRetriever;
 
-	/** The currently registered resources */
-	private final Map<ResourceID, WorkerType> registeredWorkers;
+	/** Map which contains the workers from which we know that they have been successfully started
+	 * in a container. This notification is sent by the JM when a TM tries to register at it. */
+	private final Map<ResourceID, WorkerType> startedWorkers;
 
 	/** List of listeners for info messages */
 	private final Set<ActorRef> infoMessageListeners;
@@ -141,7 +139,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			LeaderRetrievalService leaderRetriever) {
 		this.config = requireNonNull(flinkConfig);
 		this.leaderRetriever = requireNonNull(leaderRetriever);
-		this.registeredWorkers = new HashMap<>();
+		this.startedWorkers = new HashMap<>();
 
 		FiniteDuration lt;
 		try {
@@ -230,9 +228,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 
 			// --- lookup of registered resources
 
-			else if (message instanceof RegisterResource) {
-				RegisterResource msg = (RegisterResource) message;
-				handleRegisterResource(sender(), msg.getTaskManager(), msg.getRegisterMessage());
+			else if (message instanceof NotifyResourceStarted) {
+				NotifyResourceStarted msg = (NotifyResourceStarted) message;
+				handleResourceStarted(sender(), msg.getResourceID());
 			}
 
 			// --- messages about JobManager leader status and registration
@@ -273,6 +271,11 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 				infoMessageListeners.remove(sender());
 			}
 
+			else if (message instanceof FatalErrorOccurred) {
+				FatalErrorOccurred fatalErrorOccurred = (FatalErrorOccurred) message;
+				fatalError(fatalErrorOccurred.message(), fatalErrorOccurred.error());
+			}
+
 			// --- unknown messages
 
 			else {
@@ -307,73 +310,68 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	}
 
 	/**
-	 * Gets the number of currently registered TaskManagers.
+	 * Gets the number of currently started TaskManagers.
 	 *
-	 * @return The number of currently registered TaskManagers.
+	 * @return The number of currently started TaskManagers.
 	 */
-	public int getNumberOfRegisteredTaskManagers() {
-		return registeredWorkers.size();
+	public int getNumberOfStartedTaskManagers() {
+		return startedWorkers.size();
 	}
 
 	/**
 	 * Gets the currently registered resources.
 	 * @return
 	 */
-	public Collection<WorkerType> getRegisteredTaskManagers() {
-		return registeredWorkers.values();
+	public Collection<WorkerType> getStartedTaskManagers() {
+		return startedWorkers.values();
 	}
 
 	/**
-	 * Gets the registered worker for a given resource ID, if one is available.
+	 * Gets the started worker for a given resource ID, if one is available.
 	 *
 	 * @param resourceId The resource ID for the worker.
 	 * @return True if already registered, otherwise false
 	 */
-	public boolean isRegistered(ResourceID resourceId) {
-		return registeredWorkers.containsKey(resourceId);
+	public boolean isStarted(ResourceID resourceId) {
+		return startedWorkers.containsKey(resourceId);
 	}
 
 	/**
-	 * Gets an iterable for all currently registered TaskManagers.
+	 * Gets an iterable for all currently started TaskManagers.
 	 *
-	 * @return All currently registered TaskManagers.
+	 * @return All currently started TaskManagers.
 	 */
-	public Collection<WorkerType> allRegisteredWorkers() {
-		return registeredWorkers.values();
+	public Collection<WorkerType> allStartedWorkers() {
+		return startedWorkers.values();
 	}
 
 	/**
-	 * Register a resource on which a TaskManager has been started
+	 * Tells the ResourceManager that a TaskManager had been started in a container with the given
+	 * resource id.
+	 *
 	 * @param jobManager The sender (JobManager) of the message
-	 * @param taskManager The task manager who wants to register
-	 * @param msg The task manager's registration message
+	 * @param resourceID The resource id of the started TaskManager
 	 */
-	private void handleRegisterResource(ActorRef jobManager, ActorRef taskManager,
-				RegistrationMessages.RegisterTaskManager msg) {
-
-		ResourceID resourceID = msg.resourceId();
-		try {
-			Preconditions.checkNotNull(resourceID);
+	private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {
+		if (resourceID != null) {
 			// check if resourceID is already registered (TaskManager may send duplicate register messages)
-			WorkerType oldWorker = registeredWorkers.get(resourceID);
+			WorkerType oldWorker = startedWorkers.get(resourceID);
 			if (oldWorker != null) {
-				LOG.debug("TaskManager {} had been registered before.", resourceID);
+				LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);
 			} else {
-				WorkerType newWorker = workerRegistered(resourceID);
-				registeredWorkers.put(resourceID, newWorker);
-				LOG.info("TaskManager {} has registered.", resourceID);
-			}
-			jobManager.tell(decorateMessage(
-				new RegisterResourceSuccessful(taskManager, msg)),
-				self());
-		} catch (Exception e) {
-			LOG.warn("TaskManager resource registration failed for {}", resourceID, e);
+				WorkerType newWorker = workerStarted(resourceID);
 
-			// tell the JobManager about the failure
-			String eStr = ExceptionUtils.stringifyException(e);
-			sender().tell(decorateMessage(
-				new RegisterResourceFailed(taskManager, resourceID, eStr)), self());
+				if (newWorker != null) {
+					startedWorkers.put(resourceID, newWorker);
+					LOG.info("TaskManager {} has started.", resourceID);
+				} else {
+					LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);
+				}
+			}
 		}
+
+		// Acknowledge the resource registration
+		jobManager.tell(decorateMessage(Acknowledge.get()), self());
 	}
 
 	/**
@@ -384,9 +382,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	 */
 	private void removeRegisteredResource(ResourceID resourceId) {
 
-		WorkerType worker = registeredWorkers.remove(resourceId);
+		WorkerType worker = startedWorkers.remove(resourceId);
 		if (worker != null) {
-			releaseRegisteredWorker(worker);
+			releaseStartedWorker(worker);
 		} else {
 			LOG.warn("Resource {} could not be released", resourceId);
 		}
@@ -463,8 +461,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	}
 
 	/**
-	 * This method disassociates from the current leader JobManager. All currently registered
-	 * TaskManagers are put under "awaiting registration".
+	 * This method disassociates from the current leader JobManager.
 	 */
 	private void jobManagerLostLeadership() {
 		if (jobManager != null) {
@@ -474,8 +471,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			leaderSessionID = null;
 
 			infoMessageListeners.clear();
-
-			registeredWorkers.clear();
 		}
 	}
 
@@ -510,7 +505,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 					// put the consolidated TaskManagers into our bookkeeping
 					for (WorkerType worker : consolidated) {
 						ResourceID resourceID = worker.getResourceID();
-						registeredWorkers.put(resourceID, worker);
+						startedWorkers.put(resourceID, worker);
 						toHandle.remove(resourceID);
 					}
 				}
@@ -568,7 +563,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			"Number of pending workers pending registration should never be below 0.");
 
 		// see how many workers we want, and whether we have enough
-		int allAvailableAndPending = registeredWorkers.size() +
+		int allAvailableAndPending = startedWorkers.size() +
 			numWorkersPending + numWorkersPendingRegistration;
 
 		int missing = designatedPoolSize - allAvailableAndPending;
@@ -619,7 +614,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	 * @param message An informational message that explains why the worker failed.
 	 */
 	public void notifyWorkerFailed(ResourceID resourceID, String message) {
-		WorkerType worker = registeredWorkers.remove(resourceID);
+		WorkerType worker = startedWorkers.remove(resourceID);
 		if (worker != null) {
 			jobManager.tell(
 				decorateMessage(
@@ -676,16 +671,16 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 	protected abstract void releasePendingWorker(ResourceID resourceID);
 
 	/**
-	 * Trigger a release of a registered worker.
+	 * Trigger a release of a started worker.
 	 * @param resourceID The worker resource id
 	 */
-	protected abstract void releaseRegisteredWorker(WorkerType resourceID);
+	protected abstract void releaseStartedWorker(WorkerType resourceID);
 
 	/**
-	 * Callback when a worker was registered.
+	 * Callback when a worker was started.
 	 * @param resourceID The worker resource id
 	 */
-	protected abstract WorkerType workerRegistered(ResourceID resourceID) throws Exception;
+	protected abstract WorkerType workerStarted(ResourceID resourceID);
 
 	/**
 	 * This method is called when the resource manager starts after a failure and reconnects to

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
new file mode 100644
index 0000000..1427ba8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * Notifies the ResourceManager that a TaskManager has been started in a container with the given
+ * resource id.
+ */
+public class NotifyResourceStarted implements RequiresLeaderSessionID, java.io.Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private final ResourceID resourceID;
+
+	public NotifyResourceStarted(ResourceID resourceID) {
+		this.resourceID = resourceID;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	@Override
+	public String toString() {
+		return "NotifyResourceStarted{" +
+			", resourceID=" + resourceID +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
deleted file mode 100644
index bad51f0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Triggers a lookup at the ResourceManager to check if the resource for a TaskManager is registered.
- */
-public class RegisterResource implements RequiresLeaderSessionID, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final ActorRef taskManager;
-	private final RegistrationMessages.RegisterTaskManager registerMessage;
-
-
-	public RegisterResource(ActorRef taskManager, RegistrationMessages.RegisterTaskManager registerMessage) {
-		this.taskManager = taskManager;
-		this.registerMessage = registerMessage;
-	}
-
-	public ActorRef getTaskManager() {
-		return taskManager;
-	}
-
-	public RegistrationMessages.RegisterTaskManager getRegisterMessage() {
-		return registerMessage;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisterResource{" +
-			"taskManager=" + taskManager +
-			", registerMessage=" + registerMessage +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
deleted file mode 100644
index a19c0ab..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Answer to RegisterResource to indicate that the requested resource is unknown.
- * Sent by the ResourceManager to the JobManager.
- */
-public class RegisterResourceFailed implements RequiresLeaderSessionID, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/** Task Manager which tried to register */
-	private final ActorRef taskManager;
-
-	/** The id of the task manager resource */
-	private final ResourceID resourceID;
-
-	/** Error message */
-	private final String message;
-
-	public RegisterResourceFailed(ActorRef taskManager, ResourceID resourceId, String message) {
-		this.taskManager = taskManager;
-		this.resourceID = resourceId;
-		this.message = message;
-	}
-
-
-	public String getMessage() {
-		return message;
-	}
-
-	public ActorRef getTaskManager() {
-		return taskManager;
-	}
-
-	public ResourceID getResourceID() {
-		return resourceID;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisterResourceFailed{" +
-			"taskManager=" + taskManager +
-			", resourceID=" + resourceID +
-			", message='" + message + '\'' +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
deleted file mode 100644
index c29d28d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-
-/**
- * Answer to RegisterResource to indicate that the requested resource is known.
- * Sent by the ResourceManager to the JobManager.
- */
-public class RegisterResourceSuccessful implements RequiresLeaderSessionID, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final ActorRef taskManager;
-	private final RegistrationMessages.RegisterTaskManager registrationMessage;
-
-	public RegisterResourceSuccessful(ActorRef taskManager,
-			RegistrationMessages.RegisterTaskManager registrationMessage) {
-		this.taskManager = taskManager;
-		this.registrationMessage = registrationMessage;
-	}
-
-
-	public ActorRef getTaskManager() {
-		return taskManager;
-	}
-
-	public RegistrationMessages.RegisterTaskManager getRegistrationMessage() {
-		return registrationMessage;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisterResourceSuccessful{" +
-			"taskManager=" + taskManager +
-			", registrationMessage=" + registrationMessage +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
index 4626461..89a602e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
@@ -83,13 +83,13 @@ public class StandaloneResourceManager extends FlinkResourceManager<ResourceID>
 	}
 
 	@Override
-	protected ResourceID workerRegistered(ResourceID resourceID) {
+	protected ResourceID workerStarted(ResourceID resourceID) {
 		// we accept everything
 		return resourceID;
 	}
 
 	@Override
-	protected void releaseRegisteredWorker(ResourceID resourceID) {
+	protected void releaseStartedWorker(ResourceID resourceID) {
 		// cannot release any workers, they simply stay
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
index 2422925..495cacd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -58,7 +58,7 @@ public class TestingResourceManager extends StandaloneResourceManager {
 	protected void handleMessage(Object message) {
 
 		if (message instanceof GetRegisteredResources) {
-			sender().tell(new GetRegisteredResourcesReply(getRegisteredTaskManagers()), self());
+			sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
 		} else if (message instanceof FailResource) {
 			ResourceID resourceID = ((FailResource) message).resourceID;
 			notifyWorkerFailed(resourceID, "Failed for test case.");

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0026bef..f14a37f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -400,36 +400,23 @@ class JobManager(
 
       currentResourceManager match {
         case Some(rm) =>
-          val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout)
-          future.onComplete {
-            case scala.util.Success(response) =>
-              // the resource manager is available and answered
-              self ! response
-            case scala.util.Failure(t) =>
+          val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout)
+          future.onFailure {
+            case t: Throwable =>
               t match {
                 case _: TimeoutException =>
                   log.info("Attempt to register resource at ResourceManager timed out. Retrying")
                 case _ =>
                   log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
               }
-              // slow or unreachable resource manager, register anyway and let the rm reconnect
-              self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
               self ! decorateMessage(new ReconnectResourceManager(rm))
           }(context.dispatcher)
 
         case None =>
           log.info("Task Manager Registration but not connected to ResourceManager")
-          // ResourceManager not yet available
-          // sending task manager information later upon ResourceManager registration
-          self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
       }
 
-    case msg: RegisterResourceSuccessful =>
-
-      val originalMsg = msg.getRegistrationMessage
-      val taskManager = msg.getTaskManager
-
-      // ResourceManager knows about the resource, now let's try to register TaskManager
+      // ResourceManager is told about the resource, now let's try to register TaskManager
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
 
@@ -441,10 +428,10 @@ class JobManager(
         try {
           val instanceID = instanceManager.registerTaskManager(
             taskManager,
-            originalMsg.resourceId,
-            originalMsg.connectionInfo,
-            originalMsg.resources,
-            originalMsg.numberOfSlots,
+            resourceId,
+            connectionInfo,
+            hardwareInformation,
+            numberOfSlots,
             leaderSessionID.orNull)
 
           taskManager ! decorateMessage(
@@ -463,24 +450,18 @@ class JobManager(
         }
       }
 
-    case msg: RegisterResourceFailed =>
-
-      val taskManager = msg.getTaskManager
-      val resourceId = msg.getResourceID
-      log.warn(s"TaskManager's resource id $resourceId failed to register at ResourceManager. " +
-        s"Refusing registration because of\n${msg.getMessage}.")
-
-      taskManager ! decorateMessage(
-        RefuseRegistration(new IllegalStateException(
-            s"Resource $resourceId not registered with resource manager.")))
-
     case msg: ResourceRemoved =>
       // we're being informed by the resource manager that a resource has become unavailable
       val resourceID = msg.resourceId()
       log.debug(s"Resource has been removed: $resourceID")
-      val instance = instanceManager.getRegisteredInstance(resourceID)
-      // trigger removal of task manager
-      handleTaskManagerTerminated(instance.getActorGateway.actor())
+
+      Option(instanceManager.getRegisteredInstance(resourceID)) match {
+        case Some(instance) =>
+          // trigger removal of task manager
+          handleTaskManagerTerminated(instance.getActorGateway.actor())
+        case None =>
+          log.debug(s"Resource $resourceID has not been registered at job manager.")
+      }
 
     case RequestNumberRegisteredTaskManager =>
       sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers)

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index b48bcf9..d362164 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
 import akka.actor.ActorRef
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
+import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
@@ -42,12 +44,14 @@ object RegistrationMessages {
    * @param timeout The timeout for the message. The next retry will double this timeout.
    * @param deadline Optional deadline until when the registration must be completed.
    * @param attempt The attempt number, for logging.
+   * @param registrationRun UUID of the current registration run to filter out outdated runs
    */
   case class TriggerTaskManagerRegistration(
       jobManagerURL: String,
       timeout: FiniteDuration,
       deadline: Option[Deadline],
-      attempt: Int)
+      attempt: Int,
+      registrationRun: UUID)
     extends RegistrationMessage
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index dcf1e38..a7dd789 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -189,6 +189,10 @@ class TaskManager(
        connectionInfo.getHostname(),
        new UnmodifiableConfiguration(config.configuration),
        config.tmpDirPaths)
+
+  private var scheduledTaskManagerRegistration: Option[Cancellable] = None
+  private var currentRegistrationRun: UUID = UUID.randomUUID()
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -563,54 +567,60 @@ class TaskManager(
         jobManagerURL,
         timeout,
         deadline,
-        attempt) =>
+        attempt,
+        registrationRun) =>
+
+        if (registrationRun.equals(this.currentRegistrationRun)) {
+          if (isConnected) {
+            // this may be the case, if we queue another attempt and
+            // in the meantime, the registration is acknowledged
+            log.debug(
+              "TaskManager was triggered to register at JobManager, but is already registered")
+          } else if (deadline.exists(_.isOverdue())) {
+            // we failed to register in time. that means we should quit
+            log.error("Failed to register at the JobManager withing the defined maximum " +
+                        "connect time. Shutting down ...")
+
+            // terminate ourselves (hasta la vista)
+            self ! decorateMessage(PoisonPill)
+          } else {
+            if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
+              throw new Exception("Invalid internal state: Trying to register at JobManager " +
+                                    s"$jobManagerURL even though the current JobManagerAkkaURL " +
+                                    s"is set to ${jobManagerAkkaURL.getOrElse("")}")
+            }
 
-        if (isConnected) {
-          // this may be the case, if we queue another attempt and
-          // in the meantime, the registration is acknowledged
-          log.debug(
-            "TaskManager was triggered to register at JobManager, but is already registered")
-        } else if (deadline.exists(_.isOverdue())) {
-          // we failed to register in time. that means we should quit
-          log.error("Failed to register at the JobManager withing the defined maximum " +
-            "connect time. Shutting down ...")
-
-          // terminate ourselves (hasta la vista)
-          self ! decorateMessage(PoisonPill)
-        } else {
-          if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
-            throw new Exception("Invalid internal state: Trying to register at JobManager " +
-              s"$jobManagerURL even though the current JobManagerAkkaURL is set to " +
-              s"${jobManagerAkkaURL.getOrElse("")}")
-          }
+            log.info(s"Trying to register at JobManager $jobManagerURL " +
+                       s"(attempt $attempt, timeout: $timeout)")
+
+            val jobManager = context.actorSelection(jobManagerURL)
 
-          log.info(s"Trying to register at JobManager $jobManagerURL " +
-            s"(attempt $attempt, timeout: $timeout)")
-
-          val jobManager = context.actorSelection(jobManagerURL)
-
-          jobManager ! decorateMessage(
-            RegisterTaskManager(
-              resourceID,
-              connectionInfo,
-              resources,
-              numberOfSlots)
-          )
-
-          // the next timeout computes via exponential backoff with cap
-          val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
-
-          // schedule (with our timeout s delay) a check triggers a new registration
-          // attempt, if we are not registered by then
-          context.system.scheduler.scheduleOnce(
-            timeout,
-            self,
-            decorateMessage(TriggerTaskManagerRegistration(
-              jobManagerURL,
-              nextTimeout,
-              deadline,
-              attempt + 1)
-            ))(context.dispatcher)
+            jobManager ! decorateMessage(
+              RegisterTaskManager(
+                resourceID,
+                connectionInfo,
+                resources,
+                numberOfSlots)
+            )
+
+            // the next timeout computes via exponential backoff with cap
+            val nextTimeout = (timeout * 2).min(config.maxRegistrationPause)
+
+            // schedule (with our timeout s delay) a check triggers a new registration
+            // attempt, if we are not registered by then
+            scheduledTaskManagerRegistration = Option(context.system.scheduler.scheduleOnce(
+              timeout,
+              self,
+              decorateMessage(TriggerTaskManagerRegistration(
+                jobManagerURL,
+                nextTimeout,
+                deadline,
+                attempt + 1,
+                registrationRun)
+              ))(context.dispatcher))
+          }
+        } else {
+          log.info(s"Discarding registration run with ID $registrationRun")
         }
 
       // successful registration. associate with the JobManager
@@ -668,20 +678,27 @@ class TaskManager(
 
           if(jobManagerAkkaURL.isDefined) {
             // try the registration again after some time
-            val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+            val delay: FiniteDuration = config.refusedRegistrationPause
             val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
               timeout => timeout + delay fromNow
             }
 
-            context.system.scheduler.scheduleOnce(delay) {
-              self ! decorateMessage(
-                TriggerTaskManagerRegistration(
-                  jobManagerAkkaURL.get,
-                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-                  deadline,
-                  1)
-              )
-            }(context.dispatcher)
+            // start a new registration run
+            currentRegistrationRun = UUID.randomUUID()
+
+            scheduledTaskManagerRegistration.foreach(_.cancel())
+
+            scheduledTaskManagerRegistration = Option(
+              context.system.scheduler.scheduleOnce(delay) {
+                self ! decorateMessage(
+                  TriggerTaskManagerRegistration(
+                    jobManagerAkkaURL.get,
+                    config.initialRegistrationPause,
+                    deadline,
+                    1,
+                    currentRegistrationRun)
+                )
+              }(context.dispatcher))
           }
         } else {
           // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
@@ -1359,12 +1376,18 @@ class TaskManager(
       // begin attempts to reconnect
       val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
 
+      // start a new registration run
+      currentRegistrationRun = UUID.randomUUID()
+
+      scheduledTaskManagerRegistration.foreach(_.cancel())
+
       self ! decorateMessage(
         TriggerTaskManagerRegistration(
           jobManagerAkkaURL.get,
-          TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+          config.initialRegistrationPause,
           deadline,
-          1)
+          1,
+          currentRegistrationRun)
       )
     }
   }
@@ -1412,13 +1435,6 @@ object TaskManager {
     * connection attempts */
   val STARTUP_CONNECT_LOG_SUPPRESS = 10000L
 
-  /** The initial time for registration of the TaskManager with the JobManager */
-  val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds
-  /** The maximum time for registration of the TaskManager with the JobManager */
-  val MAX_REGISTRATION_TIMEOUT: FiniteDuration = 30 seconds
-
-  val DELAY_AFTER_REFUSED_REGISTRATION: FiniteDuration = 10 seconds
-
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
 
@@ -2116,13 +2132,67 @@ object TaskManager {
         e)
     }
 
+    val initialRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The initial registration pause must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
+    val maxRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The maximum registration pause must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
+    val refusedRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The refused registration pause must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
     val taskManagerConfig = TaskManagerConfiguration(
       tmpDirs,
       cleanupInterval,
       timeout,
       finiteRegistrationDuration,
       slots,
-      configuration)
+      configuration,
+      initialRegistrationPause,
+      maxRegistrationPause,
+      refusedRegistrationPause)
 
     (taskManagerConfig, networkConfig, connectionInfo, memType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 03e8e63..aab3c5f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.configuration.Configuration
 
 import scala.concurrent.duration.FiniteDuration
@@ -28,4 +30,27 @@ case class TaskManagerConfiguration(
     timeout: FiniteDuration,
     maxRegistrationDuration: Option[FiniteDuration],
     numberOfSlots: Int,
-    configuration: Configuration)
+    configuration: Configuration,
+    initialRegistrationPause: FiniteDuration,
+    maxRegistrationPause: FiniteDuration,
+    refusedRegistrationPause: FiniteDuration) {
+
+  def this(
+      tmpDirPaths: Array[String],
+      cleanupInterval: Long,
+      timeout: FiniteDuration,
+      maxRegistrationDuration: Option[FiniteDuration],
+      numberOfSlots: Int,
+      configuration: Configuration) {
+    this (
+      tmpDirPaths,
+      cleanupInterval,
+      timeout,
+      maxRegistrationDuration,
+      numberOfSlots,
+      configuration,
+      FiniteDuration(500, TimeUnit.MILLISECONDS),
+      FiniteDuration(30, TimeUnit.SECONDS),
+      FiniteDuration(10, TimeUnit.SECONDS))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 2fe830f..9640fcd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,7 +22,6 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -339,10 +338,10 @@ trait TestingJobManagerLike extends FlinkActor {
       }
 
     // TaskManager may be registered on these two messages
-    case msg @ (_: RegisterTaskManager | _: RegisterResourceSuccessful) =>
+    case msg @ (_: RegisterTaskManager) =>
       super.handleMessage(msg)
 
-      // dequeue all senders which wait for instanceManager.getNumberOfRegisteredTaskManagers or
+      // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
       // fewer registered TaskManagers
       while (waitForNumRegisteredTaskManagers.nonEmpty &&
         waitForNumRegisteredTaskManagers.head._1 <=

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index c5eb155..d6bcaaf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -30,10 +30,10 @@ import java.util.UUID;
  */
 public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
-	private final String leaderAddress;
-	private final UUID leaderSessionID;
+	private volatile String leaderAddress;
+	private volatile UUID leaderSessionID;
 
-	private LeaderRetrievalListener listener;
+	private volatile LeaderRetrievalListener listener;
 
 	public TestingLeaderRetrievalService() {
 		this(null, null);
@@ -59,6 +59,9 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 	}
 
 	public void notifyListener(String address, UUID leaderSessionID) {
+		this.leaderAddress = address;
+		this.leaderSessionID = leaderSessionID;
+
 		if (listener != null) {
 			listener.notifyLeaderAddress(address, leaderSessionID);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index d75341f..043c81c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -18,32 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 import scala.Option;
 
 import java.util.ArrayList;
@@ -198,15 +192,10 @@ public class ResourceManagerTest {
 			ResourceID resourceID = ResourceID.generate();
 
 			// Send task manager registration
-			resourceManager.tell(new RegisterResource(
-				ActorRef.noSender(),
-				new RegistrationMessages.RegisterTaskManager(resourceID,
-					Mockito.mock(InstanceConnectionInfo.class),
-					null,
-					1)),
+			resourceManager.tell(new NotifyResourceStarted(resourceID),
 				fakeJobManager);
 
-			expectMsgClass(RegisterResourceSuccessful.class);
+			expectMsgClass(Acknowledge.class);
 
 			// check for number registration of registered resources
 			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -216,15 +205,10 @@ public class ResourceManagerTest {
 			assertEquals(1, reply.resources.size());
 
 			// Send task manager registration again
-			resourceManager.tell(new RegisterResource(
-					ActorRef.noSender(),
-					new RegistrationMessages.RegisterTaskManager(resourceID,
-						Mockito.mock(InstanceConnectionInfo.class),
-						null,
-						1)),
+			resourceManager.tell(new NotifyResourceStarted(resourceID),
 				fakeJobManager);
 
-			expectMsgClass(RegisterResourceSuccessful.class);
+			expectMsgClass(Acknowledge.class);
 
 			// check for number registration of registered resources
 			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -232,17 +216,11 @@ public class ResourceManagerTest {
 
 			assertEquals(1, reply.resources.size());
 
-
 			// Send invalid null resource id to throw an exception during resource registration
-			resourceManager.tell(new RegisterResource(
-					ActorRef.noSender(),
-					new RegistrationMessages.RegisterTaskManager(null,
-						Mockito.mock(InstanceConnectionInfo.class),
-						null,
-						1)),
+			resourceManager.tell(new NotifyResourceStarted(null),
 				fakeJobManager);
 
-			expectMsgClass(RegisterResourceFailed.class);
+			expectMsgClass(Acknowledge.class);
 
 			// check for number registration of registered resources
 			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -275,15 +253,10 @@ public class ResourceManagerTest {
 			resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
 
 			// Send task manager registration
-			resourceManager.tell(new RegisterResource(
-					ActorRef.noSender(),
-					new RegistrationMessages.RegisterTaskManager(resourceID,
-						Mockito.mock(InstanceConnectionInfo.class),
-						null,
-						1)),
+			resourceManager.tell(new NotifyResourceStarted(resourceID),
 				fakeJobManager);
 
-			expectMsgClass(RegisterResourceSuccessful.class);
+			expectMsgClass(Acknowledge.class);
 
 			// check for number registration of registered resources
 			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -329,25 +302,16 @@ public class ResourceManagerTest {
 			ResourceID resourceID2 = ResourceID.generate();
 
 			// Send task manager registration
-			resourceManager.tell(new RegisterResource(
-					ActorRef.noSender(),
-					new RegistrationMessages.RegisterTaskManager(resourceID1,
-						Mockito.mock(InstanceConnectionInfo.class),
-						null,
-						1)),
+			resourceManager.tell(new NotifyResourceStarted(resourceID1),
 				fakeJobManager);
 
+			expectMsgClass(Acknowledge.class);
+
 			// Send task manager registration
-			resourceManager.tell(new RegisterResource(
-					ActorRef.noSender(),
-					new RegistrationMessages.RegisterTaskManager(resourceID2,
-						Mockito.mock(InstanceConnectionInfo.class),
-						null,
-						1)),
+			resourceManager.tell(new NotifyResourceStarted(resourceID2),
 				fakeJobManager);
 
-			expectMsgClass(RegisterResourceSuccessful.class);
-			expectMsgClass(RegisterResourceSuccessful.class);
+			expectMsgClass(Acknowledge.class);
 
 			// check for number registration of registered resources
 			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 685fa9a..e23aba7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -50,6 +50,7 @@ import scala.Option;
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -286,14 +287,18 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
 				final ActorGateway jmGateway = jm;
 
+				FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
+				Configuration tmConfig = new Configuration(config);
+				tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages
 				taskManager = createTaskManager(
-						actorSystem,
-						jmGateway,
-						config,
-						true,
-						false);
+					actorSystem,
+					jmGateway,
+					tmConfig,
+					true,
+					false);
 
 				final ActorGateway taskManagerGateway = taskManager;
 
@@ -312,8 +317,10 @@ public class TaskManagerRegistrationTest extends TestLogger {
 					}
 				};
 
+
+
 				// the TaskManager should wait a bit an retry...
-				FiniteDuration maxDelay = (FiniteDuration) TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
+				FiniteDuration maxDelay = (FiniteDuration) refusedRegistrationPause.$times(3.0);
 				new Within(maxDelay) {
 
 					@Override
@@ -333,6 +340,94 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that the TaskManager does not send an excessive amount of registration messages to
+	 * the job manager if its registration was rejected.
+	 */
+	@Test
+	public void testTaskManagerNoExcessiveRegistrationMessages() throws Exception {
+		new JavaTestKit(actorSystem) {{
+			ActorGateway jm = null;
+			ActorGateway taskManager =null;
+			try {
+				FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+
+				jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
+				final ActorGateway jmGateway = jm;
+
+				long refusedRegistrationPause = 500;
+				long initialRegistrationPause = 100;
+				long maxDelay = 30000;
+
+				Configuration tmConfig = new Configuration(config);
+				tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms");
+				tmConfig.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms");
+
+				// we make the test actor (the test kit) the JobManager to intercept
+				// the messages
+				taskManager = createTaskManager(
+					actorSystem,
+					jmGateway,
+					tmConfig,
+					true,
+					false);
+
+				final ActorGateway taskManagerGateway = taskManager;
+
+				final Deadline deadline = timeout.fromNow();
+
+				try {
+					while (deadline.hasTimeLeft()) {
+						// the TaskManager should try to register
+						expectMsgClass(deadline.timeLeft(), RegisterTaskManager.class);
+
+						// we decline the registration
+						taskManagerGateway.tell(
+							new RefuseRegistration(new Exception("test reason")),
+							jmGateway);
+					}
+				} catch (AssertionError error) {
+					// ignore since it simply means that we have used up all our time
+				}
+
+				RegisterTaskManager[] registerTaskManagerMessages = new ReceiveWhile<RegisterTaskManager>(RegisterTaskManager.class, timeout) {
+					@Override
+					protected RegisterTaskManager match(Object msg) throws Exception {
+						if (msg instanceof RegisterTaskManager) {
+							return (RegisterTaskManager) msg;
+						} else {
+							throw noMatch();
+						}
+					}
+				}.get();
+
+				int maxExponent = (int) Math.floor(Math.log(((double) maxDelay / initialRegistrationPause + 1))/Math.log(2));
+				int exponent = (int) Math.ceil(Math.log(((double) timeout.toMillis() / initialRegistrationPause + 1))/Math.log(2));
+
+				int exp = Math.min(maxExponent, exponent);
+
+				long difference = timeout.toMillis() - (initialRegistrationPause * (1 << exp));
+
+				int numberRegisterTaskManagerMessages = exp;
+
+				if (difference > 0) {
+					numberRegisterTaskManagerMessages += Math.ceil((double) difference / maxDelay);
+				}
+
+				int maxExpectedNumberOfRegisterTaskManagerMessages = numberRegisterTaskManagerMessages * 2;
+
+				assertTrue("The number of RegisterTaskManager messages #"
+					+ registerTaskManagerMessages.length
+					+ " should be less than #"
+					+ maxExpectedNumberOfRegisterTaskManagerMessages,
+					registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages);
+			} finally {
+				stopActor(taskManager);
+				stopActor(jm);
+			}
+		}};
+	}
+
+	/**
 	 * Validate that the TaskManager attempts to re-connect after it lost the connection
 	 * to the JobManager.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index dcb5334..94f9348 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -65,6 +65,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-yarn_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 3ed0dc1..a3337bb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -43,8 +43,13 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor
 		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
 			"jar. Make sure to package the flink-runtime module.");
 
+		File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " +
+			"jar. Make sure to package the flink-yarn module.");
+
 		filesToShip.add(testingJar);
 		filesToShip.add(testingRuntimeJar);
+		filesToShip.add(testingYarnJar);
 
 		addShipFiles(filesToShip);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
deleted file mode 100644
index 5a61b8f..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Flink's testing resource manager for Yarn.
- */
-public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
-
-	public TestingYarnFlinkResourceManager(
-		Configuration flinkConfig,
-		YarnConfiguration yarnConfig,
-		LeaderRetrievalService leaderRetrievalService,
-		String applicationMasterHostName,
-		String webInterfaceURL,
-		ContaineredTaskManagerParameters taskManagerParameters,
-		ContainerLaunchContext taskManagerLaunchContext,
-		int yarnHeartbeatIntervalMillis,
-		int maxFailedContainers,
-		int numInitialTaskManagers) {
-
-		super(
-			flinkConfig,
-			yarnConfig,
-			leaderRetrievalService,
-			applicationMasterHostName,
-			webInterfaceURL,
-			taskManagerParameters,
-			taskManagerLaunchContext,
-			yarnHeartbeatIntervalMillis,
-			maxFailedContainers,
-			numInitialTaskManagers);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 12db578..b770c63 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -43,7 +43,7 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients_2.10</artifactId>
@@ -64,7 +64,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-
 		<dependency>
 			<groupId>com.typesafe.akka</groupId>
 			<artifactId>akka-actor_${scala.binary.version}</artifactId>
@@ -85,8 +84,20 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-		
-		
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
@@ -197,6 +208,18 @@ under the License.
 					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index 974c4df..cb2f40a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -30,7 +30,6 @@ import static java.util.Objects.requireNonNull;
  */
 public class RegisteredYarnWorkerNode implements ResourceIDRetrievable {
 
-
 	/** The container on which the worker runs */
 	private final Container yarnContainer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 883a860..3c85795 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
@@ -122,18 +123,75 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	private RegisterApplicationMasterResponseReflector applicationMasterResponseReflector =
 		new RegisterApplicationMasterResponseReflector(LOG);
 
+	public YarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers) {
+
+		this(
+			flinkConfig,
+			yarnConfig,
+			leaderRetrievalService,
+			applicationMasterHostName,
+			webInterfaceURL,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			yarnHeartbeatIntervalMillis,
+			maxFailedContainers,
+			numInitialTaskManagers,
+			new YarnResourceManagerCallbackHandler());
+	}
 
 	public YarnFlinkResourceManager(
-			Configuration flinkConfig,
-			YarnConfiguration yarnConfig,
-			LeaderRetrievalService leaderRetrievalService,
-			String applicationMasterHostName,
-			String webInterfaceURL,
-			ContaineredTaskManagerParameters taskManagerParameters,
-			ContainerLaunchContext taskManagerLaunchContext,
-			int yarnHeartbeatIntervalMillis,
-			int maxFailedContainers,
-			int numInitialTaskManagers) {
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers,
+		YarnResourceManagerCallbackHandler callbackHandler) {
+
+		this(
+			flinkConfig,
+			yarnConfig,
+			leaderRetrievalService,
+			applicationMasterHostName,
+			webInterfaceURL,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			yarnHeartbeatIntervalMillis,
+			maxFailedContainers,
+			numInitialTaskManagers,
+			callbackHandler,
+			AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, callbackHandler),
+			NMClient.createNMClient());
+	}
+
+	public YarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers,
+		YarnResourceManagerCallbackHandler callbackHandler,
+		AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient,
+		NMClient nodeManagerClient) {
 
 		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
 
@@ -145,6 +203,10 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis;
 		this.maxFailedContainers = maxFailedContainers;
 
+		this.resourceManagerCallbackHandler = Preconditions.checkNotNull(callbackHandler);
+		this.resourceManagerClient = Preconditions.checkNotNull(resourceManagerClient);
+		this.nodeManagerClient = Preconditions.checkNotNull(nodeManagerClient);
+
 		this.containersInLaunch = new HashMap<>();
 		this.containersBeingReturned = new HashMap<>();
 	}
@@ -178,16 +240,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	protected void initialize() throws Exception {
 		LOG.info("Initializing YARN resource master");
 
-		// create the client to communicate with the ResourceManager
-		resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(self());
+		resourceManagerCallbackHandler.initialize(self());
 
-		resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
-			yarnHeartbeatIntervalMillis, resourceManagerCallbackHandler);
 		resourceManagerClient.init(yarnConfig);
 		resourceManagerClient.start();
 
 		// create the client to communicate with the node managers
-		nodeManagerClient = NMClient.createNMClient();
 		nodeManagerClient.init(yarnConfig);
 		nodeManagerClient.start();
 		nodeManagerClient.cleanupRunningContainersOnStop(true);
@@ -277,7 +335,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			Priority priority = Priority.newInstance(0);
 
 			// Resource requirements for worker containers
-			int taskManagerSlots = Integer.valueOf(System.getenv(YarnConfigKeys.ENV_SLOTS));
+			int taskManagerSlots = taskManagerParameters.numSlots();
 			int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
 			Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
 
@@ -300,7 +358,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	}
 
 	@Override
-	protected void releaseRegisteredWorker(RegisteredYarnWorkerNode worker) {
+	protected void releaseStartedWorker(RegisteredYarnWorkerNode worker) {
 		releaseYarnContainer(worker.yarnContainer());
 	}
 
@@ -323,10 +381,13 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	}
 
 	@Override
-	protected RegisteredYarnWorkerNode workerRegistered(ResourceID resourceID) throws Exception {
+	protected RegisteredYarnWorkerNode workerStarted(ResourceID resourceID) {
 		YarnContainerInLaunch inLaunch = containersInLaunch.remove(resourceID);
 		if (inLaunch == null) {
-			throw new Exception("Cannot register Worker - unknown resource id " + resourceID);
+			// Container was not in state "being launched", this can indicate that the TaskManager
+			// in this container was already registered or that the container was not started
+			// by this resource manager. Simply ignore this resourceID.
+			return null;
 		} else {
 			return new RegisteredYarnWorkerNode(inLaunch.container());
 		}
@@ -345,8 +406,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 				accepted.add(new RegisteredYarnWorkerNode(yci.container()));
 			}
 			else {
-				LOG.info("YARN container consolidation does not recognize TaskManager {}",
-					resourceID);
+				if (isStarted(resourceID)) {
+					LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
+				} else {
+					LOG.info("YARN container consolidation does not recognize TaskManager {}",
+						resourceID);
+				}
 			}
 		}
 		return accepted;
@@ -368,7 +433,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 	private void containersAllocated(List<Container> containers) {
 		final int numRequired = getDesignatedWorkerPoolSize();
-		final int numRegistered = getNumberOfRegisteredTaskManagers();
+		final int numRegistered = getNumberOfStartedTaskManagers();
 
 		for (Container container : containers) {
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
@@ -519,7 +584,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 	private void updateProgress() {
 		final int required = getDesignatedWorkerPoolSize();
-		final int available = getNumberOfRegisteredTaskManagers() + containersInLaunch.size();
+		final int available = getNumberOfStartedTaskManagers() + containersInLaunch.size();
 		final float progress = (required <= 0) ? 1.0f : available / (float) required;
 
 		if (resourceManagerCallbackHandler != null) {
@@ -583,7 +648,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		 * @return A list with containers from previous application attempt.
 		 */
 		private List<Container> getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) {
-			if (method != null) {
+			if (method != null && response != null) {
 				try {
 					@SuppressWarnings("unchecked")
 					List<Container> list = (List<Container>) method.invoke(response);

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
index 1e287c2..2372cbc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
@@ -42,12 +42,19 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb
 
 	/** The progress we report */
 	private float currentProgress;
-	
+
+	public YarnResourceManagerCallbackHandler() {
+		this(null);
+	}
 	
 	public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) {
 		this.yarnFrameworkMaster = yarnFrameworkMaster;
 	}
 
+	public void initialize(ActorRef yarnFrameworkMaster) {
+		this.yarnFrameworkMaster = yarnFrameworkMaster;
+	}
+
 	/**
 	 * Sets the current progress.
 	 * @param progress The current progress fraction.
@@ -65,16 +72,20 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb
 
 	@Override
 	public void onContainersCompleted(List<ContainerStatus> list) {
-		yarnFrameworkMaster.tell(
-			new ContainersComplete(list),
-			ActorRef.noSender());
+		if (yarnFrameworkMaster != null) {
+			yarnFrameworkMaster.tell(
+				new ContainersComplete(list),
+				ActorRef.noSender());
+		}
 	}
 
 	@Override
 	public void onContainersAllocated(List<Container> containers) {
-		yarnFrameworkMaster.tell(
-			new ContainersAllocated(containers),
-			ActorRef.noSender());
+		if (yarnFrameworkMaster != null) {
+			yarnFrameworkMaster.tell(
+				new ContainersAllocated(containers),
+				ActorRef.noSender());
+		}
 	}
 
 	@Override
@@ -89,8 +100,10 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb
 
 	@Override
 	public void onError(Throwable error) {
-		yarnFrameworkMaster.tell(
-			new FatalErrorOccurred("Connection to YARN Resource Manager failed", error),
-			ActorRef.noSender());
+		if (yarnFrameworkMaster != null) {
+			yarnFrameworkMaster.tell(
+				new FatalErrorOccurred("Connection to YARN Resource Manager failed", error),
+				ActorRef.noSender());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
new file mode 100644
index 0000000..f03c604
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
+
+	private final PriorityQueue<Tuple2<Integer, ActorRef>> waitingQueue = new PriorityQueue<>(32, new Comparator<Tuple2<Integer, ActorRef>>() {
+		@Override
+		public int compare(Tuple2<Integer, ActorRef> o1, Tuple2<Integer, ActorRef> o2) {
+			return o1.f0 - o2.f0;
+		}
+	});
+
+	public TestingYarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers) {
+
+		super(flinkConfig,
+			yarnConfig,
+			leaderRetrievalService,
+			applicationMasterHostName,
+			webInterfaceURL,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			yarnHeartbeatIntervalMillis,
+			maxFailedContainers,
+			numInitialTaskManagers);
+	}
+
+	public TestingYarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers,
+		YarnResourceManagerCallbackHandler callbackHandler,
+		AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient,
+		NMClient nodeManagerClient) {
+		super(flinkConfig, yarnConfig, leaderRetrievalService, applicationMasterHostName, webInterfaceURL, taskManagerParameters, taskManagerLaunchContext, yarnHeartbeatIntervalMillis, maxFailedContainers, numInitialTaskManagers, callbackHandler, resourceManagerClient, nodeManagerClient);
+	}
+
+	@Override
+	protected void handleMessage(Object message) {
+		if (message instanceof RequestNumberOfRegisteredResources) {
+			getSender().tell(getNumberOfStartedTaskManagers(), getSelf());
+		} else if (message instanceof NotifyWhenResourcesRegistered) {
+			NotifyWhenResourcesRegistered notifyMessage = (NotifyWhenResourcesRegistered) message;
+
+			if (getNumberOfStartedTaskManagers() >= notifyMessage.getNumberResources()) {
+				getSender().tell(true, getSelf());
+			} else {
+				waitingQueue.offer(Tuple2.of(notifyMessage.getNumberResources(), getSender()));
+			}
+		} else if (message instanceof NotifyResourceStarted) {
+			super.handleMessage(message);
+
+			while (!waitingQueue.isEmpty() && waitingQueue.peek().f0 <= getNumberOfStartedTaskManagers()) {
+				ActorRef receiver = waitingQueue.poll().f1;
+				receiver.tell(true, getSelf());
+			}
+		} else {
+			super.handleMessage(message);
+		}
+	}
+}


Mime
View raw message