flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [4/4] flink git commit: [FLINK-2790] [yarn] [ha] Add high availability support for Yarn
Date Thu, 08 Oct 2015 12:24:00 GMT
[FLINK-2790] [yarn] [ha] Add high availability support for Yarn

This squashes the following commits:
- Refactor JobManager's start actors method to be reusable
- Yarn refactoring to introduce yarn testing functionality
- Add support for testing yarn cluster. Extracted JobManager's and TaskManager's testing messages into stackable traits.
- Implement YarnHighAvailabilityITCase using Akka messages for synchronization.
- Logging statements
- Fix registration at JobManager when the leader address is null
- Fix curator dependency conflict
- Shades Flink's curator dependency in flink-runtime so that it cannot be overriden by external dependencies in the class path. This solves the problem with Hadoop 2.6.0 which adds Curator 2.6.0 to the class path. The curator version of this Hadoop version is not compatible to Flink's Curator version 2.8.0. Furthermore, Flink's Guava version is forced to be included in flink-shaded-curator jar to avoid to many different Guava version in the resulting dist jar.
- Unify two shade executions of flink-runtime into one
- Exclude log4j and slf4j-log4j12 dependency from flink-shaded-curator
- Set default number of application attempts to 1 in standalone case

This closes #1213


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa2bb8f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa2bb8f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa2bb8f1

Branch: refs/heads/master
Commit: fa2bb8f119f747a24bb013a4387ded5c51348ee7
Parents: f332fa5
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Sep 1 16:35:48 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Oct 8 14:23:41 2015 +0200

