flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [10/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.
Date Mon, 31 Aug 2015 10:31:46 GMT
[FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes #1016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9de4ed3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9de4ed3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9de4ed3

Branch: refs/heads/master
Commit: b9de4ed37ffa68ef50dc6d6b3819afcc00d1d029
Parents: 0858d9f
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Jun 26 12:07:39 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Aug 31 11:02:31 2015 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   25 +
 docs/setup/jobmanager_high_availability.md      |   28 +-
 flink-clients/pom.xml                           |    8 +
 .../org/apache/flink/client/CliFrontend.java    |  138 ++-
 .../org/apache/flink/client/LocalExecutor.java  |    5 +-
 .../org/apache/flink/client/RemoteExecutor.java |   14 +-
 .../org/apache/flink/client/program/Client.java |  295 +++--
 .../CliFrontendAddressConfigurationTest.java    |   94 +-
 .../flink/client/CliFrontendInfoTest.java       |   17 +-
 .../flink/client/CliFrontendListCancelTest.java |   15 +-
 .../apache/flink/client/CliFrontendRunTest.java |    8 +-
 .../RemoteExecutorHostnameResolutionTest.java   |   12 +-
 .../client/program/ClientConnectionTest.java    |    5 +-
 .../program/ClientHostnameResolutionTest.java   |   98 --
 .../apache/flink/client/program/ClientTest.java |    9 +-
 .../program/ExecutionPlanCreationTest.java      |    8 +-
 ...rRetrievalServiceHostnameResolutionTest.java |  103 ++
 .../stormcompatibility/api/FlinkClient.java     |   17 +-
 .../flink/contrib/streaming/CollectITCase.java  |    4 +-
 .../flink/configuration/ConfigConstants.java    |   53 +-
 .../flink/types/parser/ByteParserTest.java      |    4 -
 .../org/apache/flink/util/AbstractIDTest.java   |    2 -
 flink-dist/src/main/flink-bin/LICENSE           |    5 +
 flink-dist/src/main/flink-bin/NOTICE            |   22 +
 flink-dist/src/main/flink-bin/bin/config.sh     |   15 +-
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |    7 +-
 .../src/main/flink-bin/bin/start-cluster.sh     |   21 +-
 .../src/main/flink-bin/bin/stop-cluster.sh      |   11 +-
 .../webmonitor/ExecutionGraphHolder.java        |   61 +-
 .../webmonitor/JobManagerArchiveRetriever.java  |  111 ++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   72 +-
 .../handlers/RequestJobIdsHandler.java          |   25 +-
 .../handlers/RequestOverviewHandler.java        |   25 +-
 .../legacy/JobManagerInfoHandler.java           |   26 +-
 .../runtime/webmonitor/runner/TestRunner.java   |    7 +-
 .../src/test/resources/log4j-test.properties    |   38 +
 .../src/test/resources/logback-test.xml         |   42 +
 flink-runtime/pom.xml                           |    6 +
 .../flink/runtime/akka/FlinkUntypedActor.java   |   24 +-
 .../flink/runtime/akka/ListeningBehaviour.java  |   29 +
 .../apache/flink/runtime/blob/BlobServer.java   |    4 +
 .../checkpoint/CheckpointCoordinator.java       |    3 +-
 .../CheckpointCoordinatorDeActivator.java       |   14 +-
 .../apache/flink/runtime/client/JobClient.java  |    8 +-
 .../flink/runtime/client/JobClientActor.java    |   21 +-
 .../flink/runtime/executiongraph/Execution.java |    4 +-
 .../runtime/executiongraph/ExecutionGraph.java  |    3 +-
 .../flink/runtime/instance/ActorGateway.java    |   10 +-
 .../runtime/instance/AkkaActorGateway.java      |   16 +-
 .../flink/runtime/instance/InstanceManager.java |   46 +-
 .../runtime/io/network/NetworkEnvironment.java  |    2 +-
 .../jobmanager/JobManagerCliOptions.java        |   10 +
 .../flink/runtime/jobmanager/RecoveryMode.java  |   33 +
 .../runtime/jobmanager/web/WebInfoServer.java   |  163 ++-
 .../runtime/leaderelection/LeaderContender.java |   59 +
 .../leaderelection/LeaderElectionService.java   |   70 ++
 .../StandaloneLeaderElectionService.java        |   63 +
 .../ZooKeeperLeaderElectionService.java         |  265 +++++
 .../LeaderRetrievalException.java               |   40 +
 .../LeaderRetrievalListener.java                |   44 +
 .../leaderretrieval/LeaderRetrievalService.java |   48 +
 .../StandaloneLeaderRetrievalService.java       |   59 +
 .../ZooKeeperLeaderRetrievalService.java        |  126 ++
 .../messages/LeaderSessionMessageDecorator.java |   49 +
 .../runtime/messages/MessageDecorator.java      |   33 +
 .../org/apache/flink/runtime/net/NetUtils.java  |  157 +++
 .../apache/flink/runtime/taskmanager/Task.java  |    7 +-
 .../runtime/util/LeaderConnectionInfo.java      |   44 +
 .../flink/runtime/util/LeaderElectionUtils.java |   57 +
 .../runtime/util/LeaderRetrievalUtils.java      |  244 ++++
 .../flink/runtime/util/StandaloneUtils.java     |   84 ++
 .../flink/runtime/util/ZooKeeperUtil.java       |  110 --
 .../flink/runtime/util/ZooKeeperUtils.java      |  151 +++
 .../runtime/LeaderSessionMessageDecorator.scala |   46 -
 .../runtime/LeaderSessionMessageFilter.scala    |   69 ++
 .../flink/runtime/LeaderSessionMessages.scala   |   73 --
 .../apache/flink/runtime/MessageDecorator.scala |   32 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  127 +-
 .../runtime/akka/RemoteAddressExtension.scala   |   32 +
 .../flink/runtime/jobmanager/JobManager.scala   |  515 +++++----
 .../runtime/jobmanager/MemoryArchivist.scala    |    4 +
 .../messages/ExecutionGraphMessages.scala       |    2 +-
 .../runtime/messages/JobManagerMessages.scala   |   49 +-
 .../runtime/messages/RegistrationMessages.scala |   23 +-
 .../runtime/messages/TaskControlMessages.scala  |  184 +++
 .../runtime/messages/TaskManagerMessages.scala  |   21 +-
 .../flink/runtime/messages/TaskMessages.scala   |  184 ---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  445 +++++--
 .../minicluster/LocalFlinkMiniCluster.scala     |  125 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  500 ++++----
 .../runtime/akka/FlinkUntypedActorTest.java     |   26 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   26 +-
 .../ExecutionVertexCancelTest.java              |    7 +-
 .../instance/BaseTestingActorGateway.java       |    5 +-
 .../runtime/instance/DummyActorGateway.java     |    5 +-
 .../runtime/instance/InstanceManagerTest.java   |    4 +-
 .../netty/ServerTransportErrorHandlingTest.java |    1 -
 .../PartialConsumePipelinedResultTest.java      |    6 +-
 .../consumer/LocalInputChannelTest.java         |    3 -
 .../JobManagerProcessReapingTest.java           |    2 +-
 .../runtime/jobmanager/JobManagerTest.java      |   15 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   38 +-
 .../JobManagerLeaderElectionTest.java           |  173 +++
 .../LeaderChangeStateCleanupTest.java           |  273 +++++
 .../LeaderElectionRetrievalTestingCluster.java  |  121 ++
 .../StandaloneLeaderElectionTest.java           |   60 +
 .../leaderelection/TestingContender.java        |  146 +++
 .../TestingLeaderElectionService.java           |   60 +
 .../TestingLeaderRetrievalService.java          |   47 +
 .../runtime/leaderelection/TestingListener.java |  115 ++
 .../ZooKeeperLeaderElectionTest.java            |  550 +++++++++
 .../ZooKeeperLeaderRetrievalTest.java           |  212 ++++
 .../runtime/operators/testutils/TestData.java   |    1 -
 .../runtime/taskmanager/ForwardingActor.java    |   41 -
 .../runtime/taskmanager/TaskCancelTest.java     |   37 +-
 ...askManagerComponentsStartupShutdownTest.java |   22 +-
 .../TaskManagerConfigurationTest.java           |   20 +-
 .../TaskManagerProcessReapingTest.java          |    5 +-
 .../TaskManagerRegistrationTest.java            |  320 +++---
 .../taskmanager/TaskManagerStartupTest.java     |  168 +++
 .../runtime/taskmanager/TaskManagerTest.java    |  410 ++++---
 .../flink/runtime/taskmanager/TaskTest.java     |    2 +-
 .../taskmanager/TestManagerStartupTest.java     |  168 ---
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   17 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |   67 ++
 .../flink/runtime/akka/FlinkActorTest.scala     |   16 +-
 .../jobmanager/CoLocationConstraintITCase.scala |    6 +-
 .../jobmanager/JobManagerConnectionTest.scala   |    4 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |   57 +-
 .../jobmanager/JobManagerRegistrationTest.scala |   82 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   35 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   10 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |   10 +-
 .../testingUtils/ScalaTestingUtils.scala        |    4 +-
 .../runtime/testingUtils/TestingCluster.scala   |  109 +-
 .../testingUtils/TestingJobManager.scala        |   53 +-
 .../TestingJobManagerMessages.scala             |   12 +-
 .../runtime/testingUtils/TestingMessages.scala  |   10 +
 .../testingUtils/TestingTaskManager.scala       |   75 +-
 .../TestingTaskManagerMessages.scala            |    4 +-
 .../runtime/testingUtils/TestingUtils.scala     |  201 +++-
 flink-shaded-hadoop/pom.xml                     |    4 +
 .../api/avro/AvroExternalJarProgramITCase.java  |   12 +-
 .../apache/flink/api/io/avro/AvroPojoTest.java  |    1 -
 .../operations/DegreesWithExceptionITCase.java  |   11 +-
 .../ReduceOnEdgesWithExceptionITCase.java       |    5 +-
 .../ReduceOnNeighborsWithExceptionITCase.java   |    9 +-
 .../hbase/example/HBaseWriteStreamExample.java  |    1 -
 .../org.apache.flink/api/scala/FlinkShell.scala |    2 +-
 .../flink/api/scala/ScalaShellITSuite.scala     |   11 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 1085 ++++++++----------
 .../streaming/connectors/kafka/KafkaITCase.java |   22 +-
 .../connectors/kafka/KafkaTestBase.java         |   14 +-
 .../environment/RemoteStreamEnvironment.java    |   11 +-
 .../flink/streaming/util/ClusterUtil.java       |    2 +
 .../streaming/timestamp/TimestampITCase.java    |   16 +-
 .../util/StreamingMultipleProgramsTestBase.java |    8 +-
 .../streaming/util/TestStreamEnvironment.java   |    1 +
 flink-test-utils/pom.xml                        |    6 +
 .../flink/test/util/AbstractTestBase.java       |    8 +-
 .../test/util/MultipleProgramsTestBase.java     |    8 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   53 +-
 .../apache/flink/test/util/TestEnvironment.java |    2 +-
 .../apache/flink/test/util/FlinkTestBase.scala  |    9 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  217 +++-
 flink-tests/pom.xml                             |    1 -
 .../accumulators/AccumulatorLiveITCase.java     |   12 +-
 .../test/cancelling/CancellingTestBase.java     |    6 +-
 .../StreamFaultToleranceTestBase.java           |   10 +-
 .../test/classloading/ClassLoaderITCase.java    |    6 +-
 .../JobSubmissionFailsITCase.java               |    4 +-
 .../test/javaApiOperators/DistinctITCase.java   |    3 -
 .../test/manual/NotSoMiniClusterIterations.java |    4 +-
 .../manual/StreamingScalabilityAndLatency.java  |    4 +-
 .../flink/test/misc/AutoParallelismITCase.java  |    6 +-
 .../test/misc/CustomSerializationITCase.java    |    9 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   10 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |   10 +-
 .../test/recovery/SimpleRecoveryITCase.java     |   12 +-
 .../TaskManagerFailureRecoveryITCase.java       |    6 +-
 .../ZooKeeperLeaderElectionITCase.java          |  255 ++++
 .../LocalFlinkMiniClusterITCase.java            |   11 +-
 .../flink/test/web/WebFrontendITCase.java       |   22 +-
 .../src/test/resources/log4j-test.properties    |    3 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   47 +-
 .../JobManagerLeaderSessionIDITSuite.scala      |    8 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |   34 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   10 +-
 .../apache/flink/yarn/ApplicationClient.scala   |   20 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   25 +-
 .../scala/org/apache/flink/yarn/Messages.scala  |    7 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |    9 +-
 pom.xml                                         |   19 +-
 tools/log4j-travis.properties                   |    3 +-
 194 files changed, 8626 insertions(+), 3316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index e2ffda6..35edf7a 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -357,6 +357,31 @@ so that the Flink client is able to pick those details up. This configuration pa
 changing the default location of that file (for example for environments sharing a Flink 
 installation between users)
 
+## High Availability Mode
+
+- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently,
+Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed.
+The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing.
+Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution.
+In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state.
+In order to use the 'zookeeper' mode, it is mandatory to also define the `ha.zookeeper.quorum` configuration value.
+
+- `ha.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
+
+- `ha.zookeeper.dir`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes. 
+
+- `ha.zookeeper.dir.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
+
+- `ha.zookeeper.dir.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
+
+- `ha.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
+
+- `ha.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
+
+- `ha.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
+
+- `ha.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
+
 ## Background
 
 ### Configuring the Network Buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index 8958e17..50379ea 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -34,11 +34,19 @@ As an example, consider the following setup with three JobManager instances:
 
 ## Configuration
 
-To enable JobManager High Availability you have to configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts.
+To enable JobManager High Availability you have to set the **recovery mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web ui ports.
 
 Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distirbuted coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper.
 
-Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availability mode and all Flink components try to connect to a JobManager via coordination through ZooKeeper.
+Setting Flink's **recovery mode** to *zookeeper* in `conf/flink-conf.yaml` *enables* high availability mode.
+
+Additionally, you have to configure a **ZooKeeper quorum** in the same configuration file.
+
+In high availabliity mode, all Flink components try to connect to a JobManager via coordination through ZooKeeper.
+
+- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. 
+  
+  <pre>recovery.mode: zookeeper</pre>
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
   
@@ -55,12 +63,12 @@ Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availabi
 
 In order to start an HA-cluster configure the *masters* file in `conf/masters`:
 
-- **masters file**: The *masters file* contains all hosts, on which JobManagers are started.
+- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds.
 
   <pre>
-jobManagerAddress1
+jobManagerAddress1:webUIPort1
 [...]
-jobManagerAddressX
+jobManagerAddressX:webUIPortX
   </pre>
 
 After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start a HA-cluster. **Keep in mind that the ZooKeeper quorum has to be running when you call the scripts**.
@@ -81,15 +89,17 @@ The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each
 
 ## Example: Start and stop a local HA-cluster with 2 JobManagers
 
-1. **Configure ZooKeeper quorum** in `conf/flink.yaml`:
+1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink.yaml`:
    
-   <pre>ha.zookeeper.quorum: localhost</pre>
+   <pre>
+recovery.mode: zookeeper
+ha.zookeeper.quorum: localhost</pre>
 
 2. **Configure masters** in `conf/masters`:
 
    <pre>
-localhost
-localhost</pre>
+localhost:8081
+localhost:8082</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index e94487f..84264f9 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -44,6 +44,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index ea1a6e9..ac8009e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -36,7 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import org.apache.commons.cli.CommandLine;
@@ -64,14 +63,15 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
@@ -129,8 +129,6 @@ public class CliFrontend {
 
 	private final FiniteDuration lookupTimeout;
 
-	private InetSocketAddress jobManagerAddress;
-
 	private ActorSystem actorSystem;
 
 	private AbstractFlinkYarnCluster yarnCluster;
@@ -202,9 +200,12 @@ public class CliFrontend {
 
 			// get the JobManager address from the YARN properties
 			String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+			InetSocketAddress jobManagerAddress;
 			if (address != null) {
 				try {
-					jobManagerAddress = parseJobManagerAddress(address);
+					jobManagerAddress = parseHostPortAddress(address);
+					// store address in config from where it is retrieved by the retrieval service
+					writeJobManagerAddressToConfig(jobManagerAddress);
 				}
 				catch (Exception e) {
 					throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
@@ -227,6 +228,24 @@ public class CliFrontend {
 
 
 	// --------------------------------------------------------------------------------------------
+	//  Getter & Setter
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Getter which returns a copy of the associated configuration
+	 *
+	 * @return Copy of the associated configuration
+	 */
+	public Configuration getConfiguration() {
+		Configuration copiedConfiguration = new Configuration();
+
+		copiedConfiguration.addAll(config);
+
+		return copiedConfiguration;
+	}
+
+
+	// --------------------------------------------------------------------------------------------
 	//  Execute Actions
 	// --------------------------------------------------------------------------------------------
 
@@ -688,42 +707,26 @@ public class CliFrontend {
 				new PackagedProgram(jarFile, entryPointClass, programArgs);
 	}
 
-	protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) throws Exception {
-
-		// first, check if the address is specified as an option
-		if (options.getJobManagerAddress() != null) {
-			return parseJobManagerAddress(options.getJobManagerAddress());
-		}
-
-		// second, check whether the address was already parsed, or configured through the YARN properties
-		if (jobManagerAddress == null) {
-			// config file must have the address
-			String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-
-			// verify that there is a jobmanager address and port in the configuration
-			if (jobManagerHost == null) {
-				throw new Exception("Found no configuration in the config directory '" + configDirectory
-						+ "' that specifies the JobManager address.");
-			}
-
-			int jobManagerPort;
-			try {
-				jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-			}
-			catch (NumberFormatException e) {
-				throw new Exception("Invalid value for the JobManager port (" +
-						ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration.");
-			}
-
-			if (jobManagerPort == -1) {
-				throw new Exception("Found no configuration in the config directory '" + configDirectory
-						+ "' that specifies the JobManager port.");
-			}
+	/**
+	 * Writes the given job manager address to the associated configuration object
+	 *
+	 * @param address Address to write to the configuration
+	 */
+	protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
+	}
 
-			jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort);
+	/**
+	 * Updates the associated configuration with the given command line options
+	 *
+	 * @param options Command line options
+	 */
+	protected void updateConfig(CommandLineOptions options) {
+		if(options.getJobManagerAddress() != null){
+			InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+			writeJobManagerAddressToConfig(jobManagerAddress);
 		}
-
-		return jobManagerAddress;
 	}
 
 	/**
@@ -735,16 +738,16 @@ public class CliFrontend {
 	 * @throws Exception
 	 */
 	protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
-		//TODO: Get ActorRef from YarnCluster if we are in YARN mode.
-
-		InetSocketAddress address = getJobManagerAddress(options);
+		// overwrite config values with given command line options
+		updateConfig(options);
 
 		// start an actor system if needed
 		if (this.actorSystem == null) {
 			LOG.info("Starting actor system to communicate with JobManager");
 			try {
 				scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-				this.actorSystem = AkkaUtils.createActorSystem(config,
+				this.actorSystem = AkkaUtils.createActorSystem(
+						config,
 						new Some<scala.Tuple2<String, Object>>(systemEndpoint));
 			}
 			catch (Exception e) {
@@ -754,20 +757,33 @@ public class CliFrontend {
 			LOG.info("Actor system successfully started");
 		}
 
-		LOG.info("Trying to lookup JobManager");
-		ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
-		LOG.info("JobManager is at " + jmActor.path());
+		LOG.info("Trying to lookup the JobManager gateway");
+		// Retrieve the ActorGateway from the LeaderRetrievalService
+		LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
 
-		// Retrieve the ActorGateway from the JobManager's ActorRef
-		return JobManager.getJobManagerGateway(jmActor, lookupTimeout);
+		return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
 	}
 
 	/**
-	 * @param userParallelism The parallelism requested by the user in the CLI frontend.
+	 * Retrieves a {@link Client} object from the given command line options and other parameters.
+	 *
+	 * @param options Command line options which contain JobManager address
+	 * @param classLoader Class loader to use by the Client
+	 * @param programName Program name
+	 * @param userParallelism Given user parallelism
+	 * @return
+	 * @throws Exception
 	 */
-	protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
-		InetSocketAddress jobManagerAddress;
+	protected Client getClient(
+			CommandLineOptions options,
+			ClassLoader classLoader,
+			String programName,
+			int userParallelism)
+		throws Exception {
+		InetSocketAddress jobManagerAddress = null;
+
 		int maxSlots = -1;
+
 		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
 			logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
 
@@ -830,9 +846,16 @@ public class CliFrontend {
 			}
 		}
 		else {
-			jobManagerAddress = getJobManagerAddress(options);
+			if(options.getJobManagerAddress() != null) {
+				jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+			}
+		}
+
+		if(jobManagerAddress != null) {
+			writeJobManagerAddressToConfig(jobManagerAddress);
 		}
-		return new Client(jobManagerAddress, config, classLoader, maxSlots);
+
+		return new Client(config, classLoader, maxSlots);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -992,7 +1015,14 @@ public class CliFrontend {
 	//  Miscellaneous Utilities
 	// --------------------------------------------------------------------------------------------
 
-	private static InetSocketAddress parseJobManagerAddress(String hostAndPort) {
+	/**
+	 * Parses a given host port address of the format URL:PORT and returns an {@link InetSocketAddress}
+	 *
+	 * @param hostAndPort host port string to be parsed
+	 * @return InetSocketAddress object containing the parsed host port information
+	 */
+	private static InetSocketAddress parseHostPortAddress(String hostAndPort) {
+		// code taken from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
 		URI uri;
 		try {
 			uri = new URI("my://" + hostAndPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index b288996..83e2ee4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -109,6 +109,7 @@ public class LocalExecutor extends PlanExecutor {
 				}
 				// start it up
 				this.flink = new LocalFlinkMiniCluster(configuration, true);
+				this.flink.start();
 			} else {
 				throw new IllegalStateException("The local executor was already started.");
 			}
@@ -168,7 +169,7 @@ public class LocalExecutor extends PlanExecutor {
 			}
 
 			try {
-				Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
+				Optimizer pc = new Optimizer(new DataStatistics(), this.flink.configuration());
 				OptimizedPlan op = pc.compile(plan);
 				
 				JobGraphGenerator jgg = new JobGraphGenerator();
@@ -251,7 +252,7 @@ public class LocalExecutor extends PlanExecutor {
 		LocalExecutor exec = new LocalExecutor();
 		try {
 			exec.start();
-			Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
+			Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
 			OptimizedPlan op = pc.compile(plan);
 			PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 373d70c..d1be6d2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
@@ -53,7 +54,7 @@ public class RemoteExecutor extends PlanExecutor {
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
 
 	private final List<String> jarFiles;
-	private final InetSocketAddress address;
+	private final Configuration configuration;
 	
 	public RemoteExecutor(String hostname, int port) {
 		this(hostname, port, Collections.<String>emptyList());
@@ -73,7 +74,10 @@ public class RemoteExecutor extends PlanExecutor {
 
 	public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
 		this.jarFiles = jarFiles;
-		this.address = inet;
+		configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
+		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
 	}
 
 	@Override
@@ -83,7 +87,7 @@ public class RemoteExecutor extends PlanExecutor {
 	}
 	
 	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
+		Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
 		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 		
 		JobSubmissionResult result = c.run(p, -1, true);
@@ -99,7 +103,7 @@ public class RemoteExecutor extends PlanExecutor {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		
-		Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
+		Client c = new Client(configuration, program.getUserCodeClassLoader(), -1);
 		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 		
 		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
@@ -114,7 +118,7 @@ public class RemoteExecutor extends PlanExecutor {
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
+		Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
 		
 		OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e90a39c..2e9ba18 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -22,10 +22,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -33,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -48,15 +43,16 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
@@ -67,7 +63,6 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import com.google.common.base.Preconditions;
@@ -79,18 +74,18 @@ public class Client {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
 
-	/** The configuration to use for the client (optimizer, timeouts, ...) */
+	/**
+	 * The configuration to use for the client (optimizer, timeouts, ...) and to connect to the
+	 * JobManager.
+	 */
 	private final Configuration configuration;
 
-	/** The address of the JobManager to send the program to */
-	private final InetSocketAddress jobManagerAddress;
-
 	/** The optimizer used in the optimization of batch programs */
 	private final Optimizer compiler;
 
 	/** The class loader to use for classes from the user program (e.g., functions and data types) */
 	private final ClassLoader userCodeClassLoader;
-	
+
 	/** Flag indicating whether to sysout print execution updates */
 	private boolean printStatusDuringExecution = true;
 
@@ -98,7 +93,7 @@ public class Client {
 	 * If != -1, this field specifies the total number of available slots on the cluster
 	 * connected to the client.
 	 */
-	private int maxSlots = -1;
+	private int maxSlots;
 
 	/** ID of the last job submitted with this client. */
 	private JobID lastJobId = null;
@@ -107,85 +102,35 @@ public class Client {
 	// ------------------------------------------------------------------------
 	//                            Construction
 	// ------------------------------------------------------------------------
-	
+
 	/**
-	 * Creates a new instance of the class that submits the jobs to a job-manager.
-	 * at the given address using the default port.
-	 * 
-	 * @param jobManagerAddress Address and port of the job-manager.
+	 * Creates a instance that submits the programs to the JobManager defined in the
+	 * configuration. It sets the maximum number of slots to unknown (= -1).
+	 *
+	 * @param config The config used to obtain the JobManager's address.
+	 * @param userCodeClassLoader The class loader to use for loading user code classes.
 	 */
-	public Client(InetSocketAddress jobManagerAddress, Configuration config, 
-							ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException
-	{
-		Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
-		Preconditions.checkNotNull(config, "Configuration is null");
-		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
-		
-		this.configuration = config;
-		
-		if (jobManagerAddress.isUnresolved()) {
-			// address is unresolved, resolve it
-			String host = jobManagerAddress.getHostName();
-			if (host == null) {
-				throw new IllegalArgumentException("Host in jobManagerAddress is null");
-			}
-			
-			try {
-				InetAddress address = InetAddress.getByName(host);
-				this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
-			}
-			catch (UnknownHostException e) {
-				throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
-			}
-		}
-		else {
-			// address is already resolved, use it as is
-			this.jobManagerAddress = jobManagerAddress;
-		}
-		
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.maxSlots = maxSlots;
+	public Client(Configuration config, ClassLoader userCodeClassLoader) {
+		this(config, userCodeClassLoader, -1);
 	}
 
 	/**
 	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
-	 * if that is not possible.
+	 * configuration.
 	 * 
-	 * @param config The config used to obtain the job-manager's address.
-	 * @param userCodeClassLoader The class loader to use for loading user code classes.   
+	 * @param config The config used to obtain the JobManager's address.
+	 * @param userCodeClassLoader The class loader to use for loading user code classes.
+	 * @param maxSlots The number of maxSlots on the cluster if != -1
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
+	public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
 		Preconditions.checkNotNull(config, "Configuration is null");
 		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
 		
 		this.configuration = config;
 		this.userCodeClassLoader = userCodeClassLoader;
-		
-		// instantiate the address to the job manager
-		final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		if (address == null) {
-			throw new IllegalConfigurationException(
-					"Cannot find address to job manager's RPC service in the global configuration.");
-		}
-		
-		final int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-														ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-		if (port < 0) {
-			throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
-		}
-		
-		try {
-			InetAddress inetAddress = InetAddress.getByName(address);
-			this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
-		}
-		catch (UnknownHostException e) {
-			throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address
-					+ "' specified in the configuration");
-		}
 
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+		this.maxSlots = maxSlots;
 	}
 
 	/**
@@ -377,8 +322,6 @@ public class Client {
 	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
 		this.lastJobId = jobGraph.getJobID();
 		
-		LOG.info("JobManager actor system address is " + jobManagerAddress);
-		
 		LOG.info("Starting client actor system");
 		final ActorSystem actorSystem;
 		try {
@@ -388,58 +331,59 @@ public class Client {
 			throw new ProgramInvocationException("Could start client actor system.", e);
 		}
 
-		FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-
-		LOG.info("Looking up JobManager");
-		ActorGateway jobManagerGateway;
-		ActorRef jobManagerActorRef;
 		try {
-			jobManagerActorRef = JobManager.getJobManagerRemoteReference(
-					jobManagerAddress,
-					actorSystem,
-					configuration);
+			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
+			FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
 
+			LOG.info("Looking up JobManager");
+			ActorGateway jobManagerGateway;
 
-		} catch (IOException e) {
-			throw new ProgramInvocationException("Failed to resolve JobManager", e);
-		}
+			LeaderRetrievalService leaderRetrievalService;
 
-		try{
-			jobManagerGateway = JobManager.getJobManagerGateway(jobManagerActorRef, timeout);
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
-		}
+			try {
+				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
+			}
 
-		LOG.info("JobManager runs at " + jobManagerGateway.path());
+			try {
+				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+						leaderRetrievalService,
+						actorSystem,
+						lookupTimeout);
+			} catch (LeaderRetrievalException e) {
+				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
+			}
 
-		LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
+			LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path());
 
-		LOG.info("Checking and uploading JAR files");
-		try {
-			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
-		}
-		catch (IOException e) {
-			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
-		}
+			LOG.info("JobManager runs at " + jobManagerGateway.path());
 
-		try{
-			if (wait) {
-				return JobClient.submitJobAndWait(actorSystem,
-						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
+			LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
+
+			LOG.info("Checking and uploading JAR files");
+			try {
+				JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+			} catch (IOException e) {
+				throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
 			}
-			else {
-				JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
-				// return a dummy execution result with the JobId
-				return new JobSubmissionResult(jobGraph.getJobID());
+
+			try {
+				if (wait) {
+					return JobClient.submitJobAndWait(actorSystem,
+						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
+				} else {
+					JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
+					// return a dummy execution result with the JobId
+					return new JobSubmissionResult(jobGraph.getJobID());
+				}
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Exception during program execution.", e);
 			}
-		}
-		catch (JobExecutionException e) {
-			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
-		}
-		catch (Exception e) {
-			throw new ProgramInvocationException("Exception during program execution.", e);
-		}
-		finally {
+		} finally {
+			// shut down started actor system
 			actorSystem.shutdown();
 			
 			// wait at most for 30 seconds, to work around an occasional akka problem
@@ -454,6 +398,7 @@ public class Client {
 	 */
 	public void cancel(JobID jobId) throws Exception {
 		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
 
 		ActorSystem actorSystem;
 		try {
@@ -462,31 +407,48 @@ public class Client {
 			throw new ProgramInvocationException("Could start client actor system.", e);
 		}
 
-		ActorRef jobManager;
 		try {
-			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Error getting the remote actor reference for the job manager.", e);
-		}
+			ActorGateway jobManagerGateway;
 
-		Future<Object> response;
-		try {
-			ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout);
-			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-		}
+			LeaderRetrievalService leaderRetrievalService;
+
+			try {
+				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
+			}
 
-		Object result = Await.result(response, timeout);
+			try {
+				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+						leaderRetrievalService,
+						actorSystem,
+						lookupTimeout);
+			} catch (LeaderRetrievalException e) {
+				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
+			}
 
-		if (result instanceof JobManagerMessages.CancellationSuccess) {
-			LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
-		} else if (result instanceof JobManagerMessages.CancellationFailure) {
-			Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-			LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
-			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
-		} else {
-			throw new Exception("Unknown message received while cancelling.");
+			Future<Object> response;
+			try {
+				response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
+			}
+			
+			Object result = Await.result(response, timeout);
+
+			if (result instanceof JobManagerMessages.CancellationSuccess) {
+				LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+			} else if (result instanceof JobManagerMessages.CancellationFailure) {
+				Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+				LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+				throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+			} else {
+				throw new Exception("Unknown message received while cancelling.");
+			}
+		} finally {
+			// shut down started actor system
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
 		}
 	}
 
@@ -512,6 +474,7 @@ public class Client {
 	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
 
 		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
 
 		ActorSystem actorSystem;
 		try {
@@ -520,22 +483,32 @@ public class Client {
 			throw new Exception("Could start client actor system.", e);
 		}
 
-		ActorRef jobManager;
 		try {
-			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
-		} catch (Exception e) {
-			throw new Exception("Error getting the remote actor reference for the job manager.", e);
-		}
+			ActorGateway jobManagerGateway;
 
-		Future<Object> response;
-		try {
-			ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout);
-			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
-		} catch (Exception e) {
-			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
-		}
+			LeaderRetrievalService leaderRetrievalService;
 
-		try {
+			try {
+				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
+			}
+
+			try {
+				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+						leaderRetrievalService,
+						actorSystem,
+						lookupTimeout);
+			} catch (LeaderRetrievalException e) {
+				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
+			}
+
+			Future<Object> response;
+			try {
+				response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
+			} catch (Exception e) {
+				throw new Exception("Failed to query the job manager gateway for accumulators.", e);
+			}
 
 			Object result = Await.result(response, timeout);
 
@@ -548,14 +521,12 @@ public class Client {
 			} else if (result instanceof AccumulatorResultsErroneous) {
 				throw ((AccumulatorResultsErroneous) result).cause();
 			} else {
-				LOG.warn("Failed to fetch accumulators for job {}.", jobID);
+				throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
 			}
-
-		} catch (Exception e) {
-			LOG.error("Error occurred while fetching accumulators for {}", jobID, e);
+		} finally {
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
 		}
-
-		return Collections.emptyMap();
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 2d41374..9d0b691 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -19,24 +19,25 @@
 package org.apache.flink.client;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import static org.mockito.Mockito.*;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-
 import org.apache.flink.client.cli.CommandLineOptions;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.net.InetSocketAddress;
+
 /**
  * Tests that verify that the CLI client picks up the correct address for the JobManager
  * from configuration and configs.
@@ -62,13 +63,11 @@ public class CliFrontendAddressConfigurationTest {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
 			CommandLineOptions options = mock(CommandLineOptions.class);
 
-			try {
-				frontend.getJobManagerAddress(options);
-				fail("we expect an exception here because the we have no config");
-			}
-			catch (Exception e) {
-				// expected
-			}
+			frontend.updateConfig(options);
+			Configuration config = frontend.getConfiguration();
+
+			checkJobManagerAddress(config, null, -1);
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -84,7 +83,12 @@ public class CliFrontendAddressConfigurationTest {
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
 
-			assertNotNull(frontend.getJobManagerAddress(options));
+			frontend.updateConfig(options);
+			Configuration config = frontend.getConfiguration();
+
+			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+
+			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -98,11 +102,14 @@ public class CliFrontendAddressConfigurationTest {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
-			InetSocketAddress address = frontend.getJobManagerAddress(options);
-			
-			assertNotNull(address);
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_PORT, address.getPort());
+
+			frontend.updateConfig(options);
+			Configuration config = frontend.getConfiguration();
+
+			checkJobManagerAddress(
+					config,
+					CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
+					CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -135,11 +142,14 @@ public class CliFrontendAddressConfigurationTest {
 			CliFrontend frontend = new CliFrontend(tmpFolder.getAbsolutePath());
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
-			InetSocketAddress address = frontend.getJobManagerAddress(options);
-			
-			assertNotNull(address);
-			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
-			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT, address.getPort());
+
+			frontend.updateConfig(options);
+			Configuration config = frontend.getConfiguration();
+
+			checkJobManagerAddress(
+					config,
+					CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS,
+					CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -154,10 +164,14 @@ public class CliFrontendAddressConfigurationTest {
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
 
-			InetSocketAddress address = cli.getJobManagerAddress(options);
+			cli.updateConfig(options);
+
+			Configuration config = cli.getConfiguration();
 
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_PORT, address.getPort());
+			checkJobManagerAddress(
+				config,
+				CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
+				CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -173,11 +187,13 @@ public class CliFrontendAddressConfigurationTest {
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
 
-			InetSocketAddress address = frontend.getJobManagerAddress(options);
-			
-			assertNotNull(address);
-			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
-			assertEquals(7788, address.getPort());
+			frontend.updateConfig(options);
+
+			Configuration config = frontend.getConfiguration();
+
+			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+
+			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -193,15 +209,25 @@ public class CliFrontendAddressConfigurationTest {
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
 
-			InetSocketAddress address = frontend.getJobManagerAddress(options);
-			
-			assertNotNull(address);
-			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
-			assertEquals(7788, address.getPort());
+			frontend.updateConfig(options);
+
+			Configuration config = frontend.getConfiguration();
+
+			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+
+			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
+
+	public void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
+		String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+		assertEquals(expectedAddress, jobManagerAddress);
+		assertEquals(expectedPort, jobManagerPort);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index cb2585d..751783c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.configuration.Configuration;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 
 import static org.junit.Assert.*;
 
@@ -106,9 +106,13 @@ public class CliFrontendInfoTest {
 
 		@Override
 		protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par)
-				throws Exception
-		{
-			return new TestClient(expectedDop);
+				throws Exception {
+			Configuration config = new Configuration();
+
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getLocalHost().getHostName());
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6176);
+
+			return new TestClient(config, expectedDop);
 		}
 	}
 	
@@ -116,9 +120,8 @@ public class CliFrontendInfoTest {
 		
 		private final int expectedDop;
 		
-		private TestClient(int expectedDop) throws Exception {
-			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176),
-					new Configuration(), CliFrontendInfoTest.class.getClassLoader(), -1);
+		private TestClient(Configuration config, int expectedDop) throws Exception {
+			super(config, CliFrontendInfoTest.class.getClassLoader(), -1);
 			
 			this.expectedDop = expectedDop;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index fc64503..736d859 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.Option;
 
 import java.util.UUID;
 
@@ -83,7 +82,7 @@ public class CliFrontendListCancelTest {
 				JobID jid = new JobID();
 				String jidString = jid.toString();
 
-				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+				final UUID leaderSessionID = UUID.randomUUID();
 
 				final ActorRef jm = actorSystem.actorOf(Props.create(
 								CliJobManager.class,
@@ -106,7 +105,7 @@ public class CliFrontendListCancelTest {
 				JobID jid1 = new JobID();
 				JobID jid2 = new JobID();
 
-				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+				final UUID leaderSessionID = UUID.randomUUID();
 
 				final ActorRef jm = actorSystem.actorOf(
 						Props.create(
@@ -143,11 +142,11 @@ public class CliFrontendListCancelTest {
 			
 			// test list properly
 			{
-				final Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+				final UUID leaderSessionID = UUID.randomUUID();
 				final ActorRef jm = actorSystem.actorOf(
 						Props.create(
 								CliJobManager.class,
-								(Object)null,
+								null,
 								leaderSessionID
 						)
 				);
@@ -183,9 +182,9 @@ public class CliFrontendListCancelTest {
 
 	protected static final class CliJobManager extends FlinkUntypedActor {
 		private final JobID jobID;
-		private final Option<UUID> leaderSessionID;
+		private final UUID leaderSessionID;
 
-		public CliJobManager(final JobID jobID, final Option<UUID> leaderSessionID){
+		public CliJobManager(final JobID jobID, final UUID leaderSessionID){
 			this.jobID = jobID;
 			this.leaderSessionID = leaderSessionID;
 		}
@@ -217,7 +216,7 @@ public class CliFrontendListCancelTest {
 		}
 
 		@Override
-		protected Option<UUID> getLeaderSessionID() {
+		protected UUID getLeaderSessionID() {
 			return leaderSessionID;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 6798806..a7944ce 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -92,18 +92,18 @@ public class CliFrontendRunTest {
 	
 	public static final class RunTestingCliFrontend extends CliFrontend {
 		
-		private final int expectedParallelim;
+		private final int expectedParallelism;
 		private final boolean sysoutLogging;
 		
-		public RunTestingCliFrontend(int expectedParallelim, boolean logging) throws Exception {
+		public RunTestingCliFrontend(int expectedParallelism, boolean logging) throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
-			this.expectedParallelim = expectedParallelim;
+			this.expectedParallelism = expectedParallelism;
 			this.sysoutLogging = logging;
 		}
 
 		@Override
 		protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
-			assertEquals(this.expectedParallelim, parallelism);
+			assertEquals(this.expectedParallelism, parallelism);
 			assertEquals(client.getPrintStatusDuringExecution(), sysoutLogging);
 			return 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 9293148..47236af 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -28,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
@@ -45,10 +47,11 @@ public class RemoteExecutorHostnameResolutionTest {
 		try {
 			RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 			exec.executePlan(getProgram());
-			fail("This should fail with an UnknownHostException");
+			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (UnknownHostException e) {
+		catch (ProgramInvocationException e) {
 			// that is what we want!
+			assertTrue(e.getCause() instanceof UnknownHostException);
 		}
 		catch (Exception e) {
 			System.err.println("Wrong exception!");
@@ -66,10 +69,11 @@ public class RemoteExecutorHostnameResolutionTest {
 			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
 			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
 			exec.executePlan(getProgram());
-			fail("This should fail with an UnknownHostException");
+			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (UnknownHostException e) {
+		catch (ProgramInvocationException e) {
 			// that is what we want!
+			assertTrue(e.getCause() instanceof UnknownHostException);
 		}
 		catch (Exception e) {
 			System.err.println("Wrong exception!");

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 39b74a3..1b9fd73 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -85,13 +85,16 @@ public class ClientConnectionTest {
 		final Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s");
 		config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s");
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
+
 
 		try {
 			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(TestInvokable.class);
 
 			final JobGraph jg = new JobGraph("Test Job", vertex);
-			final Client client = new Client(unreachableEndpoint, config, getClass().getClassLoader(), -1);
+			final Client client = new Client(config, getClass().getClassLoader(), -1);
 
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
deleted file mode 100644
index 41294e6..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-import static org.junit.Assert.*;
-import static org.junit.Assume.*;
-
-/**
- * Tests that verify that the client correctly handles non-resolvable host names and does not
- * fail with another exception
- */
-public class ClientHostnameResolutionTest {
-	
-	private static final String nonExistingHostname = "foo.bar.com.invalid";
-	
-	@Test
-	public void testUnresolvableHostname1() {
-		
-		checkPreconditions();
-		
-		try {
-			InetSocketAddress addr = new InetSocketAddress(nonExistingHostname, 17234);
-			new Client(addr, new Configuration(), getClass().getClassLoader(), 1);
-			fail("This should fail with an UnknownHostException");
-		}
-		catch (UnknownHostException e) {
-			// that is what we want!
-		}
-		catch (Exception e) {
-			System.err.println("Wrong exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testUnresolvableHostname2() {
-
-		checkPreconditions();
-		
-		try {
-			Configuration config = new Configuration();
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
-			
-			new Client(config, getClass().getClassLoader());
-			fail("This should fail with an UnknownHostException");
-		}
-		catch (UnknownHostException e) {
-			// that is what we want!
-		}
-		catch (Exception e) {
-			System.err.println("Wrong exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static void checkPreconditions() {
-		// the test can only work if the invalid URL cannot be resolves
-		// some internet providers resolve unresolvable URLs to navigational aid servers,
-		// voiding this test.
-		boolean throwsException;
-		try {
-			//noinspection ResultOfMethodCallIgnored
-			InetAddress.getByName(nonExistingHostname);
-			throwsException = false;
-		}
-		catch (UnknownHostException e) {
-			throwsException = true;
-		}
-		assumeTrue(throwsException);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 46de93d..a4b8acb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -52,7 +52,6 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 
@@ -229,7 +228,7 @@ public class ClientTest {
 
 	public static class SuccessReturningActor extends FlinkUntypedActor {
 
-		private final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+		private UUID leaderSessionID = null;
 
 		@Override
 		public void handleMessage(Object message) {
@@ -252,14 +251,14 @@ public class ClientTest {
 		}
 
 		@Override
-		protected Option<UUID> getLeaderSessionID() {
+		protected UUID getLeaderSessionID() {
 			return leaderSessionID;
 		}
 	}
 
 	public static class FailureReturningActor extends FlinkUntypedActor {
 
-		private Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+		private UUID leaderSessionID = null;
 
 		@Override
 		public void handleMessage(Object message) {
@@ -270,7 +269,7 @@ public class ClientTest {
 		}
 
 		@Override
-		protected Option<UUID> getLeaderSessionID() {
+		protected UUID getLeaderSessionID() {
 			return leaderSessionID;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 67b406d..d1e971f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
@@ -43,8 +44,13 @@ public class ExecutionPlanCreationTest {
 			
 			InetAddress mockAddress = InetAddress.getLocalHost();
 			InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
+
+			Configuration config = new Configuration();
+
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
 			
-			Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader(), -1);
+			Client client = new Client(config, getClass().getClassLoader(), -1);
 			OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
 			assertNotNull(op);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
new file mode 100644
index 0000000..ee26145
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+/**
+ * Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
+ * and does not fail with another exception
+ */
+public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
+	
+	private static final String nonExistingHostname = "foo.bar.com.invalid";
+	
+	@Test
+	public void testUnresolvableHostname1() {
+		
+		checkPreconditions();
+		
+		try {
+			Configuration config = new Configuration();
+
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+
+			LeaderRetrievalUtils.createLeaderRetrievalService(config);
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testUnresolvableHostname2() {
+
+		checkPreconditions();
+		
+		try {
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+
+			LeaderRetrievalUtils.createLeaderRetrievalService(config);
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static void checkPreconditions() {
+		// the test can only work if the invalid URL cannot be resolves
+		// some internet providers resolve unresolvable URLs to navigational aid servers,
+		// voiding this test.
+		boolean throwsException;
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			InetAddress.getByName(nonExistingHostname);
+			throwsException = false;
+		}
+		catch (UnknownHostException e) {
+			throwsException = true;
+		}
+		assumeTrue(throwsException);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 51a4fa1..99e4906 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -55,7 +55,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -184,12 +183,14 @@ public class FlinkClient {
 		final Configuration configuration = jobGraph.getJobConfiguration();
 
 		final Client client;
-		try {
-			client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
-					JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-		} catch (final UnknownHostException e) {
-			throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
-		}
+
+		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
+		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+
+		client = new Client(
+			configuration,
+			JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()),
+			-1);
 
 		try {
 			client.run(jobGraph, false);
@@ -302,7 +303,7 @@ public class FlinkClient {
 			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
 		}
 
-		return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+		return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
 				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index f21e58c..fab5c9a 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Test;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.DataStreamUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.junit.Assert;
 
@@ -38,9 +37,10 @@ public class CollectITCase {
 
 		Configuration config = new Configuration();
 		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster.start();
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 
 		long N = 10;
 		DataStream<Long> stream = env.generateSequence(1, N);


Mime
View raw message