flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [01/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.
Date Mon, 31 Aug 2015 10:31:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0858d9f12 -> b9de4ed37


http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index f410827..1f5bfda 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -163,7 +163,6 @@ under the License.
 			<version>${guava.version}</version>
 			<scope>test</scope>
 		</dependency>
-		
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index ccaa486..922fc43 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -43,6 +43,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -119,11 +120,12 @@ public class AccumulatorLiveITCase {
 
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		TestingCluster testingCluster = new TestingCluster(config, false, true);
+		testingCluster.start();
 
-		jobManagerGateway = testingCluster.getJobManagerGateway();
+		jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 		taskManager = testingCluster.getTaskManagersAsJava().get(0);
 
 		// generate test data
@@ -192,7 +194,11 @@ public class AccumulatorLiveITCase {
 
 			// submit job
 
-			jobManagerGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, false), selfGateway);
+			jobManagerGateway.tell(
+					new JobManagerMessages.SubmitJob(
+							jobGraph,
+							ListeningBehaviour.EXECUTION_RESULT),
+					selfGateway);
 			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 088ea4d..1a96a1b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -115,7 +116,8 @@ public abstract class CancellingTestBase extends TestLogger {
 				public void run() {
 					try {
 						Thread.sleep(msecsTillCanceling);
-						executor.getJobManagerGateway().tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+						executor.getLeaderGateway(TestingUtils.TESTING_DURATION())
+							.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
 					}
 					catch (Throwable t) {
 						error.set(t);
@@ -146,7 +148,7 @@ public abstract class CancellingTestBase extends TestLogger {
 	}
 
 	private JobGraph getJobGraph(final Plan plan) throws Exception {
-		final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration());
+		final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
 		final OptimizedPlan op = pc.compile(plan);
 		final JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 59f24b0..20e19a5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -47,12 +47,14 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	public static void startCluster() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 			
 			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -61,9 +63,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	}
 
 	@AfterClass
-	public static void shutdownCluster() {
+	public static void stopCluster() {
 		try {
-			cluster.shutdown();
+			cluster.stop();
 			cluster = null;
 		}
 		catch (Exception e) {
@@ -91,7 +93,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	public void runCheckpointedProgram() {
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
+					"localhost", cluster.getLeaderRPCPort());
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(500);
 			env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 5c2f2dc..4480d95 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -50,7 +50,7 @@ public class ClassLoaderITCase {
 	public void testJobsWithCustomClassLoader() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
 
@@ -60,8 +60,10 @@ public class ClassLoaderITCase {
 
 			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
 
+			testCluster.start();
+
 			try {
-				int port = testCluster.getJobManagerRPCPort();
+				int port = testCluster.getLeaderRPCPort();
 
 				PackagedProgram inputSplitTestProg = new PackagedProgram(
 						new File(INPUT_SPLITS_PROG_JAR_FILE),

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 2bde833..28c2e58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -55,10 +55,12 @@ public class JobSubmissionFailsITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			
 			cluster = new ForkableFlinkMiniCluster(config);
+
+			cluster.start();
 			
 			final JobVertex jobVertex = new JobVertex("Working job vertex.");
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index d32986d..6e30fac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.avro.generic.GenericData;
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index d340f57..f8b0d96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -49,7 +49,7 @@ public class NotSoMiniClusterIterations {
 
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, PARALLELISM);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 8);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 1000);
@@ -60,7 +60,7 @@ public class NotSoMiniClusterIterations {
 
 			cluster = new LocalFlinkMiniCluster(config, false);
 
-			runConnectedComponents(cluster.getJobManagerRPCPort());
+			runConnectedComponents(cluster.getLeaderRPCPort());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 8d41292..b01a524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -46,7 +46,7 @@ public class StreamingScalabilityAndLatency {
 
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000);
@@ -56,7 +56,7 @@ public class StreamingScalabilityAndLatency {
 
 			cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.STREAMING);
 			
-			runPartitioningProgram(cluster.getJobManagerRPCPort(), PARALLELISM);
+			runPartitioningProgram(cluster.getLeaderRPCPort(), PARALLELISM);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index df190d7..7dccb7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -54,9 +54,11 @@ public class AutoParallelismITCase {
 	@BeforeClass
 	public static void setupCluster() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
 		cluster = new ForkableFlinkMiniCluster(config, false);
+
+		cluster.start();
 	}
 
 	@AfterClass
@@ -76,7 +78,7 @@ public class AutoParallelismITCase {
 	public void testProgramWithAutoParallelism() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
+					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
 			env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 4e7da83..f30f61f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -52,6 +52,7 @@ public class CustomSerializationITCase {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
 			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -75,7 +76,7 @@ public class CustomSerializationITCase {
 	public void testIncorrectSerializer1() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 			
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
@@ -108,7 +109,7 @@ public class CustomSerializationITCase {
 	public void testIncorrectSerializer2() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
@@ -141,7 +142,7 @@ public class CustomSerializationITCase {
 	public void testIncorrectSerializer3() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
@@ -174,7 +175,7 @@ public class CustomSerializationITCase {
 	public void testIncorrectSerializer4() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 01e6f62..42419fb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -58,10 +58,12 @@ public class MiscellaneousIssuesITCase {
 	public static void startCluster() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -85,7 +87,7 @@ public class MiscellaneousIssuesITCase {
 	public void testNullValues() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(1);
 			env.getConfig().disableSysoutLogging();
@@ -119,7 +121,7 @@ public class MiscellaneousIssuesITCase {
 	public void testDisjointDataflows() {
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(5);
 			env.getConfig().disableSysoutLogging();
@@ -142,7 +144,7 @@ public class MiscellaneousIssuesITCase {
 		
 		try {
 			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(6);
 			env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 6cb76e6..12b7a68 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -47,15 +47,17 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 		
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
 			
 			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
 			
 			try {
-				runConnectedComponents(cluster.getJobManagerRPCPort());
+				runConnectedComponents(cluster.getLeaderRPCPort());
 			}
 			catch (Exception e) {
 				e.printStackTrace();
@@ -63,7 +65,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 			}
 	
 			try {
-				runKMeans(cluster.getJobManagerRPCPort());
+				runKMeans(cluster.getLeaderRPCPort());
 				fail("This program execution should have failed.");
 			}
 			catch (ProgramInvocationException e) {
@@ -71,7 +73,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 			}
 	
 			try {
-				runConnectedComponents(cluster.getJobManagerRPCPort());
+				runConnectedComponents(cluster.getLeaderRPCPort());
 			}
 			catch (Exception e) {
 				e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index e2f5a71..aa721e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -49,11 +49,13 @@ public class SimpleRecoveryITCase {
 	@BeforeClass
 	public static void setupCluster() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
 
 		cluster = new ForkableFlinkMiniCluster(config, false);
+
+		cluster.start();
 	}
 
 	@AfterClass
@@ -77,7 +79,7 @@ public class SimpleRecoveryITCase {
 			// attempt 1
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-						"localhost", cluster.getJobManagerRPCPort());
+						"localhost", cluster.getLeaderRPCPort());
 
 				env.setParallelism(4);
 				env.setNumberOfExecutionRetries(0);
@@ -107,7 +109,7 @@ public class SimpleRecoveryITCase {
 			// attempt 2
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-						"localhost", cluster.getJobManagerRPCPort());
+						"localhost", cluster.getLeaderRPCPort());
 
 				env.setParallelism(4);
 				env.setNumberOfExecutionRetries(0);
@@ -154,7 +156,7 @@ public class SimpleRecoveryITCase {
 			List<Long> resultCollection = new ArrayList<Long>();
 
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
+					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(4);
 			env.setNumberOfExecutionRetries(1);
@@ -199,7 +201,7 @@ public class SimpleRecoveryITCase {
 			List<Long> resultCollection = new ArrayList<Long>();
 
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
+					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(4);
 			env.setNumberOfExecutionRetries(5);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 877893f..520a0f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -69,7 +69,7 @@ public class TaskManagerFailureRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 			
@@ -79,11 +79,13 @@ public class TaskManagerFailureRecoveryITCase {
 
 			cluster = new ForkableFlinkMiniCluster(config, false);
 
+			cluster.start();
+
 			// for the result
 			List<Long> resultCollection = new ArrayList<Long>();
 
 			final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getJobManagerRPCPort());
+					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARALLELISM);
 			env.setNumberOfExecutionRetries(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
new file mode 100644
index 0000000..6035c45
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime.leaderelection;
+
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.PoisonPill;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.*;
+
+public class ZooKeeperLeaderElectionITCase extends TestLogger {
+
+	private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
+
+	/**
+	 * Tests that the TaskManagers successfully register at the new leader once the old leader
+	 * is terminated.
+	 */
+	@Test
+	public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
+		Configuration configuration = new Configuration();
+
+		int numJMs = 10;
+		int numTMs = 3;
+
+		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+
+		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+
+		try {
+			cluster.start();
+
+			for(int i = 0; i < numJMs; i++) {
+				ActorGateway leadingJM = cluster.getLeaderGateway(timeout);
+
+				cluster.waitForTaskManagersToBeRegisteredAtJobManager(leadingJM.actor());
+
+				Future<Object> registeredTMs = leadingJM.ask(
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+						timeout);
+
+				int numRegisteredTMs = (Integer) Await.result(registeredTMs, timeout);
+
+				assertEquals(numTMs, numRegisteredTMs);
+
+				cluster.clearLeader();
+				leadingJM.tell(PoisonPill.getInstance());
+			}
+		} finally {
+			cluster.stop();
+		}
+	}
+
+	/**
+	 * Tests that a job can be executed after a new leader has been elected. For all except for the
+	 * last leader, the job is blocking. The JobManager will be terminated while executing the
+	 * blocking job. Once only one JobManager is left, it is checked that a non-blocking can be
+	 * successfully executed.
+	 */
+	@Test
+	public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
+		int numJMs = 10;
+		int numTMs = 3;
+		int numSlotsPerTM = 3;
+		int parallelism = numTMs * numSlotsPerTM;
+
+		Configuration configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+
+		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+		JobVertex sender = new JobVertex("sender");
+		JobVertex receiver = new JobVertex("receiver");
+
+		sender.setInvokableClass(Tasks.Sender.class);
+		receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
+
+		sender.setParallelism(parallelism);
+		receiver.setParallelism(parallelism);
+
+		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+		sender.setSlotSharingGroup(slotSharingGroup);
+		receiver.setSlotSharingGroup(slotSharingGroup);
+
+		final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
+
+		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+
+		ActorSystem clientActorSystem = null;
+
+		Thread thread = null;
+
+		JobSubmitterRunnable jobSubmission = null;
+
+		try {
+			cluster.start();
+
+			clientActorSystem = cluster.startJobClientActorSystem(graph.getJobID());
+
+			final ActorSystem clientAS = clientActorSystem;
+
+			jobSubmission = new JobSubmitterRunnable(clientAS, cluster, graph);
+
+			thread = new Thread(jobSubmission);
+
+			thread.start();
+
+			// Kill all JobManager except for two
+			for(int i = 0; i < numJMs - 2; i++) {
+				ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+				cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
+
+				Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), timeout);
+
+				Await.ready(future, timeout);
+
+				cluster.clearLeader();
+
+				jm.tell(Kill.getInstance());
+			}
+
+			ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+			cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
+
+			Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), timeout);
+
+			Await.ready(future, timeout);
+
+			cluster.clearLeader();
+
+			// set the BlockinOnceReceiver for the execution on the last JM to non-blocking, so
+			// that it can succeed
+			Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+
+			jm.tell(PoisonPill.getInstance());
+
+			thread.join(timeout.toMillis());
+
+			if(thread.isAlive()) {
+				jobSubmission.finished = true;
+				fail("The job submission thread did not stop (meaning it did not succeeded in" +
+						"executing the test job.");
+			}
+		} finally {
+			if (clientActorSystem != null) {
+				cluster.shutdownJobClientActorSystem(clientActorSystem);
+			}
+
+			if(thread != null && thread.isAlive() && jobSubmission != null) {
+				jobSubmission.finished = true;
+			}
+			cluster.stop();
+		}
+	}
+
+	public static class JobSubmitterRunnable implements Runnable {
+		boolean finished = false;
+
+		final ActorSystem clientActorSystem;
+		final ForkableFlinkMiniCluster cluster;
+		final JobGraph graph;
+
+		public JobSubmitterRunnable(
+				ActorSystem actorSystem,
+				ForkableFlinkMiniCluster cluster,
+				JobGraph graph) {
+			this.clientActorSystem = actorSystem;
+			this.cluster = cluster;
+			this.graph = graph;
+		}
+
+		@Override
+		public void run() {
+			while(!finished) {
+				try {
+					LeaderRetrievalService lrService =
+							LeaderRetrievalUtils.createLeaderRetrievalService(
+									cluster.configuration());
+
+					ActorGateway jobManagerGateway =
+							LeaderRetrievalUtils.retrieveLeaderGateway(
+									lrService,
+									clientActorSystem,
+									timeout);
+
+					JobClient.submitJobAndWait(
+							clientActorSystem,
+							jobManagerGateway,
+							graph,
+							timeout,
+							false,
+							getClass().getClassLoader());
+
+					finished = true;
+				} catch (JobExecutionException e) {
+					// This was expected, so just try again to submit the job
+				} catch (LeaderRetrievalException e) {
+					// This can also happen, so just try again to submit the job
+				} catch (Exception e) {
+					// This was not expected... fail the test case
+					e.printStackTrace();
+					fail("Caught unexpected exception in job submission test case.");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index c45cfe8..8e87143 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -31,9 +31,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.Option;
-
-import java.util.UUID;
 
 public class LocalFlinkMiniClusterITCase {
 
@@ -59,14 +56,16 @@ public class LocalFlinkMiniClusterITCase {
 
 		try{
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
 			miniCluster = new LocalFlinkMiniCluster(config, true);
 
-			final ActorGateway jmGateway = miniCluster.getJobManagerGateway();
+			miniCluster.start();
+
+			final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 			new JavaTestKit(system) {{
-				final ActorGateway selfGateway = new AkkaActorGateway(getRef(), Option.<UUID>empty());
+				final ActorGateway selfGateway = new AkkaActorGateway(getRef(), null);
 
 				new Within(TestingUtils.TESTING_DURATION()) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 31f8560..2ca35ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -20,19 +20,23 @@ package org.apache.flink.test.web;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 @RunWith(Parameterized.class)
 public class WebFrontendITCase extends MultipleProgramsTestBase {
@@ -41,7 +45,16 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	static {
 		startWebServer = true;
 	}
-	
+
+	private static int port = -1;
+
+	@BeforeClass
+	public static void initialize() {
+		WebMonitor webMonitor = cluster.webMonitor().get();
+		port = webMonitor.getServerPort();
+	}
+
+	static final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
 	public WebFrontendITCase(TestExecutionMode m) {
 		super(m);
@@ -57,7 +70,8 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	@Test
 	public void getNumberOfTaskManagers() {
 		try {
-			Assert.assertEquals("{\"taskmanagers\": "+cluster.getTaskManagers().size()+", \"slots\": 4}", TestBaseUtils.getFromHTTP("http://localhost:8081/jobsInfo?get=taskmanagers"));
+			Assert.assertEquals("{\"taskmanagers\": "+cluster.getTaskManagers().size()+", \"slots\": 4}",
+					TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobsInfo?get=taskmanagers"));
 		}catch(Throwable e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
@@ -67,7 +81,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	@Test
 	public void getTaskmanagers() {
 		try {
-			String json = getFromHTTP("http://localhost:8081/setupInfo?get=taskmanagers");
+			String json = getFromHTTP("http://localhost:" + port + "/setupInfo?get=taskmanagers");
 			JSONObject parsed = new JSONObject(json);
 			Object taskManagers = parsed.get("taskmanagers");
 			Assert.assertNotNull(taskManagers);
@@ -102,7 +116,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	@Test
 	public void getConfiguration() {
 		try {
-			String config = getFromHTTP("http://localhost:8081/setupInfo?get=globalC");
+			String config = getFromHTTP("http://localhost:" + port + "/setupInfo?get=globalC");
 			JSONObject parsed = new JSONObject(config);
 			Assert.assertEquals(logDir.toString(), parsed.getString("jobmanager.web.logpath"));
 			Assert.assertEquals(cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), parsed.getString("taskmanager.numberOfTaskSlots"));

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 85897b3..9d29841 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -27,4 +27,5 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 577b5c6..b0de0e8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -28,7 +28,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.jobgraph.{JobVertex, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -59,28 +59,31 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val cluster = startDeathwatchCluster(num_slots, 1)
 
       val tm = cluster.getTaskManagers(0)
-      val jmGateway = cluster.getJobManagerGateway()
+      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
       // disable disconnect message to test death watch
       tm ! DisableDisconnect
 
-      try{
-        jmGateway.tell(RequestNumberRegisteredTaskManager, self)
-        expectMsg(1)
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jmGateway.tell(RequestNumberRegisteredTaskManager, self)
+          expectMsg(1)
 
-        tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
+          tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
 
-        jmGateway.tell(PoisonPill, self)
+          jmGateway.tell(PoisonPill, self)
 
-        expectMsgClass(classOf[JobManagerTerminated])
+          expectMsgClass(classOf[JobManagerTerminated])
 
-        cluster.restartJobManager()
+          cluster.restartLeadingJobManager()
 
-        cluster.waitForTaskManagersToBeRegistered()
+          cluster.waitForTaskManagersToBeRegistered()
 
-        cluster.getJobManagerGateway().tell(RequestNumberRegisteredTaskManager, self)
+          cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
+            .tell(RequestNumberRegisteredTaskManager, self)
 
-        expectMsg(1)
+          expectMsg(1)
+        }
       } finally {
         cluster.stop()
       }
@@ -101,12 +104,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
       val cluster = startDeathwatchCluster(num_slots / 2, 2)
 
-      var jmGateway = cluster.getJobManagerGateway()
+      var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
       val tm = cluster.getTaskManagers(0)
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, false), self)
+          jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           tm.tell(NotifyWhenJobManagerTerminated(jmGateway.actor()), self)
@@ -115,13 +118,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
           expectMsgClass(classOf[JobManagerTerminated])
 
-          cluster.restartJobManager()
-
-          jmGateway = cluster.getJobManagerGateway()
+          cluster.restartLeadingJobManager()
 
           cluster.waitForTaskManagersToBeRegistered()
 
-          jmGateway.tell(SubmitJob(jobGraph2, false), self)
+          jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
+
+          jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
 
           expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
 
@@ -138,8 +141,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
   def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
+
+    val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+
+    cluster.start()
 
-    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    cluster
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
index 010a086..f1e995f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
@@ -23,7 +23,7 @@ import java.util.UUID
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.messages.JobManagerMessages.{LeaderSessionMessage, CancelJob,
@@ -66,13 +66,13 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem)
     sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
     val jobGraph = new JobGraph("TestJob", sender)
 
-    val oldSessionID = Option(UUID.randomUUID())
+    val oldSessionID = UUID.randomUUID()
 
-    val jmGateway = cluster.getJobManagerGateway()
+    val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
     val jm = jmGateway.actor()
 
     within(TestingUtils.TESTING_DURATION) {
-      jmGateway.tell(SubmitJob(jobGraph, false), self)
+      jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
 
       expectMsg(Success(jobGraph.getJobID))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a37fae3..869af82 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -24,12 +24,12 @@ import akka.testkit.{ImplicitSender, TestKit}
 
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, BlockingNoOpInvokable, BlockingReceiver, Sender}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobManager, NotifyWhenRegisteredAtJobManager}
+import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobManager, NotifyWhenRegisteredAtAnyJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
@@ -60,10 +60,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
     "detect a failing task manager" in {
 
       val num_slots = 11
-      val cluster = startDeathwatchCluster(num_slots, 2)
+      val cluster = createDeathwatchCluster(num_slots, 2)
+
+      cluster.start()
 
       val taskManagers = cluster.getTaskManagers
-      val jmGateway = cluster.getJobManagerGateway()
+      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
       jmGateway.tell(DisableDisconnect)
 
@@ -103,11 +105,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
 
-      val jmGateway = cluster.getJobManagerGateway()
+      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, false), self)
+          jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
@@ -156,11 +158,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
 
       val taskManagers = cluster.getTaskManagers
-      val jmGateway = cluster.getJobManagerGateway()
+      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jmGateway.tell(SubmitJob(jobGraph, false), self)
+          jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
 
           jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
@@ -196,14 +198,16 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       noOp.setInvokableClass(classOf[NoOpInvokable])
       val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
 
-      val cluster = startDeathwatchCluster(num_slots/2, 2)
+      val cluster = createDeathwatchCluster(num_slots/2, 2)
+
+      cluster.start()
 
       var tm = cluster.getTaskManagers(0)
-      val jmGateway = cluster.getJobManagerGateway()
+      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
       try{
         within(TestingUtils.TESTING_DURATION){
-          jmGateway.tell(SubmitJob(jobGraph, false), self)
+          jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           tm ! PoisonPill
@@ -221,11 +225,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
           tm = cluster.getTaskManagers(0)
 
-          tm ! NotifyWhenRegisteredAtJobManager
+          tm ! NotifyWhenRegisteredAtAnyJobManager
 
           expectMsg(RegisteredAtJobManager)
 
-          jmGateway.tell(SubmitJob(jobGraph2, false), self)
+          jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
 
           expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
 
@@ -238,10 +242,10 @@ class TaskManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 56be198..560a584 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -101,9 +101,13 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	 * @throws IOException
 	 * @throws YarnException
 	 */
-	public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, Configuration hadoopConfig,
-							org.apache.flink.configuration.Configuration flinkConfig,
-							Path sessionFilesDir, boolean detached) throws IOException, YarnException {
+	public FlinkYarnCluster(
+			final YarnClient yarnClient,
+			final ApplicationId appId,
+			Configuration hadoopConfig,
+			org.apache.flink.configuration.Configuration flinkConfig,
+			Path sessionFilesDir,
+			boolean detached) throws IOException, YarnException {
 		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
 		this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
 		this.yarnClient = yarnClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 6930be6..5ae814f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -23,7 +23,10 @@ import java.net.InetSocketAddress
 import akka.actor._
 import akka.pattern.ask
 import grizzled.slf4j.Logger
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.util.LeaderRetrievalUtils
+import org.apache.flink.runtime.util.LeaderRetrievalUtils.LeaderGatewayListener
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -74,12 +77,19 @@ class ApplicationClient(flinkConfig: Configuration)
   override def handleMessage: Receive = {
     // ----------------------------- Registration -> Status updates -> shutdown ----------------
     case LocalRegisterClient(address: InetSocketAddress) =>
-      val jmAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(address)
+      flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
+      flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
 
-      val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl, system, timeout)
+      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig)
 
-      jobManagerFuture.onComplete {
-        case Success(jm) => self ! decorateMessage(JobManagerActorRef(jm))
+      val listener = new LeaderGatewayListener(context.system, timeout);
+
+      leaderRetrievalService.start(listener)
+
+      val jobManagerGatewayFuture = listener.getActorGatewayFuture
+
+      jobManagerGatewayFuture.onComplete {
+        case Success(gateway) => self ! decorateMessage(JobManagerActorRef(gateway.actor()))
         case Failure(t) =>
           log.error("Registration at JobManager/ApplicationMaster failed. Shutting " +
             "ApplicationClient down.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 9e0c976..1d1db7e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.util.{StandaloneUtils, LeaderRetrievalUtils, EnvironmentInformation}
 import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
@@ -112,17 +112,11 @@ object ApplicationMaster {
               streamingMode)
 
           actorSystem = system
-          val extActor = system.asInstanceOf[ExtendedActorSystem]
-          val jobManagerPort = extActor.provider.getDefaultAddress.port.get
+          val address = AkkaUtils.getAddress(actorSystem)
+          val jobManagerPort = address.port.get
 
           if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
             // start the web info server
-            val lookupTimeout = AkkaUtils.getLookupTimeout(config)
-            val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
-            val archiverGateway = new AkkaActorGateway(
-              archiver,
-              jobManagerGateway.leaderSessionID())
-
             LOG.info("Starting Job Manger web frontend.")
             config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
             config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
@@ -130,13 +124,16 @@ object ApplicationMaster {
             config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ownHostname)
             config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort)
 
+            // TODO: Add support for HA: Make web server work independently from the JM
+            val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(config)
+
             webserver = if(
               config.getBoolean(
                 ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
                 false)) {
-              JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
+              JobManager.startWebRuntimeMonitor(config, leaderRetrievalService, actorSystem)
             } else {
-              new WebInfoServer(config, jobManagerGateway, archiverGateway)
+              new WebInfoServer(config, leaderRetrievalService, actorSystem)
             }
 
             webserver.start()
@@ -257,7 +254,8 @@ object ApplicationMaster {
       executionRetries,
       delayBetweenRetries,
       timeout,
-      _) = JobManager.createJobManagerComponents(configuration)
+      _,
+      leaderElectionService) = JobManager.createJobManagerComponents(configuration)
 
     // start the archiver
     val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
@@ -273,7 +271,8 @@ object ApplicationMaster {
         executionRetries,
         delayBetweenRetries,
         timeout,
-        streamingMode)
+        streamingMode,
+        leaderElectionService)
       with ApplicationMasterActor)
 
     LOG.debug("Starting JobManager actor")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
index 99d0345..63c0aa6 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
@@ -37,9 +37,10 @@ object Messages {
 
   case object JobManagerStopped
 
-  case class StartYarnSession(configuration: Configuration,
-                              actorSystemPort: Int,
-                              webServerport: Int)
+  case class StartYarnSession(
+      configuration: Configuration,
+      actorSystemPort: Int,
+      webServerport: Int)
 
   case class JobManagerActorRef(jobManager: ActorRef)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 5216030..b95eb86 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -21,6 +21,7 @@ package org.apache.flink.yarn
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
 import org.apache.flink.yarn.Messages.StopYarnSession
@@ -31,19 +32,19 @@ import org.apache.flink.yarn.Messages.StopYarnSession
 class YarnTaskManager(
     config: TaskManagerConfiguration,
     connectionInfo: InstanceConnectionInfo,
-    jobManagerAkkaURL: String,
     memoryManager: DefaultMemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
-    numberOfSlots: Int)
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
   extends TaskManager(
     config,
     connectionInfo,
-    jobManagerAkkaURL,
     memoryManager,
     ioManager,
     network,
-    numberOfSlots) {
+    numberOfSlots,
+    leaderRetrievalService) {
 
   override def handleMessage: Receive = {
     handleYarnMessages orElse super.handleMessage

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dbba9fc..0e7f995 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@ under the License.
 		<asm.version>5.0.4</asm.version>
 		<tez.version>0.6.1</tez.version>
 		<zookeeper.version>3.4.6</zookeeper.version>
-		<curatorrecipes.version>2.8.0</curatorrecipes.version>
+		<curator.version>2.8.0</curator.version>
 	</properties>
 
 	<dependencies>
@@ -332,15 +332,22 @@ under the License.
 			</dependency>
 
 			<dependency>
-				<groupId>org.apache.zookeeper</groupId>
-				<artifactId>zookeeper</artifactId>
-				<version>${zookeeper.version}</version>
+				<groupId>org.apache.curator</groupId>
+				<artifactId>curator-recipes</artifactId>
+				<version>${curator.version}</version>
 			</dependency>
 
 			<dependency>
 				<groupId>org.apache.curator</groupId>
-				<artifactId>curator-recipes</artifactId>
-				<version>${curatorrecipes.version}</version>
+				<artifactId>curator-test</artifactId>
+				<version>${curator.version}</version>
+				<scope>test</scope>
+			</dependency>
+
+			<dependency>
+				<groupId>org.apache.zookeeper</groupId>
+				<artifactId>zookeeper</artifactId>
+				<version>${zookeeper.version}</version>
 			</dependency>
 		</dependencies>
 	</dependencyManagement>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 69ddcde..34ae971 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -35,4 +35,5 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
+log4j.logger.org.apache.zookeeper=ERROR, file


Mime
View raw message