flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [06/15] flink git commit: [FLINK-3995] [build] Properly structure test scopes and dependencies
Date Tue, 05 Jul 2016 14:38:43 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1762a7a..5394fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import com.google.common.collect.Maps;
-
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -47,15 +46,18 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
 
+import org.apache.flink.util.SerializedValue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -529,8 +531,7 @@ public class TaskTest {
 		setInputGate(task, inputGate);
 
 		// Expected task state for each partition state
-		final Map<ExecutionState, ExecutionState> expected = Maps
-				.newHashMapWithExpectedSize(ExecutionState.values().length);
+		final Map<ExecutionState, ExecutionState> expected = new HashMap<>(ExecutionState.values().length);
 
 		// Fail the task for unexpected states
 		for (ExecutionState state : ExecutionState.values()) {
@@ -564,7 +565,7 @@ public class TaskTest {
 			f.setAccessible(true);
 			f.set(task, new SingleInputGate[]{inputGate});
 
-			Map<IntermediateDataSetID, SingleInputGate> byId = Maps.newHashMapWithExpectedSize(1);
+			Map<IntermediateDataSetID, SingleInputGate> byId = new HashMap<>(1);
 			byId.put(inputGate.getConsumedResultId(), inputGate);
 
 			f = Task.class.getDeclaredField("inputGatesById");
@@ -628,9 +629,16 @@ public class TaskTest {
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
+		SerializedValue<ExecutionConfig> execConfig;
+		try {
+			execConfig = new SerializedValue<>(new ExecutionConfig());
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		
 		return new TaskDeploymentDescriptor(
 				new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
-				ExecutionConfigTest.getSerializedConfig(),
+				execConfig,
 				"Test Task", 0, 1, 0,
 				new Configuration(), new Configuration(),
 				invokable.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
deleted file mode 100644
index 2422925..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * A testing resource manager which may alter the default standalone resource master's behavior.
- */
-public class TestingResourceManager extends StandaloneResourceManager {
-
-	/** Set of Actors which want to be informed of a connection to the job manager */
-	private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
-
-	/** Set of Actors which want to be informed of a shutdown */
-	private Set<ActorRef> waitForShutdown = new HashSet<>();
-
-	/** Flag to signal a connection to the JobManager */
-	private boolean isConnected = false;
-
-	public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
-		super(flinkConfig, leaderRetriever);
-	}
-
-	/**
-	 * Overwrite messages here if desired
-	 */
-	@Override
-	protected void handleMessage(Object message) {
-
-		if (message instanceof GetRegisteredResources) {
-			sender().tell(new GetRegisteredResourcesReply(getRegisteredTaskManagers()), self());
-		} else if (message instanceof FailResource) {
-			ResourceID resourceID = ((FailResource) message).resourceID;
-			notifyWorkerFailed(resourceID, "Failed for test case.");
-
-		} else if (message instanceof NotifyWhenResourceManagerConnected) {
-			if (isConnected) {
-				sender().tell(
-					Messages.getAcknowledge(),
-					self());
-			} else {
-				waitForResourceManagerConnected.add(sender());
-			}
-		} else if (message instanceof RegisterResourceManagerSuccessful) {
-			super.handleMessage(message);
-
-			isConnected = true;
-
-			for (ActorRef ref : waitForResourceManagerConnected) {
-				ref.tell(
-					Messages.getAcknowledge(),
-					self());
-			}
-			waitForResourceManagerConnected.clear();
-
-		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
-			waitForShutdown.add(sender());
-		} else if (message instanceof TestingMessages.Alive$) {
-			sender().tell(Messages.getAcknowledge(), self());
-		} else {
-			super.handleMessage(message);
-		}
-	}
-
-	/**
-	 * Testing messages
-	 */
-	public static class GetRegisteredResources {}
-
-	public static class GetRegisteredResourcesReply {
-
-		public Collection<ResourceID> resources;
-
-		public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
-			this.resources = resources;
-		}
-
-	}
-
-	/**
-	 * Fails all resources that the resource manager has registered
-	 */
-	public static class FailResource {
-
-		public ResourceID resourceID;
-
-		public FailResource(ResourceID resourceID) {
-			this.resourceID = resourceID;
-		}
-	}
-
-	/**
-	 * The sender of this message will be informed of a connection to the Job Manager
-	 */
-	public static class NotifyWhenResourceManagerConnected {}
-
-	/**
-	 * Inform registered listeners about a shutdown of the application.
-     */
-	@Override
-	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
-		for (ActorRef listener : waitForShutdown) {
-			listener.tell(new TestingMessages.ComponentShutdown(self()), self());
-		}
-		waitForShutdown.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 1927c39..c30d244 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest, JobID}
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.util.SerializedValue
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
@@ -57,7 +58,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
           new JobID(),
           "test job",
           new Configuration(),
-          ExecutionConfigTest.getSerializedConfig,
+          new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
           new NoRestartStrategy())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
deleted file mode 100644
index 04689c6..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.metrics.MetricRegistry
-import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
-
-/** JobManager implementation extended by testing messages
-  *
-  */
-class TestingJobManager(
-    flinkConfiguration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore : SavepointStore,
-    jobRecoveryTimeout : FiniteDuration,
-    metricRegistry : Option[MetricRegistry])
-  extends JobManager(
-    flinkConfiguration,
-      executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricRegistry)
-  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
deleted file mode 100644
index 8ea603f..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Cancellable, Terminated}
-import akka.pattern.{ask, pipe}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import language.postfixOps
-
-/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
-trait TestingJobManagerLike extends FlinkActor {
-  that: JobManager =>
-
-  import scala.collection.JavaConverters._
-  import context._
-
-  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
-  val waitForAllVerticesToBeRunningOrFinished =
-    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
-  var periodicCheck: Option[Cancellable] = None
-
-  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
-    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
-  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
-  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
-  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
-    new Ordering[(Int, ActorRef)] {
-      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
-    })
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  var postStopEnabled = true
-
-  abstract override def postStop(): Unit = {
-    if (postStopEnabled) {
-      super.postStop()
-    } else {
-      // only stop leader election service to revoke the leadership of this JM so that a new JM
-      // can be elected leader
-      leaderElectionService.stop()
-    }
-  }
-
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case RequestExecutionGraph(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
-          ExecutionGraphFound(
-            jobID,
-            executionGraph)
-        )
-
-        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
-      }
-
-    case WaitForAllVerticesToBeRunning(jobID) =>
-      if(checkIfAllVerticesRunning(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
-      if(checkIfAllVerticesRunningOrFinished(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-
-    case NotifyListeners =>
-      for(jobID <- currentJobs.keySet){
-        notifyListeners(jobID)
-      }
-
-      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
-        periodicCheck foreach { _.cancel() }
-        periodicCheck = None
-      }
-
-
-    case NotifyWhenJobRemoved(jobID) =>
-      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
-      val responses = gateways.map{
-        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
-      }
-
-      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
-      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
-      import context.dispatcher
-      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
-
-    case CheckIfJobRemoved(jobID) =>
-      if(currentJobs.contains(jobID)) {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID))
-        )(context.dispatcher, sender())
-      } else {
-        sender() ! decorateMessage(true)
-      }
-
-    case NotifyWhenTaskManagerTerminated(taskManager) =>
-      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
-      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
-    case msg@Terminated(taskManager) =>
-      super.handleMessage(msg)
-
-      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-        _ foreach {
-          listener =>
-            listener ! decorateMessage(TaskManagerTerminated(taskManager))
-        }
-      }
-
-    // see shutdown method for reply
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case NotifyWhenAccumulatorChange(jobID) =>
-
-      val (updated, registered) = waitForAccumulatorUpdate.
-        getOrElse(jobID, (false, Set[ActorRef]()))
-      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
-      sender ! true
-
-    /**
-     * Notification from the task manager that changed accumulator are transferred on next
-     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
-     */
-    case AccumulatorsChanged(jobID: JobID) =>
-      waitForAccumulatorUpdate.get(jobID) match {
-        case Some((updated, registered)) =>
-          waitForAccumulatorUpdate.put(jobID, (true, registered))
-        case None =>
-      }
-
-    /**
-     * Disabled async processing of accumulator values and send accumulators to the listeners if
-     * we previously received an [[AccumulatorsChanged]] message.
-     */
-    case msg : Heartbeat =>
-      super.handleMessage(msg)
-
-      waitForAccumulatorUpdate foreach {
-        case (jobID, (updated, actors)) if updated =>
-          currentJobs.get(jobID) match {
-            case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
-              val userAccumulators = graph.aggregateUserAccumulators
-              actors foreach {
-                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
-              }
-            case None =>
-          }
-          waitForAccumulatorUpdate.put(jobID, (false, actors))
-        case _ =>
-      }
-
-    case RequestWorkingTaskManager(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          if(eg.getAllExecutionVertices.asScala.isEmpty){
-            sender ! decorateMessage(WorkingTaskManager(None))
-          } else {
-            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
-            if(resource == null){
-              sender ! decorateMessage(WorkingTaskManager(None))
-            } else {
-              sender ! decorateMessage(
-                WorkingTaskManager(
-                  Some(resource.getInstance().getActorGateway)
-                )
-              )
-            }
-          }
-        case None => sender ! decorateMessage(WorkingTaskManager(None))
-      }
-
-    case NotifyWhenJobStatus(jobID, state) =>
-      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
-        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
-      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
-      jobStatusListener += state -> (listener + sender)
-
-    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
-      super.handleMessage(msg)
-
-      val cleanup = waitForJobStatus.get(jobID) match {
-        case Some(stateListener) =>
-          stateListener.remove(newJobStatus) match {
-            case Some(listeners) =>
-              listeners foreach {
-                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
-              }
-            case _ =>
-          }
-          stateListener.isEmpty
-
-        case _ => false
-      }
-
-      if (cleanup) {
-        waitForJobStatus.remove(jobID)
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case DisablePostStop =>
-      postStopEnabled = false
-
-    case RequestSavepoint(savepointPath) =>
-      try {
-        val savepoint = savepointStore.getState(savepointPath)
-        sender ! ResponseSavepoint(savepoint)
-      }
-      catch {
-        case e: Exception =>
-          sender ! ResponseSavepoint(null)
-      }
-
-    case msg: Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val taskManager = sender()
-
-        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-          _ foreach {
-            listener =>
-              listener ! decorateMessage(TaskManagerTerminated(taskManager))
-          }
-        }
-      }
-
-    case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
-        sender() ! true
-      } else {
-        waitForLeader += sender()
-      }
-
-    case msg: GrantLeadership =>
-      super.handleMessage(msg)
-
-      waitForLeader.foreach(_ ! true)
-
-      waitForLeader.clear()
-
-    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
-      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
-        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
-        sender() ! Acknowledge
-      } else {
-        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
-        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
-      }
-
-    // TaskManager may be registered on these two messages
-    case msg @ (_: RegisterTaskManager | _: RegisterResourceSuccessful) =>
-      super.handleMessage(msg)
-
-      // dequeue all senders which wait for instanceManager.getNumberOfRegisteredTaskManagers or
-      // fewer registered TaskManagers
-      while (waitForNumRegisteredTaskManagers.nonEmpty &&
-        waitForNumRegisteredTaskManagers.head._1 <=
-          instanceManager.getNumberOfRegisteredTaskManagers) {
-        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
-        receiver ! Acknowledge
-      }
-  }
-
-  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
-      case None => false
-    }
-  }
-
-  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall {
-          case vertex =>
-            (vertex.getExecutionState == ExecutionState.RUNNING
-              || vertex.getExecutionState == ExecutionState.FINISHED)
-        }
-      case None => false
-    }
-  }
-
-  def notifyListeners(jobID: JobID): Unit = {
-    if(checkIfAllVerticesRunning(jobID)) {
-      waitForAllVerticesToBeRunning.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-
-    if(checkIfAllVerticesRunningOrFinished(jobID)) {
-      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
deleted file mode 100644
index ec513e3..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import java.util.Map
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.ActorGateway
-import org.apache.flink.runtime.jobgraph.JobStatus
-
-object TestingJobManagerMessages {
-
-  case class RequestExecutionGraph(jobID: JobID)
-
-  sealed trait ResponseExecutionGraph {
-    def jobID: JobID
-  }
-
-  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
-  ResponseExecutionGraph
-
-  case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
-
-  case class WaitForAllVerticesToBeRunning(jobID: JobID)
-  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
-  case class AllVerticesRunning(jobID: JobID)
-
-  case class NotifyWhenJobRemoved(jobID: JobID)
-
-  case class RequestWorkingTaskManager(jobID: JobID)
-  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
-
-  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
-  case class JobStatusIs(jobID: JobID, state: JobStatus)
-
-  case object NotifyListeners
-
-  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
-  case class TaskManagerTerminated(taskManager: ActorRef)
-
-  /**
-   * Registers a listener to receive a message when accumulators changed.
-   * The change must be explicitly triggered by the TestingTaskManager which can receive an
-   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
-   * message by a task that changed the accumulators. This message is then
-   * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
-   * message when the next Heartbeat occurs.
-   */
-  case class NotifyWhenAccumulatorChange(jobID: JobID)
-
-  /**
-   * Reports updated accumulators back to the listener.
-   */
-  case class UpdatedAccumulators(jobID: JobID,
-    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
-    userAccumulators: Map[String, Accumulator[_,_]])
-
-  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
-   *
-   */
-  case object NotifyWhenLeader
-
-  /**
-   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
-   * message when at least numRegisteredTaskManager have registered at the JobManager.
-   *
-   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
-   */
-  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
-
-  /** Disables the post stop method of the [[TestingJobManager]].
-    *
-    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
-    */
-  case object DisablePostStop
-
-  /**
-    * Requests a savepoint from the job manager.
-    *
-    * @param savepointPath The path of the savepoint to request.
-    */
-  case class RequestSavepoint(savepointPath: String)
-
-  /**
-    * Response to a savepoint request.
-    *
-    * @param savepoint The requested savepoint or null if none available.
-    */
-  case class ResponseSavepoint(savepoint: CompletedCheckpoint)
-
-  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
-  def getDisablePostStop(): AnyRef = DisablePostStop
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
deleted file mode 100644
index 71a7e3e..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.jobmanager.MemoryArchivist
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
-
-/** Memory archivist extended by testing messages
-  *
-  * @param maxEntries number of maximum number of archived jobs
-  */
-class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
-
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case RequestExecutionGraph(jobID) =>
-      val executionGraph = graphs.get(jobID)
-      
-      executionGraph match {
-        case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
-        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
deleted file mode 100644
index 91d169a..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-
-object TestingMessages {
-
-  case class CheckIfJobRemoved(jobID: JobID)
-
-  case object DisableDisconnect
-
-  case object Alive
-
-  def getAlive: AnyRef = Alive
-
-  def getDisableDisconnect: AnyRef = DisableDisconnect
-
-  case object NotifyOfComponentShutdown
-  case class ComponentShutdown(ref: ActorRef)
-
-  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
deleted file mode 100644
index 2597753..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
-
-import scala.language.postfixOps
-
-/** Subclass of the [[TaskManager]] to support testing messages
- */
-class TestingTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
-  extends TaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    numberOfSlots,
-    leaderRetrievalService)
-  with TestingTaskManagerLike {
-
-  def this(
-      config: TaskManagerConfiguration,
-      connectionInfo: InstanceConnectionInfo,
-      memoryManager: MemoryManager,
-      ioManager: IOManager,
-      network: NetworkEnvironment,
-      numberOfSlots: Int,
-      leaderRetrievalService: LeaderRetrievalService) {
-    this(
-      config,
-      ResourceID.generate(),
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
-      numberOfSlots,
-      leaderRetrievalService)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
deleted file mode 100644
index f8df89c..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Terminated}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
-import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-import scala.concurrent.duration._
-import language.postfixOps
-
-/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
-trait TestingTaskManagerLike extends FlinkActor {
-  that: TaskManager =>
-
-  import scala.collection.JavaConverters._
-
-  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-  val waitForRegisteredAtResourceManager =
-    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
-  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
-  /** Map of registered task submit listeners */
-  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  /**
-   * Handler for testing related messages
-   */
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case NotifyWhenTaskIsRunning(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
-          sender ! decorateMessage(true)
-
-        case _ =>
-          val listeners = waitForRunning.getOrElse(executionID, Set())
-          waitForRunning += (executionID -> (listeners + sender))
-      }
-
-    case RequestRunningTasks =>
-      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-
-    case NotifyWhenTaskRemoved(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(_) =>
-          val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender))
-        case None =>
-          if(unregisteredTasks.contains(executionID)) {
-            sender ! decorateMessage(true)
-          } else {
-            val set = waitForRemoval.getOrElse(executionID, Set())
-            waitForRemoval += (executionID -> (set + sender))
-          }
-      }
-
-    case TaskInFinalState(executionID) =>
-      super.handleMessage(TaskInFinalState(executionID))
-      waitForRemoval.remove(executionID) match {
-        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
-        case None =>
-      }
-
-      unregisteredTasks += executionID
-
-    case RequestBroadcastVariablesWithReferences =>
-      sender ! decorateMessage(
-        ResponseBroadcastVariablesWithReferences(
-          bcVarManager.getNumberOfVariablesWithReferences)
-      )
-
-    case RequestNumActiveConnections =>
-      val numActive = if (network.isAssociated) {
-        network.getConnectionManager.getNumberOfActiveConnections
-      } else {
-        0
-      }
-      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
-    case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }else{
-        sender ! decorateMessage(true)
-      }
-
-    case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        sender ! decorateMessage(true)
-      } else {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }
-
-    case NotifyWhenJobManagerTerminated(jobManager) =>
-      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
-      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
-    case RegisterSubmitTaskListener(jobId) =>
-      registeredSubmitTaskListeners.put(jobId, sender())
-
-    case msg@SubmitTask(tdd) =>
-      registeredSubmitTaskListeners.get(tdd.getJobID) match {
-        case Some(listenerRef) =>
-          listenerRef ! ResponseSubmitTaskListener(tdd)
-        case None =>
-        // Nothing to do
-      }
-
-      super.handleMessage(msg)
-
-    /**
-     * Message from task manager that accumulator values changed and need to be reported immediately
-     * instead of lazily through the
-     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
-     * message to the job manager that it knows it should report to the listeners.
-     */
-    case msg: AccumulatorsChanged =>
-      currentJobManager match {
-        case Some(jobManager) =>
-          jobManager.forward(msg)
-          sendHeartbeatToJobManager()
-          sender ! true
-        case None =>
-      }
-
-    case msg@Terminated(jobManager) =>
-      super.handleMessage(msg)
-
-      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-        _ foreach {
-          _ ! decorateMessage(JobManagerTerminated(jobManager))
-        }
-      }
-
-    case msg:Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val jobManager = sender()
-
-        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-          _ foreach {
-            _ ! decorateMessage(JobManagerTerminated(jobManager))
-          }
-        }
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
-      super.handleMessage(msg)
-
-      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
-        waitForRunning.get(taskExecutionState.getID) foreach {
-          _ foreach (_ ! decorateMessage(true))
-        }
-      }
-
-    case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
-    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
-      if(isConnected && jobManager == currentJobManager.get) {
-        sender() ! true
-      } else {
-        val list = waitForRegisteredAtResourceManager.getOrElse(
-          jobManager,
-          Set[ActorRef]())
-
-        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
-      }
-
-    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
-      super.handleMessage(msg)
-
-      val jm = sender()
-
-      waitForRegisteredAtResourceManager.remove(jm).foreach {
-        listeners => listeners.foreach{
-          listener =>
-            listener ! true
-        }
-      }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
deleted file mode 100644
index 974e4e8..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-/**
- * Additional messages that the [[TestingTaskManager]] understands.
- */
-object TestingTaskManagerMessages {
-  
-  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
-  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
-  
-  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
-    import collection.JavaConverters._
-    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
-  }
-  
-  case class ResponseBroadcastVariablesWithReferences(number: Int)
-
-  case object RequestNumActiveConnections
-  case class ResponseNumActiveConnections(number: Int)
-  
-  case object RequestRunningTasks
-  
-  case object RequestBroadcastVariablesWithReferences
-
-  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
-
-  case class JobManagerTerminated(jobManager: ActorRef)
-
-  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
-
-  /**
-   * Message to give a hint to the task manager that accumulator values were updated in the task.
-   * This message is forwarded to the job manager which knows that it needs to notify listeners
-   * of accumulator updates.
-   */
-  case class AccumulatorsChanged(jobID: JobID)
-
-  /**
-    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
-    * messages of the given job.
-    *
-    * If a task is submitted with the given job ID the task deployment
-    * descriptor is forwarded to the listener.
-    *
-    * @param jobId The job ID to listen for.
-    */
-  case class RegisterSubmitTaskListener(jobId: JobID)
-
-  /**
-    * A response to a listened job ID containing the submitted task deployment descriptor.
-    *
-    * @param tdd The submitted task deployment descriptor.
-    */
-  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
-
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-  
-  def getRequestRunningTasksMessage: AnyRef = {
-    RequestRunningTasks
-  }
-  
-  def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
-    RequestBroadcastVariablesWithReferences
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2f43d38..02a0fec 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -32,7 +32,6 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.testutils.TestingResourceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 2343485..bd935ee 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -34,28 +34,29 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
 		<!-- scala command line parsing -->
 		<dependency>
 			<groupId>com.github.scopt</groupId>
             <artifactId>scopt_${scala.binary.version}</artifactId>
 		</dependency>
 
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-clients_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-scala_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -70,27 +71,19 @@ under the License.
 			<artifactId>scala-library</artifactId>
 			<version>${scala.version}</version>
 		</dependency>
