flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/6] flink git commit: [FLINK-2332] [runtime] Adds leader session IDs and registration session IDs to JobManager and TaskManager messages.
Date Thu, 23 Jul 2015 15:41:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index f166c36..5e276bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -27,6 +25,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -72,25 +71,32 @@ public class RuntimeEnvironment implements Environment {
 	private final ResultPartitionWriter[] writers;
 	private final InputGate[] inputGates;
 	
-	private final ActorRef jobManagerActor;
+	private final ActorGateway jobManager;
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
 	// ------------------------------------------------------------------------
 
-	public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId,
-								String taskName, String taskNameWithSubtasks,
-								int subtaskIndex, int parallelism,
-								Configuration jobConfiguration, Configuration taskConfiguration,
-								ClassLoader userCodeClassLoader,
-								MemoryManager memManager, IOManager ioManager,
-								BroadcastVariableManager bcVarManager,
+	public RuntimeEnvironment(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			ExecutionAttemptID executionId,
+			String taskName,
+			String taskNameWithSubtasks,
+			int subtaskIndex,
+			int parallelism,
+			Configuration jobConfiguration,
+			Configuration taskConfiguration,
+			ClassLoader userCodeClassLoader,
+			MemoryManager memManager,
+			IOManager ioManager,
+			BroadcastVariableManager bcVarManager,
 								AccumulatorRegistry accumulatorRegistry,
-								InputSplitProvider splitProvider,
-								Map<String, Future<Path>> distCacheEntries,
-								ResultPartitionWriter[] writers,
-								InputGate[] inputGates,
-								ActorRef jobManagerActor) {
+			InputSplitProvider splitProvider,
+			Map<String, Future<Path>> distCacheEntries,
+			ResultPartitionWriter[] writers,
+			InputGate[] inputGates,
+			ActorGateway jobManager) {
 		
 		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
 
@@ -112,7 +118,7 @@ public class RuntimeEnvironment implements Environment {
 		this.distCacheEntries = checkNotNull(distCacheEntries);
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
-		this.jobManagerActor = checkNotNull(jobManagerActor);
+		this.jobManager = checkNotNull(jobManager);
 	}
 
 
@@ -238,6 +244,6 @@ public class RuntimeEnvironment implements Environment {
 		}
 		
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(jobId, executionId, checkpointId, serializedState);
-		jobManagerActor.tell(message, ActorRef.noSender());
+		jobManager.tell(message);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 13a2ace..c4f62fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
@@ -37,6 +35,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -163,17 +162,17 @@ public class Task implements Runnable {
 
 	private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
 
-	/** The TaskManager actor that spawned this task */
-	private final ActorRef taskManager;
+	/** Gateway to the TaskManager that spawned this task */
+	private final ActorGateway taskManager;
 
-	/** The JobManager actor */
-	private final ActorRef jobManager;
+	/** Gateway to the JobManager */
+	private final ActorGateway jobManager;
 
 	/** All actors that want to be notified about changes in the task's execution state */
-	private final List<ActorRef> executionListenerActors;
+	private final List<ActorGateway> executionListenerActors;
 
 	/** The timeout for all ask operations on actors */
-	private final Timeout actorAskTimeout;
+	private final FiniteDuration actorAskTimeout;
 
 	/** The library cache, from which the task can request its required JAR files */
 	private final LibraryCacheManager libraryCache;
@@ -224,8 +223,8 @@ public class Task implements Runnable {
 				IOManager ioManager,
 				NetworkEnvironment networkEnvironment,
 				BroadcastVariableManager bcVarManager,
-				ActorRef taskManagerActor,
-				ActorRef jobManagerActor,
+				ActorGateway taskManagerActor,
+				ActorGateway jobManagerActor,
 				FiniteDuration actorAskTimeout,
 				LibraryCacheManager libraryCache,
 				FileCache fileCache)
@@ -254,13 +253,13 @@ public class Task implements Runnable {
 
 		this.jobManager = checkNotNull(jobManagerActor);
 		this.taskManager = checkNotNull(taskManagerActor);
-		this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout));
+		this.actorAskTimeout = checkNotNull(actorAskTimeout);
 
 		this.libraryCache = checkNotNull(libraryCache);
 		this.fileCache = checkNotNull(fileCache);
 		this.network = checkNotNull(networkEnvironment);
 
-		this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
+		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
 
 		// create the reader and writer structures
 
@@ -568,7 +567,7 @@ public class Task implements Runnable {
 			// to know this!
 			notifyObservers(ExecutionState.RUNNING, null);
 			taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
-					new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)), ActorRef.noSender());
+					new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));
 
 			// make sure the user code classloader is accessible thread-locally
 			executingThread.setContextClassLoader(userCodeClassLoader);
@@ -750,11 +749,11 @@ public class Task implements Runnable {
 	}
 
 	private void notifyFinalState() {
-		taskManager.tell(new TaskInFinalState(executionId), ActorRef.noSender());
+		taskManager.tell(new TaskInFinalState(executionId));
 	}
 
 	private void notifyFatalError(String message, Throwable cause) {
-		taskManager.tell(new FatalError(message, cause), ActorRef.noSender());
+		taskManager.tell(new FatalError(message, cause));
 	}
 
 	// ----------------------------------------------------------------------------------------------------------------