----------------------------------------------------------------------
 .../flink/client/FlinkYarnSessionCli.java       |   3 +-
 .../flink/configuration/ConfigConstants.java    |   6 +-
 flink-runtime/pom.xml                           |  28 +-
 .../flink/runtime/jobmanager/RecoveryMode.java  |  28 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  51 +-
 .../runtime/LeaderSessionMessageFilter.scala    |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 125 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |   6 +-
 .../taskmanager/TaskManagerConfiguration.scala  |  14 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../LeaderElectionRetrievalTestingCluster.java  |   5 +-
 .../taskmanager/ForwardingActorGateway.java     |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   5 +-
 .../TaskManagerProcessReapingTest.java          |   5 +-
 .../TaskManagerRegistrationTest.java            |   5 +-
 .../jobmanager/JobManagerRegistrationTest.scala |   4 +-
 .../runtime/testingUtils/TestingCluster.scala   |  13 +-
 .../testingUtils/TestingJobManager.scala        | 296 +------
 .../testingUtils/TestingJobManagerLike.scala    | 364 ++++++++
 .../TestingJobManagerMessages.scala             |  11 +-
 .../testingUtils/TestingMemoryArchivist.scala   |   1 -
 .../testingUtils/TestingTaskManager.scala       | 181 +---
 .../testingUtils/TestingTaskManagerLike.scala   | 220 +++++
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 flink-shaded-curator/pom.xml                    |  37 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  38 +-
 .../AbstractProcessFailureRecoveryTest.java     |   8 +-
 .../recovery/ProcessFailureCancelingITCase.java |   8 +-
 flink-yarn-tests/pom.xml                        | 139 +++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  78 ++
 .../flink/yarn/TestingApplicationMaster.java    |  45 +
 .../flink/yarn/TestingFlinkYarnClient.java      |  71 ++
 .../yarn/TestingYarnTaskManagerRunner.java      |  30 +
 .../java/org/apache/flink/yarn/UtilsTest.java   |  52 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  | 164 ++++
 .../flink/yarn/YARNSessionFIFOITCase.java       |   1 -
 .../org/apache/flink/yarn/YarnTestBase.java     |  15 +-
 .../src/main/resources/log4j-test.properties    |   8 +-
 .../flink/yarn/TestingYarnJobManager.scala      |  79 ++
 .../flink/yarn/TestingYarnTaskManager.scala     |  59 ++
 .../org/apache/flink/yarn/FlinkYarnClient.java  | 742 +---------------
 .../apache/flink/yarn/FlinkYarnClientBase.java  | 873 +++++++++++++++++++
 .../org/apache/flink/yarn/FlinkYarnCluster.java |  43 +-
 .../flink/yarn/YarnTaskManagerRunner.java       | 113 +++
 .../yarn/appMaster/YarnTaskManagerRunner.java   | 110 ---
 .../apache/flink/yarn/ApplicationClient.scala   | 216 +++--
 .../apache/flink/yarn/ApplicationMaster.scala   | 294 +------
 .../flink/yarn/ApplicationMasterActor.scala     | 652 --------------
 .../flink/yarn/ApplicationMasterBase.scala      | 257 ++++++
 .../scala/org/apache/flink/yarn/Messages.scala  |  68 --
 .../org/apache/flink/yarn/YarnJobManager.scala  | 791 +++++++++++++++++
 .../org/apache/flink/yarn/YarnMessages.scala    |  94 ++
 .../org/apache/flink/yarn/YarnTaskManager.scala |   2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  78 --
 .../java/org/apache/flink/yarn/UtilsTests.java  |  73 --
 55 files changed, 3925 insertions(+), 2701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 66a4834..5daaf0e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -367,7 +367,6 @@ public class FlinkYarnSessionCli {
 	}
 
 	public int run(String[] args) {
-
 		//
 		//	Command Line Options
 		//
@@ -418,7 +417,7 @@ public class FlinkYarnSessionCli {
 				return 1;
 			}
 			//------------------ Cluster deployed, handle connection details
-			String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" + yarnCluster.getJobManagerAddress().getPort();
+			String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
 			System.out.println("Flink JobManager is now running on " + jobManagerAddress);
 			System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
 			// file that we write into the conf/ dir containing the jobManager address and the dop.

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index cd7fd76..2b4749c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -235,10 +235,10 @@ public final class ConfigConstants {
 	public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";
 
 	/**
-	 * Set the number of retries for failed YARN ApplicationMasters/JobManagers.
-	 * This value is usually limited by YARN.
+	 * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
+	 * availability mode. This value is usually limited by YARN.
 	 *
-	 * By default, its 1.
+	 * By default, it's 1 in the standalone case and 2 in the high availability case.
 	 */
 	public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 11ad1f2..1802709 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -210,7 +210,6 @@ under the License.
 			<version>${curator.version}</version>
 			<scope>test</scope>
 		</dependency>
-
 	</dependencies>
 
 	<build>
@@ -422,6 +421,33 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<include>org.apache.flink:flink-shaded-curator</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>org.apache.curator</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 90a1147..2e75b19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
 /**
  * Recovery mode for Flink's cluster execution. Currently supported modes are:
  *
@@ -29,5 +32,28 @@ package org.apache.flink.runtime.jobmanager;
  */
 public enum RecoveryMode {
 	STANDALONE,
-	ZOOKEEPER
+	ZOOKEEPER;
+
+	/**
+	 * Returns true if the defined recovery mode supports high availability.
+	 *
+	 * @param configuration Configuration which contains the recovery mode
+	 * @return true if high availability is supported by the recovery mode, otherwise false
+	 */
+	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
+		String recoveryMode = configuration.getString(
+			ConfigConstants.RECOVERY_MODE,
+			ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
+
+		RecoveryMode mode = RecoveryMode.valueOf(recoveryMode);
+
+		switch(mode) {
+			case STANDALONE:
+				return false;
+			case ZOOKEEPER:
+				return true;
+			default:
+				return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 5cf5bff..29de512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.util;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -95,9 +97,7 @@ public class LeaderRetrievalUtils {
 
 			Future<ActorGateway> actorGatewayFuture = listener.getActorGatewayFuture();
 
-			ActorGateway gateway = Await.result(actorGatewayFuture, timeout);
-
-			return gateway;
+			return Await.result(actorGatewayFuture, timeout);
 		} catch (Exception e) {
 			throw new LeaderRetrievalException("Could not retrieve the leader gateway", e);
 		} finally {
@@ -131,9 +131,7 @@ public class LeaderRetrievalUtils {
 
 			Future<LeaderConnectionInfo> connectionInfoFuture = listener.getLeaderConnectionInfoFuture();
 
-			LeaderConnectionInfo result = Await.result(connectionInfoFuture, timeout);
-
-			return result;
+			return Await.result(connectionInfoFuture, timeout);
 		} catch (Exception e) {
 			throw new LeaderRetrievalException("Could not retrieve the leader address and leader " +
 					"session ID.", e);
@@ -160,9 +158,7 @@ public class LeaderRetrievalUtils {
 			LOG.info("TaskManager will try to connect for " + timeout +
 					" before falling back to heuristics");
 
-			InetAddress result =  listener.findConnectingAddress(timeout);
-
-			return result;
+			return listener.findConnectingAddress(timeout);
 		} catch (Exception e) {
 			throw new LeaderRetrievalException("Could not find the connecting address by " +
 					"connecting to the current leader.", e);
@@ -183,6 +179,7 @@ public class LeaderRetrievalUtils {
 
 		private final ActorSystem actorSystem;
 		private final FiniteDuration timeout;
+		private final Object lock = new Object();
 
 		private final Promise<ActorGateway> futureActorGateway = new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>();
 
@@ -191,23 +188,37 @@ public class LeaderRetrievalUtils {
 			this.timeout = timeout;
 		}
 
-		@Override
-		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-			if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) {
-				try {
-					ActorRef actorRef = AkkaUtils.getActorRef(leaderAddress, actorSystem, timeout);
-
-					ActorGateway gateway = new AkkaActorGateway(actorRef, leaderSessionID);
-
+		private void completePromise(ActorGateway gateway) {
+			synchronized (lock) {
+				if (!futureActorGateway.isCompleted()) {
 					futureActorGateway.success(gateway);
-
-				} catch(Exception e){
-					futureActorGateway.failure(e);
 				}
 			}
 		}
 
 		@Override
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+			if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) {
+				AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, timeout)
+					.map(new Mapper<ActorRef, ActorGateway>() {
+						public ActorGateway apply(ActorRef ref) {
+							return new AkkaActorGateway(ref, leaderSessionID);
+						}
+					}, actorSystem.dispatcher())
+					.onComplete(new OnComplete<ActorGateway>() {
+						@Override
+						public void onComplete(Throwable failure, ActorGateway success) throws Throwable {
+							if (failure == null) {
+								completePromise(success);
+							} else {
+								LOG.debug("Could not retrieve the leader for address " + leaderAddress + ".", failure);
+							}
+						}
+					}, actorSystem.dispatcher());
+			}
+		}
+
+		@Override
 		public void handleError(Exception exception) {
 			if (!futureActorGateway.isCompleted()) {
 				futureActorGateway.failure(exception);

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
index 72db258..b8aea78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
@@ -49,8 +49,8 @@ trait LeaderSessionMessageFilter extends FlinkActor {
       msg: LeaderSessionMessage)
     : Unit = {
     log.warn(s"Discard message $msg because the expected leader session ID " +
-      s"$expectedLeaderSessionID did not equal the received leader session ID" +
-      s"${msg.leaderSessionID}.")
+      s"$expectedLeaderSessionID did not equal the received leader session ID " +
+      s"${Option(msg.leaderSessionID)}.")
   }
 
   /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9417b38..bbb382e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1184,15 +1184,23 @@ object JobManager {
         LOG.info("Security is enabled. Starting secure JobManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            runJobManager(configuration, executionMode, streamingMode,
-                          listeningHost, listeningPort)
+            runJobManager(
+              configuration,
+              executionMode,
+              streamingMode,
+              listeningHost,
+              listeningPort)
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
-        runJobManager(configuration, executionMode, streamingMode,
-                      listeningHost, listeningPort)
+        runJobManager(
+          configuration,
+          executionMode,
+          streamingMode,
+          listeningHost,
+          listeningPort)
       }
     }
     catch {
@@ -1213,7 +1221,7 @@ object JobManager {
    *
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
-   *                      an additional TaskManager in the same process.
+   *                      additional TaskManager in the same process.
    * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
@@ -1225,6 +1233,43 @@ object JobManager {
       listeningAddress: String,
       listeningPort: Int)
     : Unit = {
+    
+    val (jobManagerSystem, _, _, _) = startActorSystemAndJobManagerActors(
+      configuration,
+      executionMode,
+      streamingMode,
+      listeningAddress,
+      listeningPort,
+      classOf[JobManager],
+      classOf[MemoryArchivist]
+    )
+
+    // block until everything is shut down
+    jobManagerSystem.awaitTermination()
+  }
+
+  /** Starts an ActorSystem, the JobManager and all its components including the WebMonitor.
+    *
+    * @param configuration The configuration object for the JobManager
+    * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
+    *                      additional TaskManager in the same process.
+    * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
+    * @param listeningAddress The hostname where the JobManager should lsiten for messages.
+    * @param listeningPort The port where the JobManager should listen for messages
+    * @param jobManagerClass The class of the JobManager to be started
+    * @param archiveClass The class of the Archivist to be started
+    * @return A tuple containing the started ActorSystem, ActorRefs to the JobManager and the
+    *         Archivist and an Option containing a possibly started WebMonitor
+    */
+  def startActorSystemAndJobManagerActors(
+      configuration: Configuration,
+      executionMode: JobManagerMode,
+      streamingMode: StreamingMode,
+      listeningAddress: String,
+      listeningPort: Int,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist])
+    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
 
     LOG.info("Starting JobManager")
 
@@ -1262,7 +1307,9 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        streamingMode)
+        streamingMode,
+        jobManagerClass,
+        archiveClass)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -1299,7 +1346,10 @@ object JobManager {
           "TaskManager_Process_Reaper")
       }
 
-      if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+      val webMonitor = if (
+        configuration.getInteger(
+          ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+          0) >= 0) {
 
         // TODO: Add support for HA. Webserver has to work in dedicated mode. All transferred
         // information has to be made serializable
@@ -1329,7 +1379,13 @@ object JobManager {
         if(webServer != null) {
           webServer.start()
         }
+
+        Option(webServer)
+      } else {
+        None
       }
+
+      (jobManagerSystem, jobManager, archive, webMonitor)
     }
     catch {
       case t: Throwable => {
@@ -1342,9 +1398,6 @@ object JobManager {
         throw t
       }
     }
-
-    // block until everything is shut down
-    jobManagerSystem.awaitTermination()
   }
 
   /**
@@ -1458,14 +1511,17 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
+   * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
+   *                                    is defined
    * @return The members for a default JobManager.
    */
-  def createJobManagerComponents(configuration: Configuration) :
+  def createJobManagerComponents(
+      configuration: Configuration,
+      leaderElectionServiceOption: Option[LeaderElectionService]) :
     (ExecutionContext,
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
-    Props,
     Int, // execution retries
     Long, // delay between retries
     FiniteDuration, // timeout
@@ -1474,9 +1530,6 @@ object JobManager {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
-    val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
-      ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
-
     val cleanupInterval = configuration.getLong(
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
@@ -1485,6 +1538,9 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
+    val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
+      ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
+
     // configure the delay between execution retries.
     // unless explicitly specifies, this is dependent on the heartbeat timeout
     val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
@@ -1501,8 +1557,6 @@ object JobManager {
             s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
       }
 
-    val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
-
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
 
     var blobServer: BlobServer = null
@@ -1536,13 +1590,15 @@ object JobManager {
       }
     }
 
-    val leaderElectionService = LeaderElectionUtils.createLeaderElectionService(configuration)
+    val leaderElectionService = leaderElectionServiceOption match {
+      case Some(les) => les
+      case None => LeaderElectionUtils.createLeaderElectionService(configuration)
+    }
 
     (executionContext,
       instanceManager,
       scheduler,
       libraryCacheManager,
-      archiveProps,
       executionRetries,
       delayBetweenRetries,
       timeout, 
@@ -1555,13 +1611,19 @@ object JobManager {
    * given actor system.
    *
    * @param configuration The configuration for the JobManager
-   * @param actorSystem Teh actor system running the JobManager
+   * @param actorSystem The actor system running the JobManager
+   * @param streamingMode The execution mode
+   * @param jobManagerClass The class of the JobManager to be started
+   * @param archiveClass The class of the MemoryArchivist to be started
+   *
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      streamingMode: StreamingMode)
+      streamingMode: StreamingMode,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
     startJobManagerActors(
@@ -1569,7 +1631,9 @@ object JobManager {
       actorSystem,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
-      streamingMode)
+      streamingMode,
+      jobManagerClass,
+      archiveClass)
   }
   /**
    * Starts the JobManager and job archiver based on the given configuration, in the
@@ -1582,6 +1646,8 @@ object JobManager {
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param streamingMode The mode to run the system in (streaming vs. batch-only)
+   * @param jobManagerClass The class of the JobManager to be started
+   * @param archiveClass The class of the MemoryArchivist to be started
    * 
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
@@ -1590,19 +1656,24 @@ object JobManager {
       actorSystem: ActorSystem,
       jobMangerActorName: Option[String],
       archiveActorName: Option[String],
-      streamingMode: StreamingMode)
+      streamingMode: StreamingMode,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
     val (executionContext,
       instanceManager,
       scheduler,
       libraryCacheManager,
-      archiveProps,
       executionRetries,
       delayBetweenRetries,
       timeout,
-      _,
-      leaderElectionService) = createJobManagerComponents(configuration)
+      archiveCount,
+      leaderElectionService) = createJobManagerComponents(
+      configuration,
+      None)
+
+    val archiveProps = Props(archiveClass, archiveCount)
 
     // start the archiver with the given name, or without (avoid name conflicts)
     val archive: ActorRef = archiveActorName match {
@@ -1611,7 +1682,7 @@ object JobManager {
     }
 
     val jobManagerProps = Props(
-      classOf[JobManager],
+      jobManagerClass,
       configuration,
       executionContext,
       instanceManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3f8783a..7b1f9e8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 
@@ -84,7 +84,9 @@ class LocalFlinkMiniCluster(
       system,
       Some(jobManagerName),
       Some(archiveName),
-      streamingMode)
+      streamingMode,
+      classOf[JobManager],
+      classOf[MemoryArchivist])
 
     jobManager
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 05ede99..03e8e63 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -22,10 +22,10 @@ import org.apache.flink.configuration.Configuration
 
 import scala.concurrent.duration.FiniteDuration
 
-
-case class TaskManagerConfiguration(tmpDirPaths: Array[String],
-                                    cleanupInterval: Long,
-                                    timeout: FiniteDuration,
-                                    maxRegistrationDuration: Option[FiniteDuration],
-                                    numberOfSlots: Int,
-                                    configuration: Configuration)
+case class TaskManagerConfiguration(
+    tmpDirPaths: Array[String],
+    cleanupInterval: Long,
+    timeout: FiniteDuration,
+    maxRegistrationDuration: Option[FiniteDuration],
+    numberOfSlots: Int,
+    configuration: Configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index edc1f92..61f536c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -77,7 +77,9 @@ public class JobSubmitTest {
 		ActorRef jobManagerActorRef = JobManager.startJobManagerActors(
 				config,
 				jobManagerSystem,
-				StreamingMode.BATCH_ONLY)._1();
+				StreamingMode.BATCH_ONLY,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 
 		try {
 			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index ff73615..5b63107 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import scala.Option;
 
 import java.util.UUID;
 
@@ -77,12 +78,12 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 	}
 
 	@Override
-	public LeaderElectionService createLeaderElectionService(LeaderElectionService originalService) {
+	public Option<LeaderElectionService> createLeaderElectionService() {
 		leaderElectionServices[leaderElectionServiceCounter] = new TestingLeaderElectionService();
 
 		LeaderElectionService result = leaderElectionServices[leaderElectionServiceCounter++];
 
-		return result;
+		return Option.apply(result);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
index e7e105f..18a3c3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
@@ -28,8 +28,9 @@ import java.util.concurrent.BlockingQueue;
  * in a {@link BlockingQueue}.
  */
 public class ForwardingActorGateway extends BaseTestingActorGateway {
+	private static final long serialVersionUID = 7001973884263802603L;
 
-	private final BlockingQueue<Object> queue;
+	private final transient BlockingQueue<Object> queue;
 
 	public ForwardingActorGateway(BlockingQueue<Object> queue) {
 		super(TestingUtils.directExecutionContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 0a7f08d..d30db9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -75,7 +76,9 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
-				StreamingMode.BATCH_ONLY)._1();
+				StreamingMode.BATCH_ONLY,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 
 			// create the components for the TaskManager manually
 			final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index ed03ae7..1c15abf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -90,7 +91,9 @@ public class TaskManagerProcessReapingTest {
 			JobManager.startJobManagerActors(
 				new Configuration(),
 				jmActorSystem,
-				StreamingMode.BATCH_ONLY);
+				StreamingMode.BATCH_ONLY,
+				JobManager.class,
+				MemoryArchivist.class);
 
 			final int taskManagerPort = NetUtils.getAvailablePort();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 2b5b709..130dcc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -596,6 +597,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			actorSystem,
 			NONE_STRING,
 			NONE_STRING,
-			StreamingMode.BATCH_ONLY)._1();
+			StreamingMode.BATCH_ONLY,
+			JobManager.class,
+			MemoryArchivist.class)._1();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index ea691f1..fa3fc8b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -162,7 +162,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       _system,
       None,
       None,
-      StreamingMode.BATCH_ONLY)
+      StreamingMode.BATCH_ONLY,
+      classOf[JobManager],
+      classOf[MemoryArchivist])
     new AkkaActorGateway(jm, null)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 795ad4e..c9ae1e4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -103,18 +103,17 @@ class TestingCluster(
       instanceManager,
       scheduler,
       libraryCacheManager,
-      _,
       executionRetries,
       delayBetweenRetries,
       timeout,
       archiveCount,
-      leaderElectionService) = JobManager.createJobManagerComponents(config)
+      leaderElectionService) = JobManager.createJobManagerComponents(
+        config,
+        createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
     val archive = actorSystem.actorOf(testArchiveProps, archiveName)
 
-    val resolvedLeaderElectionService = createLeaderElectionService(leaderElectionService)
-
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
@@ -127,7 +126,7 @@ class TestingCluster(
         delayBetweenRetries,
         timeout,
         streamingMode,
-        resolvedLeaderElectionService))
+        leaderElectionService))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)
@@ -155,8 +154,8 @@ class TestingCluster(
   }
 
 
-  def createLeaderElectionService(electionService: LeaderElectionService): LeaderElectionService = {
-    electionService
+  def createLeaderElectionService(): Option[LeaderElectionService] = {
+    None
   }
 
   @throws(classOf[TimeoutException])

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index c91a421..312a1e5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -82,297 +82,5 @@ class TestingJobManager(
     delayBetweenRetries,
     timeout,
     mode,
-    leaderElectionService) {
-
-  import scala.collection.JavaConverters._
-  import context._
-
-  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
-  val waitForAllVerticesToBeRunningOrFinished =
-    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
-  var periodicCheck: Option[Cancellable] = None
-
-  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
-    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
-  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
-  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case RequestExecutionGraph(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((executionGraph, jobInfo)) => sender ! decorateMessage(
-          ExecutionGraphFound(
-            jobID,
-            executionGraph)
-        )
-
-        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender)
-      }
-
-    case WaitForAllVerticesToBeRunning(jobID) =>
-      if(checkIfAllVerticesRunning(jobID)){
-        sender ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunning += jobID -> (waiting + sender)
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
-      if(checkIfAllVerticesRunningOrFinished(jobID)){
-        sender ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender)
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-
-    case NotifyListeners =>
-      for(jobID <- currentJobs.keySet){
-        notifyListeners(jobID)
-      }
-
-      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
-        periodicCheck foreach { _.cancel() }
-        periodicCheck = None
-      }
-
-
-    case NotifyWhenJobRemoved(jobID) =>
-      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
-      val responses = gateways.map{
-        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
-      }
-
-      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
-      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
-      import context.dispatcher
-      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender
-
-    case CheckIfJobRemoved(jobID) =>
-      if(currentJobs.contains(jobID)) {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID))
-        )(context.dispatcher, sender())
-      } else {
-        sender() ! decorateMessage(true)
-      }
-
-    case NotifyWhenTaskManagerTerminated(taskManager) =>
-      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
-      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
-    case msg@Terminated(taskManager) =>
-      super.handleMessage(msg)
-
-      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-        _ foreach {
-          listener =>
-            listener ! decorateMessage(TaskManagerTerminated(taskManager))
-        }
-      }
-
-    case NotifyWhenAccumulatorChange(jobID) =>
-
-      val (updated, registered) = waitForAccumulatorUpdate.
-        getOrElse(jobID, (false, Set[ActorRef]()))
-      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
-      sender ! true
-
-    /**
-     * Notification from the task manager that changed accumulator are transferred on next
-     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
-     */
-    case AccumulatorsChanged(jobID: JobID) =>
-      waitForAccumulatorUpdate.get(jobID) match {
-        case Some((updated, registered)) =>
-          waitForAccumulatorUpdate.put(jobID, (true, registered))
-        case None =>
-      }
-
-    /**
-     * Disabled async processing of accumulator values and send accumulators to the listeners if
-     * we previously received an [[AccumulatorsChanged]] message.
-     */
-    case msg : Heartbeat =>
-      super.handleMessage(msg)
-
-      waitForAccumulatorUpdate foreach {
-        case (jobID, (updated, actors)) if updated =>
-          currentJobs.get(jobID) match {
-            case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
-              val userAccumulators = graph.aggregateUserAccumulators
-              actors foreach {
-                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
-              }
-            case None =>
-          }
-          waitForAccumulatorUpdate.put(jobID, (false, actors))
-        case _ =>
-      }
-
-    case RequestWorkingTaskManager(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          if(eg.getAllExecutionVertices.asScala.isEmpty){
-            sender ! decorateMessage(WorkingTaskManager(None))
-          } else {
-            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
-            if(resource == null){
-              sender ! decorateMessage(WorkingTaskManager(None))
-            } else {
-              sender ! decorateMessage(
-                WorkingTaskManager(
-                  Some(resource.getInstance().getActorGateway)
-                )
-              )
-            }
-          }
-        case None => sender ! decorateMessage(WorkingTaskManager(None))
-      }
-
-
-    case NotifyWhenJobStatus(jobID, state) =>
-      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
-        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
-      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
-      jobStatusListener += state -> (listener + sender)
-
-    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
-      super.handleMessage(msg)
-
-      val cleanup = waitForJobStatus.get(jobID) match {
-        case Some(stateListener) =>
-          stateListener.remove(newJobStatus) match {
-            case Some(listeners) =>
-              listeners foreach {
-                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
-              }
-            case _ =>
-          }
-          stateListener.isEmpty
-
-        case _ => false
-      }
-
-      if (cleanup) {
-        waitForJobStatus.remove(jobID)
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case msg: Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val taskManager = sender
-
-        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-          _ foreach {
-            listener =>
-              listener ! decorateMessage(TaskManagerTerminated(taskManager))
-          }
-        }
-      }
-
-    case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
-        sender() ! true
-      } else {
-        waitForLeader += sender()
-      }
-
-    case msg: GrantLeadership =>
-      super.handleMessage(msg)
-
-      waitForLeader.foreach(_ ! true)
-
-      waitForLeader.clear()
-  }
-
-  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
-      case None => false
-    }
-  }
-
-  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall {
-          case vertex =>
-            (vertex.getExecutionState == ExecutionState.RUNNING
-              || vertex.getExecutionState == ExecutionState.FINISHED)
-        }
-      case None => false
-    }
-  }
-
-  def notifyListeners(jobID: JobID): Unit = {
-    if(checkIfAllVerticesRunning((jobID))) {
-      waitForAllVerticesToBeRunning.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-
-    if(checkIfAllVerticesRunningOrFinished(jobID)) {
-      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-  }
-}
+    leaderElectionService)
+  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..e91f068
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+
+  import scala.collection.JavaConverters._
+  import context._
+
+  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+  val waitForAllVerticesToBeRunningOrFinished =
+    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+    new Ordering[(Int, ActorRef)] {
+      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+    })
+
+  var disconnectDisabled = false
+
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case RequestExecutionGraph(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+          ExecutionGraphFound(
+            jobID,
+            executionGraph)
+        )
+
+        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+      }
+
+    case WaitForAllVerticesToBeRunning(jobID) =>
+      if(checkIfAllVerticesRunning(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+      if(checkIfAllVerticesRunningOrFinished(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+
+    case NotifyListeners =>
+      for(jobID <- currentJobs.keySet){
+        notifyListeners(jobID)
+      }
+
+      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+        periodicCheck foreach { _.cancel() }
+        periodicCheck = None
+      }
+
+
+    case NotifyWhenJobRemoved(jobID) =>
+      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
+
+      val responses = gateways.map{
+        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+      }
+
+      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+
+      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+      import context.dispatcher
+      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+
+    case CheckIfJobRemoved(jobID) =>
+      if(currentJobs.contains(jobID)) {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID))
+        )(context.dispatcher, sender())
+      } else {
+        sender() ! decorateMessage(true)
+      }
+
+    case NotifyWhenTaskManagerTerminated(taskManager) =>
+      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
+
+    case msg@Terminated(taskManager) =>
+      super.handleMessage(msg)
+
+      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+        _ foreach {
+          listener =>
+            listener ! decorateMessage(TaskManagerTerminated(taskManager))
+        }
+      }
+
+    case NotifyWhenAccumulatorChange(jobID) =>
+
+      val (updated, registered) = waitForAccumulatorUpdate.
+        getOrElse(jobID, (false, Set[ActorRef]()))
+      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+      sender ! true
+
+    /**
+     * Notification from the task manager that changed accumulator are transferred on next
+     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+     */
+    case AccumulatorsChanged(jobID: JobID) =>
+      waitForAccumulatorUpdate.get(jobID) match {
+        case Some((updated, registered)) =>
+          waitForAccumulatorUpdate.put(jobID, (true, registered))
+        case None =>
+      }
+
+    /**
+     * Disabled async processing of accumulator values and send accumulators to the listeners if
+     * we previously received an [[AccumulatorsChanged]] message.
+     */
+    case msg : Heartbeat =>
+      super.handleMessage(msg)
+
+      waitForAccumulatorUpdate foreach {
+        case (jobID, (updated, actors)) if updated =>
+          currentJobs.get(jobID) match {
+            case Some((graph, jobInfo)) =>
+              val flinkAccumulators = graph.getFlinkAccumulators
+              val userAccumulators = graph.aggregateUserAccumulators
+              actors foreach {
+                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+              }
+            case None =>
+          }
+          waitForAccumulatorUpdate.put(jobID, (false, actors))
+        case _ =>
+      }
+
+    case RequestWorkingTaskManager(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((eg, _)) =>
+          if(eg.getAllExecutionVertices.asScala.isEmpty){
+            sender ! decorateMessage(WorkingTaskManager(None))
+          } else {
+            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+
+            if(resource == null){
+              sender ! decorateMessage(WorkingTaskManager(None))
+            } else {
+              sender ! decorateMessage(
+                WorkingTaskManager(
+                  Some(resource.getInstance().getActorGateway)
+                )
+              )
+            }
+          }
+        case None => sender ! decorateMessage(WorkingTaskManager(None))
+      }
+
+    case NotifyWhenJobStatus(jobID, state) =>
+      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+      jobStatusListener += state -> (listener + sender)
+
+    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+      super.handleMessage(msg)
+
+      val cleanup = waitForJobStatus.get(jobID) match {
+        case Some(stateListener) =>
+          stateListener.remove(newJobStatus) match {
+            case Some(listeners) =>
+              listeners foreach {
+                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+              }
+            case _ =>
+          }
+          stateListener.isEmpty
+
+        case _ => false
+      }
+
+      if (cleanup) {
+        waitForJobStatus.remove(jobID)
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case msg: Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val taskManager = sender()
+
+        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+          _ foreach {
+            listener =>
+              listener ! decorateMessage(TaskManagerTerminated(taskManager))
+          }
+        }
+      }
+
+    case NotifyWhenLeader =>
+      if (leaderElectionService.hasLeadership) {
+        sender() ! true
+      } else {
+        waitForLeader += sender()
+      }
+
+    case msg: GrantLeadership =>
+      super.handleMessage(msg)
+
+      waitForLeader.foreach(_ ! true)
+
+      waitForLeader.clear()
+
+    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+        sender() ! Acknowledge
+      } else {
+        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+      }
+
+    case msg:RegisterTaskManager =>
+      super.handleMessage(msg)
+
+      // dequeue all senders which wait for instanceManager.getNumberOfRegisteredTaskManagers or
+      // fewer registered TaskManagers
+      while (waitForNumRegisteredTaskManagers.nonEmpty &&
+        waitForNumRegisteredTaskManagers.head._1 <=
+          instanceManager.getNumberOfRegisteredTaskManagers) {
+        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+        receiver ! Acknowledge
+      }
+  }
+
+  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+      case None => false
+    }
+  }
+
+  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall {
+          case vertex =>
+            (vertex.getExecutionState == ExecutionState.RUNNING
+              || vertex.getExecutionState == ExecutionState.FINISHED)
+        }
+      case None => false
+    }
+  }
+
+  def notifyListeners(jobID: JobID): Unit = {
+    if(checkIfAllVerticesRunning(jobID)) {
+      waitForAllVerticesToBeRunning.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+
+    if(checkIfAllVerticesRunningOrFinished(jobID)) {
+      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index acade53..4f5cf14 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -60,7 +60,8 @@ object TestingJobManagerMessages {
   /**
    * Registers a listener to receive a message when accumulators changed.
    * The change must be explicitly triggered by the TestingTaskManager which can receive an
-   * [[AccumulatorChanged]] message by a task that changed the accumulators. This message is then
+   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+   * message by a task that changed the accumulators. This message is then
    * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
    * message when the next Heartbeat occurs.
    */
@@ -78,5 +79,13 @@ object TestingJobManagerMessages {
     */
   case object NotifyWhenLeader
 
+  /**
+   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+   * message when at least numRegisteredTaskManager have registered at the JobManager.
+   *
+   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+   */
+  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
   def getNotifyWhenLeader: AnyRef = NotifyWhenLeader
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 167afbf..71a7e3e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 8746fbb..0b38c9c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -65,182 +65,5 @@ class TestingTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService) {
-
-  import scala.collection.JavaConverters._
-
-  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-  val waitForRegisteredAtJobManager = scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
-  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
-  var disconnectDisabled = false
-
-  /**
-   * Handler for testing related messages
-   */
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case NotifyWhenTaskIsRunning(executionID) => {
-      Option(runningTasks.get(executionID)) match {
-        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
-          sender ! decorateMessage(true)
-
-        case _ =>
-          val listeners = waitForRunning.getOrElse(executionID, Set())
-          waitForRunning += (executionID -> (listeners + sender))
-      }
-    }
-
-    case RequestRunningTasks =>
-      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-      
-    case NotifyWhenTaskRemoved(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(_) =>
-          val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender))
-        case None =>
-          if(unregisteredTasks.contains(executionID)) {
-            sender ! decorateMessage(true)
-          } else {
-              val set = waitForRemoval.getOrElse(executionID, Set())
-              waitForRemoval += (executionID -> (set + sender))
-          }
-      }
-
-    case TaskInFinalState(executionID) =>
-      super.handleMessage(TaskInFinalState(executionID))
-      waitForRemoval.remove(executionID) match {
-        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
-        case None =>
-      }
-
-      unregisteredTasks += executionID
-      
-    case RequestBroadcastVariablesWithReferences =>
-      sender ! decorateMessage(
-        ResponseBroadcastVariablesWithReferences(
-          bcVarManager.getNumberOfVariablesWithReferences)
-      )
-
-    case RequestNumActiveConnections =>
-      val numActive = if (network.isAssociated) {
-                        network.getConnectionManager.getNumberOfActiveConnections
-                      } else {
-                        0
-                      }
-      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
-    case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-          context.dispatcher,
-          sender
-          )
-      }else{
-        sender ! decorateMessage(true)
-      }
-
-    case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        sender ! decorateMessage(true)
-      } else {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-          context.dispatcher,
-          sender
-          )
-      }
-
-    case NotifyWhenJobManagerTerminated(jobManager) =>
-      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
-      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
-    /**
-     * Message from task manager that accumulator values changed and need to be reported immediately
-     * instead of lazily through the
-     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
-     * message to the job manager that it knows it should report to the listeners.
-     */
-    case msg: AccumulatorsChanged =>
-      currentJobManager match {
-        case Some(jobManager) =>
-          jobManager.forward(msg)
-          sendHeartbeatToJobManager()
-          sender ! true
-        case None =>
-      }
-
-    case msg@Terminated(jobManager) =>
-      super.handleMessage(msg)
-
-      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-        _ foreach {
-          _ ! decorateMessage(JobManagerTerminated(jobManager))
-        }
-      }
-
-    case msg:Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val jobManager = sender()
-
-        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-          _ foreach {
-            _ ! decorateMessage(JobManagerTerminated(jobManager))
-          }
-        }
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
-      super.handleMessage(msg)
-
-      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
-        waitForRunning.get(taskExecutionState.getID) foreach {
-          _ foreach (_ ! decorateMessage(true))
-        }
-      }
-
-    case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
-    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
-      if(isConnected && jobManager == currentJobManager.get) {
-        sender() ! true
-      } else {
-        val list = waitForRegisteredAtJobManager.getOrElse(
-          jobManager,
-          Set[ActorRef]())
-
-        waitForRegisteredAtJobManager += jobManager -> (list + sender())
-      }
-
-    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
-      super.handleMessage(msg)
-
-      val jm = sender()
-
-      waitForRegisteredAtJobManager.remove(jm).foreach {
-        listeners => listeners.foreach{
-          listener =>
-            listener ! true
-        }
-      }
-  }
-}
+    leaderRetrievalService)
+  with TestingTaskManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..0350675
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{Terminated, ActorRef}
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
+RequestLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+AcknowledgeRegistration}
+import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+
+import language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+  that: TaskManager =>
+
+  import scala.collection.JavaConverters._
+
+  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtJobManager = scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+  var disconnectDisabled = false
+
+  /**
+   * Handler for testing related messages
+   */
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case NotifyWhenTaskIsRunning(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+          sender ! decorateMessage(true)
+
+        case _ =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+
+    case RequestRunningTasks =>
+      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+    case NotifyWhenTaskRemoved(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) =>
+          val set = waitForRemoval.getOrElse(executionID, Set())
+          waitForRemoval += (executionID -> (set + sender))
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! decorateMessage(true)
+          } else {
+            val set = waitForRemoval.getOrElse(executionID, Set())
+            waitForRemoval += (executionID -> (set + sender))
+          }
+      }
+
+    case TaskInFinalState(executionID) =>
+      super.handleMessage(TaskInFinalState(executionID))
+      waitForRemoval.remove(executionID) match {
+        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+        case None =>
+      }
+
+      unregisteredTasks += executionID
+
+    case RequestBroadcastVariablesWithReferences =>
+      sender ! decorateMessage(
+        ResponseBroadcastVariablesWithReferences(
+          bcVarManager.getNumberOfVariablesWithReferences)
+      )
+
+    case RequestNumActiveConnections =>
+      val numActive = if (network.isAssociated) {
+        network.getConnectionManager.getNumberOfActiveConnections
+      } else {
+        0
+      }
+      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
+
+    case NotifyWhenJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }else{
+        sender ! decorateMessage(true)
+      }
+
+    case CheckIfJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+        sender ! decorateMessage(true)
+      } else {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }
+
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
+
+    /**
+     * Message from task manager that accumulator values changed and need to be reported immediately
+     * instead of lazily through the
+     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+     * message to the job manager that it knows it should report to the listeners.
+     */
+    case msg: AccumulatorsChanged =>
+      currentJobManager match {
+        case Some(jobManager) =>
+          jobManager.forward(msg)
+          sendHeartbeatToJobManager()
+          sender ! true
+        case None =>
+      }
+
+    case msg@Terminated(jobManager) =>
+      super.handleMessage(msg)
+
+      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+        _ foreach {
+          _ ! decorateMessage(JobManagerTerminated(jobManager))
+        }
+      }
+
+    case msg:Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val jobManager = sender()
+
+        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+          _ foreach {
+            _ ! decorateMessage(JobManagerTerminated(jobManager))
+          }
+        }
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.handleMessage(msg)
+
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! decorateMessage(true))
+        }
+      }
+
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtJobManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+
+        waitForRegisteredAtJobManager += jobManager -> (list + sender())
+      }
+
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+
+      val jm = sender()
+
+      waitForRegisteredAtJobManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 21939d6..553b686 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
@@ -274,7 +274,9 @@ object TestingUtils {
         actorSystem,
         Some(JobManager.JOB_MANAGER_NAME),
         Some(JobManager.ARCHIVE_NAME),
-        StreamingMode.BATCH_ONLY)
+        StreamingMode.BATCH_ONLY,
+        classOf[JobManager],
+        classOf[MemoryArchivist])
 
     new AkkaActorGateway(actor, null)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-shaded-curator/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
