flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure
Date Wed, 07 Oct 2015 16:47:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14947187#comment-14947187
] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1153#discussion_r41414806
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.zookeeper;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.state.StateHandleProvider;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.data.Stat;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.ConcurrentModificationException;
    +import java.util.List;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * State handles backed by ZooKeeper.
    + *
    + * <p>Added state is persisted via {@link StateHandle}s, which in turn are written
to
    + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
    + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple
MBs.
    + *
    + * <p>State modifications require some care, because it is possible that certain
failures bring
    + * the state handle backend and ZooKeeper out of sync.
    + *
    + * <p>ZooKeeper holds the ground truth about state handles, i.e. the following
holds:
    + *
    + * <pre>
    + * State handle in ZooKeeper => State handle exists
    + * </pre>
    + *
    + * But not:
    + *
    + * <pre>
    + * State handle exists => State handle in ZooKeeper
    + * </pre>
    + *
    + * There can be lingering state handles when failures happen during operation. They
    + * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
    + * FLINK-2513</a> about a possible way to overcome this).
    + *
    + * @param <T> Type of state
    + */
    +public class ZooKeeperStateHandleStore<T extends Serializable> {
    +
    +	/** Curator ZooKeeper client */
    +	private final CuratorFramework client;
    +
    +	/** State handle provider */
    +	private final StateHandleProvider<T> stateHandleProvider;
    +
    +	/**
    +	 * Creates a {@link ZooKeeperStateHandleStore}.
    +	 *
    +	 * @param client              The Curator ZooKeeper client. <strong>Important:</strong>
It is
    +	 *                            expected that the client's namespace ensures that the
root
    +	 *                            path is exclusive for all state handles managed by this
    +	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
    +	 * @param stateHandleProvider The state handle provider for the state
    +	 */
    +	public ZooKeeperStateHandleStore(
    +			CuratorFramework client,
    +			StateHandleProvider<T> stateHandleProvider) {
    +
    +		this.client = checkNotNull(client, "Curator client");
    +		this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider");
    +	}
    +
    +	/**
    +	 * Creates a state handle and stores it in ZooKeeper with create mode {@link
    +	 * CreateMode#PERSISTENT}.
    +	 *
    +	 * @see #add(String, Serializable, CreateMode)
    +	 */
    +	public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state) throws Exception
{
    +		return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
    +	}
    +
    +	/**
    +	 * Creates a state handle and stores it in ZooKeeper.
    +	 *
    +	 * <p><strong>Important</strong>: This will <em>not</em>
store the actual state in
    +	 * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
    +	 * makes sure that data in ZooKeeper is small.
    +	 *
    +	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
and
    +	 *                        start with a '/')
    +	 * @param state           State to be added
    +	 * @param createMode      The create mode for the new path in ZooKeeper
    +	 * @return Created {@link ZooKeeperStateHandle}
    +	 * @throws Exception
    +	 */
    +	public ZooKeeperStateHandle<T> add(String pathInZooKeeper, T state, CreateMode
createMode) throws Exception {
    +		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +		checkNotNull(state, "State");
    +
    +		// Create the state handle. Nothing persisted yet.
    +		StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
    +
    +		boolean success = false;
    +
    +		try {
    +			// Serialize the state handle. This writes the state to the backend.
    +			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
    +
    +			// Write state handle (not the actual state) to ZooKeeper. This is expected to be
    +			// smaller than the state itself. This level of indirection makes sure that data in
    +			// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
    +			// the state can be larger.
    +			client
    +					.create()
    +					.withMode(createMode)
    +					.forPath(pathInZooKeeper, serializedStateHandle);
    +
    +			success = true;
    +
    +			return new ZooKeeperStateHandle<>(stateHandle, pathInZooKeeper);
    +		}
    +		finally {
    +			if (!success) {
    +				// Cleanup the state handle if it was not written to ZooKeeper.
    +				if (stateHandle != null) {
    +					stateHandle.discardState();
    +				}
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Replaces a state handle in ZooKeeper and discards the old state handle.
    +	 *
    +	 * <p><strong>Important</strong>: This method will only discard the
state handle and not the
    +	 * state itself. Don't forget to run custom cleanup code of the state, if necessary.
    +	 *
    +	 * <pre>
    +	 * T state = get(path).getState();
    +	 * state.discard(); // Custom clean up
    +	 * replace(path, version, newState)
    +	 * </pre>
    +	 *
    +	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to exist and start
with a '/')
    +	 * @param expectedVersion Expected version of the node to replace
    +	 * @param state           The new state to replace the old one
    +	 */
    +	public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception
{
    +		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +		checkNotNull(state, "State");
    +
    +		StateHandle<T> oldStateHandle = get(pathInZooKeeper);
    +
    +		StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
    +
    +		boolean success = false;
    +
    +		try {
    +			// Serialize the new state handle. This writes the state to the backend.
    +			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
    +
    +			// Replace state handle in ZooKeeper.
    +			client.setData()
    +					.withVersion(expectedVersion)
    +					.forPath(pathInZooKeeper, serializedStateHandle);
    +
    +			success = true;
    +		}
    +		finally {
    +			if (success) {
    +				oldStateHandle.discardState();
    +			}
    +			else {
    +				stateHandle.discardState();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Returns the version of the node if it exists or <code>-1</code> if it
doesn't.
    +	 *
    +	 * @param pathInZooKeeper Path in ZooKeeper to check
    +	 * @return Version of the ZNode if the path exists, <code>-1</code> otherwise.
    +	 * @throws Exception
    +	 */
    +	public int exists(String pathInZooKeeper) throws Exception {
    +		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
    +
    +		Stat stat = client.checkExists().forPath(pathInZooKeeper);
    +
    +		if (stat != null) {
    +			return stat.getVersion();
    +		}
    +
    +		return -1;
    +	}
    +
    +	/**
    +	 * Gets a state handle from ZooKeeper.
    +	 *
    +	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
    +	 *                        exist and start with a '/').
    +	 * @return The state handle
    +	 * @throws Exception
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public StateHandle<T> get(String pathInZooKeeper) throws Exception {
    --- End diff --
    
    Resolved


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 0.10
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state handle via
ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the leading job
manager looses all running jobs when it fails. After a new leading job manager is elected,
it is not possible to recover any previously running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the job graph
to a state backend, and 2) a reference to the respective state handle to ZooKeeper. In general,
job graphs can become large (multiple MBs, because they include closures etc.). ZooKeeper
is not designed for data of this size. The level of indirection via the reference to the state
backend keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs between job managers.
The currentJobs node needs to satisfy the following invariant: There is a reference to a job
graph with id i IFF the respective job graph needs to be recovered by a newly elected job
manager leader.
> With this in place, jobs will be recovered from their initial state (as if resubmitted).
The next step is to backup the runtime state handles of checkpoints in a similar manner.
> ---
> This work will be based on [~trohrmann@apache.org]'s implementation of FLINK-2291. The
leader election service notifies the job manager about granted/revoked leadership. This notification
happens via Akka and thus serially *per* job manager, but results in eventually consistent
state between job managers. For some snapshots of time it is possible to have a new leader
granted leadership, before the old one has been revoked its leadership.
> [~trohrmann@apache.org], can you confirm that leadership does not guarantee mutually
exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 notification revoking
leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html]
to coordinate the access to currentJobs. The lock needs to be acquired on leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and ZooKeeper with
expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message