@@ -839,14 +838,10 @@ public class Task implements Runnable {
 	//  State Listeners
 	// ------------------------------------------------------------------------
 
-	public void registerExecutionListener(ActorRef listener) {
+	public void registerExecutionListener(ActorGateway listener) {
 		executionListenerActors.add(listener);
 	}
 
-	public void unregisterExecutionListener(ActorRef listener) {
-		executionListenerActors.remove(listener);
-	}
-
 	private void notifyObservers(ExecutionState newState, Throwable error) {
 		if (error == null) {
 			LOG.info(taskNameWithSubtask + " switched to " + newState);
@@ -859,8 +854,8 @@ public class Task implements Runnable {
 		TaskMessages.UpdateTaskExecutionState actorMessage = new
 				TaskMessages.UpdateTaskExecutionState(stateUpdate);
 
-		for (ActorRef listener : executionListenerActors) {
-			listener.tell(actorMessage, ActorRef.noSender());
+		for (ActorGateway listener : executionListenerActors) {
+			listener.tell(actorMessage);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 5a69850..cb78c16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -32,10 +29,11 @@ import org.apache.flink.util.InstantiationUtil;
 
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
 
-	private final ActorRef jobManager;
+	private final ActorGateway jobManager;
 	
 	private final JobID jobId;
 	
@@ -45,11 +43,15 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 
 	private final ClassLoader usercodeClassLoader;
 	
-	private final Timeout timeout;
+	private final FiniteDuration timeout;
 	
-	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
-								ExecutionAttemptID executionID, ClassLoader userCodeClassLoader,
-								Timeout timeout)
+	public TaskInputSplitProvider(
+			ActorGateway jobManager,
+			JobID jobId,
+			JobVertexID vertexId,
+			ExecutionAttemptID executionID,
+			ClassLoader userCodeClassLoader,
+			FiniteDuration timeout)
 	{
 		this.jobManager = jobManager;
 		this.jobId = jobId;
@@ -62,11 +64,11 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	@Override
 	public InputSplit getNextInputSplit() {
 		try {
-			final Future<Object> response = Patterns.ask(jobManager,
+			final Future<Object> response = jobManager.ask(
 					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
 					timeout);
 
-			final Object result = Await.result(response, timeout.duration());
+			final Object result = Await.result(response, timeout);
 
 			if(!(result instanceof JobManagerMessages.NextInputSplit)){
 				throw new RuntimeException("RequestNextInputSplit requires a response of type " +

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
deleted file mode 100644
index c74c339..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime
-
-import _root_.akka.actor.Actor
-
-/**
- * Mixin to add debug message logging
- */
-trait ActorLogMessages {
-  that: Actor with ActorSynchronousLogging =>
-
-  override def receive: Receive = new Actor.Receive {
-    private val _receiveWithLogMessages = receiveWithLogMessages
-
-    override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
-
-    override def apply(x: Any): Unit = {
-      if (!log.isDebugEnabled) {
-        _receiveWithLogMessages(x)
-      }
-      else {
-        log.debug(s"Received message $x at ${that.self.path} from ${that.sender()}.")
-
-        val start = System.nanoTime()
-
-        _receiveWithLogMessages(x)
-
-        val duration = (System.nanoTime() - start) / 1000000
-        log.debug(s"Handled message $x in $duration ms from ${that.sender()}.")
-      }
-    }
-  }
-
-  def receiveWithLogMessages: Receive
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
deleted file mode 100644
index 4d3a988..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime
-
-import _root_.akka.actor.Actor
-import grizzled.slf4j.Logger
-
-/** Adds a logger to an [[akka.actor.Actor]] implementation
-  *
-  */
-trait ActorSynchronousLogging {
-  self: Actor =>
-
-  lazy val log = Logger(getClass)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala
new file mode 100644
index 0000000..74a9b07
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import _root_.akka.actor.Actor
+import grizzled.slf4j.Logger
+
+/** Base trait for Flink's actors.
+  *
+  * The message handling logic is defined in the handleMessage method. This allows to mixin
+  * stackable traits which change the message receiving behaviour.
+  */
+trait FlinkActor extends Actor {
+  val log: Logger
+
+  override def receive: Receive = handleMessage
+
+  /** Handle incoming messages
+    *
+    * @return
+    */
+  def handleMessage: Receive
+
+  /** Factory method for messages. This method can be used by mixins to decorate messages
+    *
+    * @param message The message to decorate
+    * @return The decorated message
+    */
+  def decorateMessage(message: Any): Any = {
+    message
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
new file mode 100644
index 0000000..c6793ed
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import java.util.UUID
+
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+
+/** [[MessageDecorator]] which wraps [[RequiresLeaderSessionID]] messages in a
+  * [[LeaderSessionMessage]] with the given leader session ID.
+  *
+  * @param leaderSessionID Leader session ID which is associated with the
+  *                        [[RequiresLeaderSessionID]] message
+  */
+class LeaderSessionMessageDecorator(val leaderSessionID: Option[UUID]) extends MessageDecorator {
+
+  /** Wraps [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]].
+    *
+    * @param message Message to decorate
+    * @return Decorated message
+    */
+  override def decorate(message: Any): Any = {
+    message match {
+      case msg: RequiresLeaderSessionID =>
+        LeaderSessionMessage(leaderSessionID, msg)
+      case msg => msg
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
new file mode 100644
index 0000000..d54926d
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import java.util.UUID
+
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+
+/** Mixin to filter out [[LeaderSessionMessage]] which contain an invalid leader session id.
+  * Messages which contain a valid leader session ID are unwrapped and forwarded to the actor.
+  *
+  */
+trait LeaderSessionMessages extends FlinkActor {
+  protected def leaderSessionID: Option[UUID]
+
+  abstract override def receive: Receive = {
+    case LeaderSessionMessage(id, msg) =>
+      // Filter out messages which have not the correct leader session ID
+      (leaderSessionID, id) match {
+        case (Some(currentID), Some(msgID)) =>
+          if(currentID.equals(msgID)) {
+            // correct leader session ID
+            super.receive(msg)
+          } else {
+            // discard message because of incorrect leader session ID
+            handleDiscardedMessage(msg)
+          }
+
+        case _ => handleDiscardedMessage(msg)
+      }
+    case msg: RequiresLeaderSessionID =>
+      throw new Exception(s"Received a message $msg without a leader session ID, even though" +
+        " it requires to have one.")
+    case msg =>
+      // pass the message to the parent's receive method for further processing
+      super.receive(msg)
+  }
+
+  private def handleDiscardedMessage(msg: Any): Unit = {
+    log.debug(s"Discard message $msg because the leader session ID was not correct.")
+  }
+
+  /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]
+    *
+    * @param message The message to decorate
+    * @return The decorated message
+    */
+  override def decorateMessage(message: Any): Any = {
+    message match {
+      case msg: RequiresLeaderSessionID =>
+        LeaderSessionMessage(leaderSessionID, super.decorateMessage(msg))
+
+      case msg => super.decorateMessage(msg)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala
new file mode 100644
index 0000000..e1e8961
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+/** Mixin to add message logging if the debug log level is activated
+  *
+  */
+trait LogMessages extends FlinkActor {
+  abstract override def receive: Receive = {
+    val _receive = super.receive
+
+    new Receive {
+      override def isDefinedAt(x: Any): Boolean = _receive.isDefinedAt(x)
+
+      override def apply(x: Any): Unit = {
+        if (!log.isDebugEnabled) {
+          _receive(x)
+        }
+        else {
+          log.debug(s"Received message $x at ${context.self.path} from ${context.sender()}.")
+
+          val start = System.nanoTime()
+
+          _receive(x)
+
+          val duration = (System.nanoTime() - start) / 1000000
+          log.debug(s"Handled message $x in $duration ms from ${context.sender()}.")
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
new file mode 100644
index 0000000..5b1700f
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+/** Base trait for message decorators
+  *
+  */
+trait MessageDecorator {
+
+  /** Decorates a message
+    *
+    * @param message Message to decorate
+    * @return Decorated message
+    */
+  def decorate(message: Any): Any
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index d38e503..b8cce41 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -62,8 +62,10 @@ object AkkaUtils {
    *                         parameter is None, then a local actor system will be created.
    * @return created actor system
    */
-  def createActorSystem(configuration: Configuration,
-                        listeningAddress: Option[(String, Int)]): ActorSystem = {
+  def createActorSystem(
+      configuration: Configuration,
+      listeningAddress: Option[(String, Int)])
+    : ActorSystem = {
     val akkaConfig = getAkkaConfig(configuration, listeningAddress)
     createActorSystem(akkaConfig)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4325e41..7bf4447 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.lang.reflect.{InvocationTargetException, Constructor}
 import java.net.InetSocketAddress
-import java.util.Collections
+import java.util.{UUID, Collections}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -51,10 +51,11 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.ZooKeeperUtil
 import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
 import org.apache.flink.runtime.webmonitor.WebMonitor
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages}
+import org.apache.flink.runtime.{LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
+import _root_.akka.pattern.ask
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
@@ -107,11 +109,17 @@ class JobManager(
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
     protected val mode: StreamingMode)
-  extends Actor with ActorLogMessages with ActorSynchronousLogging {
+  extends FlinkActor
+  with LeaderSessionMessages // order of the mixin is important, we want filtering after logging
+  with LogMessages // order of the mixin is important, we want first logging
+  {
+
+  override val log = Logger(getClass)
 
   /** List of current jobs running jobs */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
+  override val leaderSessionID = Some(UUID.randomUUID())
 
   /**
    * Run when the job manager is started. Simply logs an informational message.
@@ -125,10 +133,10 @@ class JobManager(
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
-      _.getInstanceGateway().tell(Disconnect("JobManager is shutting down"))
+      _.getActorGateway().tell(Disconnect("JobManager is shutting down"))
     }
 
-    archive ! PoisonPill
+    archive ! decorateMessage(PoisonPill)
 
     for((e,_) <- currentJobs.values) {
       e.fail(new Exception("The JobManager is shutting down."))
@@ -151,26 +159,48 @@ class JobManager(
    *
    * @return
    */
-  override def receiveWithLogMessages: Receive = {
+  override def handleMessage: Receive = {
 
-    case RegisterTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) =>
+    case RegisterTaskManager(
+      registrationSessionID,
+      taskManager,
+      connectionInfo,
+      hardwareInformation,
+      numberOfSlots) =>
 
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
 
         // IMPORTANT: Send the response to the "sender", which is not the
         //            TaskManager actor, but the ask future!
-        sender() ! AlreadyRegistered(self, instanceID, libraryCacheManager.getBlobServerPort)
+        sender() ! decorateMessage(
+          AlreadyRegistered(
+            registrationSessionID,
+            leaderSessionID.get,
+            self,
+            instanceID,
+            libraryCacheManager.getBlobServerPort)
+        )
       }
       else {
         try {
-          val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
-            hardwareInformation, numberOfSlots)
+          val instanceID = instanceManager.registerTaskManager(
+            taskManager,
+            connectionInfo,
+            hardwareInformation,
+            numberOfSlots,
+            leaderSessionID)
 
           // IMPORTANT: Send the response to the "sender", which is not the
           //            TaskManager actor, but the ask future!
-          sender() ! AcknowledgeRegistration(self, instanceID,
-                                             libraryCacheManager.getBlobServerPort)
+          sender() ! decorateMessage(
+            AcknowledgeRegistration(
+              registrationSessionID,
+              leaderSessionID.get,
+              self,
+              instanceID,
+              libraryCacheManager.getBlobServerPort)
+          )
 
           // to be notified when the taskManager is no longer reachable
           context.watch(taskManager)
@@ -183,15 +213,19 @@ class JobManager(
 
             // IMPORTANT: Send the response to the "sender", which is not the
             //            TaskManager actor, but the ask future!
-            sender() ! RefuseRegistration(ExceptionUtils.stringifyException(e))
+            sender() ! decorateMessage(
+              RefuseRegistration(
+                registrationSessionID,
+                ExceptionUtils.stringifyException(e))
+            )
         }
       }
 
     case RequestNumberRegisteredTaskManager =>
-      sender ! instanceManager.getNumberOfRegisteredTaskManagers
+      sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers)
 
     case RequestTotalNumberOfSlots =>
-      sender ! instanceManager.getTotalNumberOfSlots
+      sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
 
     case SubmitJob(jobGraph, listen) =>
       submitJob(jobGraph, listenToEvents = listen)
@@ -206,16 +240,19 @@ class JobManager(
             executionGraph.cancel()
           }(context.dispatcher)
 
-          sender ! CancellationSuccess(jobID)
+          sender ! decorateMessage(CancellationSuccess(jobID))
         case None =>
           log.info(s"No job found with ID $jobID.")
-          sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " +
-            s"ID $jobID."))
+          sender ! decorateMessage(
+            CancellationFailure(
+              jobID,
+              new IllegalArgumentException(s"No job found with ID $jobID."))
+          )
       }
 
     case UpdateTaskExecutionState(taskExecutionState) =>
       if (taskExecutionState == null) {
-        sender ! false
+        sender ! decorateMessage(false)
       } else {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
@@ -223,13 +260,13 @@ class JobManager(
 
             Future {
               val result = executionGraph.updateState(taskExecutionState)
-              originalSender ! result
+              originalSender ! decorateMessage(result)
             }(context.dispatcher)
 
           case None => log.error("Cannot find execution graph for ID " +
             s"${taskExecutionState.getJobID} to change state to " +
             s"${taskExecutionState.getExecutionState}.")
-            sender ! false
+            sender ! decorateMessage(false)
         }
       }
 
@@ -283,7 +320,7 @@ class JobManager(
           null
       }
 
-      sender ! NextInputSplit(serializedInputSplit)
+      sender ! decorateMessage(NextInputSplit(serializedInputSplit))
 
     case checkpointMessage : AbstractCheckpointMessage =>
       handleCheckpointMessage(checkpointMessage)
@@ -310,20 +347,31 @@ class JobManager(
                 }
                 val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
                                                               accumulatorResults)
-                jobInfo.client ! JobResultSuccess(result)
+                jobInfo.client ! decorateMessage(JobResultSuccess(result))
 
               case JobStatus.CANCELED =>
-                jobInfo.client ! Failure(new JobCancellationException(jobID,
-                  "Job was cancelled.", error))
+                jobInfo.client ! decorateMessage(
+                  Failure(
+                    new JobCancellationException(
+                      jobID,
+                    "Job was cancelled.", error)
+                  )
+                )
 
               case JobStatus.FAILED =>
-                jobInfo.client ! Failure(new JobExecutionException(jobID,
-                  "Job execution failed.", error))
+                jobInfo.client ! decorateMessage(
+                  Failure(
+                    new JobExecutionException(
+                      jobID,
+                      "Job execution failed.",
+                      error)
+                  )
+                )
 
               case x =>
                 val exception = new JobExecutionException(jobID, s"$x is not a " +
                   "terminal state.")
-                jobInfo.client ! Failure(exception)
+                jobInfo.client ! decorateMessage(Failure(exception))
                 throw exception
             }
 
@@ -337,13 +385,17 @@ class JobManager(
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          sender ! Acknowledge
+          sender ! decorateMessage(Acknowledge)
           executionGraph.scheduleOrUpdateConsumers(partitionId)
         case None =>
           log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " +
             s"consumers.")
-          sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " +
-            s"$jobId to schedule or update consumers."))
+          sender ! decorateMessage(
+            Failure(
+              new IllegalStateException("Cannot find execution graph for job ID " +
+                s"$jobId to schedule or update consumers.")
+            )
+          )
       }
 
     case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
@@ -360,15 +412,21 @@ class JobManager(
           null
       }
 
-      sender ! PartitionState(
-        taskExecutionId, taskResultId, partitionId.getPartitionId, state)
+      sender ! decorateMessage(
+        PartitionState(
+          taskExecutionId,
+          taskResultId,
+          partitionId.getPartitionId,
+          state)
+      )
 
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
-        case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState)
+        case Some((executionGraph,_)) =>
+          sender ! decorateMessage(CurrentJobStatus(jobID, executionGraph.getState))
         case None =>
           // check the archive
-          archive forward RequestJobStatus(jobID)
+          archive forward decorateMessage(RequestJobStatus(jobID))
       }
 
     case RequestRunningJobs =>
@@ -376,16 +434,21 @@ class JobManager(
         case (_, (eg, jobInfo)) => eg
       }
 
-      sender ! RunningJobs(executionGraphs)
+      sender ! decorateMessage(RunningJobs(executionGraphs))
 
     case RequestRunningJobsStatus =>
       try {
         val jobs = currentJobs map {
-          case (_, (eg, _)) => new JobStatusMessage(eg.getJobID, eg.getJobName,
-                                            eg.getState, eg.getStatusTimestamp(JobStatus.CREATED))
+          case (_, (eg, _)) =>
+            new JobStatusMessage(
+              eg.getJobID,
+              eg.getJobName,
+              eg.getState,
+              eg.getStatusTimestamp(JobStatus.CREATED)
+            )
         }
 
-        sender ! RunningJobsStatus(jobs)
+        sender ! decorateMessage(RunningJobsStatus(jobs))
       }
       catch {
         case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t)
@@ -393,18 +456,22 @@ class JobManager(
 
     case RequestJob(jobID) =>
       currentJobs.get(jobID) match {
-        case Some((eg, _)) => sender ! JobFound(jobID, eg)
+        case Some((eg, _)) => sender ! decorateMessage(JobFound(jobID, eg))
         case None =>
           // check the archive
-          archive forward RequestJob(jobID)
+          archive forward decorateMessage(RequestJob(jobID))
       }
 
     case RequestBlobManagerPort =>
-      sender ! libraryCacheManager.getBlobServerPort
+      sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
 
     case RequestRegisteredTaskManagers =>
       import scala.collection.JavaConverters._
-      sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
+      sender ! decorateMessage(
+        RegisteredTaskManagers(
+          instanceManager.getAllRegisteredInstances.asScala
+        )
+      )
 
     case Heartbeat(instanceID, metricsReport, accumulators) =>
       log.debug(s"Received hearbeat message from $instanceID.")
@@ -420,8 +487,8 @@ class JobManager(
     case message: InfoMessage => handleInfoRequestMessage(message, sender())
 
     case RequestStackTrace(instanceID) =>
-      val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
-      gateway.forward(SendStackTrace, sender())
+      val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway
+      gateway.forward(SendStackTrace, new AkkaActorGateway(sender(), leaderSessionID))
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
@@ -432,7 +499,7 @@ class JobManager(
       }
 
     case RequestJobManagerStatus =>
-      sender() ! JobManagerStatusAlive
+      sender() ! decorateMessage(JobManagerStatusAlive)
 
     case Disconnect(msg) =>
       val taskManager = sender()
@@ -443,6 +510,9 @@ class JobManager(
         instanceManager.unregisterTaskManager(taskManager)
         context.unwatch(taskManager)
       }
+
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID)
   }
 
   /**
@@ -456,7 +526,11 @@ class JobManager(
    */
   private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
     if (jobGraph == null) {
-      sender ! Failure(new JobSubmissionException(null, "JobGraph must not be null."))
+      sender ! decorateMessage(
+        Failure(
+          new JobSubmissionException(null, "JobGraph must not be null.")
+        )
+      )
     }
     else {
       val jobId = jobGraph.getJobID
@@ -499,7 +573,8 @@ class JobManager(
             timeout,
             jobGraph.getUserJarBlobKeys,
             userCodeLoader),
-            JobInfo(sender(), System.currentTimeMillis()))
+            JobInfo(sender(), System.currentTimeMillis())
+          )
         )._1
 
         // configure the execution graph
@@ -577,23 +652,29 @@ class JobManager(
           val confirmVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToConfirm.asScala.map(idToVertex).asJava
 
-          executionGraph.enableSnaphotCheckpointing(
-            snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout,
-            triggerVertices, ackVertices, confirmVertices,
-            context.system)
+          executionGraph.enableSnapshotCheckpointing(
+            snapshotSettings.getCheckpointInterval,
+            snapshotSettings.getCheckpointTimeout,
+            triggerVertices,
+            ackVertices,
+            confirmVertices,
+            context.system,
+            leaderSessionID)
         }
 
         // get notified about job status changes
-        executionGraph.registerJobStatusListener(self)
+        executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID))
 
         if (listenToEvents) {
           // the sender wants to be notified about state changes
-          executionGraph.registerExecutionListener(sender())
-          executionGraph.registerJobStatusListener(sender())
+          val gateway = new AkkaActorGateway(sender(), leaderSessionID)
+
+          executionGraph.registerExecutionListener(gateway)
+          executionGraph.registerJobStatusListener(gateway)
         }
 
         // done with submitting the job
-        sender ! Success(jobGraph.getJobID)
+        sender ! decorateMessage(Success(jobGraph.getJobID))
       }
       catch {
         case t: Throwable =>
@@ -612,7 +693,7 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t)
           }
 
-          sender ! Failure(rt)
+          sender ! decorateMessage(Failure(rt))
           return
       }
 
@@ -690,14 +771,14 @@ class JobManager(
             currentJobs.get(jobID) match {
               case Some((graph, jobInfo)) =>
                 val accumulatorValues = graph.getAccumulatorsSerialized()
-                sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
+          sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
               case None =>
                 archive.forward(message)
             }
           } catch {
           case e: Exception =>
             log.error("Cannot serialize accumulator result.", e)
-            sender() ! AccumulatorResultsErroneous(jobID, e)
+            sender() ! decorateMessage(AccumulatorResultsErroneous(jobID, e))
           }
 
         case RequestAccumulatorResultsStringified(jobId) =>
@@ -830,7 +911,7 @@ class JobManager(
         try {
           eg.prepareForArchiving()
 
-          archive ! ArchiveExecutionGraph(jobID, eg)
+          archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
         } catch {
           case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
             "archiving.", t)
@@ -967,11 +1048,13 @@ object JobManager {
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
    */
-  def runJobManager(configuration: Configuration,
-                    executionMode: JobManagerMode,
-                    streamingMode: StreamingMode,
-                    listeningAddress: String,
-                    listeningPort: Int) : Unit = {
+  def runJobManager(
+      configuration: Configuration,
+      executionMode: JobManagerMode,
+      streamingMode: StreamingMode,
+      listeningAddress: String,
+      listeningPort: Int)
+    : Unit = {
 
     LOG.info("Starting JobManager")
 
@@ -979,8 +1062,10 @@ object JobManager {
     LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
 
     val jobManagerSystem = try {
-      val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
-                                               Some((listeningAddress, listeningPort)))
+      val akkaConfig = AkkaUtils.getAkkaConfig(
+        configuration,
+        Some((listeningAddress, listeningPort))
+      )
       if (LOG.isDebugEnabled) {
         LOG.debug("Using akka configuration\n " + akkaConfig)
       }
@@ -1003,14 +1088,20 @@ object JobManager {
     try {
       // bring up the job manager actor
       LOG.info("Starting JobManager actor")
-      val (jobManager, archiver) = startJobManagerActors(configuration, 
-                                                         jobManagerSystem, streamingMode)
+      val (jobManager, archiver) = startJobManagerActors(
+        configuration,
+        jobManagerSystem,
+        streamingMode)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
       LOG.debug("Starting JobManager process reaper")
       jobManagerSystem.actorOf(
-        Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
+        Props(
+          classOf[ProcessReaper],
+          jobManager,
+          LOG.logger,
+          RUNTIME_FAILURE_RETURN_CODE),
         "JobManager_Process_Reaper")
 
       // bring up a local task manager, if needed
@@ -1018,16 +1109,22 @@ object JobManager {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
         val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
-                        configuration, jobManagerSystem,
-                        listeningAddress,
-                        Some(TaskManager.TASK_MANAGER_NAME),
-                        Some(jobManager.path.toString),
-                        true, streamingMode,
-                        classOf[TaskManager])
+          configuration,
+          jobManagerSystem,
+          listeningAddress,
+          Some(TaskManager.TASK_MANAGER_NAME),
+          Some(jobManager.path.toString),
+          true,
+          streamingMode,
+          classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
         jobManagerSystem.actorOf(
-          Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
+          Props(
+            classOf[ProcessReaper],
+            taskManagerActor,
+            LOG.logger,
+            RUNTIME_FAILURE_RETURN_CODE),
           "TaskManager_Process_Reaper")
       }
 
@@ -1257,13 +1354,18 @@ object JobManager {
    * @param actorSystem Teh actor system running the JobManager
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
-  def startJobManagerActors(configuration: Configuration,
-                            actorSystem: ActorSystem,
-                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
+  def startJobManagerActors(
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      streamingMode: StreamingMode)
+    : (ActorRef, ActorRef) = {
 
-    startJobManagerActors(configuration, actorSystem,
-                          Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME),
-                          streamingMode)
+    startJobManagerActors(
+      configuration,
+      actorSystem,
+      Some(JOB_MANAGER_NAME),
+      Some(ARCHIVE_NAME),
+      streamingMode)
   }
   /**
    * Starts the JobManager and job archiver based on the given configuration, in the
@@ -1279,11 +1381,13 @@ object JobManager {
    * 
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
-  def startJobManagerActors(configuration: Configuration,
-                            actorSystem: ActorSystem,
-                            jobMangerActorName: Option[String],
-                            archiverActorName: Option[String],
-                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
+  def startJobManagerActors(
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      jobMangerActorName: Option[String],
+      archiverActorName: Option[String],
+      streamingMode: StreamingMode)
+    : (ActorRef, ActorRef) = {
 
     val (executionContext,
       instanceManager,
@@ -1352,9 +1456,11 @@ object JobManager {
     "akka://flink/user/" + JOB_MANAGER_NAME
   }
 
-  def getJobManagerRemoteReferenceFuture(address: InetSocketAddress,
-                                   system: ActorSystem,
-                                   timeout: FiniteDuration): Future[ActorRef] = {
+  def getJobManagerRemoteReferenceFuture(
+      address: InetSocketAddress,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : Future[ActorRef] = {
 
     AkkaUtils.getReference(getRemoteJobManagerAkkaURL(address), system, timeout)
   }
@@ -1369,9 +1475,12 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(jobManagerUrl: String,
-                                   system: ActorSystem,
-                                   timeout: FiniteDuration): ActorRef = {
+  def getJobManagerRemoteReference(
+      jobManagerUrl: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : ActorRef = {
+
     try {
       val future = AkkaUtils.getReference(jobManagerUrl, system, timeout)
       Await.result(future, timeout)
@@ -1397,9 +1506,11 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(address: InetSocketAddress,
-                                   system: ActorSystem,
-                                   timeout: FiniteDuration): ActorRef = {
+  def getJobManagerRemoteReference(
+      address: InetSocketAddress,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : ActorRef = {
 
     val jmAddress = getRemoteJobManagerAkkaURL(address)
     getJobManagerRemoteReference(jmAddress, system, timeout)
@@ -1415,14 +1526,38 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(address: InetSocketAddress,
-                                   system: ActorSystem,
-                                   config: Configuration): ActorRef = {
+  def getJobManagerRemoteReference(
+      address: InetSocketAddress,
+      system: ActorSystem,
+      config: Configuration)
+    : ActorRef = {
 
     val timeout = AkkaUtils.getLookupTimeout(config)
     getJobManagerRemoteReference(address, system, timeout)
   }
 
+  /** Returns the [[ActorGateway]] for the provided JobManager. The function automatically
+    * retrieves the current leader session ID from the JobManager and instantiates the
+    * [[AkkaActorGateway]] with it.
+    *
+    * @param jobManager ActorRef to the [[JobManager]]
+    * @param timeout Timeout for the blocking leader session ID retrieval
+    * @throws java.lang.Exception
+    * @return Gateway to the specified JobManager
+    */
+  @throws(classOf[Exception])
+  def getJobManagerGateway(
+    jobManager: ActorRef,
+    timeout: FiniteDuration
+    ): ActorGateway = {
+    val futureLeaderSessionID = (jobManager ? RequestLeaderSessionID)(timeout)
+      .mapTo[ResponseLeaderSessionID]
+
+    val leaderSessionID = Await.result(futureLeaderSessionID, timeout).leaderSessionID
+
+    new AkkaActorGateway(jobManager, leaderSessionID)
+  }
+
   // --------------------------------------------------------------------------
   //  Utilities
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 7572e72..9f228ed 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -20,13 +20,12 @@ package org.apache.flink.runtime.jobmanager
 
 import java.util
 
-import akka.actor.Actor
-
+import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -56,9 +55,11 @@ import scala.collection.mutable
  * @param max_entries Maximum number of stored Flink jobs
  */
 class MemoryArchivist(private val max_entries: Int)
-  extends Actor
-  with ActorLogMessages
-  with ActorSynchronousLogging {
+  extends FlinkActor
+  with LogMessages {
+
+  override val log = Logger(getClass)
+
   /*
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
@@ -70,41 +71,45 @@ class MemoryArchivist(private val max_entries: Int)
   var canceledCnt: Int = 0
   var failedCnt: Int = 0
 
-  override def receiveWithLogMessages: Receive = {
+  override def handleMessage: Receive = {
     
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
       graphs.update(jobID, graph)
+
       // update job counters
       graph.getState match {
         case JobStatus.FINISHED => finishedCnt += 1
         case JobStatus.CANCELED => canceledCnt += 1
         case JobStatus.FAILED => failedCnt += 1
+          // ignore transitional states, e.g. Cancelling, Running, Failing, etc.
+        case _ =>
       }
+
       trimHistory()
 
     case RequestArchivedJob(jobID: JobID) =>
       val graph = graphs.get(jobID)
-      sender ! ArchivedJob(graph)
+      sender ! decorateMessage(ArchivedJob(graph))
 
     case RequestArchivedJobs =>
-      sender ! ArchivedJobs(graphs.values)
+      sender ! decorateMessage(ArchivedJobs(graphs.values))
 
     case RequestJob(jobID) =>
       graphs.get(jobID) match {
-        case Some(graph) => sender ! JobFound(jobID, graph)
-        case None => sender ! JobNotFound(jobID)
+        case Some(graph) => sender ! decorateMessage(JobFound(jobID, graph))
+        case None => sender ! decorateMessage(JobNotFound(jobID))
       }
 
     case RequestJobStatus(jobID) =>
       graphs.get(jobID) match {
-        case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
-        case None => sender ! JobNotFound(jobID)
+        case Some(graph) => sender ! decorateMessage(CurrentJobStatus(jobID, graph.getState))
+        case None => sender ! decorateMessage(JobNotFound(jobID))
       }
 
     case RequestJobCounts =>
-      sender ! (finishedCnt, canceledCnt, failedCnt)
+      sender ! decorateMessage((finishedCnt, canceledCnt, failedCnt))
 
     case _ : RequestJobsOverview =>
       try {

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 6bc59a2..0cb3b0d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -45,11 +45,18 @@ object ExecutionGraphMessages {
    * @param timestamp of the execution state change
    * @param optionalMessage
    */
-  case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID,
-                                   taskName: String, totalNumberOfSubTasks: Int, subtaskIndex: Int,
-                                   executionID: ExecutionAttemptID,
-                                   newExecutionState: ExecutionState, timestamp: Long,
-                                   optionalMessage: String){
+  case class ExecutionStateChanged(
+      jobID: JobID,
+      vertexID: JobVertexID,
+      taskName: String,
+      totalNumberOfSubTasks: Int,
+      subtaskIndex: Int,
+      executionID: ExecutionAttemptID,
+      newExecutionState: ExecutionState,
+      timestamp: Long,
+      optionalMessage: String)
+    extends RequiresLeaderSessionID {
+
     override def toString: String = {
       val oMsg = if (optionalMessage != null) {
         s"\n$optionalMessage"
@@ -69,8 +76,12 @@ object ExecutionGraphMessages {
    * @param timestamp
    * @param error
    */
-  case class JobStatusChanged(jobID: JobID, newJobStatus: JobStatus, timestamp: Long,
-                              error: Throwable){
+  case class JobStatusChanged(
+      jobID: JobID,
+      newJobStatus: JobStatus,
+      timestamp: Long,
+      error: Throwable)
+    extends RequiresLeaderSessionID {
     override def toString: String = {
       s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus."
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index c9a2878..e38986b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusMessage}
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
@@ -32,6 +34,8 @@ import scala.collection.JavaConverters._
  */
 object JobManagerMessages {
 
+  case class LeaderSessionMessage(leaderSessionID: Option[UUID], message: Any)
+
   /**
    * Submits a job to the job manager. If [[registerForEvents]] is true,
    * then the sender will be registered as listener for the state change messages.
@@ -41,6 +45,7 @@ object JobManagerMessages {
    * @param registerForEvents if true, then register for state change events
    */
   case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean)
+    extends RequiresLeaderSessionID
 
   /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
@@ -48,19 +53,22 @@ object JobManagerMessages {
    *
    * @param jobID
    */
-  case class CancelJob(jobID: JobID)
+  case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
    * Requesting next input split for the
    * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
    * of the job specified by [[jobID]]. The next input split is sent back to the sender as a
-   * [[org.apache.flink.runtime.messages.TaskManagerMessages.NextInputSplit]] message.
+   * [[NextInputSplit]] message.
    *
    * @param jobID
    * @param vertexID
    */
-  case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID, executionAttempt:
-  ExecutionAttemptID)
+  case class RequestNextInputSplit(
+      jobID: JobID,
+      vertexID: JobVertexID,
+      executionAttempt: ExecutionAttemptID)
+    extends RequiresLeaderSessionID
 
   /**
    * Contains the next input split for a task. This message is a response to
@@ -80,10 +88,12 @@ object JobManagerMessages {
    * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
    * @param taskResultId The input gate ID of the task requesting the partition state.
    */
-  case class RequestPartitionState(jobId: JobID,
-                                   partitionId: ResultPartitionID,
-                                   taskExecutionId: ExecutionAttemptID,
-                                   taskResultId: IntermediateDataSetID)
+  case class RequestPartitionState(
+      jobId: JobID,
+      partitionId: ResultPartitionID,
+      taskExecutionId: ExecutionAttemptID,
+      taskResultId: IntermediateDataSetID)
+    extends RequiresLeaderSessionID
 
   /**
    * Notifies the [[org.apache.flink.runtime.jobmanager.JobManager]] about available data for a
@@ -101,8 +111,7 @@ object JobManagerMessages {
    * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
    */
   case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId: ResultPartitionID)
-
-  case class ConsumerNotificationResult(success: Boolean, error: Option[Throwable] = None)
+    extends RequiresLeaderSessionID
 
   /**
    * Requests the current [[JobStatus]] of the job identified by [[jobID]]. This message triggers
@@ -142,6 +151,17 @@ object JobManagerMessages {
    */
   case object RequestBlobManagerPort
 
+  /** Requests the current leader session ID of the job manager. The result is sent back to the
+    * sender as an [[ResponseLeaderSessionID]]
+    */
+  case object RequestLeaderSessionID
+
+  /** Response to the [[RequestLeaderSessionID]] message.
+    *
+    * @param leaderSessionID
+    */
+  case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
+
   /**
    * Denotes a successful job execution.
    */
@@ -300,4 +320,8 @@ object JobManagerMessages {
   def getJobManagerStatusAlive : AnyRef = {
     JobManagerStatusAlive
   }
+
+  def getRequestLeaderSessionID: AnyRef = {
+    RequestLeaderSessionID
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
index e292a47..75036b3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -39,7 +39,7 @@ object Messages {
    *
    * @param reason The reason for disconnecting, to be displayed in log and error messages.
    */
-  case class Disconnect(reason: String)
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
 
   /**
    * Accessor for the case object instance, to simplify Java interoperability.

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 3051d00..b435ebc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
 import akka.actor.ActorRef
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
 
@@ -32,7 +34,9 @@ object RegistrationMessages {
   /**
    * Marker trait for registration messages.
    */
-  trait RegistrationMessage
+  trait RegistrationMessage {
+    def registrationSessionID: UUID
+  }
 
   /**
    * Triggers the TaskManager to attempt a registration at the JobManager.
@@ -42,10 +46,12 @@ object RegistrationMessages {
    * @param deadline Optional deadline until when the registration must be completed.
    * @param attempt The attempt number, for logging.
    */
-  case class TriggerTaskManagerRegistration(jobManagerAkkaURL: String,
-                                            timeout: FiniteDuration,
-                                            deadline: Option[Deadline],
-                                            attempt: Int)
+  case class TriggerTaskManagerRegistration(
+      registrationSessionID: UUID,
+      jobManagerAkkaURL: String,
+      timeout: FiniteDuration,
+      deadline: Option[Deadline],
+      attempt: Int)
     extends RegistrationMessage
 
   /**
@@ -57,10 +63,12 @@ object RegistrationMessages {
    * @param resources The TaskManagers resources.
    * @param numberOfSlots The number of processing slots offered by the TaskManager.
    */
-  case class RegisterTaskManager(taskManager: ActorRef,
-                                 connectionInfo: InstanceConnectionInfo,
-                                 resources: HardwareDescription,
-                                 numberOfSlots: Int)
+  case class RegisterTaskManager(
+      registrationSessionID: UUID,
+      taskManager: ActorRef,
+      connectionInfo: InstanceConnectionInfo,
+      resources: HardwareDescription,
+      numberOfSlots: Int)
     extends RegistrationMessage
 
   /**
@@ -71,7 +79,12 @@ object RegistrationMessages {
    *                   JobManager.
    * @param blobPort The server port where the JobManager's BLOB service runs.
    */
-  case class AcknowledgeRegistration(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+  case class AcknowledgeRegistration(
+      registrationSessionID: UUID,
+      leaderSessionID: UUID,
+      jobManager: ActorRef,
+      instanceID: InstanceID,
+      blobPort: Int)
     extends RegistrationMessage
 
   /**
@@ -80,7 +93,12 @@ object RegistrationMessages {
    * @param instanceID The instance ID under which the TaskManager is registered.
    * @param blobPort The server port where the JobManager's BLOB service runs.
    */
-  case class AlreadyRegistered(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+  case class AlreadyRegistered(
+      registrationSessionID: UUID,
+      leaderSessionID: UUID,
+      jobManager: ActorRef,
+      instanceID: InstanceID,
+      blobPort: Int)
     extends RegistrationMessage
 
   /**
@@ -89,6 +107,6 @@ object RegistrationMessages {
    *
    * @param reason Reason why the task manager registration was refused
    */
-  case class RefuseRegistration(reason: String)
+  case class RefuseRegistration(registrationSessionID: UUID, reason: String)
     extends RegistrationMessage
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
index 9373576..a80ca99 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -47,7 +47,7 @@ object TaskMessages {
    * @param tasks Descriptor which contains the information to start the task.
    */
   case class SubmitTask(tasks: TaskDeploymentDescriptor)
-    extends TaskMessage
+    extends TaskMessage with RequiresLeaderSessionID
 
   /**
    * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
@@ -56,7 +56,7 @@ object TaskMessages {
    * @param attemptID The task's execution attempt ID.
    */
   case class CancelTask(attemptID: ExecutionAttemptID)
-    extends TaskMessage
+    extends TaskMessage with RequiresLeaderSessionID
 
   /**
    * Triggers a fail of specified task from the outside (as opposed to the task throwing
@@ -86,15 +86,16 @@ object TaskMessages {
    * Answer to a [[RequestPartitionState]] with the state of the respective partition.
    */
   case class PartitionState(
-    taskExecutionId: ExecutionAttemptID,
-    taskResultId: IntermediateDataSetID,
-    partitionId: IntermediateResultPartitionID,
-    state: ExecutionState) extends TaskMessage
+      taskExecutionId: ExecutionAttemptID,
+      taskResultId: IntermediateDataSetID,
+      partitionId: IntermediateResultPartitionID,
+      state: ExecutionState)
+    extends TaskMessage with RequiresLeaderSessionID
 
   /**
    * Base class for messages that update the information about location of input partitions
    */
-  abstract sealed class UpdatePartitionInfo extends TaskMessage {
+  abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {
     def executionID: ExecutionAttemptID
   }
 
@@ -104,9 +105,10 @@ object TaskMessages {
    * @param resultId The input reader to update.
    * @param partitionInfo The partition info update.
    */
-  case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
-                                           resultId: IntermediateDataSetID,
-                                           partitionInfo: InputChannelDeploymentDescriptor)
+  case class UpdateTaskSinglePartitionInfo(
+      executionID: ExecutionAttemptID,
+      resultId: IntermediateDataSetID,
+      partitionInfo: InputChannelDeploymentDescriptor)
     extends UpdatePartitionInfo
 
   /**
@@ -115,8 +117,8 @@ object TaskMessages {
    * @param partitionInfos List of input gates with channel descriptors to update.
    */
   case class UpdateTaskMultiplePartitionInfos(
-                    executionID: ExecutionAttemptID,
-                    partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+      executionID: ExecutionAttemptID,
+      partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
     extends UpdatePartitionInfo
 
   /**
@@ -126,7 +128,7 @@ object TaskMessages {
    * @param executionID The task's execution attempt ID.
    */
   case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
-    extends TaskMessage
+    extends TaskMessage with RequiresLeaderSessionID
 
 
   // --------------------------------------------------------------------------
@@ -140,7 +142,7 @@ object TaskMessages {
    * @param taskExecutionState The changed task state
    */
   case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
-    extends TaskMessage
+    extends TaskMessage with RequiresLeaderSessionID
 
   /**
    * Response message to updates in the task state. Send for example as a response to
@@ -152,11 +154,11 @@ object TaskMessages {
    * @param success indicating whether the operation has been successful
    * @param description Optional description for unsuccessful results.
    */
-  case class TaskOperationResult(executionID: ExecutionAttemptID,
-                                 success: Boolean,
-                                 description: String)
-    extends TaskMessage
-  {
+  case class TaskOperationResult(
+      executionID: ExecutionAttemptID,
+      success: Boolean,
+      description: String)
+    extends TaskMessage {
     def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
   }
 
@@ -166,10 +168,10 @@ object TaskMessages {
   // --------------------------------------------------------------------------
 
   def createUpdateTaskMultiplePartitionInfos(
-                               executionID: ExecutionAttemptID,
-                               resultIDs: java.util.List[IntermediateDataSetID],
-                               partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
-  UpdateTaskMultiplePartitionInfos = {
+      executionID: ExecutionAttemptID,
+      resultIDs: java.util.List[IntermediateDataSetID],
+      partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
+    : UpdateTaskMultiplePartitionInfos = {
 
     require(resultIDs.size() == partitionInfos.size(),
       "ResultIDs must have the same length as partitionInfos.")

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 49c701e..6f810fc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,8 +28,11 @@ import org.apache.flink.api.common.JobSubmissionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
+import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
+SerializedJobExecutionResult}
+import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobGraph
+import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.slf4j.LoggerFactory
 
@@ -132,8 +135,9 @@ abstract class FlinkMiniCluster(
     AkkaUtils.createActorSystem(config)
   }
 
-  def getJobManager: ActorRef = {
-    jobManagerActor
+  def getJobManagerGateway(): ActorGateway = {
+    // create ActorGateway from the JobManager's ActorRef
+    JobManager.getJobManagerGateway(jobManagerActor, timeout)
   }
 
   def getTaskManagers = {
@@ -206,12 +210,17 @@ abstract class FlinkMiniCluster(
     val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
     else JobClient.startJobClientActorSystem(configuration)
 
-    JobClient.submitJobAndWait(clientActorSystem, jobManagerActor, jobGraph, timeout, printUpdates)
+    JobClient.submitJobAndWait(
+      clientActorSystem,
+      getJobManagerGateway(),
+      jobGraph,
+      timeout,
+      printUpdates)
   }
 
   @throws(classOf[JobExecutionException])
   def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
-    JobClient.submitJobDetached(jobManagerActor, jobGraph, timeout)
+    JobClient.submitJobDetached(getJobManagerGateway(), jobGraph, timeout)
     new JobSubmissionResult(jobGraph.getJobID)
   }
 }


Mime
View raw message