+
 		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-reflect</artifactId>
 			<version>${scala.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>1.1-SNAPSHOT</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
-			<version>1.1-SNAPSHOT</version>
-		</dependency>
-		<!-- tests -->
+
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 4a6a142..83e0f46 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -78,6 +78,14 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.scalatest</groupId>
+			<artifactId>scalatest_${scala.binary.version}</artifactId>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
index 7ddf7e3..dd528be 100644
--- a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
@@ -42,6 +42,8 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -49,42 +51,29 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests_2.10</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
+		<!-- test dependencies -->
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
-            <version>${project.version}</version>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
 			<type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
+		</dependency>
+	</dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
index 090917b..4089cf6 100644
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
 	<modelVersion>4.0.0</modelVersion>
 
@@ -42,6 +42,8 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+ 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -49,40 +51,33 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
 
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-core</artifactId>
+		</dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
+		<!-- core dependencies -->
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests_2.10</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
-            <version>${project.version}</version>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
 			<type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
+		</dependency>
+	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
index 89ded0b..c5e903f 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -41,6 +41,8 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -60,37 +62,36 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-test-utils-junit</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
-			<type>test-jar</type>
 		</dependency>
-
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
-			<type>test-jar</type>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
 			<scope>test</scope>
+			<type>test-jar</type>
 		</dependency>
 
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
-			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index b6c5bdb..dd2fc26 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -1,20 +1,21 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.avro.Schema;
@@ -31,7 +32,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -39,6 +40,7 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyVa
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index 3557f1c..79fa7bd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -41,25 +41,27 @@ under the License.
 	</properties>
 
 	<dependencies>
+
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_2.10</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
@@ -116,6 +118,8 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- test dependencies -->
+		
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
@@ -125,16 +129,16 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-curator-recipes</artifactId>
+			<artifactId>flink-connector-kafka-base_2.10</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
+			<type>test-jar</type>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index 8feadb5..8ad909a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -42,6 +42,15 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-base_2.10</artifactId>
@@ -56,6 +65,24 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-base_2.10</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
@@ -70,12 +97,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka-clients</artifactId>
-			<version>${kafka.version}</version>
-		</dependency>
-
-		<dependency>
 			<!-- include 0.9 server for tests  -->
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_${scala.binary.version}</artifactId>
@@ -93,31 +114,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 
 	</dependencies>
-	
 
 	<build>
 		<plugins>
@@ -130,30 +132,30 @@ under the License.
 							<goal>test-jar</goal>
 						</goals>
 						<configuration>
-			              <includes>
-			                <include>**/KafkaTestEnvironmentImpl*</include>
-			              </includes>
-			            </configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
 					</execution>
 				</executions>
 			</plugin>
 			<plugin>
-		        <groupId>org.apache.maven.plugins</groupId>
-		        <artifactId>maven-source-plugin</artifactId>
-		        <executions>
-		          <execution>
-		            <id>attach-test-sources</id>
-		            <goals>
-		              <goal>test-jar-no-fork</goal>
-		            </goals>
-		            <configuration>
-		              <includes>
-		                <include>**/KafkaTestEnvironmentImpl*</include>
-		              </includes>
-		            </configuration>
-		          </execution>
-		        </executions>
-		    </plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-test-sources</id>
+						<goals>
+							<goal>test-jar-no-fork</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 1cbb554..258a2cf 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -42,6 +42,8 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
@@ -103,14 +105,18 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- test dependencies -->
+		
 		<!-- force using the latest zkclient -->
 		<dependency>
 			<groupId>com.101tec</groupId>
 			<artifactId>zkclient</artifactId>
 			<version>0.7</version>
 			<type>jar</type>
+			<scope>test</scope>
 		</dependency>
 
+
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
@@ -120,7 +126,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -130,6 +136,21 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 5ee8001..88150b3 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -36,6 +36,8 @@ under the License.
 
 	<dependencies>
 
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -56,16 +58,8 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-math</artifactId>
-			<version>2.2</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
+			<artifactId>commons-math3</artifactId>
+			<version>3.5</version>
 		</dependency>
 
 		<dependency>
@@ -80,13 +74,24 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math3</artifactId>
-            <version>3.5</version>
-        </dependency>
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index f768ace..7bb726d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.streaming.api;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -34,9 +34,9 @@ public class RestartStrategyTest {
 	 * strategy has been specified
 	 */
 	@Test
-	public void testAutomaticRestartingWhenCheckpointing() {
+	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.enableCheckpointing();
+		env.enableCheckpointing(500);
 
 		env.fromElements(1).print();
 
@@ -44,7 +44,7 @@ public class RestartStrategyTest {
 		JobGraph jobGraph = graph.getJobGraph();
 
 		RestartStrategies.RestartStrategyConfiguration restartStrategy =
-			ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
+			jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
@@ -56,9 +56,9 @@ public class RestartStrategyTest {
 	 * of execution retries is set to 0, restarting is deactivated
 	 */
 	@Test
-	public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() {
+	public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.enableCheckpointing();
+		env.enableCheckpointing(500);
 		env.setNumberOfExecutionRetries(0);
 
 		env.fromElements(1).print();
@@ -67,7 +67,7 @@ public class RestartStrategyTest {
 		JobGraph jobGraph = graph.getJobGraph();
 
 		RestartStrategies.RestartStrategyConfiguration restartStrategy =
-			ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
+			jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration);
@@ -78,9 +78,9 @@ public class RestartStrategyTest {
 	 * of execution retries is set to 42 and the delay to 1337, fixed delay restarting is used.
 	 */
 	@Test
-	public void testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero() {
+	public void testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.enableCheckpointing();
+		env.enableCheckpointing(500);
 		env.setNumberOfExecutionRetries(42);
 		env.getConfig().setExecutionRetryDelay(1337);
 
@@ -90,7 +90,7 @@ public class RestartStrategyTest {
 		JobGraph jobGraph = graph.getJobGraph();
 
 		RestartStrategies.RestartStrategyConfiguration restartStrategy =
-			ExecutionConfigTest.deserializeConfig(jobGraph.getSerializedExecutionConfig()).getRestartStrategy();
+			jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 0de4325..7f94aa0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -29,12 +28,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.InstantiationUtil;
-
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
+@SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest {
 	
 	@Test
@@ -93,7 +93,7 @@ public class StreamingJobGraphGeneratorTest {
 
 		assertNotNull(serializedExecutionConfig);
 
-		ExecutionConfig executionConfig = ExecutionConfigTest.deserializeConfig(serializedExecutionConfig);
+		ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
 
 		assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
 		assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8484e90..df4efdb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,6 +55,7 @@ import java.util.Set;
 
 import static org.junit.Assert.*;
 
+@SuppressWarnings("serial")
 public class RescalePartitionerTest extends TestLogger {
 	
 	private RescalePartitioner<Tuple> distributePartitioner;
@@ -84,7 +86,7 @@ public class RescalePartitionerTest extends TestLogger {
 	}
 
 	@Test
-	public void testExecutionGraphGeneration() {
+	public void testExecutionGraphGeneration() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setParallelism(4);
@@ -135,7 +137,7 @@ public class RescalePartitionerTest extends TestLogger {
 			jobId,
 			jobName,
 			cfg,
-			ExecutionConfigTest.getSerializedConfig(),
+			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new ArrayList<BlobKey>(),


Mime
View raw message