flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [08/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.
Date Mon, 31 Aug 2015 10:31:44 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
new file mode 100644
index 0000000..b6223ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+/**
+ * Interface for a service which allows to elect a leader among a group of contenders.
+ *
+ * Prior to using this service, it has to be started calling the start method. The start method
+ * takes the contender as a parameter. If there are multiple contenders, then each contender has
+ * to instantiate its own leader election service.
+ *
+ * Once a contender has been granted leadership he has to confirm the received leader session ID
+ * by calling the method confirmLeaderSessionID. This will notify the leader election service, that
+ * the contender has received the new leader session ID and that it can now be published for
+ * leader retrieval services.
+ */
+public interface LeaderElectionService {
+
+	/**
+	 * Starts the leader election service. This method can only be called once.
+	 *
+	 * @param contender LeaderContender which applies for the leadership
+	 * @throws Exception
+	 */
+	void start(LeaderContender contender) throws Exception;
+
+	/**
+	 * Stops the leader election service.
+	 * @throws Exception
+	 */
+	void stop() throws Exception;
+
+	/**
+	 * Confirms that the new leader session ID has been successfully received by the new leader.
+	 * This method is usually called by the newly appointed {@link LeaderContender}.
+	 *
+	 * The rational behind this method is to establish an order between setting the new leader
+	 * session ID in the {@link LeaderContender} and publishing the new leader session ID to the
+	 * leader retrieval services.
+	 *
+	 * @param leaderSessionID The new leader session ID
+	 */
+	void confirmLeaderSessionID(UUID leaderSessionID);
+
+	/**
+	 * Returns true if the {@link LeaderContender} with which the service has been started owns
+	 * currently the leadership.
+	 *
+	 * @return true if the associated {@link LeaderContender} is the leader, otherwise false
+	 */
+	boolean hasLeadership();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
new file mode 100644
index 0000000..1a297db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import com.google.common.base.Preconditions;
+
+import java.util.UUID;
+
+/**
+ * Standalone implementation of the {@link LeaderElectionService} interface. The standalone
+ * implementation assumes that there is only a single {@link LeaderContender} and thus directly
+ * grants him the leadership upon start up. Furthermore, there is no communication needed between
+ * multiple standalone leader election services.
+ */
+public class StandaloneLeaderElectionService implements LeaderElectionService {
+
+	private LeaderContender contender = null;
+
+	@Override
+	public void start(LeaderContender newContender) throws Exception {
+		if (contender != null) {
+			// Service was already started
+			throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
+		}
+
+		contender = Preconditions.checkNotNull(newContender);
+
+		// directly grant leadership to the given contender
+		contender.grantLeadership(null);
+	}
+
+	@Override
+	public void stop() {
+		if (contender != null) {
+			contender.revokeLeadership();
+			contender = null;
+		}
+	}
+
+	@Override
+	public void confirmLeaderSessionID(UUID leaderSessionID) {}
+
+	@Override
+	public boolean hasLeadership() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
new file mode 100644
index 0000000..d1fd548
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+/**
+ * Leader election service for multiple JobManager. The leading JobManager is elected using
+ * ZooKeeper. The current leader's address as well as its leader session ID is published via
+ * ZooKeeper as well.
+ */
+public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener {
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
+
+	/** Client to the ZooKeeper quorum */
+	private final CuratorFramework client;
+
+	/** Curator recipe for leader election */
+	private final LeaderLatch leaderLatch;
+
+	/** Curator recipe to watch a given ZooKeeper node for changes */
+	private final NodeCache cache;
+
+	/** ZooKeeper path of the node which stores the current leader information */
+	private final String leaderPath;
+
+	private UUID issuedLeaderSessionID;
+
+	private UUID confirmedLeaderSessionID;
+
+	/** The leader contender which applies for leadership */
+	private volatile LeaderContender leaderContender;
+
+	private final Object lock = new Object();
+
+	/**
+	 * Creates a ZooKeeperLeaderElectionService object.
+	 *
+	 * @param client Client which is connected to the ZooKeeper quorum
+	 * @param latchPath ZooKeeper node path for the leader election latch
+	 * @param leaderPath ZooKeeper node path for the node which stores the current leader information
+	 */
+	public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
+		this.client = client;
+		this.leaderPath = leaderPath;
+
+		leaderLatch = new LeaderLatch(client, latchPath);
+		cache = new NodeCache(client, leaderPath);
+	}
+
+	/**
+	 * Returns the current leader session ID or null, if the contender is not the leader.
+	 *
+	 * @return The last leader session ID or null, if the contender is not the leader
+	 */
+	public UUID getLeaderSessionID() {
+		return confirmedLeaderSessionID;
+	}
+
+	@Override
+	public void start(LeaderContender contender) throws Exception {
+		Preconditions.checkNotNull(contender, "Contender must not be null.");
+		Preconditions.checkState(leaderContender == null, "Contender was already set.");
+
+		LOG.info("Starting ZooKeeperLeaderElectionService.");
+
+		leaderContender = contender;
+
+		leaderLatch.addListener(this);
+		leaderLatch.start();
+
+		cache.getListenable().addListener(this);
+		cache.start();
+	}
+
+	@Override
+	public void stop() throws Exception{
+		LOG.info("Stopping ZooKeeperLeaderElectionService.");
+
+		cache.close();
+		leaderLatch.close();
+		client.close();
+
+		confirmedLeaderSessionID = null;
+		issuedLeaderSessionID = null;
+	}
+
+	@Override
+	public void confirmLeaderSessionID(UUID leaderSessionID) {
+		Preconditions.checkNotNull(leaderSessionID);
+
+		if(leaderLatch.hasLeadership()) {
+			// check if this is an old confirmation call
+			synchronized (lock) {
+				if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
+					confirmedLeaderSessionID = leaderSessionID;
+					writeLeaderInformation(confirmedLeaderSessionID);
+				}
+			}
+		} else {
+			LOG.warn("The leader session ID " + leaderSessionID + " was confirmed even though the" +
+					"corresponding JobManager was not elected as the leader.");
+		}
+	}
+
+	@Override
+	public boolean hasLeadership() {
+		if(leaderLatch.getState().equals(LeaderLatch.State.STARTED)) {
+			return leaderLatch.hasLeadership();
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public void isLeader() {
+		synchronized (lock) {
+			issuedLeaderSessionID = UUID.randomUUID();
+			confirmedLeaderSessionID = null;
+			leaderContender.grantLeadership(issuedLeaderSessionID);
+		}
+	}
+
+	@Override
+	public void notLeader() {
+		synchronized (lock) {
+			issuedLeaderSessionID = null;
+			confirmedLeaderSessionID = null;
+			leaderContender.revokeLeadership();
+		}
+	}
+
+	@Override
+	public void nodeChanged() throws Exception {
+		try {
+			// leaderSessionID is null if the leader contender has not yet confirmed the session ID
+			if (leaderLatch.hasLeadership()) {
+				synchronized (lock) {
+					if (confirmedLeaderSessionID != null) {
+						ChildData childData = cache.getCurrentData();
+
+						if (childData == null) {
+							writeLeaderInformation(confirmedLeaderSessionID);
+						} else {
+							byte[] data = childData.getData();
+
+							if (data == null || data.length == 0) {
+								// the data field seems to be empty, rewrite information
+								writeLeaderInformation(confirmedLeaderSessionID);
+							} else {
+								ByteArrayInputStream bais = new ByteArrayInputStream(data);
+								ObjectInputStream ois = new ObjectInputStream(bais);
+
+								String leaderAddress = ois.readUTF();
+								UUID leaderSessionID = (UUID) ois.readObject();
+
+								if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
+										(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
+									// the data field does not correspond to the expected leader information
+									writeLeaderInformation(confirmedLeaderSessionID);
+								}
+							}
+						}
+					}
+				}
+			}
+		} catch (Exception e) {
+			leaderContender.handleError(new Exception("Could not handle node changed event.", e));
+			throw e;
+		}
+	}
+
+	/**
+	 * Writes the current leader's address as well the given leader session ID to ZooKeeper.
+	 *
+	 * @param leaderSessionID Leader session ID which is written to ZooKeeper
+	 */
+	protected void writeLeaderInformation(UUID leaderSessionID) {
+		// this method does not have to be synchronized because the curator framework client
+		// is thread-safe
+		try {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+			oos.writeUTF(leaderContender.getAddress());
+			oos.writeObject(leaderSessionID);
+
+			oos.close();
+
+			boolean dataWritten = false;
+
+			while(!dataWritten && leaderLatch.hasLeadership()) {
+				Stat stat = client.checkExists().forPath(leaderPath);
+
+				if (stat != null) {
+					long owner = stat.getEphemeralOwner();
+					long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();
+
+					if(owner == sessionID) {
+						try {
+							client.setData().forPath(leaderPath, baos.toByteArray());
+
+							dataWritten = true;
+						} catch (KeeperException.NoNodeException noNode) {
+							// node was deleted in the meantime
+						}
+					} else {
+						try {
+							client.delete().forPath(leaderPath);
+						} catch (KeeperException.NoNodeException noNode) {
+							// node was deleted in the meantime --> try again
+						}
+					}
+				} else {
+					try {
+						client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
+								leaderPath,
+								baos.toByteArray());
+
+						dataWritten = true;
+					} catch (KeeperException.NodeExistsException nodeExists) {
+						// node has been created in the meantime --> try again
+					}
+				}
+			}
+		} catch (Exception e) {
+			leaderContender.handleError(
+					new Exception("Could not write leader address and leader session ID to " +
+							"ZooKeeper.", e));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
new file mode 100644
index 0000000..8d2a9b5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+/**
+ * This exception is thrown by the {@link org.apache.flink.runtime.util.LeaderRetrievalUtils} when
+ * the method retrieveLeaderGateway fails to retrieve the current leader's gateway.
+ */
+public class LeaderRetrievalException extends Exception {
+
+	private static final long serialVersionUID = 42;
+
+	public LeaderRetrievalException(String message) {
+		super(message);
+	}
+
+	public LeaderRetrievalException(Throwable cause) {
+		super(cause);
+	}
+
+	public LeaderRetrievalException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
new file mode 100644
index 0000000..b5ba4e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import java.util.UUID;
+
+/**
+ * Classes which want to be notified about a changing leader by the {@link LeaderRetrievalService}
+ * have to implement this interface.
+ */
+public interface LeaderRetrievalListener {
+
+	/**
+	 * This method is called by the {@link LeaderRetrievalService} when a new leader is elected.
+	 *
+	 * @param leaderAddress The address of the new leader
+	 * @param leaderSessionID The new leader session ID
+	 */
+	void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
+
+	/**
+	 * This method is called by the {@link LeaderRetrievalService} in case of an exception. This
+	 * assures that the {@link LeaderRetrievalListener} is aware of any problems occurring in the
+	 * {@link LeaderRetrievalService} thread.
+	 * @param exception
+	 */
+	void handleError(Exception exception);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalService.java
new file mode 100644
index 0000000..97f8843
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalService.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+/**
+ * This interface has to be implemented by a service which retrieves the current leader and notifies
+ * a listener about it.
+ *
+ * Prior to using this service it has to be started by calling the start method. The start method
+ * also takes the {@link LeaderRetrievalListener} as an argument. The service can only be started
+ * once.
+ *
+ * The service should be stopped by calling the stop method.
+ */
+public interface LeaderRetrievalService {
+
+	/**
+	 * Starts the leader retrieval service with the given listener to listen for new leaders. This
+	 * method can only be called once.
+	 *
+	 * @param listener The leader retrieval listener which will be notified about new leaders.
+	 * @throws Exception
+	 */
+	void start(LeaderRetrievalListener listener) throws Exception;
+
+	/**
+	 * Stops the leader retrieval service.
+	 *
+	 * @throws Exception
+	 */
+	void stop() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
new file mode 100644
index 0000000..dbab41c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Standalone implementation of the {@link LeaderRetrievalService}. The standalone implementation
+ * assumes that there is only a single {@link org.apache.flink.runtime.jobmanager.JobManager} whose
+ * address is given to the service when creating it. This address is directly given to the
+ * {@link LeaderRetrievalListener} when the service is started.
+ */
+public class StandaloneLeaderRetrievalService implements LeaderRetrievalService {
+
+	/** Address of the only JobManager */
+	private final String jobManagerAddress;
+
+	/** Listener which wants to be notified about the new leader */
+	private LeaderRetrievalListener leaderListener;
+
+	/**
+	 * Creates a StandaloneLeaderRetrievalService with the given JobManager address.
+	 *
+	 * @param jobManagerAddress The JobManager's address which is returned to the
+	 * 							{@link LeaderRetrievalListener}
+	 */
+	public StandaloneLeaderRetrievalService(String jobManagerAddress) {
+		this.jobManagerAddress = jobManagerAddress;
+	}
+
+	public void start(LeaderRetrievalListener listener) {
+		Preconditions.checkNotNull(listener, "Listener must not be null.");
+		Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " +
+				"only be started once.");
+
+		leaderListener = listener;
+
+		// directly notify the listener, because we already know the leading JobManager's address
+		leaderListener.notifyLeaderAddress(jobManagerAddress, null);
+	}
+
+	public void stop() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
new file mode 100644
index 0000000..20ed4d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService}.
+ * This implementation of the {@link LeaderRetrievalService} retrieves the current leader which has
+ * been elected by the {@link org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService}.
+ * The leader address as well as the current leader session ID is retrieved from ZooKeeper.
+ */
+public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener {
+	private static final Logger LOG = LoggerFactory.getLogger(
+		ZooKeeperLeaderRetrievalService.class);
+
+	/** Connection to the used ZooKeeper quorum */
+	private final CuratorFramework client;
+
+	/** Curator recipe to watch changes of a specific ZooKeeper node */
+	private final NodeCache cache;
+
+	/** Listener which will be notified about leader changes */
+	private volatile LeaderRetrievalListener leaderListener;
+
+	private String lastLeaderAddress;
+	private UUID lastLeaderSessionID;
+
+	/**
+	 * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information.
+	 *
+	 * @param client Client which constitutes the connection to the ZooKeeper quorum
+	 * @param retrievalPath Path of the ZooKeeper node which contains the leader information
+	 */
+	public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
+		this.client = client;
+		this.cache = new NodeCache(client, retrievalPath);
+	}
+
+	@Override
+	public void start(LeaderRetrievalListener listener) throws Exception {
+		Preconditions.checkNotNull(listener, "Listener must not be null.");
+		Preconditions.checkState(leaderListener == null, "ZooKeeperLeaderRetrievalService can " +
+				"only be started once.");
+
+		LOG.info("Starting ZooKeeperLeaderRetrievalService.");
+
+		leaderListener = listener;
+
+		cache.getListenable().addListener(this);
+		cache.start();
+	}
+
+	@Override
+	public void stop() throws Exception {
+		LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
+
+		cache.close();
+		client.close();
+	}
+
+	@Override
+	public void nodeChanged() throws Exception {
+		try {
+			ChildData childData = cache.getCurrentData();
+
+			String leaderAddress;
+			UUID leaderSessionID;
+
+			if (childData == null) {
+				leaderAddress = null;
+				leaderSessionID = null;
+			} else {
+				byte[] data = childData.getData();
+
+				if (data == null || data.length == 0) {
+					leaderAddress = null;
+					leaderSessionID = null;
+				} else {
+					ByteArrayInputStream bais = new ByteArrayInputStream(data);
+					ObjectInputStream ois = new ObjectInputStream(bais);
+
+					leaderAddress = ois.readUTF();
+					leaderSessionID = (UUID) ois.readObject();
+				}
+			}
+
+			if(!(Objects.equals(leaderAddress, lastLeaderAddress) &&
+					Objects.equals(leaderSessionID, lastLeaderSessionID))) {
+				lastLeaderAddress = leaderAddress;
+				lastLeaderSessionID = leaderSessionID;
+				leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
+			}
+		} catch (Exception e) {
+			leaderListener.handleError(new Exception("Could not handle node changed event.", e));
+			throw e;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
new file mode 100644
index 0000000..082fdd4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import java.util.UUID;
+
+/**
+ * Message decorator which wraps message which implement {@link RequiresLeaderSessionID} into
+ * a {@link org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}.
+ */
+public class LeaderSessionMessageDecorator implements MessageDecorator {
+
+	/** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */
+	private final UUID leaderSessionID;
+
+	/**
+	 * Sets the leader session ID with which the messages will be decorated.
+	 *
+	 * @param leaderSessionID Leader session ID to be used for decoration
+	 */
+	public LeaderSessionMessageDecorator(UUID leaderSessionID) {
+		this.leaderSessionID = leaderSessionID;
+	}
+
+	@Override
+	public Object decorate(Object message) {
+		if (message instanceof RequiresLeaderSessionID) {
+			return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
+		} else {
+			return message;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
new file mode 100644
index 0000000..25ee47b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+/**
+ * Interface for message decorators
+ */
+public interface MessageDecorator {
+
+	/**
+	 * Decorates a message
+	 *
+	 * @param message Message to decorate
+	 * @return Decorated message
+	 */
+	Object decorate(Object message);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 94073db..46c07fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -27,9 +27,15 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Utilities to determine the network interface and address that should be used to bind the
@@ -396,4 +402,155 @@ public class NetUtils {
 
 		throw new RuntimeException("Could not find a free permitted port on the machine.");
 	}
+
+	public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
+
+		private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
+
+		private enum LeaderRetrievalState {
+			NOT_RETRIEVED,
+			RETRIEVED,
+			NEWLY_RETRIEVED
+		}
+
+		final private Object retrievalLock = new Object();
+
+		private String akkaURL;
+		private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
+		private Exception exception;
+
+		public InetAddress findConnectingAddress(
+				FiniteDuration timeout) throws LeaderRetrievalException {
+			return findConnectingAddress(timeout, defaultLoggingDelay);
+		}
+
+		public InetAddress findConnectingAddress(
+				FiniteDuration timeout,
+				FiniteDuration startLoggingAfter)
+			throws LeaderRetrievalException {
+			long startTime = System.currentTimeMillis();
+			long currentSleepTime = MIN_SLEEP_TIME;
+			long elapsedTime = 0;
+			InetSocketAddress targetAddress = null;
+
+			try {
+				while (elapsedTime < timeout.toMillis()) {
+
+					long maxTimeout = timeout.toMillis() - elapsedTime;
+
+					synchronized (retrievalLock) {
+						if (exception != null) {
+							throw exception;
+						}
+
+						if (retrievalState == LeaderRetrievalState.NOT_RETRIEVED) {
+							try {
+								retrievalLock.wait(maxTimeout);
+							} catch (InterruptedException e) {
+								throw new Exception("Finding connecting address was interrupted" +
+										"while waiting for the leader retrieval.");
+							}
+						} else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
+							targetAddress = AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
+
+							LOG.info("Retrieved new target address {}.", targetAddress);
+
+							retrievalState = LeaderRetrievalState.RETRIEVED;
+
+							currentSleepTime = MIN_SLEEP_TIME;
+						} else {
+							currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+						}
+					}
+
+					if (targetAddress != null) {
+						AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+
+						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
+						if (logging) {
+							LOG.info("Trying to connect to address {}." + targetAddress);
+						}
+
+						do {
+							InetAddress address = NetUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
+							if (address != null) {
+								return address;
+							}
+
+							// pick the next strategy
+							switch (strategy) {
+								case ADDRESS:
+									strategy = AddressDetectionState.FAST_CONNECT;
+									break;
+								case FAST_CONNECT:
+									strategy = AddressDetectionState.SLOW_CONNECT;
+									break;
+								case SLOW_CONNECT:
+									strategy = null;
+									break;
+								default:
+									throw new RuntimeException("Unsupported strategy: " + strategy);
+							}
+						}
+						while (strategy != null);
+					}
+
+					elapsedTime = System.currentTimeMillis() - startTime;
+
+					long timeToWait = Math.min(
+							Math.max(timeout.toMillis() - elapsedTime, 0),
+							currentSleepTime);
+
+					if (timeToWait > 0) {
+						synchronized (retrievalLock) {
+							try {
+								retrievalLock.wait(timeToWait);
+							} catch (InterruptedException e) {
+								throw new Exception("Finding connecting address was interrupted while pausing.");
+							}
+						}
+
+						elapsedTime = System.currentTimeMillis() - startTime;
+					}
+				}
+
+				InetAddress heuristic = null;
+
+				if (targetAddress != null) {
+					LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+					heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+				}
+
+				if (heuristic != null) {
+					return heuristic;
+				} else {
+					LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+					return InetAddress.getLocalHost();
+				}
+			} catch (Exception e) {
+				throw new LeaderRetrievalException("Could not retrieve the connecting address to the " +
+						"current leader with the akka URL " + akkaURL + ".", e);
+			}
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			if (leaderAddress != null && !leaderAddress.equals("")) {
+				synchronized (retrievalLock) {
+					akkaURL = leaderAddress;
+					retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;
+
+					retrievalLock.notifyAll();
+				}
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			synchronized (retrievalLock) {
+				this.exception = exception;
+				retrievalLock.notifyAll();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c7abce0..a19a57a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -51,8 +51,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
-import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
+import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
@@ -571,7 +571,7 @@ public class Task implements Runnable {
 			// notify everyone that we switched to running. especially the TaskManager needs
 			// to know this!
 			notifyObservers(ExecutionState.RUNNING, null);
-			taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
+			taskManager.tell(new UpdateTaskExecutionState(
 					new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));
 
 			// make sure the user code classloader is accessible thread-locally
@@ -856,8 +856,7 @@ public class Task implements Runnable {
 		}
 
 		TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
-		TaskMessages.UpdateTaskExecutionState actorMessage = new
-				TaskMessages.UpdateTaskExecutionState(stateUpdate);
+		UpdateTaskExecutionState actorMessage = new UpdateTaskExecutionState(stateUpdate);
 
 		for (ActorGateway listener : executionListenerActors) {
 			listener.tell(actorMessage);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
new file mode 100644
index 0000000..aee023a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.UUID;
+
+/**
+ * Wrapper class for a pair of connection address and leader session ID.
+ */
+public class LeaderConnectionInfo {
+
+	private final String address;
+
+	private final UUID leaderSessionID;
+
+	public LeaderConnectionInfo(String address, UUID leaderSessionID) {
+		this.address = address;
+		this.leaderSessionID = leaderSessionID;
+	}
+
+	public String getAddress() {
+		return address;
+	}
+
+	public UUID getLeaderSessionID() {
+		return leaderSessionID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
new file mode 100644
index 0000000..76657d3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+
+public class LeaderElectionUtils {
+
+	/**
+	 * Creates a {@link LeaderElectionService} based on the provided {@link Configuration} object.
+	 *
+	 * @param configuration Configuration object
+	 * @return {@link LeaderElectionService} which was created based on the provided Configuration
+	 * @throws Exception
+	 */
+	public static LeaderElectionService createLeaderElectionService(Configuration configuration) throws Exception {
+		RecoveryMode recoveryMode = RecoveryMode.valueOf(configuration.getString(
+				ConfigConstants.RECOVERY_MODE,
+				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()
+		);
+
+		LeaderElectionService leaderElectionService;
+
+		switch(recoveryMode) {
+			case STANDALONE:
+				leaderElectionService = new StandaloneLeaderElectionService();
+				break;
+			case ZOOKEEPER:
+				leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
+				break;
+			default:
+				throw new Exception("Unknown RecoveryMode " + recoveryMode);
+		}
+
+		return leaderElectionService;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
new file mode 100644
index 0000000..201db03
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.net.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+public class LeaderRetrievalUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LeaderRetrievalUtils.class);
+
+	/**
+	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
+	 *
+	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
+	 * @return The {@link LeaderRetrievalService} specified in the configuration object
+	 * @throws Exception
+	 */
+	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
+		throws Exception {
+
+		RecoveryMode recoveryMode = RecoveryMode.valueOf(
+				configuration.getString(
+						ConfigConstants.RECOVERY_MODE,
+						ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
+
+		switch (recoveryMode) {
+			case STANDALONE:
+				return StandaloneUtils.createLeaderRetrievalService(configuration);
+			case ZOOKEEPER:
+				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			default:
+				throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
+		}
+	}
+
+	/**
+	 * Retrieves the current leader gateway using the given {@link LeaderRetrievalService}. If the
+	 * current leader could not be retrieved after the given timeout, then a
+	 * {@link LeaderRetrievalException} is thrown.
+	 *
+	 * @param leaderRetrievalService {@link LeaderRetrievalService} which is used for the leader retrieval
+	 * @param actorSystem ActorSystem which is used for the {@link LeaderRetrievalListener} implementation
+	 * @param timeout Timeout value for the retrieval call
+	 * @return The current leader gateway
+	 * @throws LeaderRetrievalException If the actor gateway could not be retrieved or the timeout has been exceeded
+	 */
+	public static ActorGateway retrieveLeaderGateway(
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem,
+			FiniteDuration timeout)
+		throws LeaderRetrievalException {
+		LeaderGatewayListener listener = new LeaderGatewayListener(actorSystem, timeout);
+
+		try {
+			leaderRetrievalService.start(listener);
+
+			Future<ActorGateway> actorGatewayFuture = listener.getActorGatewayFuture();
+
+			ActorGateway gateway = Await.result(actorGatewayFuture, timeout);
+
+			return gateway;
+		} catch (Exception e) {
+			throw new LeaderRetrievalException("Could not retrieve the leader gateway", e);
+		} finally {
+			try {
+				leaderRetrievalService.stop();
+			} catch (Exception fe) {
+				LOG.warn("Could not stop the leader retrieval service.", fe);
+			}
+		}
+	}
+
+	/**
+	 * Retrieves the leader akka url and the current leader session ID. The values are stored in a
+	 * {@link LeaderConnectionInfo} instance.
+	 *
+	 * @param leaderRetrievalService Leader retrieval service to retrieve the leader connection
+	 *                               information
+	 * @param timeout Timeout when to give up looking for the leader
+	 * @return LeaderConnectionInfo containing the leader's akka URL and the current leader session
+	 * ID
+	 * @throws LeaderRetrievalException
+	 */
+	public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout
+	) throws LeaderRetrievalException {
+		LeaderConnectionInfoListener listener = new LeaderConnectionInfoListener();
+
+		try {
+			leaderRetrievalService.start(listener);
+
+			Future<LeaderConnectionInfo> connectionInfoFuture = listener.getLeaderConnectionInfoFuture();
+
+			LeaderConnectionInfo result = Await.result(connectionInfoFuture, timeout);
+
+			return result;
+		} catch (Exception e) {
+			throw new LeaderRetrievalException("Could not retrieve the leader address and leader " +
+					"session ID.", e);
+		} finally {
+			try {
+				leaderRetrievalService.stop();
+			} catch (Exception fe) {
+				LOG.warn("Could not stop the leader retrieval service.", fe);
+			}
+		}
+	}
+
+	public static InetAddress findConnectingAddress(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout) throws LeaderRetrievalException {
+		NetUtils.LeaderConnectingAddressListener listener = new NetUtils.LeaderConnectingAddressListener();
+
+		try {
+			leaderRetrievalService.start(listener);
+
+			LOG.info("Trying to select the network interface and address to use " +
+					"by connecting to the leading JobManager.");
+
+			LOG.info("TaskManager will try to connect for " + timeout +
+					" before falling back to heuristics");
+
+			InetAddress result =  listener.findConnectingAddress(timeout);
+
+			return result;
+		} catch (Exception e) {
+			throw new LeaderRetrievalException("Could not find the connecting address by " +
+					"connecting to the current leader.", e);
+		} finally {
+			try {
+				leaderRetrievalService.stop();
+			} catch (Exception fe) {
+				LOG.warn("Could not stop the leader retrieval service.", fe);
+			}
+		}
+	}
+
+	/**
+	 * Helper class which is used by the retrieveLeaderGateway method as the
+	 * {@link LeaderRetrievalListener}.
+	 */
+	public static class LeaderGatewayListener implements LeaderRetrievalListener {
+
+		private final ActorSystem actorSystem;
+		private final FiniteDuration timeout;
+
+		private final Promise<ActorGateway> futureActorGateway = new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>();
+
+		public LeaderGatewayListener(ActorSystem actorSystem, FiniteDuration timeout) {
+			this.actorSystem = actorSystem;
+			this.timeout = timeout;
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) {
+				try {
+					ActorRef actorRef = AkkaUtils.getActorRef(leaderAddress, actorSystem, timeout);
+
+					ActorGateway gateway = new AkkaActorGateway(actorRef, leaderSessionID);
+
+					futureActorGateway.success(gateway);
+
+				} catch(Exception e){
+					futureActorGateway.failure(e);
+				}
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			if (!futureActorGateway.isCompleted()) {
+				futureActorGateway.failure(exception);
+			}
+		}
+
+		public Future<ActorGateway> getActorGatewayFuture() {
+			return futureActorGateway.future();
+		}
+	}
+
+	/**
+	 * Helper class which is used by the retrieveLeaderConnectionInfo method to retrieve the
+	 * leader's akka URL and the current leader session ID.
+	 */
+	public static class LeaderConnectionInfoListener implements  LeaderRetrievalListener {
+		private final Promise<LeaderConnectionInfo> connectionInfo = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+		public Future<LeaderConnectionInfo> getLeaderConnectionInfoFuture() {
+			return connectionInfo.future();
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			if(leaderAddress != null && !leaderAddress.equals("") && !connectionInfo.isCompleted()) {
+				connectionInfo.success(new LeaderConnectionInfo(leaderAddress, leaderSessionID));
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			if (!connectionInfo.isCompleted()) {
+				connectionInfo.failure(exception);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
new file mode 100644
index 0000000..ebb7965
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import scala.Option;
+import scala.Tuple2;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+public class StandaloneUtils {
+
+	/**
+	 * Creates a {@link StandaloneLeaderRetrievalService} from the given configuration. The
+	 * host and port for the remote Akka URL are retrieved from the provided configuration.
+	 *
+	 * @param configuration Configuration instance containing the host and port information
+	 * @return StandaloneLeaderRetrievalService
+	 * @throws UnknownHostException
+	 */
+	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration)
+		throws UnknownHostException {
+		return createLeaderRetrievalService(configuration, null);
+	}
+
+	/**
+	 * Creates a {@link StandaloneLeaderRetrievalService} form the given configuration and the
+	 * JobManager name. The host and port for the remote Akka URL are retrieved from the provided
+	 * configuration. Instead of using the standard JobManager Akka name, the provided one is used
+	 * for the remote Akka URL.
+	 *
+	 * @param configuration Configuration instance containing hte host and port information
+	 * @param jobManagerName Name of the JobManager actor
+	 * @return StandaloneLeaderRetrievalService
+	 * @throws UnknownHostException
+	 */
+	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration,
+			String jobManagerName)
+		throws UnknownHostException {
+		Tuple2<String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration);
+
+		String jobManagerHostname = stringIntPair._1();
+		int jobManagerPort = (Integer) stringIntPair._2();
+		InetSocketAddress hostPort;
+
+		try {
+			InetAddress inetAddress = InetAddress.getByName(jobManagerHostname);
+			hostPort = new InetSocketAddress(inetAddress, jobManagerPort);
+		}
+		catch (UnknownHostException e) {
+			throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
+					+ "' specified in the configuration");
+		}
+
+		String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(
+				hostPort,
+				Option.apply(jobManagerName));
+
+		return new StandaloneLeaderRetrievalService(jobManagerAkkaUrl);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
deleted file mode 100644
index c3d9df4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ZooKeeper utilities.
- */
-public class ZooKeeperUtil {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class);
-
-	public static CuratorFramework createCuratorFramework(Configuration configuration) throws Exception {
-		String zkQuorum = ZooKeeperUtil.getZooKeeperEnsemble(configuration);
-
-		if (zkQuorum == null || zkQuorum.equals("")) {
-			throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
-		}
-
-		int sessionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-
-		int connectionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
-
-		int retryWait = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
-
-		int maxRetryAttempts = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
-
-		String root = configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_ZNODE_ROOT);
-
-		LOG.info("Using '{}' as root namespace.", root);
-
-		CuratorFramework cf = CuratorFrameworkFactory.builder()
-				.connectString(zkQuorum)
-				.sessionTimeoutMs(sessionTimeout)
-				.connectionTimeoutMs(connectionTimeout)
-				.retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
-				// Curator prepends a '/' manually and throws an Exception if the
-				// namespace starts with a '/'.
-				.namespace(root.startsWith("/") ? root.substring(1) : root)
-				.build();
-
-		try {
-			cf.start();
-		}
-		catch (Exception e) {
-			throw new Exception("Could not start CuratorFramework.", e);
-		}
-
-		return cf;
-	}
-
-	/**
-	 * Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
-	 */
-	public static boolean isJobManagerHighAvailabilityEnabled(Configuration flinkConf) {
-		return flinkConf.containsKey(ConfigConstants.ZOOKEEPER_QUORUM_KEY);
-	}
-
-	/**
-	 * Returns the configured ZooKeeper quorum (and removes whitespace, because ZooKeeper does not
-	 * tolerate it).
-	 */
-	public static String getZooKeeperEnsemble(Configuration flinkConf)
-			throws IllegalConfigurationException {
-
-		String zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, null);
-
-		if (zkQuorum == null || zkQuorum.equals("")) {
-			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
-		}
-
-		// Remove all whitespace
-		zkQuorum = zkQuorum.replaceAll("\\s+", "");
-
-		return zkQuorum;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
new file mode 100644
index 0000000..14afb7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
+
+	/**
+	 * Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper
+	 * quorum.
+	 *
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @return {@link CuratorFramework} instance
+	 */
+	public static CuratorFramework startCuratorFramework(Configuration configuration) {
+		String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+
+		if(zkQuorum == null || zkQuorum.equals("")) {
+			throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
+		}
+
+		int sessionTimeout = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+
+		int connectionTimeout = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+
+		int retryWait = configuration.getInteger (
+				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+
+		int maxRetryAttempts = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+
+		String root = configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
+
+		LOG.info("Using '{}' as root namespace.", root);
+
+		CuratorFramework cf = CuratorFrameworkFactory.builder()
+				.connectString(zkQuorum)
+				.sessionTimeoutMs(sessionTimeout)
+				.connectionTimeoutMs(connectionTimeout)
+				.retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
+				// Curator prepends a '/' manually and throws an Exception if the
+				// namespace starts with a '/'.
+				.namespace(root.startsWith("/") ? root.substring(1) : root)
+				.build();
+
+		cf.start();
+
+		return cf;
+	}
+
+	/**
+	 * Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
+	 */
+	public static boolean isZooKeeperHighAvailabilityEnabled(Configuration flinkConf) {
+		String recoveryMode = flinkConf.getString(
+				ConfigConstants.RECOVERY_MODE,
+				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
+
+		return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name());
+	}
+
+	/**
+	 * Returns the configured ZooKeeper quorum (and removes whitespace, because ZooKeeper does not
+	 * tolerate it).
+	 */
+	public static String getZooKeeperEnsemble(Configuration flinkConf)
+			throws IllegalConfigurationException {
+
+		String zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+
+		if (zkQuorum == null || zkQuorum.equals("")) {
+			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
+		}
+
+		// Remove all whitespace
+		zkQuorum = zkQuorum.replaceAll("\\s+", "");
+
+		return zkQuorum;
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+	 *
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration) throws Exception{
+		CuratorFramework client = startCuratorFramework(configuration);
+		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+
+		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderElectionService} instance.
+	 *
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @return {@link ZooKeeperLeaderElectionService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderElectionService createLeaderElectionService(
+			Configuration configuration) throws Exception {
+		CuratorFramework client = startCuratorFramework(configuration);
+
+		String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
+		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+
+		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
deleted file mode 100644
index c6793ed..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime
-
-import java.util.UUID
-
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID
-
-/** [[MessageDecorator]] which wraps [[RequiresLeaderSessionID]] messages in a
-  * [[LeaderSessionMessage]] with the given leader session ID.
-  *
-  * @param leaderSessionID Leader session ID which is associated with the
-  *                        [[RequiresLeaderSessionID]] message
-  */
-class LeaderSessionMessageDecorator(val leaderSessionID: Option[UUID]) extends MessageDecorator {
-
-  /** Wraps [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]].
-    *
-    * @param message Message to decorate
-    * @return Decorated message
-    */
-  override def decorate(message: Any): Any = {
-    message match {
-      case msg: RequiresLeaderSessionID =>
-        LeaderSessionMessage(leaderSessionID, msg)
-      case msg => msg
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
new file mode 100644
index 0000000..72db258
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime
+
+import java.util.UUID
+
+import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+
+/** Mixin to filter out [[LeaderSessionMessage]] which contain an invalid leader session id.
+  * Messages which contain a valid leader session ID are unwrapped and forwarded to the actor.
+  *
+  */
+trait LeaderSessionMessageFilter extends FlinkActor {
+  protected def leaderSessionID: Option[UUID]
+
+  abstract override def receive: Receive = {
+    case leaderMessage@LeaderSessionMessage(msgID, msg) =>
+      if (leaderSessionID.equals(Option(msgID))) {
+        super.receive(msg)
+      } else {
+        handleDiscardedMessage(leaderSessionID, leaderMessage)
+      }
+    case msg: RequiresLeaderSessionID =>
+      throw new Exception(s"Received a message $msg without a leader session ID, even though" +
+        s" the message requires a leader session ID.")
+    case msg =>
+      super.receive(msg)
+  }
+
+  private def handleDiscardedMessage(
+      expectedLeaderSessionID: Option[UUID],
+      msg: LeaderSessionMessage)
+    : Unit = {
+    log.warn(s"Discard message $msg because the expected leader session ID " +
+      s"$expectedLeaderSessionID did not equal the received leader session ID" +
+      s"${msg.leaderSessionID}.")
+  }
+
+  /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]
+    *
+    * @param message The message to decorate
+    * @return The decorated message
+    */
+  override def decorateMessage(message: Any): Any = {
+    message match {
+      case msg: RequiresLeaderSessionID =>
+        LeaderSessionMessage(leaderSessionID.orNull, super.decorateMessage(msg))
+
+      case msg => super.decorateMessage(msg)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
deleted file mode 100644
index d54926d..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime
-
-import java.util.UUID
-
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID
-
-/** Mixin to filter out [[LeaderSessionMessage]] which contain an invalid leader session id.
-  * Messages which contain a valid leader session ID are unwrapped and forwarded to the actor.
-  *
-  */
-trait LeaderSessionMessages extends FlinkActor {
-  protected def leaderSessionID: Option[UUID]
-
-  abstract override def receive: Receive = {
-    case LeaderSessionMessage(id, msg) =>
-      // Filter out messages which have not the correct leader session ID
-      (leaderSessionID, id) match {
-        case (Some(currentID), Some(msgID)) =>
-          if(currentID.equals(msgID)) {
-            // correct leader session ID
-            super.receive(msg)
-          } else {
-            // discard message because of incorrect leader session ID
-            handleDiscardedMessage(msg)
-          }
-
-        case _ => handleDiscardedMessage(msg)
-      }
-    case msg: RequiresLeaderSessionID =>
-      throw new Exception(s"Received a message $msg without a leader session ID, even though" +
-        " it requires to have one.")
-    case msg =>
-      // pass the message to the parent's receive method for further processing
-      super.receive(msg)
-  }
-
-  private def handleDiscardedMessage(msg: Any): Unit = {
-    log.debug(s"Discard message $msg because the leader session ID was not correct.")
-  }
-
-  /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]
-    *
-    * @param message The message to decorate
-    * @return The decorated message
-    */
-  override def decorateMessage(message: Any): Any = {
-    message match {
-      case msg: RequiresLeaderSessionID =>
-        LeaderSessionMessage(leaderSessionID, super.decorateMessage(msg))
-
-      case msg => super.decorateMessage(msg)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
deleted file mode 100644
index 5b1700f..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime
-
-/** Base trait for message decorators
-  *
-  */
-trait MessageDecorator {
-
-  /** Decorates a message
-    *
-    * @param message Message to decorate
-    * @return Decorated message
-    */
-  def decorate(message: Any): Any
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b8cce41..8007ef6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.InetAddress
+import java.io.IOException
+import java.net.{InetSocketAddress, InetAddress}
 import java.util.concurrent.{TimeUnit, Callable}
 
 import akka.actor._
@@ -27,7 +28,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
 import org.slf4j.LoggerFactory
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -324,18 +325,68 @@ object AkkaUtils {
     }
   }
 
-  def getChild(parent: ActorRef, child: String,
-               system: ActorSystem,
-               timeout: FiniteDuration): Future[ActorRef] = {
+  /** Returns a [[Future]] to the [[ActorRef]] of the child of a given actor. The child is specified
+    * by providing its actor name.
+    *
+    * @param parent [[ActorRef]] to the parent of the child to be retrieved
+    * @param child Name of the child actor
+    * @param system [[ActorSystem]] to be used
+    * @param timeout Maximum timeout for the future
+    * @return [[Future]] to the [[ActorRef]] of the child actor
+    */
+  def getChild(
+      parent: ActorRef,
+      child: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : Future[ActorRef] = {
     system.actorSelection(parent.path / child).resolveOne()(timeout)
   }
 
-  def getReference(path: String, system:
-                   ActorSystem,
-                   timeout: FiniteDuration): Future[ActorRef] = {
+  /** Returns a [[Future]] to the [[ActorRef]] of an actor. The actor is specified by its path.
+    *
+    * @param path Path to the actor to be retrieved
+    * @param system [[ActorSystem]] to be used
+    * @param timeout Maximum timeout for the future
+    * @return [[Future]] to the [[ActorRef]] of the actor
+    */
+  def getActorRefFuture(
+      path: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : Future[ActorRef] = {
     system.actorSelection(path).resolveOne()(timeout)
   }
 
+  /** Returns an [[ActorRef]] for the actor specified by the path parameter.
+    *
+    * @param path Path to the actor to be retrieved
+    * @param system [[ActorSystem]] to be used
+    * @param timeout Maximum timeout for the future
+    * @throws java.io.IOException
+    * @return [[ActorRef]] of the requested [[Actor]]
+    */
+  @throws(classOf[IOException])
+  def getActorRef(
+      path: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : ActorRef = {
+    try {
+      val future = AkkaUtils.getActorRefFuture(path, system, timeout)
+      Await.result(future, timeout)
+    }
+    catch {
+      case e @ (_ : ActorNotFound | _ : TimeoutException) =>
+        throw new IOException(
+          s"Actor at $path not reachable. " +
+            "Please make sure that the actor is running and its port is reachable.", e)
+
+      case e: IOException =>
+        throw new IOException(s"Could not connect to the actor at $path", e)
+    }
+  }
+
 
   /**
    * Utility function to construct a future which tries multiple times to execute itself if it
@@ -421,4 +472,64 @@ object AkkaUtils {
     val duration = Duration(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT)
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
+
+  /** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains
+    * the port and the host under which the actor system is reachable
+    *
+    * @param system [[ActorSystem]] for which the [[Address]] shall be retrieved
+    * @return [[Address]] of the given [[ActorSystem]]
+    */
+  def getAddress(system: ActorSystem): Address = {
+    RemoteAddressExtension(system).address
+  }
+
+  /** Returns the given [[ActorRef]]'s path string representation with host and port of the
+    * [[ActorSystem]] in which the actor is running.
+    *
+    * @param system [[ActorSystem]] in which the given [[ActorRef]] is running
+    * @param actor [[ActorRef]] of the [[Actor]] for which the URL has to be generated
+    * @return String containing the [[ActorSystem]] independent URL of the [[Actor]]
+    */
+  def getAkkaURL(system: ActorSystem, actor: ActorRef): String = {
+    val address = getAddress(system)
+    actor.path.toStringWithAddress(address)
+  }
+
+  /** Returns the AkkaURL for a given [[ActorSystem]] and a path describing a running [[Actor]] in
+    * the actor system.
+    *
+    * @param system [[ActorSystem]] in which the given [[Actor]] is running
+    * @param path Path describing an [[Actor]] for which the URL has to be generated
+    * @return String containing the [[ActorSystem]] independent URL of an [[Actor]] specified by
+    *         path.
+    */
+  def getAkkaURL(system: ActorSystem, path: String): String = {
+    val address = getAddress(system)
+    address.toString + path
+  }
+
+  /** Extracts the hostname and the port of the remote actor system from the given Akka URL. The
+    * result is an [[InetSocketAddress]] instance containing the extracted hostname and port. If
+    * the Akka URL does not contain the hostname and port information, e.g. a local Akka URL is
+    * provided, then an [[Exception]] is thrown.
+    *
+    * @param akkaURL
+    * @throws java.lang.Exception
+    * @return
+    */
+  @throws(classOf[Exception])
+  def getInetSockeAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
+    // AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
+    val hostPortRegex = """@([^/:]*):(\d*)""".r
+
+    hostPortRegex.findFirstMatchIn(akkaURL) match {
+      case Some(m) =>
+        val host = m.group(1)
+        val port = m.group(2).toInt
+
+        new InetSocketAddress(host, port)
+      case None => throw new Exception("Could not retrieve InetSocketAddress from " +
+        s"Akka URL $akkaURL")
+    }
+  }
 }


Mime
View raw message