flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [6/6] flink git commit: [FLINK-2332] [runtime] Adds leader session IDs and registration session IDs to JobManager and TaskManager messages.
Date Thu, 23 Jul 2015 15:41:14 GMT
[FLINK-2332] [runtime] Adds leader session IDs and registration session IDs to JobManager and TaskManager messages.

Refactors Flink's actor traits to support stackable traits

Refactored Flink's actors to use a factory method to generate messages

Replaced ActorRef with InstanceGateway in Task, RuntimeEnvironment and ExecutionGraph

Add comments

Add test cases for registration session ID and leader session ID

Adds IT case to check that a CancelMessage with the wrong leader session ID is discarded

Corrected order of visibility and abstract modifiers. Removed lazy log member from FlinkActor. Made RequiresLeaderSessionID a Java interface.

This closes #917.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22224542
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22224542
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22224542

Branch: refs/heads/master
Commit: 222245428518d0e1b843947a6184b4a803a78ad5
Parents: fa78be6
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Jul 8 15:37:50 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Jul 23 17:39:57 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  28 +-
 .../org/apache/flink/client/program/Client.java |  32 +-
 .../flink/client/CliFrontendListCancelTest.java |  84 ++-
 .../apache/flink/client/program/ClientTest.java |  43 +-
 .../flink/runtime/akka/FlinkUntypedActor.java   | 140 +++++
 .../checkpoint/CheckpointCoordinator.java       |  40 +-
 .../CheckpointCoordinatorDeActivator.java       |  25 +-
 .../apache/flink/runtime/client/JobClient.java  |  69 ++-
 .../flink/runtime/client/JobClientActor.java    |  62 +-
 .../flink/runtime/executiongraph/Execution.java |  10 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  55 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../flink/runtime/instance/ActorGateway.java    | 107 ++++
 .../runtime/instance/AkkaActorGateway.java      | 157 +++++
 .../runtime/instance/AkkaInstanceGateway.java   | 111 ----
 .../apache/flink/runtime/instance/Instance.java |  14 +-
 .../flink/runtime/instance/InstanceGateway.java |  82 ---
 .../flink/runtime/instance/InstanceManager.java |  24 +-
 .../runtime/io/network/NetworkEnvironment.java  |  47 +-
 .../jobmanager/web/SetupInfoServlet.java        |   2 +-
 .../messages/RequiresLeaderSessionID.java       |  25 +
 .../runtime/taskmanager/RuntimeEnvironment.java |  40 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  39 +-
 .../taskmanager/TaskInputSplitProvider.java     |  24 +-
 .../apache/flink/runtime/ActorLogMessages.scala |  52 --
 .../flink/runtime/ActorSynchronousLogging.scala |  31 -
 .../org/apache/flink/runtime/FlinkActor.scala   |  48 ++
 .../runtime/LeaderSessionMessageDecorator.scala |  46 ++
 .../flink/runtime/LeaderSessionMessages.scala   |  73 +++
 .../org/apache/flink/runtime/LogMessages.scala  |  48 ++
 .../apache/flink/runtime/MessageDecorator.scala |  32 ++
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 337 +++++++----
 .../runtime/jobmanager/MemoryArchivist.scala    |  33 +-
 .../messages/ExecutionGraphMessages.scala       |  25 +-
 .../runtime/messages/JobManagerMessages.scala   |  44 +-
 .../flink/runtime/messages/Messages.scala       |   2 +-
 .../runtime/messages/RegistrationMessages.scala |  42 +-
 .../flink/runtime/messages/TaskMessages.scala   |  48 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  19 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 566 ++++++++++++-------
 .../runtime/akka/FlinkUntypedActorTest.java     | 146 +++++
 .../checkpoint/CoordinatorShutdownTest.java     |  22 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  17 +-
 .../ExecutionStateProgressTest.java             |   2 +-
 .../ExecutionVertexCancelTest.java              |  40 +-
 .../ExecutionVertexDeploymentTest.java          |  14 +-
 .../ExecutionVertexSchedulingTest.java          |   8 +-
 .../executiongraph/LocalInputSplitsTest.java    |   2 +-
 .../TerminalStateDeadlockTest.java              |   4 +-
 .../VertexLocationConstraintTest.java           |   6 +-
 .../instance/BaseTestingActorGateway.java       | 120 ++++
 .../instance/BaseTestingInstanceGateway.java    |  94 ---
 .../runtime/instance/DummyActorGateway.java     |  68 +++
 .../runtime/instance/DummyInstanceGateway.java  |  57 --
 .../runtime/instance/InstanceManagerTest.java   |  23 +-
 .../flink/runtime/instance/InstanceTest.java    |   6 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   2 +-
 .../io/network/NetworkEnvironmentTest.java      |  16 +-
 .../PartialConsumePipelinedResultTest.java      |   2 +-
 .../runtime/jobmanager/JobManagerTest.java      |  46 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  33 +-
 .../scheduler/SchedulerTestUtils.java           |   4 +-
 .../taskmanager/ForwardingActorGateway.java     |  47 ++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  35 +-
 .../runtime/taskmanager/TaskCancelTest.java     |  11 +-
 .../taskmanager/TaskInputSplitProviderTest.java |  44 +-
 .../TaskManagerRegistrationTest.java            |  95 +++-
 .../runtime/taskmanager/TaskManagerTest.java    | 170 ++++--
 .../flink/runtime/taskmanager/TaskTest.java     |  93 ++-
 .../flink/runtime/akka/FlinkActorTest.scala     | 107 ++++
 .../ExecutionGraphRestartTest.scala             |   6 +-
 .../TaskManagerLossFailsTasksTest.scala         |   6 +-
 .../jobmanager/CoLocationConstraintITCase.scala |  16 +-
 .../runtime/jobmanager/JobManagerITCase.scala   | 108 ++--
 .../jobmanager/JobManagerRegistrationTest.scala |  43 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |  31 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |  20 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  23 +-
 .../testingUtils/ScalaTestingUtils.scala        |  37 ++
 .../testingUtils/TestingJobManager.scala        |  93 +--
 .../TestingJobManagerMessages.scala             |   4 +-
 .../testingUtils/TestingMemoryArchivist.scala   |  14 +-
 .../testingUtils/TestingTaskManager.scala       | 105 ++--
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../accumulators/AccumulatorLiveITCase.java     |  13 +-
 .../test/cancelling/CancellingTestBase.java     |   3 +-
 .../LocalFlinkMiniClusterITCase.java            |  19 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  33 +-
 .../JobManagerLeaderSessionIDITSuite.scala      | 112 ++++
 .../taskmanager/TaskManagerFailsITCase.scala    |  41 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  46 +-
 .../flink/yarn/ApplicationMasterActor.scala     | 138 +++--
 .../org/apache/flink/yarn/YarnTaskManager.scala |  39 +-
 96 files changed, 3367 insertions(+), 1641 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e28f2fd..71b78bd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -38,8 +38,6 @@ import java.util.Properties;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -65,6 +63,7 @@ import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -447,11 +446,12 @@ public class CliFrontend {
 		}
 
 		try {
-			ActorRef jobManager = getJobManager(options);
+			ActorGateway jobManagerGateway = getJobManagerGateway(options);
 
 			LOG.info("Connecting to JobManager to retrieve list of jobs");
-			Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
+			Future<Object> response = jobManagerGateway.ask(
+					JobManagerMessages.getRequestRunningJobsStatus(),
+					askTimeout);
 
 			Object result;
 			try {
@@ -580,8 +580,8 @@ public class CliFrontend {
 		}
 
 		try {
-			ActorRef jobManager = getJobManager(options);
-			Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+			ActorGateway jobManager = getJobManagerGateway(options);
+			Future<Object> response = jobManager.ask(new CancelJob(jobId), askTimeout);
 
 			try {
 				Await.result(response, askTimeout);
@@ -722,7 +722,15 @@ public class CliFrontend {
 		return jobManagerAddress;
 	}
 
-	protected ActorRef getJobManager(CommandLineOptions options) throws Exception {
+	/**
+	 * Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved
+	 * from the provided {@link CommandLineOptions}.
+	 *
+	 * @param options CommandLineOptions specifying the JobManager URL
+	 * @return Gateway to the JobManager
+	 * @throws Exception
+	 */
+	protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
 		//TODO: Get ActorRef from YarnCluster if we are in YARN mode.
 
 		InetSocketAddress address = getJobManagerAddress(options);
@@ -745,7 +753,9 @@ public class CliFrontend {
 		LOG.info("Trying to lookup JobManager");
 		ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
 		LOG.info("JobManager is at " + jmActor.path());
-		return jmActor;
+
+		// Retrieve the ActorGateway from the JobManager's ActorRef
+		return JobManager.getJobManagerGateway(jmActor, lookupTimeout);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 2908f92..50590df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -52,6 +52,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -375,22 +376,35 @@ public class Client {
 			throw new ProgramInvocationException("Could start client actor system.", e);
 		}
 
+		FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+
 		LOG.info("Looking up JobManager");
-		ActorRef jobManager;
+		ActorGateway jobManagerGateway;
+		ActorRef jobManagerActorRef;
 		try {
-			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
-		}
-		catch (IOException e) {
+			jobManagerActorRef = JobManager.getJobManagerRemoteReference(
+					jobManagerAddress,
+					actorSystem,
+					configuration);
+
+
+		} catch (IOException e) {
 			throw new ProgramInvocationException("Failed to resolve JobManager", e);
 		}
-		LOG.info("JobManager runs at " + jobManager.path());
 
-		FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+		try{
+			jobManagerGateway = JobManager.getJobManagerGateway(jobManagerActorRef, timeout);
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
+		}
+
+		LOG.info("JobManager runs at " + jobManagerGateway.path());
+
 		LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
 
 		LOG.info("Checking and uploading JAR files");
 		try {
-			JobClient.uploadJarFiles(jobGraph, jobManager, timeout);
+			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
 		}
 		catch (IOException e) {
 			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
@@ -399,7 +413,7 @@ public class Client {
 		try{
 			if (wait) {
 				SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem, 
-						jobManager, jobGraph, timeout, printStatusDuringExecution);
+						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution);
 				try {
 					return result.toJobExecutionResult(this.userCodeClassLoader);
 				}
@@ -409,7 +423,7 @@ public class Client {
 				}
 			}
 			else {
-				JobClient.submitJobDetached(jobManager, jobGraph, timeout);
+				JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout);
 				// return a dummy execution result with the JobId
 				return new JobSubmissionResult(jobGraph.getJobID());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 3224e0f..fc64503 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -23,10 +23,16 @@ import akka.testkit.JavaTestKit;
 
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import scala.Option;
+
+import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
 import static org.junit.Assert.*;
@@ -77,10 +83,19 @@ public class CliFrontendListCancelTest {
 				JobID jid = new JobID();
 				String jidString = jid.toString();
 
-				final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid));
+				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+
+				final ActorRef jm = actorSystem.actorOf(Props.create(
+								CliJobManager.class,
+								jid,
+								leaderSessionID
+						)
+				);
+
+				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
 				
 				String[] parameters = { jidString };
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(jm);
+				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
 
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode == 0);
@@ -91,10 +106,20 @@ public class CliFrontendListCancelTest {
 				JobID jid1 = new JobID();
 				JobID jid2 = new JobID();
 
-				final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1));
+				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+
+				final ActorRef jm = actorSystem.actorOf(
+						Props.create(
+								CliJobManager.class,
+								jid1,
+								leaderSessionID
+						)
+				);
+
+				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
 
 				String[] parameters = { jid2.toString() };
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(jm);
+				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
 
 				assertTrue(testFrontend.cancel(parameters) != 0);
 			}
@@ -118,9 +143,17 @@ public class CliFrontendListCancelTest {
 			
 			// test list properly
 			{
-				final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, (Object)null));
+				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+				final ActorRef jm = actorSystem.actorOf(
+						Props.create(
+								CliJobManager.class,
+								(Object)null,
+								leaderSessionID
+						)
+				);
+				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
 				String[] parameters = {"-r", "-s"};
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(jm);
+				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
 				int retCode = testFrontend.list(parameters);
 				assertTrue(retCode == 0);
 			}
@@ -135,46 +168,57 @@ public class CliFrontendListCancelTest {
 
 	protected static final class InfoListTestCliFrontend extends CliFrontend {
 
-		private ActorRef jobmanager;
+		private ActorGateway jobManagerGateway;
 
-
-
-		public InfoListTestCliFrontend(ActorRef jobmanager) throws Exception {
+		public InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
-			this.jobmanager = jobmanager;
+			this.jobManagerGateway = jobManagerGateway;
 		}
 
 		@Override
-		public ActorRef getJobManager(CommandLineOptions options) {
-			return jobmanager;
+		public ActorGateway getJobManagerGateway(CommandLineOptions options) {
+			return jobManagerGateway;
 		}
 	}
 
-	protected static final class CliJobManager extends UntypedActor{
+	protected static final class CliJobManager extends FlinkUntypedActor {
 		private final JobID jobID;
+		private final Option<UUID> leaderSessionID;
 
-		public CliJobManager(final JobID jobID){
+		public CliJobManager(final JobID jobID, final Option<UUID> leaderSessionID){
 			this.jobID = jobID;
+			this.leaderSessionID = leaderSessionID;
 		}
 
 		@Override
-		public void onReceive(Object message) throws Exception {
+		public void handleMessage(Object message) {
 			if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
-				getSender().tell(1, getSelf());
+				getSender().tell(decorateMessage(1), getSelf());
 			}
 			else if (message instanceof JobManagerMessages.CancelJob) {
 				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
 
 				if (jobID != null && jobID.equals(cancelJob.jobID())) {
-					getSender().tell(new Status.Success(new Object()), getSelf());
+					getSender().tell(
+							decorateMessage(new Status.Success(new Object())),
+							getSelf());
 				}
 				else {
-					getSender().tell(new Status.Failure(new Exception("Wrong or no JobID")), getSelf());
+					getSender().tell(
+							decorateMessage(new Status.Failure(new Exception("Wrong or no JobID"))),
+							getSelf());
 				}
 			}
 			else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
-				getSender().tell(new JobManagerMessages.RunningJobsStatus(), getSelf());
+				getSender().tell(
+						decorateMessage(new JobManagerMessages.RunningJobsStatus()),
+						getSelf());
 			}
 		}
+
+		@Override
+		protected Option<UUID> getLeaderSessionID() {
+			return leaderSessionID;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 9a37dde..594525e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -34,6 +33,7 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -48,9 +48,12 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 
+import java.util.UUID;
+
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 
@@ -220,25 +223,49 @@ public class ClientTest {
 
 	// --------------------------------------------------------------------------------------------
 
-	public static class SuccessReturningActor extends UntypedActor {
+	public static class SuccessReturningActor extends FlinkUntypedActor {
+
+		private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
 
 		@Override
-		public void onReceive(Object message) throws Exception {
+		public void handleMessage(Object message) {
 			if (message instanceof JobManagerMessages.SubmitJob) {
 				JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
-				getSender().tell(new Status.Success(jid), getSelf());
+				getSender().tell(
+						decorateMessage(new Status.Success(jid)),
+						getSelf());
+			} else if(message instanceof JobManagerMessages.RequestLeaderSessionID$) {
+				getSender().tell(
+						decorateMessage(new JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)),
+						getSelf());
 			}
 			else {
-				getSender().tell(new Status.Failure(new Exception("Unknown message " + message)), getSelf());
+				getSender().tell(
+						decorateMessage(new Status.Failure(new Exception("Unknown message " + message))),
+						getSelf());
 			}
 		}
+
+		@Override
+		protected Option<UUID> getLeaderSessionID() {
+			return leaderSessionID;
+		}
 	}
 
-	public static class FailureReturningActor extends UntypedActor {
+	public static class FailureReturningActor extends FlinkUntypedActor {
+
+		private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+
+		@Override
+		public void handleMessage(Object message) {
+			getSender().tell(
+					decorateMessage(new Status.Failure(new Exception("test"))),
+					getSelf());
+		}
 
 		@Override
-		public void onReceive(Object message) throws Exception {
-			getSender().tell(new Status.Failure(new Exception("test")), getSelf());
+		protected Option<UUID> getLeaderSessionID() {
+			return leaderSessionID;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
new file mode 100644
index 0000000..bba2aeb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.UUID;
+
+/**
+ * Base class for Flink's actors implemented with Java. Actors inheriting from this class
+ * automatically log received messages when the debug log level is activated. Furthermore,
+ * they filter out {@link LeaderSessionMessage} with the wrong leader session ID. If a message
+ * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is
+ * detected, then an Exception is thrown.
+ *
+ * In order to implement the actor behavior, an implementing subclass has to override the method
+ * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide
+ * a leader session ID option which is returned by getLeaderSessionID.
+ */
+public abstract class FlinkUntypedActor extends UntypedActor {
+	protected static Logger LOG = LoggerFactory.getLogger(FlinkUntypedActor.class);
+
+	/**
+	 * This method is called by Akka if a new message has arrived for the actor. It logs the
+	 * processing time of the incoming message if the logging level is set to debug. After logging
+	 * the handleLeaderSessionID method is called.
+	 *
+	 * Important: This method cannot be overriden. The actor specific message handling logic is
+	 * implemented by the method handleMessage.
+	 *
+	 * @param message Incoming message
+	 * @throws Exception
+	 */
+	@Override
+	public final void onReceive(Object message) throws Exception {
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender());
+
+			long start = System.nanoTime();
+
+			handleLeaderSessionID(message);
+
+			long duration = (System.nanoTime() - start)/ 1000000;
+
+			LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender());
+		} else {
+			handleLeaderSessionID(message);
+		}
+	}
+
+	/**
+	 * This method filters out {@link LeaderSessionMessage} whose leader session ID is not equal
+	 * to the actors leader session ID. If a message of type {@link RequiresLeaderSessionID}
+	 * arrives, then an Exception is thrown, because these messages have to be wrapped in a
+	 * {@link LeaderSessionMessage}.
+	 *
+	 * @param message Incoming message
+	 * @throws Exception
+	 */
+	private void handleLeaderSessionID(Object message) throws Exception {
+		if(message instanceof LeaderSessionMessage) {
+			LeaderSessionMessage msg = (LeaderSessionMessage) message;
+
+			if(msg.leaderSessionID().isDefined() && getLeaderSessionID().isDefined()) {
+				if(getLeaderSessionID().equals(msg.leaderSessionID())) {
+					// finally call method to handle message
+					handleMessage(msg.message());
+				} else {
+					handleDiscardedMessage(msg);
+				}
+			} else {
+				handleDiscardedMessage(msg);
+			}
+		} else if (message instanceof RequiresLeaderSessionID) {
+			throw new Exception("Received a message " + message + " without a leader session " +
+					"ID, even though it requires to have one.");
+		} else {
+			// call method to handle message
+			handleMessage(message);
+		}
+	}
+
+	private void handleDiscardedMessage(Object msg) {
+		LOG.debug("Discard message {} because the leader session ID was not correct.", msg);
+	}
+
+	/**
+	 * This method contains the actor logic which defines how to react to incoming messages.
+	 *
+	 * @param message Incoming message
+	 * @throws Exception
+	 */
+	protected abstract void handleMessage(Object message) throws Exception;
+
+	/**
+	 * Returns the current leader session ID associcated with this actor.
+	 * @return
+	 */
+	protected abstract Option<UUID> getLeaderSessionID();
+
+	/**
+	 * This method should be called for every outgoing message. It wraps messages which require
+	 * a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a
+	 * {@link LeaderSessionMessage} with the actor's leader session ID.
+	 *
+	 * This method can be overriden to implement a different decoration behavior.
+	 *
+	 * @param message Message to be decorated
+	 * @return The deocrated message
+	 */
+	protected Object decorateMessage(Object message) {
+		if(message instanceof  RequiresLeaderSessionID) {
+			return new LeaderSessionMessage(getLeaderSessionID(), message);
+		} else {
+			return message;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2b2bf6b..9694132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -28,12 +27,15 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -94,7 +97,7 @@ public class CheckpointCoordinator {
 	
 	private TimerTask periodicScheduler;
 	
-	private ActorRef jobStatusListener;
+	private ActorGateway jobStatusListener;
 	
 	private ClassLoader userClassLoader;
 	
@@ -102,11 +105,14 @@ public class CheckpointCoordinator {
 	
 	// --------------------------------------------------------------------------------------------
 
-	public CheckpointCoordinator(JobID job, int numSuccessfulCheckpointsToRetain, long checkpointTimeout,
-								ExecutionVertex[] tasksToTrigger,
-								ExecutionVertex[] tasksToWaitFor,
-								ExecutionVertex[] tasksToCommitTo,
-								ClassLoader userClassLoader) {
+	public CheckpointCoordinator(
+			JobID job,
+			int numSuccessfulCheckpointsToRetain,
+			long checkpointTimeout,
+			ExecutionVertex[] tasksToTrigger,
+			ExecutionVertex[] tasksToWaitFor,
+			ExecutionVertex[] tasksToCommitTo,
+			ClassLoader userClassLoader) {
 		
 		// some sanity checks
 		if (job == null || tasksToTrigger == null ||
@@ -157,7 +163,7 @@ public class CheckpointCoordinator {
 			
 			// make sure that the actor does not linger
 			if (jobStatusListener != null) {
-				jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				jobStatusListener.tell(PoisonPill.getInstance());
 				jobStatusListener = null;
 			}
 			
@@ -521,16 +527,28 @@ public class CheckpointCoordinator {
 		}
 	}
 	
-	public ActorRef createJobStatusListener(ActorSystem actorSystem, long checkpointInterval) {
+	public ActorGateway createJobStatusListener(
+			ActorSystem actorSystem,
+			long checkpointInterval,
+			Option<UUID> leaderSessionID) {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 			}
 
 			if (jobStatusListener == null) {
-				Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, checkpointInterval);
-				jobStatusListener = actorSystem.actorOf(props);
+				Props props = Props.create(
+						CheckpointCoordinatorDeActivator.class,
+						this,
+						checkpointInterval,
+						leaderSessionID);
+
+				// wrap the ActorRef in a AkkaActorGateway to support message decoration
+				jobStatusListener = new AkkaActorGateway(
+						actorSystem.actorOf(props),
+						leaderSessionID);
 			}
+
 			return jobStatusListener;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index a6c4d76..f65be15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -18,26 +18,38 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.UntypedActor;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import scala.Option;
+
+import java.util.UUID;
 
 /**
  * This actor listens to changes in the JobStatus and activates or deactivates the periodic
  * checkpoint scheduler.
  */
-public class CheckpointCoordinatorDeActivator extends UntypedActor {
+public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 
 	private final CheckpointCoordinator coordinator;
 	private final long interval;
+	private final Option<UUID> leaderSessionID;
 	
-	public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator, long interval) {
+	public CheckpointCoordinatorDeActivator(
+			CheckpointCoordinator coordinator,
+			long interval,
+			Option<UUID> leaderSessionID) {
+		Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
+		Preconditions.checkNotNull(leaderSessionID, "The leaderSesssionID must not be null.");
+
 		this.coordinator = coordinator;
 		this.interval = interval;
+		this.leaderSessionID = leaderSessionID;
 	}
 
 	@Override
-	public void onReceive(Object message) {
+	public void handleMessage(Object message) {
 		if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
 			JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
 			
@@ -53,4 +65,9 @@ public class CheckpointCoordinatorDeActivator extends UntypedActor {
 		
 		// we ignore all other messages
 	}
+
+	@Override
+	public Option<UUID> getLeaderSessionID() {
+		return leaderSessionID;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 72f39d6..aeefa61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -26,10 +26,12 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -114,25 +116,36 @@ public class JobClient {
 	 * case a [[JobExecutionException]] is thrown.
 	 *
 	 * @param actorSystem The actor system that performs the communication.
-	 * @param jobManager  The JobManager that should execute the job.
+	 * @param jobManagerGateway  Gateway to the JobManager that should execute the job.
 	 * @param jobGraph    JobGraph describing the Flink job
 	 * @param timeout     Timeout for futures
 	 * @return The job execution result
 	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
 	 *                                                               execution fails.
 	 */
-	public static SerializedJobExecutionResult submitJobAndWait(ActorSystem actorSystem, ActorRef jobManager,
-																JobGraph jobGraph, FiniteDuration timeout,
-																boolean sysoutLogUpdates) throws JobExecutionException
-	{
-		if (actorSystem == null || jobManager == null || jobGraph == null || timeout == null) {
-			throw new NullPointerException();
-		}
+	public static SerializedJobExecutionResult submitJobAndWait(
+			ActorSystem actorSystem,
+			ActorGateway jobManagerGateway,
+			JobGraph jobGraph,
+			FiniteDuration timeout,
+			boolean sysoutLogUpdates) throws JobExecutionException {
+
+		Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
+		Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+		Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
+		Preconditions.checkNotNull(timeout, "The timeout must not be null.");
+
 		// for this job, we create a proxy JobClientActor that deals with all communication with
 		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
 		// update messages, watches for disconnect between client and JobManager, ...
 
-		Props jobClientActorProps = Props.create(JobClientActor.class, jobManager, LOG, sysoutLogUpdates);
+		Props jobClientActorProps = Props.create(
+				JobClientActor.class,
+				jobManagerGateway.actor(),
+				LOG,
+				sysoutLogUpdates,
+				jobManagerGateway.leaderSessionID());
+
 		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
 
 		try {
@@ -178,25 +191,30 @@ public class JobClient {
 	 * Submits a job in detached mode. The method sends the JobGraph to the
 	 * JobManager and waits for the answer whether teh job could be started or not.
 	 *
+	 * @param jobManagerGateway Gateway to the JobManager which will execute the jobs
 	 * @param jobGraph The job
 	 * @param timeout  Timeout in which the JobManager must have responded.
 	 */
-	public static void submitJobDetached(ActorRef jobManager, JobGraph jobGraph, FiniteDuration timeout)
-			throws JobExecutionException {
-		if (jobManager == null || jobGraph == null || timeout == null) {
-			throw new NullPointerException();
-		}
+	public static void submitJobDetached(
+			ActorGateway jobManagerGateway,
+			JobGraph jobGraph,
+			FiniteDuration timeout) throws JobExecutionException {
+
+		Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+		Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
+		Preconditions.checkNotNull(timeout, "The timeout must not be null.");
 
-		Future<Object> future = Patterns.ask(jobManager,
+		Future<Object> future = jobManagerGateway.ask(
 				new JobManagerMessages.SubmitJob(jobGraph, false),
-				new Timeout(timeout));
+				timeout);
+
 		try {
 			Object result = Await.result(future, timeout);
 			if (result instanceof JobID) {
 				JobID respondedID = (JobID) result;
 				if (!respondedID.equals(jobGraph.getJobID())) {
-					throw new Exception("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() +
-							", response: " + respondedID);
+					throw new Exception("JobManager responded for wrong Job. This Job: " +
+							jobGraph.getJobID() + ", response: " + respondedID);
 				}
 			}
 			else {
@@ -222,17 +240,20 @@ public class JobClient {
 	 * blocking call.
 	 *
 	 * @param jobGraph   Flink job containing the information about the required jars
-	 * @param jobManager ActorRef of the JobManager.
+	 * @param jobManagerGateway Gateway to the JobManager.
 	 * @param timeout    Timeout for futures
 	 * @throws IOException Thrown, if the file upload to the JobManager failed.
 	 */
-	public static void uploadJarFiles(JobGraph jobGraph, ActorRef jobManager, FiniteDuration timeout)
+	public static void uploadJarFiles(
+			JobGraph jobGraph,
+			ActorGateway jobManagerGateway,
+			FiniteDuration timeout)
 			throws IOException {
 		if (jobGraph.hasUsercodeJarFiles()) {
-			Timeout tOut = new Timeout(timeout);
-			Future<Object> futureBlobPort = Patterns.ask(jobManager,
+
+			Future<Object> futureBlobPort = jobManagerGateway.ask(
 					JobManagerMessages.getRequestBlobManagerPort(),
-					tOut);
+					timeout);
 
 			int port;
 			try {
@@ -247,7 +268,7 @@ public class JobClient {
 				throw new IOException("Could not retrieve the JobManager's blob port.", e);
 			}
 
-			Option<String> jmHost = jobManager.path().address().host();
+			Option<String> jmHost = jobManagerGateway.actor().path().address().host();
 			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
 			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index ee31e8d..b4bfc8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -22,37 +22,47 @@ import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
 import akka.actor.Terminated;
-import akka.actor.UntypedActor;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.UUID;
 
 /**
  * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
  * is used to submit jobs to the JobManager and to request the port of the BlobManager.
  */
-public class JobClientActor extends UntypedActor {
+public class JobClientActor extends FlinkUntypedActor {
 	
 	private final ActorRef jobManager;
 	private final Logger logger;
 	private final boolean sysoutUpdates;
-	
+
+	// leader session ID of the JobManager when this actor was created
+	private final Option<UUID> leaderSessionID;
+
+	// Actor which submits a job to the JobManager via this actor
 	private ActorRef submitter;
-	
-	
-	public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates) {
-		if (jobManager == null || logger == null) {
-			throw new NullPointerException();
-		}
-		this.jobManager = jobManager;
-		this.logger = logger;
+
+	public JobClientActor(
+			ActorRef jobManager,
+			Logger logger,
+			boolean sysoutUpdates,
+			Option<UUID> leaderSessionID) {
+		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
+		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
+		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
+
 		this.sysoutUpdates = sysoutUpdates;
 	}
 	
 	@Override
-	public void onReceive(Object message) {
+	protected void handleMessage(Object message) {
 		
 		// =========== State Change Messages ===============
 
@@ -73,14 +83,17 @@ public class JobClientActor extends UntypedActor {
 				JobGraph jobGraph = ((JobClientMessages.SubmitJobAndWait) message).jobGraph();
 				if (jobGraph == null) {
 					logger.error("Received null JobGraph");
-					sender().tell(new Status.Failure(new Exception("JobGraph is null")), getSelf());
+					sender().tell(
+							decorateMessage(new Status.Failure(new Exception("JobGraph is null"))),
+							getSelf());
 				}
 				else {
 					logger.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
 							jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
 
 					this.submitter = getSender();
-					jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, true), getSelf());
+					jobManager.tell(
+							decorateMessage(new JobManagerMessages.SubmitJob(jobGraph, true)), getSelf());
 					
 					// make sure we notify the sender when the connection got lost
 					getContext().watch(jobManager);
@@ -90,10 +103,12 @@ public class JobClientActor extends UntypedActor {
 				// repeated submission - tell failure to sender and kill self
 				String msg = "Received repeated 'SubmitJobAndWait'";
 				logger.error(msg);
-				getSender().tell(new Status.Failure(new Exception(msg)), ActorRef.noSender());
+				getSender().tell(
+						decorateMessage(new Status.Failure(new Exception(msg))),
+						ActorRef.noSender());
 
 				getContext().unwatch(jobManager);
-				getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+				getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
 			}
 		}
 		// acknowledgement to submit job is only logged, our original
@@ -102,12 +117,12 @@ public class JobClientActor extends UntypedActor {
 			// forward the success to the original job submitter
 			logger.debug("Received JobResultSuccess message from JobManager");
 			if (this.submitter != null) {
-				this.submitter.tell(message, getSelf());
+				this.submitter.tell(decorateMessage(message), getSelf());
 			}
 			
 			// we are done, stop ourselves
 			getContext().unwatch(jobManager);
-			getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
 		}
 		else if (message instanceof Status.Success) {
 			// job was successfully submitted :-)
@@ -117,7 +132,7 @@ public class JobClientActor extends UntypedActor {
 			// job execution failed, inform the actor that submitted the job
 			logger.debug("Received failure from JobManager", ((Status.Failure) message).cause());
 			if (submitter != null) {
-				submitter.tell(message, sender());
+				submitter.tell(decorateMessage(message), sender());
 			}
 		}
 
@@ -128,7 +143,7 @@ public class JobClientActor extends UntypedActor {
 			if (jobManager.equals(target)) {
 				String msg = "Lost connection to JobManager " + jobManager.path();
 				logger.info(msg);
-				submitter.tell(new Status.Failure(new Exception(msg)), getSelf());
+				submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), getSelf());
 			} else {
 				logger.error("Received 'Terminated' for unknown actor " + target);
 			}
@@ -140,7 +155,12 @@ public class JobClientActor extends UntypedActor {
 			logger.error("JobClient received unknown message: " + message);
 		}
 	}
-	
+
+	@Override
+	protected Option<UUID> getLeaderSessionID() {
+		return leaderSessionID;
+	}
+
 	private void logAndPrintMessage(Object message) {
 		logger.info(message.toString());
 		if (sysoutUpdates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3b836ad..aa0f981 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -361,7 +361,7 @@ public class Execution implements Serializable {
 			vertex.getExecutionGraph().registerExecution(this);
 
 			final Instance instance = slot.getInstance();
-			final InstanceGateway gateway = instance.getInstanceGateway();
+			final ActorGateway gateway = instance.getActorGateway();
 
 			final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
 
@@ -848,7 +848,7 @@ public class Execution implements Serializable {
 
 		if (slot != null) {
 
-			final InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+			final ActorGateway gateway = slot.getInstance().getActorGateway();
 
 			Future<Object> cancelResult = gateway.retry(
 				new CancelTask(attemptId),
@@ -881,7 +881,7 @@ public class Execution implements Serializable {
 			final Instance instance = slot.getInstance();
 
 			if (instance.isAlive()) {
-				final InstanceGateway gateway = instance.getInstanceGateway();
+				final ActorGateway gateway = instance.getActorGateway();
 
 				// TODO For some tests this could be a problem when querying too early if all resources were released
 				gateway.tell(new FailIntermediateResultPartitions(attemptId));
@@ -901,7 +901,7 @@ public class Execution implements Serializable {
 
 		if (consumerSlot != null) {
 			final Instance instance = consumerSlot.getInstance();
-			final InstanceGateway gateway = instance.getInstanceGateway();
+			final ActorGateway gateway = instance.getActorGateway();
 
 			Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9c977fa..833518c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
-
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -32,6 +30,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -50,6 +49,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -63,6 +63,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -166,10 +167,10 @@ public class ExecutionGraph implements Serializable {
 
 	/** Listeners that receive messages when the entire job switches it status (such as from
 	 * RUNNING to FINISHED) */
-	private final List<ActorRef> jobStatusListenerActors;
+	private final List<ActorGateway> jobStatusListenerActors;
 
 	/** Listeners that receive messages whenever a single task execution changes its status */
-	private final List<ActorRef> executionListenerActors;
+	private final List<ActorGateway> executionListenerActors;
 
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
 	 * the execution graph transitioned into a certain state. The index into this array is the
@@ -287,8 +288,8 @@ public class ExecutionGraph implements Serializable {
 		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
 		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
 
-		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorRef>();
-		this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
+		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorGateway>();
+		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
 
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
@@ -340,12 +341,14 @@ public class ExecutionGraph implements Serializable {
 		return scheduleMode;
 	}
 
-	public void enableSnaphotCheckpointing(long interval, long checkpointTimeout,
-											List<ExecutionJobVertex> verticesToTrigger,
-											List<ExecutionJobVertex> verticesToWaitFor,
-											List<ExecutionJobVertex> verticesToCommitTo,
-											ActorSystem actorSystem)
-	{
+	public void enableSnapshotCheckpointing(
+			long interval,
+			long checkpointTimeout,
+			List<ExecutionJobVertex> verticesToTrigger,
+			List<ExecutionJobVertex> verticesToWaitFor,
+			List<ExecutionJobVertex> verticesToCommitTo,
+			ActorSystem actorSystem,
+			Option<UUID> leaderSessionID) {
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
 			throw new IllegalArgumentException();
@@ -363,12 +366,22 @@ public class ExecutionGraph implements Serializable {
 		
 		// create the coordinator that triggers and commits checkpoints and holds the state 
 		snapshotCheckpointsEnabled = true;
-		checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
-				checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader);
+		checkpointCoordinator = new CheckpointCoordinator(
+				jobID,
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
+				checkpointTimeout,
+				tasksToTrigger,
+				tasksToWaitFor,
+				tasksToCommitTo,
+				userClassLoader);
 		
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
-		registerJobStatusListener(checkpointCoordinator.createJobStatusListener(actorSystem, interval));
+		registerJobStatusListener(
+				checkpointCoordinator.createJobStatusListener(
+						actorSystem,
+						interval,
+						leaderSessionID));
 	}
 	
 	public void disableSnaphotCheckpointing() {
@@ -998,13 +1011,13 @@ public class ExecutionGraph implements Serializable {
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
 
-	public void registerJobStatusListener(ActorRef listener) {
+	public void registerJobStatusListener(ActorGateway listener) {
 		if (listener != null) {
 			this.jobStatusListenerActors.add(listener);
 		}
 	}
 
-	public void registerExecutionListener(ActorRef listener) {
+	public void registerExecutionListener(ActorGateway listener) {
 		if (listener != null) {
 			this.executionListenerActors.add(listener);
 		}
@@ -1016,8 +1029,8 @@ public class ExecutionGraph implements Serializable {
 			ExecutionGraphMessages.JobStatusChanged message =
 					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), error);
 
-			for (ActorRef listener: jobStatusListenerActors) {
-				listener.tell(message, ActorRef.noSender());
+			for (ActorGateway listener: jobStatusListenerActors) {
+				listener.tell(message);
 			}
 		}
 	}
@@ -1035,8 +1048,8 @@ public class ExecutionGraph implements Serializable {
 																	executionID, newExecutionState,
 																	System.currentTimeMillis(), message);
 
-			for (ActorRef listener : executionListenerActors) {
-				listener.tell(actorMessage, ActorRef.noSender());
+			for (ActorGateway listener : executionListenerActors) {
+				listener.tell(actorMessage);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index f9001cf..dcf64c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -469,7 +469,7 @@ public class ExecutionVertex implements Serializable {
 			
 			// send only if we actually have a target
 			if (slot != null) {
-				InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+				ActorGateway gateway = slot.getInstance().getActorGateway();
 				if (gateway != null) {
 					gateway.tell(message);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
new file mode 100644
index 0000000..fe4a1cd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Interface to abstract the communication with an actor.
+ *
+ * It allows to avoid direct interaction with an ActorRef.
+ */
+public interface ActorGateway {
+
+	/**
+	 * Sends a message asynchronously and returns its response. The response to the message is
+	 * returned as a future.
+	 *
+	 * @param message Message to be sent
+	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
+	 * @return Future which contains the response to the sent message
+	 */
+	Future<Object> ask(Object message, FiniteDuration timeout);
+
+	/**
+	 * Sends a message asynchronously without a result.
+	 *
+	 * @param message Message to be sent
+	 */
+	void tell(Object message);
+
+	/**
+	 * Sends a message asynchronously without a result with sender being the sender.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the message
+	 */
+	void tell(Object message, ActorGateway sender);
+
+	/**
+	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
+	 * message.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the forwarded message
+	 */
+	void forward(Object message, ActorGateway sender);
+
+	/**
+	 * Retries to send asynchronously a message up to numberRetries times. The response to this
+	 * message is returned as a future. The message is re-sent if the number of retries is not yet
+	 * exceeded and if an exception occurred while sending it.
+	 *
+	 * @param message Message to be sent
+	 * @param numberRetries Number of times to retry sending the message
+	 * @param timeout Timeout for each sending attempt
+	 * @param executionContext ExecutionContext which is used to send the message multiple times
+	 * @return Future of the response to the sent message
+	 */
+	Future<Object> retry(
+			Object message,
+			int numberRetries,
+			FiniteDuration timeout,
+			ExecutionContext executionContext);
+
+	/**
+	 * Returns the path of the remote instance.
+	 *
+	 * @return Path of the remote instance.
+	 */
+	String path();
+
+	/**
+	 * Returns the underlying actor with which is communicated
+	 *
+	 * @return ActorRef of the target actor
+	 */
+	ActorRef actor();
+
+	/**
+	 * Returns the leaderSessionID associated with the remote actor or None.
+	 *
+	 * @return Leader session ID if its associated with this gateway, otherwise None
+	 */
+	Option<UUID> leaderSessionID();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
new file mode 100644
index 0000000..ea55458
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.LeaderSessionMessageDecorator;
+import org.apache.flink.runtime.MessageDecorator;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Concrete {@link ActorGateway} implementation which uses Akka to communicate with remote actors.
+ */
+public class AkkaActorGateway implements ActorGateway {
+
+	// ActorRef of the remote instance
+	private final ActorRef actor;
+
+	// Associated leader session ID, which is used for RequiresLeaderSessionID messages
+	private final Option<UUID> leaderSessionID;
+
+	// Decorator for messages
+	private final MessageDecorator decorator;
+
+	public AkkaActorGateway(ActorRef actor, Option<UUID> leaderSessionID) {
+		this.actor = actor;
+		this.leaderSessionID = leaderSessionID;
+		// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
+		this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);
+	}
+
+	/**
+	 * Sends a message asynchronously and returns its response. The response to the message is
+	 * returned as a future.
+	 *
+	 * @param message Message to be sent
+	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
+	 * @return Future which contains the response to the sent message
+	 */
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		Object newMessage = decorator.decorate(message);
+		return Patterns.ask(actor, newMessage, new Timeout(timeout));
+	}
+
+	/**
+	 * Sends a message asynchronously without a result.
+	 *
+	 * @param message Message to be sent
+	 */
+	@Override
+	public void tell(Object message) {
+		Object newMessage = decorator.decorate(message);
+		actor.tell(newMessage, ActorRef.noSender());
+	}
+
+	/**
+	 * Sends a message asynchronously without a result with sender being the sender.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the message
+	 */
+	@Override
+	public void tell(Object message, ActorGateway sender) {
+		Object newMessage = decorator.decorate(message);
+		actor.tell(newMessage, sender.actor());
+	}
+
+	/**
+	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
+	 * message.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the forwarded message
+	 */
+	@Override
+	public void forward(Object message, ActorGateway sender) {
+		Object newMessage = decorator.decorate(message);
+		actor.tell(newMessage, sender.actor());
+	}
+
+	/**
+	 * Retries to send asynchronously a message up to numberRetries times. The response to this
+	 * message is returned as a future. The message is re-sent if the number of retries is not yet
+	 * exceeded and if an exception occurred while sending it.
+	 *
+	 * @param message Message to be sent
+	 * @param numberRetries Number of times to retry sending the message
+	 * @param timeout Timeout for each sending attempt
+	 * @param executionContext ExecutionContext which is used to send the message multiple times
+	 * @return Future of the response to the sent message
+	 */
+	@Override
+	public Future<Object> retry(
+			Object message,
+			int numberRetries,
+			FiniteDuration timeout,
+			ExecutionContext executionContext) {
+
+		Object newMessage = decorator.decorate(message);
+
+		return AkkaUtils.retry(
+			actor,
+			newMessage,
+			numberRetries,
+			executionContext,
+			timeout);
+	}
+
+	/**
+	 * Returns the ActorPath of the remote instance.
+	 *
+	 * @return ActorPath of the remote instance.
+	 */
+	@Override
+	public String path() {
+		return actor.path().toString();
+	}
+
+	/**
+	 * Returns {@link ActorRef} of the target actor
+	 *
+	 * @return ActorRef of the target actor
+	 */
+	@Override
+	public ActorRef actor() {
+		return actor;
+	}
+
+	@Override
+	public Option<UUID> leaderSessionID() {
+		return leaderSessionID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
deleted file mode 100644
index b7d60c5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * InstanceGateway implementation which uses Akka to communicate with remote instances.
- */
-public class AkkaInstanceGateway implements InstanceGateway {
-
-	/** ActorRef of the remote instance */
-	private final ActorRef taskManager;
-
-	public AkkaInstanceGateway(ActorRef taskManager) {
-		this.taskManager = taskManager;
-	}
-
-	/**
-	 * Sends a message asynchronously and returns its response. The response to the message is
-	 * returned as a future.
-	 *
-	 * @param message Message to be sent
-	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
-	 * @return Future which contains the response to the sent message
-	 */
-	@Override
-	public Future<Object> ask(Object message, FiniteDuration timeout) {
-		return Patterns.ask(taskManager, message, new Timeout(timeout));
-	}
-
-	/**
-	 * Sends a message asynchronously without a result.
-	 *
-	 * @param message Message to be sent
-	 */
-	@Override
-	public void tell(Object message) {
-		taskManager.tell(message, ActorRef.noSender());
-	}
-
-	/**
-	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
-	 * message.
-	 *
-	 * @param message Message to be sent
-	 * @param sender Sender of the forwarded message
-	 */
-	@Override
-	public void forward(Object message, ActorRef sender) {
-		taskManager.tell(message, sender);
-	}
-
-	/**
-	 * Retries to send asynchronously a message up to numberRetries times. The response to this
-	 * message is returned as a future. The message is re-sent if the number of retries is not yet
-	 * exceeded and if an exception occurred while sending it.
-	 *
-	 * @param message Message to be sent
-	 * @param numberRetries Number of times to retry sending the message
-	 * @param timeout Timeout for each sending attempt
-	 * @param executionContext ExecutionContext which is used to send the message multiple times
-	 * @return Future of the response to the sent message
-	 */
-	@Override
-	public Future<Object> retry(
-			Object message,
-			int numberRetries,
-			FiniteDuration timeout,
-			ExecutionContext executionContext) {
-
-		return AkkaUtils.retry(
-			taskManager,
-			message,
-			numberRetries,
-			executionContext,
-			timeout);
-	}
-
-	/**
-	 * Returns the ActorPath of the remote instance.
-	 *
-	 * @return ActorPath of the remote instance.
-	 */
-	@Override
-	public String path() {
-		return taskManager.path().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 1c44b5d..743dc8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -42,7 +42,7 @@ public class Instance {
 	private final Object instanceLock = new Object();
 
 	/** The instacne gateway to communicate with the instance */
-	private final InstanceGateway instanceGateway;
+	private final ActorGateway actorGateway;
 
 	/** The instance connection information for the data transfer. */
 	private final InstanceConnectionInfo connectionInfo;
@@ -79,19 +79,19 @@ public class Instance {
 	/**
 	 * Constructs an instance reflecting a registered TaskManager.
 	 *
-	 * @param instanceGateway The instance gateway to communicate with the remote instance
+	 * @param actorGateway The actor gateway to communicate with the remote instance
 	 * @param connectionInfo The remote connection where the task manager receives requests.
 	 * @param id The id under which the taskManager is registered.
 	 * @param resources The resources available on the machine.
 	 * @param numberOfSlots The number of task slots offered by this taskManager.
 	 */
 	public Instance(
-			InstanceGateway instanceGateway,
+			ActorGateway actorGateway,
 			InstanceConnectionInfo connectionInfo,
 			InstanceID id,
 			HardwareDescription resources,
 			int numberOfSlots) {
-		this.instanceGateway = instanceGateway;
+		this.actorGateway = actorGateway;
 		this.connectionInfo = connectionInfo;
 		this.instanceId = id;
 		this.resources = resources;
@@ -335,8 +335,8 @@ public class Instance {
 	 *
 	 * @return InstanceGateway associated with this instance
 	 */
-	public InstanceGateway getInstanceGateway() {
-		return instanceGateway;
+	public ActorGateway getActorGateway() {
+		return actorGateway;
 	}
 
 	public InstanceConnectionInfo getInstanceConnectionInfo() {
@@ -390,6 +390,6 @@ public class Instance {
 	@Override
 	public String toString() {
 		return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
-				numberOfSlots, (instanceGateway != null ? instanceGateway.path() : "No instance gateway"));
+				numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
deleted file mode 100644
index a30b2f6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorRef;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Interface to abstract the communication with an Instance.
- *
- * It allows to avoid direct interaction with an ActorRef.
- */
-public interface InstanceGateway {
-
-	/**
-	 * Sends a message asynchronously and returns its response. The response to the message is
-	 * returned as a future.
-	 *
-	 * @param message Message to be sent
-	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
-	 * @return Future which contains the response to the sent message
-	 */
-	Future<Object> ask(Object message, FiniteDuration timeout);
-
-	/**
-	 * Sends a message asynchronously without a result.
-	 *
-	 * @param message Message to be sent
-	 */
-	void tell(Object message);
-
-	/**
-	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
-	 * message.
-	 *
-	 * @param message Message to be sent
-	 * @param sender Sender of the forwarded message
-	 */
-	void forward(Object message, ActorRef sender);
-
-	/**
-	 * Retries to send asynchronously a message up to numberRetries times. The response to this
-	 * message is returned as a future. The message is re-sent if the number of retries is not yet
-	 * exceeded and if an exception occurred while sending it.
-	 *
-	 * @param message Message to be sent
-	 * @param numberRetries Number of times to retry sending the message
-	 * @param timeout Timeout for each sending attempt
-	 * @param executionContext ExecutionContext which is used to send the message multiple times
-	 * @return Future of the response to the sent message
-	 */
-	Future<Object> retry(
-			Object message,
-			int numberRetries,
-			FiniteDuration timeout,
-			ExecutionContext executionContext);
-
-	/**
-	 * Returns the path of the remote instance.
-	 *
-	 * @return Path of the remote instance.
-	 */
-	String path();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index ef49804..0a6b4d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -25,10 +25,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import akka.actor.ActorRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 /**
  * Simple manager that keeps track of which TaskManager are available and alive.
@@ -125,7 +127,23 @@ public class InstanceManager {
 		}
 	}
 
-	public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo, HardwareDescription resources, int numberOfSlots){
+	/**
+	 * Registers a task manager. Registration of a task manager makes it available to be used
+	 * for the job execution.
+	 *
+	 * @param taskManager ActorRef to the TaskManager which wants to be registered
+	 * @param connectionInfo ConnectionInfo of the TaskManager
+	 * @param resources Hardware description of the TaskManager
+	 * @param numberOfSlots Number of available slots on the TaskManager
+	 * @param leaderSessionID The current leader session ID of the JobManager
+	 * @return
+	 */
+	public InstanceID registerTaskManager(
+			ActorRef taskManager,
+			InstanceConnectionInfo connectionInfo,
+			HardwareDescription resources,
+			int numberOfSlots,
+			Option<UUID> leaderSessionID){
 		synchronized(this.lock){
 			if (this.isShutdown) {
 				throw new IllegalStateException("InstanceManager is shut down.");
@@ -149,9 +167,9 @@ public class InstanceManager {
 				id = new InstanceID();
 			} while (registeredHostsById.containsKey(id));
 
-			InstanceGateway instanceGateway = new AkkaInstanceGateway(taskManager);
+			ActorGateway actorGateway = new AkkaActorGateway(taskManager, leaderSessionID);
 
-			Instance host = new Instance(instanceGateway, connectionInfo, id, resources, numberOfSlots);
+			Instance host = new Instance(actorGateway, connectionInfo, id, resources, numberOfSlots);
 
 			registeredHostsById.put(id, host);
 			registeredHostsByConnection.put(taskManager, host);

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 0ffc889..539dbc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.actor.ActorRef;
 import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -164,16 +162,17 @@ public class NetworkEnvironment {
 	 * This associates the network environment with a TaskManager and JobManager.
 	 * This will actually start the network components.
 	 *
-	 * @param jobManagerRef The JobManager actor reference.
-	 * @param taskManagerRef The TaskManager actor reference.
+	 * @param jobManagerGateway Gateway to the JobManager.
+	 * @param taskManagerGateway Gateway to the TaskManager.
 	 *
 	 * @throws IOException Thrown if the network subsystem (Netty) cannot be properly started.
 	 */
-	public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorRef taskManagerRef)
-			throws IOException
+	public void associateWithTaskManagerAndJobManager(
+			ActorGateway jobManagerGateway,
+			ActorGateway taskManagerGateway) throws IOException
 	{
-		checkNotNull(jobManagerRef);
-		checkNotNull(taskManagerRef);
+		checkNotNull(jobManagerGateway);
+		checkNotNull(taskManagerGateway);
 
 		synchronized (lock) {
 			if (isShutdown) {
@@ -192,12 +191,12 @@ public class NetworkEnvironment {
 				this.taskEventDispatcher = new TaskEventDispatcher();
 				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
 					executionContext,
-					jobManagerRef,
-					taskManagerRef,
-					new Timeout(jobManagerTimeout));
+					jobManagerGateway,
+					taskManagerGateway,
+					jobManagerTimeout);
 
 				this.partitionStateChecker = new JobManagerPartitionStateChecker(
-						jobManagerRef, taskManagerRef);
+						jobManagerGateway, taskManagerGateway);
 
 				// -----  Network connections  -----
 				final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
@@ -432,17 +431,17 @@ public class NetworkEnvironment {
 		 */
 		private final ExecutionContext executionContext;
 
-		private final ActorRef jobManager;
+		private final ActorGateway jobManager;
 
-		private final ActorRef taskManager;
+		private final ActorGateway taskManager;
 
-		private final Timeout jobManagerMessageTimeout;
+		private final FiniteDuration jobManagerMessageTimeout;
 
 		public JobManagerResultPartitionConsumableNotifier(
 			ExecutionContext executionContext,
-			ActorRef jobManager,
-			ActorRef taskManager,
-			Timeout jobManagerMessageTimeout) {
+			ActorGateway jobManager,
+			ActorGateway taskManager,
+			FiniteDuration jobManagerMessageTimeout) {
 
 			this.executionContext = executionContext;
 			this.jobManager = jobManager;
@@ -455,7 +454,7 @@ public class NetworkEnvironment {
 
 			final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
 
-			Future<Object> futureResponse = Patterns.ask(jobManager, msg, jobManagerMessageTimeout);
+			Future<Object> futureResponse = jobManager.ask(msg, jobManagerMessageTimeout);
 
 			futureResponse.onFailure(new OnFailure() {
 				@Override
@@ -468,7 +467,7 @@ public class NetworkEnvironment {
 							new RuntimeException("Could not notify JobManager to schedule or update consumers",
 									failure));
 
-					taskManager.tell(failMsg, ActorRef.noSender());
+					taskManager.tell(failMsg);
 				}
 			}, executionContext);
 		}
@@ -476,11 +475,11 @@ public class NetworkEnvironment {
 
 	private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
 
-		private final ActorRef jobManager;
+		private final ActorGateway jobManager;
 
-		private final ActorRef taskManager;
+		private final ActorGateway taskManager;
 
-		public JobManagerPartitionStateChecker(ActorRef jobManager, ActorRef taskManager) {
+		public JobManagerPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
 			this.jobManager = jobManager;
 			this.taskManager = taskManager;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 567d15a..c3df253 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -146,7 +146,7 @@ public class SetupInfoServlet extends HttpServlet {
 				long time = new Date().getTime() - instance.getLastHeartBeat();
 
 				try {
-					objInner.put("path", instance.getInstanceGateway().path());
+					objInner.put("path", instance.getActorGateway().path());
 					objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
 					objInner.put("timeSinceLastHeartbeat", time / 1000);
 					objInner.put("slotsNumber", instance.getTotalNumberOfSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
new file mode 100644
index 0000000..7e6a473
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+/**
+ * Marks messages to be sent wrapped in a
+ * {@link org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}
+ */
+public interface RequiresLeaderSessionID {}


Mime
View raw message