index 3e3894a..ac62cc8 100644
--- a/flink-shaded-curator/pom.xml
+++ b/flink-shaded-curator/pom.xml
@@ -41,45 +41,34 @@ under the License.
 			<artifactId>curator-recipes</artifactId>
 			<version>${curator.version}</version>
 		</dependency>
+
+		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>
 		<plugins>
-			<!-- Relocate the Curator's Guava dependency into a different namespace and
-			put create our own apache curator dependency.
-			We can easily integrate curator's netty into the jar file.
-			-->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
 				<executions>
 					<execution>
-						<id>shade-flink</id> <!-- override inherited execution id -->
+						<id>shade-flink</id>
 						<phase>package</phase>
 						<goals>
 							<goal>shade</goal>
 						</goals>
 						<configuration>
-							<shadedArtifactAttached>false</shadedArtifactAttached>
-							<createDependencyReducedPom>true</createDependencyReducedPom>
-							<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-							<transformers>
-								<!-- The service transformer is needed to merge META-INF/services files -->
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-							</transformers>
-							<artifactSet>
-								<includes>
-									<include>org.apache.curator:*</include>
-									<include>com.google.guava:*</include>
-								</includes>
+							<artifactSet combine.self="override">
+								<excludes>
+									<exclude>log4j</exclude>
+									<exclude>org.slf4j:slf4j-log4j12</exclude>
+								</excludes>
 							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
-								</relocation>
-							</relocations>
 						</configuration>
 					</execution>
 				</executions>


Mime
View raw message