Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1437C18F6F for ; Tue, 20 Oct 2015 07:58:53 +0000 (UTC) Received: (qmail 77057 invoked by uid 500); 20 Oct 2015 07:58:53 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 76961 invoked by uid 500); 20 Oct 2015 07:58:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 76661 invoked by uid 99); 20 Oct 2015 07:58:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Oct 2015 07:58:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40A31E03D0; Tue, 20 Oct 2015 07:58:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 20 Oct 2015 07:58:56 -0000 Message-Id: <8ea3a179fd754ec5b0d4a35b6fbcb8d2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java new file mode 100644 index 0000000..660f8bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + *

Each job graph creates ZNode: + *

+ * +----O /flink/jobgraphs/<job-id> 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/<job-id> N [persistent]
+ * 
+ * + *

The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** + * Cache to monitor all children. This is used to detect races with other instances working + * on the same state. + */ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); + + this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); + + this.pathCache = new PathChildrenCache(client, "/", false); + pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener()); + } + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + synchronized (cacheLock) { + if (!isRunning) { + this.jobGraphListener = jobGraphListener; + + pathCache.start(); + + isRunning = true; + } + } + } + + @Override + public void stop() throws Exception { + synchronized (cacheLock) { + if (isRunning) { + jobGraphListener = null; + + pathCache.close(); + + client.close(); + + isRunning = false; + } + } + } + + @Override + public List recoverJobGraphs() throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + List, String>> submitted; + + while (true) { + try { + submitted = jobGraphsInZooKeeper.getAll(); + break; + } + catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + } + } + + if (submitted.size() != 0) { + List jobGraphs = new ArrayList<>(submitted.size()); + + for (Tuple2, String> jobStateHandle : submitted) { + SubmittedJobGraph jobGraph = jobStateHandle + .f0.getState(ClassLoader.getSystemClassLoader()); + + addedJobGraphs.add(jobGraph.getJobId()); + + jobGraphs.add(jobGraph); + } + + LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs); + return jobGraphs; + } + else { + LOG.info("No job graph to recover."); + return Collections.emptyList(); + } + } + } + + @Override + public Option recoverJobGraph(JobID jobId) throws Exception { + checkNotNull(jobId, "Job ID"); + String path = getPathForJob(jobId); + + synchronized (cacheLock) { + verifyIsRunning(); + + try { + StateHandle jobStateHandle = jobGraphsInZooKeeper.get(path); + + SubmittedJobGraph jobGraph = jobStateHandle + .getState(ClassLoader.getSystemClassLoader()); + + addedJobGraphs.add(jobGraph.getJobId()); + + LOG.info("Recovered {}.", jobGraph); + + return Option.apply(jobGraph); + } + catch (KeeperException.NoNodeException ignored) { + return Option.empty(); + } + } + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + checkNotNull(jobGraph, "Job graph"); + String path = getPathForJob(jobGraph.getJobId()); + + boolean success = false; + + while (!success) { + synchronized (cacheLock) { + verifyIsRunning(); + + int currentVersion = jobGraphsInZooKeeper.exists(path); + + if (currentVersion == -1) { + try { + jobGraphsInZooKeeper.add(path, jobGraph); + + addedJobGraphs.add(jobGraph.getJobId()); + + LOG.info("Added {} to ZooKeeper.", jobGraph); + + success = true; + } + catch (KeeperException.NodeExistsException ignored) { + } + } + else if (addedJobGraphs.contains(jobGraph.getJobId())) { + try { + jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph); + LOG.info("Updated {} in ZooKeeper.", jobGraph); + + success = true; + } + catch (KeeperException.NoNodeException ignored) { + } + } + else { + throw new IllegalStateException("Oh, no. Trying to update a graph you didn't " + + "#getAllSubmittedJobGraphs() or #putJobGraph() yourself before."); + } + } + } + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + checkNotNull(jobId, "Job ID"); + String path = getPathForJob(jobId); + + synchronized (cacheLock) { + if (addedJobGraphs.contains(jobId)) { + jobGraphsInZooKeeper.removeAndDiscardState(path); + + addedJobGraphs.remove(jobId); + LOG.info("Removed job graph {} from ZooKeeper.", jobId); + } + } + } + + /** + * Monitors ZooKeeper for changes. + * + *

Detects modifications from other job managers in corner situations. The event + * notifications fire for changes from this job manager as well. + */ + private final class SubmittedJobGraphsPathCacheListener implements PathChildrenCacheListener { + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) + throws Exception { + + if (LOG.isDebugEnabled()) { + if (event.getData() != null) { + LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath()); + } + else { + LOG.debug("Received {} event", event.getType()); + } + } + + switch (event.getType()) { + case CHILD_ADDED: + synchronized (cacheLock) { + try { + JobID jobId = fromEvent(event); + if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) { + try { + // Whoa! This has been added by someone else. Or we were fast + // to remove it (false positive). + jobGraphListener.onAddedJobGraph(jobId); + } + catch (Throwable t) { + LOG.error("Error in callback", t); + } + } + } + catch (Exception e) { + LOG.error("Error in SubmittedJobGraphsPathCacheListener", e); + } + } + + break; + + case CHILD_UPDATED: + // Nothing to do + break; + + case CHILD_REMOVED: + synchronized (cacheLock) { + try { + JobID jobId = fromEvent(event); + if (jobGraphListener != null && addedJobGraphs.contains(jobId)) { + try { + // Oh oh. Someone else removed one of our job graphs. Mean! + jobGraphListener.onRemovedJobGraph(jobId); + } + catch (Throwable t) { + LOG.error("Error in callback", t); + } + } + + break; + } + catch (Exception e) { + LOG.error("Error in SubmittedJobGraphsPathCacheListener", e); + } + } + break; + + case CONNECTION_SUSPENDED: + LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " + + "graphs are not monitored (temporarily)."); + + case CONNECTION_LOST: + LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " + + "graphs are not monitored (permanently)."); + break; + + case CONNECTION_RECONNECTED: + LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " + + "graphs are monitored again."); + + case INITIALIZED: + LOG.info("SubmittedJobGraphsPathCacheListener initialized"); + break; + } + } + + /** + * Returns a JobID for the event's path. + */ + private JobID fromEvent(PathChildrenCacheEvent event) { + return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath())); + } + } + + /** + * Verifies that the state is running. + */ + private void verifyIsRunning() { + checkState(isRunning, "Not running. Forgot to call start()?"); + } + + /** + * Returns the JobID as a String (with leading slash). + */ + public static String getPathForJob(JobID jobId) { + checkNotNull(jobId, "Job ID"); + return String.format("/%s", jobId); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java index b6223ee..6cba141 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java @@ -67,4 +67,5 @@ public interface LeaderElectionService { * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ boolean hasLeadership(); + } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index ae3f0e6..811037c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -43,6 +43,7 @@ import java.util.UUID; * ZooKeeper as well. */ public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class); /** Client to the ZooKeeper quorum */ http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java new file mode 100644 index 0000000..7aa1ccf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; + +public enum StateBackend { + JOBMANAGER, FILESYSTEM; + + /** + * Returns the configured {@link StateBackend}. + * + * @param config The config to parse + * @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not + * configured. + */ + public static StateBackend fromConfig(Configuration config) { + return StateBackend.valueOf(config.getString( + ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java new file mode 100644 index 0000000..0086ac6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import java.io.Serializable; + +/** + * State handler provider factory. + * + *

This is going to be superseded soon. + */ +public class StateHandleProviderFactory { + + /** + * Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at + * the configured recovery path. + */ + public static StateHandleProvider createRecoveryFileStateHandleProvider( + Configuration config) { + + StateBackend stateBackend = StateBackend.fromConfig(config); + + if (stateBackend == StateBackend.FILESYSTEM) { + String recoveryPath = config.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); + + if (recoveryPath.equals("")) { + throw new IllegalConfigurationException("Missing recovery path. Specify via " + + "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'."); + } + else { + return FileStateHandle.createProvider(recoveryPath); + } + } + else { + throw new IllegalConfigurationException("Unexpected state backend configuration " + + stateBackend); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java deleted file mode 100644 index 5f867a5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobmanager.RecoveryMode; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; - -/** - * Utility class to help working with {@link LeaderElectionService} class. - */ -public final class LeaderElectionUtils { - - /** - * Creates a {@link LeaderElectionService} based on the provided {@link Configuration} object. - * - * @param configuration Configuration object - * @return {@link LeaderElectionService} which was created based on the provided Configuration - * @throws Exception - */ - public static LeaderElectionService createLeaderElectionService(Configuration configuration) throws Exception { - RecoveryMode recoveryMode = RecoveryMode.valueOf(configuration.getString( - ConfigConstants.RECOVERY_MODE, - ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase() - ); - - LeaderElectionService leaderElectionService; - - switch(recoveryMode) { - case STANDALONE: - leaderElectionService = new StandaloneLeaderElectionService(); - break; - case ZOOKEEPER: - leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration); - break; - default: - throw new Exception("Unknown RecoveryMode " + recoveryMode); - } - - return leaderElectionService; - } - - /** - * Private constructor to prevent instantiation. - */ - private LeaderElectionUtils() { - throw new RuntimeException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index d2d3945..79b9b7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -21,19 +21,27 @@ package org.apache.flink.runtime.util; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore; import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.state.StateHandleProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Utility class to work with Apache Zookeeper for Flink runtime. - */ -public final class ZooKeeperUtils { +import static com.google.common.base.Preconditions.checkNotNull; + +public class ZooKeeperUtils { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class); @@ -47,8 +55,10 @@ public final class ZooKeeperUtils { public static CuratorFramework startCuratorFramework(Configuration configuration) { String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, ""); - if(zkQuorum == null || zkQuorum.equals("")) { - throw new RuntimeException("No valid ZooKeeper quorum has been specified."); + if (zkQuorum == null || zkQuorum.equals("")) { + throw new RuntimeException("No valid ZooKeeper quorum has been specified. " + + "You can specify the quorum via the configuration key '" + + ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'."); } int sessionTimeout = configuration.getInteger( @@ -59,7 +69,7 @@ public final class ZooKeeperUtils { ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT); - int retryWait = configuration.getInteger ( + int retryWait = configuration.getInteger( ConfigConstants.ZOOKEEPER_RETRY_WAIT, ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT); @@ -88,14 +98,10 @@ public final class ZooKeeperUtils { } /** - * Returns whether high availability is enabled (<=> ZooKeeper quorum configured). + * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured. */ - public static boolean isZooKeeperHighAvailabilityEnabled(Configuration flinkConf) { - String recoveryMode = flinkConf.getString( - ConfigConstants.RECOVERY_MODE, - ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase(); - - return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name()); + public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) { + return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER); } /** @@ -125,7 +131,7 @@ public final class ZooKeeperUtils { * @throws Exception */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( - Configuration configuration) throws Exception{ + Configuration configuration) throws Exception { CuratorFramework client = startCuratorFramework(configuration); String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); @@ -134,7 +140,8 @@ public final class ZooKeeperUtils { } /** - * Creates a {@link ZooKeeperLeaderElectionService} instance. + * Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link + * CuratorFramework} client. * * @param configuration {@link Configuration} object containing the configuration values * @return {@link ZooKeeperLeaderElectionService} instance. @@ -142,8 +149,24 @@ public final class ZooKeeperUtils { */ public static ZooKeeperLeaderElectionService createLeaderElectionService( Configuration configuration) throws Exception { + CuratorFramework client = startCuratorFramework(configuration); + return createLeaderElectionService(client, configuration); + } + + /** + * Creates a {@link ZooKeeperLeaderElectionService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @return {@link ZooKeeperLeaderElectionService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderElectionService createLeaderElectionService( + CuratorFramework client, + Configuration configuration) throws Exception { + String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH); String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH, @@ -153,6 +176,89 @@ public final class ZooKeeperUtils { } /** + * Creates a {@link ZooKeeperSubmittedJobGraphStore} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object + * @return {@link ZooKeeperSubmittedJobGraphStore} instance + */ + public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs( + CuratorFramework client, + Configuration configuration) throws Exception { + + checkNotNull(configuration, "Configuration"); + + StateHandleProvider stateHandleProvider = + StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration); + + // ZooKeeper submitted jobs root dir + String zooKeeperSubmittedJobsPath = configuration.getString( + ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + + return new ZooKeeperSubmittedJobGraphStore( + client, zooKeeperSubmittedJobsPath, stateHandleProvider); + } + + /** + * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object + * @param jobId ID of job to create the instance for + * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain + * @param userClassLoader User code class loader + * @return {@link ZooKeeperCompletedCheckpointStore} instance + */ + public static CompletedCheckpointStore createCompletedCheckpoints( + CuratorFramework client, + Configuration configuration, + JobID jobId, + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) throws Exception { + + checkNotNull(configuration, "Configuration"); + + StateHandleProvider stateHandleProvider = + StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration); + + String completedCheckpointsPath = configuration.getString( + ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); + + completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); + + return new ZooKeeperCompletedCheckpointStore( + maxNumberOfCheckpointsToRetain, + userClassLoader, + client, + completedCheckpointsPath, + stateHandleProvider); + } + + /** + * Creates a {@link ZooKeeperCheckpointIDCounter} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object + * @param jobId ID of job to create the instance for + * @return {@link ZooKeeperCheckpointIDCounter} instance + */ + public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( + CuratorFramework client, + Configuration configuration, + JobID jobId) throws Exception { + + String checkpointIdCounterPath = configuration.getString( + ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); + + checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); + + return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath); + } + + /** * Private constructor to prevent instantiation. */ private ZooKeeperUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java new file mode 100644 index 0000000..936fe1b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + *

Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + *

State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + *

ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + *

+ * State handle in ZooKeeper => State handle exists
+ * 
+ * + * But not: + * + *
+ * State handle exists => State handle in ZooKeeper
+ * 
+ * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** + * Creates a {@link ZooKeeperStateHandleStore}. + * + * @param client The Curator ZooKeeper client. Important: It is + * expected that the client's namespace ensures that the root + * path is exclusive for all state handles managed by this + * instance, e.g. client.usingNamespace("/stateHandles") + * @param stateHandleProvider The state handle provider for the state + */ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** + * Creates a state handle and stores it in ZooKeeper with create mode {@link + * CreateMode#PERSISTENT}. + * + * @see #add(String, Serializable, CreateMode) + */ + public StateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** + * Creates a state handle and stores it in ZooKeeper. + * + *

Important: This will not store the actual state in + * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection + * makes sure that data in ZooKeeper is small. + * + * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and + * start with a '/') + * @param state State to be added + * @param createMode The create mode for the new path in ZooKeeper + * @return Created {@link StateHandle} + * @throws Exception If a ZooKeeper or state handle operation fails + */ + public StateHandle add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + checkNotNull(state, "State"); + + // Create the state handle. Nothing persisted yet. + StateHandle stateHandle = stateHandleProvider.createStateHandle(state); + + boolean success = false; + + try { + // Serialize the state handle. This writes the state to the backend. + byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle); + + // Write state handle (not the actual state) to ZooKeeper. This is expected to be + // smaller than the state itself. This level of indirection makes sure that data in + // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but + // the state can be larger. + client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle); + + success = true; + + return stateHandle; + } + finally { + if (!success) { + // Cleanup the state handle if it was not written to ZooKeeper. + if (stateHandle != null) { + stateHandle.discardState(); + } + } + } + } + + /** + * Replaces a state handle in ZooKeeper and discards the old state handle. + * + * @param pathInZooKeeper Destination path in ZooKeeper (expected to exist and start with a '/') + * @param expectedVersion Expected version of the node to replace + * @param state The new state to replace the old one + * @throws Exception If a ZooKeeper or state handle operation fails + */ + public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + checkNotNull(state, "State"); + + StateHandle oldStateHandle = get(pathInZooKeeper); + + StateHandle stateHandle = stateHandleProvider.createStateHandle(state); + + boolean success = false; + + try { + // Serialize the new state handle. This writes the state to the backend. + byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle); + + // Replace state handle in ZooKeeper. + client.setData() + .withVersion(expectedVersion) + .forPath(pathInZooKeeper, serializedStateHandle); + + success = true; + } + finally { + if (success) { + oldStateHandle.discardState(); + } + else { + stateHandle.discardState(); + } + } + } + + /** + * Returns the version of the node if it exists or -1 if it doesn't. + * + * @param pathInZooKeeper Path in ZooKeeper to check + * @return Version of the ZNode if the path exists, -1 otherwise. + * @throws Exception If the ZooKeeper operation fails + */ + public int exists(String pathInZooKeeper) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + + Stat stat = client.checkExists().forPath(pathInZooKeeper); + + if (stat != null) { + return stat.getVersion(); + } + + return -1; + } + + /** + * Gets a state handle from ZooKeeper. + * + * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to + * exist and start with a '/'). + * @return The state handle + * @throws Exception If a ZooKeeper or state handle operation fails + */ + @SuppressWarnings("unchecked") + public StateHandle get(String pathInZooKeeper) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + + byte[] data = client.getData().forPath(pathInZooKeeper); + + return (StateHandle) InstantiationUtil + .deserializeObject(data, ClassLoader.getSystemClassLoader()); + } + + /** + * Gets all available state handles from ZooKeeper. + * + *

If there is a concurrent modification, the operation is retried until it succeeds. + * + * @return All state handles from ZooKeeper. + * @throws Exception If a ZooKeeper or state handle operation fails + */ + @SuppressWarnings("unchecked") + public List, String>> getAll() throws Exception { + final List, String>> stateHandles = new ArrayList<>(); + + boolean success = false; + + retry: + while (!success) { + // Initial cVersion (number of changes to the children of this node) + int initialCVersion = client.checkExists().forPath("/").getCversion(); + + List children = client.getChildren().forPath("/"); + + for (String path : children) { + path = "/" + path; + + try { + final StateHandle stateHandle = get(path); + stateHandles.add(new Tuple2<>(stateHandle, path)); + } + catch (KeeperException.NoNodeException ignored) { + // Concurrent deletion, retry + continue retry; + } + } + + int finalCVersion = client.checkExists().forPath("/").getCversion(); + + // Check for concurrent modifications + success = initialCVersion == finalCVersion; + } + + return stateHandles; + } + + /** + * Gets all available state handles from ZooKeeper sorted by name (ascending). + * + *

If there is a concurrent modification, the operation is retried until it succeeds. + * + * @return All state handles in ZooKeeper. + * @throws Exception If a ZooKeeper or state handle operation fails + */ + @SuppressWarnings("unchecked") + public List, String>> getAllSortedByName() throws Exception { + final List, String>> stateHandles = new ArrayList<>(); + + boolean success = false; + + retry: + while (!success) { + // Initial cVersion (number of changes to the children of this node) + int initialCVersion = client.checkExists().forPath("/").getCversion(); + + List children = ZKPaths.getSortedChildren( + client.getZookeeperClient().getZooKeeper(), + ZKPaths.fixForNamespace(client.getNamespace(), "/")); + + for (String path : children) { + path = "/" + path; + + try { + final StateHandle stateHandle = get(path); + stateHandles.add(new Tuple2<>(stateHandle, path)); + } + catch (KeeperException.NoNodeException ignored) { + // Concurrent deletion, retry + continue retry; + } + } + + int finalCVersion = client.checkExists().forPath("/").getCversion(); + + // Check for concurrent modifications + success = initialCVersion == finalCVersion; + } + + return stateHandles; + } + + /** + * Removes a state handle from ZooKeeper. + * + *

Important: this does not discard the state handle. If you want to + * discard the state handle call {@link #removeAndDiscardState(String)}. + * + * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/') + * @throws Exception If the ZooKeeper operation fails + */ + public void remove(String pathInZooKeeper) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + + client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper); + } + + /** + * Removes a state handle from ZooKeeper asynchronously. + * + *

Important: this does not discard the state handle. If you want to + * discard the state handle call {@link #removeAndDiscardState(String)}. + * + * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/') + * @param callback The callback after the operation finishes + * @throws Exception If the ZooKeeper operation fails + */ + public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + checkNotNull(callback, "Background callback"); + + client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper); + } + + /** + * Discards a state handle and removes it from ZooKeeper. + * + *

If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}. + * + * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/') + * @throws Exception If the ZooKeeper or state handle operation fails + */ + public void removeAndDiscardState(String pathInZooKeeper) throws Exception { + checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + + StateHandle stateHandle = get(pathInZooKeeper); + + // Delete the state handle from ZooKeeper first + client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper); + + // Discard the state handle only after it has been successfully deleted from ZooKeeper. + // Otherwise we might enter an illegal state after failures (with a state handle in + // ZooKeeper, which has already been discarded). + stateHandle.discardState(); + } + + /** + * Discards all available state handles and removes them from ZooKeeper. + * + * @throws Exception If a ZooKeeper or state handle operation fails + */ + public void removeAndDiscardAllState() throws Exception { + final List, String>> allStateHandles = getAll(); + + ZKPaths.deleteChildren( + client.getZookeeperClient().getZooKeeper(), + ZKPaths.fixForNamespace(client.getNamespace(), "/"), + false); + + // Discard the state handles only after they have been successfully deleted from ZooKeeper. + for (Tuple2, String> stateHandleAndPath : allStateHandles) { + stateHandleAndPath.f0.discardState(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 75ad20f..67d7a06 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorRef - +import org.apache.flink.runtime.akka.ListeningBehaviour /** * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor @@ -27,11 +27,20 @@ import akka.actor.ActorRef * Additionally, it stores whether the job was started in the detached mode. Detached means that * the submitting actor does not wait for the job result once the job has terminated. * + * Important: This class is serializable, but needs to be deserialized in the context of an actor + * system in order to resolve the client [[ActorRef]]. It is possible to serialize the Akka URL + * manually, but it is cumbersome and complicates testing in certain scenarios, where you need to + * make sure to resolve the correct [[ActorRef]]s when submitting jobs (RepointableActorRef vs. + * RemoteActorRef). + * * @param client Actor which submitted the job * @param start Starting time */ -class JobInfo(val client: ActorRef, val start: Long, - val sessionTimeout: Long) { +class JobInfo( + val client: ActorRef, + val listeningBehaviour: ListeningBehaviour, + val start: Long, + val sessionTimeout: Long) extends Serializable { var sessionAlive = sessionTimeout > 0 @@ -49,12 +58,16 @@ class JobInfo(val client: ActorRef, val start: Long, } } + override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)" + def setLastActive() = lastActive = System.currentTimeMillis() } object JobInfo{ - def apply(client: ActorRef, start: Long, - sessionTimeout: Long) = - new JobInfo(client, start, sessionTimeout) + def apply( + client: ActorRef, + listeningBehaviour: ListeningBehaviour, + start: Long, + sessionTimeout: Long) = new JobInfo(client, listeningBehaviour, start, sessionTimeout) } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 95637bb..f3e4054 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,31 +19,39 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.lang.reflect.{InvocationTargetException, Constructor} +import java.lang.reflect.{Constructor, InvocationTargetException} import java.net.InetSocketAddress import java.util.UUID import akka.actor.Status.Failure -import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem} +import akka.actor._ import akka.pattern.ask - import grizzled.slf4j.Logger - import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.accumulators.AccumulatorSnapshot +import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer +import org.apache.flink.runtime.checkpoint.{CheckpointRecoveryFactory, StandaloneCheckpointRecoveryFactory, ZooKeeperCheckpointRecoveryFactory} import org.apache.flink.runtime.client._ +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} +import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator -import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} +import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService} import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages._ +import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} -import org.apache.flink.runtime.messages.accumulators._ +import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint} import org.apache.flink.runtime.messages.webmonitor._ import org.apache.flink.runtime.process.ProcessReaper @@ -51,25 +59,16 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ -import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor} -import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter} -import org.apache.flink.runtime.LogMessages -import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils} -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} -import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus} -import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} -import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat} -import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil} +import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode} +import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps -import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global /** @@ -110,17 +109,22 @@ class JobManager( protected val delayBetweenRetries: Long, protected val timeout: FiniteDuration, protected val mode: StreamingMode, - protected val leaderElectionService: LeaderElectionService) + protected val leaderElectionService: LeaderElectionService, + protected val submittedJobGraphs : SubmittedJobGraphStore, + protected val checkpointRecoveryFactory : CheckpointRecoveryFactory) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging - with LeaderContender { + with LeaderContender + with SubmittedJobGraphListener { override val log = Logger(getClass) /** Either running or not yet archived jobs (session hasn't been ended). */ protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration) + var leaderSessionID: Option[UUID] = None /** @@ -138,6 +142,22 @@ class JobManager( "start.", e) throw new RuntimeException("Could not start the leader election service.", e) } + + try { + submittedJobGraphs.start(this) + } catch { + case e: Exception => + log.error("Could not start the submitted job graphs service.", e) + throw new RuntimeException("Could not start the submitted job graphs service.", e) + } + + try { + checkpointRecoveryFactory.start() + } catch { + case e: Exception => + log.error("Could not start the checkpoint recovery service.", e) + throw new RuntimeException("Could not start the checkpoint recovery service.", e) + } } override def postStop(): Unit = { @@ -159,6 +179,18 @@ class JobManager( case e: Exception => log.error("Could not properly shutdown the leader election service.") } + try { + submittedJobGraphs.stop() + } catch { + case e: Exception => log.error("Could not properly stop the submitted job graphs service.") + } + + try { + checkpointRecoveryFactory.stop() + } catch { + case e: Exception => log.error("Could not properly stop the checkpoint recovery service.") + } + if (archive != ActorRef.noSender) { archive ! decorateMessage(PoisonPill) } @@ -191,12 +223,21 @@ class JobManager( // confirming the leader session ID might be blocking, thus do it in a future future{ leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull) + + // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task + // managers etc.) + if (recoveryMode != RecoveryMode.STANDALONE) { + context.system.scheduler.scheduleOnce(new FiniteDuration(delayBetweenRetries, + MILLISECONDS), self, decorateMessage(RecoverAllJobs))(context.dispatcher) + } }(context.dispatcher) case RevokeLeadership => log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") - cancelAndClearEverything(new Exception("JobManager is no longer the leader.")) + future { + cancelAndClearEverything(new Exception("JobManager is no longer the leader.")) + }(context.dispatcher) // disconnect the registered task managers instanceManager.getAllRegisteredInstances.asScala.foreach { @@ -269,7 +310,62 @@ class JobManager( sender ! decorateMessage(instanceManager.getTotalNumberOfSlots) case SubmitJob(jobGraph, listeningBehaviour) => - submitJob(jobGraph, listeningBehaviour) + val client = sender() + + val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), + jobGraph.getSessionTimeout) + + future { + submitJob(jobGraph, jobInfo) + }(context.dispatcher) + + case RecoverJob(jobId) => + future { + // The ActorRef, which is part of the submitted job graph can only be deserialized in the + // scope of an actor system. + akka.serialization.JavaSerializer.currentSystem.withValue( + context.system.asInstanceOf[ExtendedActorSystem]) { + + log.info(s"Attempting to recover job $jobId.") + + val jobGraph = submittedJobGraphs.recoverJobGraph(jobId) + + if (jobGraph.isDefined) { + if (!leaderElectionService.hasLeadership()) { + // we've lost leadership. mission: abort. + log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") + } + else { + recoverJobGraph(jobGraph.get) + } + } + else { + log.warn(s"Failed to recover job graph ${jobId}.") + } + } + }(context.dispatcher) + + case RecoverAllJobs => + future { + // The ActorRef, which is part of the submitted job graph can only be deserialized in the + // scope of an actor system. + akka.serialization.JavaSerializer.currentSystem.withValue( + context.system.asInstanceOf[ExtendedActorSystem]) { + + val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala + + if (!leaderElectionService.hasLeadership()) { + // we've lost leadership. mission: abort. + log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + + s"jobs.") + } + else { + log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.") + + jobGraphs.foreach(recoverJobGraph(_)) + } + } + }(context.dispatcher) case CancelJob(jobID) => log.info(s"Trying to cancel job with ID $jobID.") @@ -377,10 +473,27 @@ class JobManager( if (newJobStatus.isTerminalState()) { jobInfo.end = timeStamp - // is the client waiting for the job result? - if (jobInfo.client != ActorRef.noSender) { - newJobStatus match { - case JobStatus.FINISHED => + future { + // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will + // linger around and potentially be recovered at a later time. There is nothing we + // can do about that, but it should be communicated with the Client. + if (jobInfo.sessionAlive) { + jobInfo.setLastActive() + val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { + removeJob(jobID) + } + } + } else { + removeJob(jobID) + } + + // is the client waiting for the job result? + if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { + newJobStatus match { + case JobStatus.FINISHED => try { val accumulatorResults = executionGraph.getAccumulatorsSerialized() val result = new SerializedJobExecutionResult( @@ -398,47 +511,37 @@ class JobManager( jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable(exception))) } - case JobStatus.CANCELED => - // the error may be packed as a serialized throwable - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) - - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable( - new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))) - - case JobStatus.FAILED => - val unpackedError = SerializedThrowable.get( - error, executionGraph.getUserClassLoader()) - - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable( - new JobExecutionException(jobID, "Job execution failed.", unpackedError)))) - - case x => - val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") - jobInfo.client ! decorateMessage(JobResultFailure( - new SerializedThrowable(exception))) - throw exception - } - } - if (jobInfo.sessionAlive) { - jobInfo.setLastActive() - val lastActivity = jobInfo.lastActive - context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { - // remove only if no activity occurred in the meantime - if (lastActivity == jobInfo.lastActive) { - removeJob(jobID) + case JobStatus.CANCELED => + // the error may be packed as a serialized throwable + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) + + jobInfo.client ! decorateMessage(JobResultFailure( + new SerializedThrowable( + new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))) + + case JobStatus.FAILED => + val unpackedError = SerializedThrowable.get( + error, executionGraph.getUserClassLoader()) + + jobInfo.client ! decorateMessage(JobResultFailure( + new SerializedThrowable( + new JobExecutionException(jobID, "Job execution failed.", unpackedError)))) + + case x => + val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") + jobInfo.client ! decorateMessage(JobResultFailure( + new SerializedThrowable(exception))) + throw exception } } - } else { - removeJob(jobID) - } - + }(context.dispatcher) } case None => - removeJob(jobID) + future { + removeJob(jobID) + }(context.dispatcher) } case ScheduleOrUpdateConsumers(jobId, partitionId) => @@ -600,11 +703,12 @@ class JobManager( * graph and the execution vertices are queued for scheduling. * * @param jobGraph representing the Flink job - * @param listeningBehaviour specifies the listening behaviour of the sender. + * @param jobInfo the job info + * @param isRecovery Flag indicating whether this is a recovery or initial submission */ - private def submitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour): Unit = { + private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { if (jobGraph == null) { - sender() ! decorateMessage(JobResultFailure( + jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable( new JobSubmissionException(null, "JobGraph must not be null.") ) @@ -615,7 +719,7 @@ class JobManager( val jobName = jobGraph.getName var executionGraph: ExecutionGraph = null - log.info(s"Received job ${jobId} (${jobName}).") + log.info(s"Submitting job $jobId ($jobName)" + (if (isRecovery) " (Recovery)" else "") + ".") try { // Important: We need to make sure that the library registration is the first action, @@ -628,7 +732,7 @@ class JobManager( catch { case t: Throwable => throw new JobSubmissionException(jobId, - "Cannot set up the user code libraries: " + t.getMessage, t) + "Cannot set up the user code libraries: " + t.getMessage, t) } val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) @@ -641,18 +745,10 @@ class JobManager( throw new JobSubmissionException(jobId, "The given job is empty") } - val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) { - // The client does not want to receive the SerializedJobExecutionResult - ActorRef.noSender - } else { - // Send the job execution result back to the sender - sender - } - // see if there already exists an ExecutionGraph for the corresponding job ID executionGraph = currentJobs.get(jobGraph.getJobID) match { - case Some((graph, jobInfo)) => - jobInfo.setLastActive() + case Some((graph, currentJobInfo)) => + currentJobInfo.setLastActive() graph case None => val graph = new ExecutionGraph( @@ -664,11 +760,7 @@ class JobManager( jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths, userCodeLoader) - val jobInfo = JobInfo( - client, - System.currentTimeMillis(), - jobGraph.getSessionTimeout) - currentJobs.put(jobGraph.getJobID, (graph, jobInfo)) + graph } @@ -682,7 +774,7 @@ class JobManager( executionGraph.setDelayBeforeRetrying(delayBetweenRetries) executionGraph.setScheduleMode(jobGraph.getScheduleMode()) executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()) - + try { executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)) } @@ -691,7 +783,7 @@ class JobManager( log.warn("Cannot create JSON plan for job", t) executionGraph.setJsonPlan("{}") } - + // initialize the vertices that have a master initialization hook // file output formats create directories here, input formats create splits if (log.isDebugEnabled) { @@ -701,62 +793,67 @@ class JobManager( val numSlots = scheduler.getTotalNumberOfSlots() for (vertex <- jobGraph.getVertices.asScala) { - val executableClass = vertex.getInvokableClassName if (executableClass == null || executableClass.length == 0) { throw new JobSubmissionException(jobId, s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.") } - if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { - vertex.setParallelism(numSlots) - } + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(numSlots) + } - try { - vertex.initializeOnMaster(userCodeLoader) - } - catch { + try { + vertex.initializeOnMaster(userCodeLoader) + } + catch { case t: Throwable => throw new JobExecutionException(jobId, "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t) - } - } + } + } - // topologically sort the job vertices and attach the graph to the existing one - val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources() - if (log.isDebugEnabled) { - log.debug(s"Adding ${sortedTopology.size()} vertices from " + - s"job graph ${jobId} (${jobName}).") - } - executionGraph.attachJobGraph(sortedTopology) + // topologically sort the job vertices and attach the graph to the existing one + val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources() + if (log.isDebugEnabled) { + log.debug(s"Adding ${sortedTopology.size()} vertices from " + + s"job graph ${jobId} (${jobName}).") + } + executionGraph.attachJobGraph(sortedTopology) - if (log.isDebugEnabled) { - log.debug("Successfully created execution graph from job " + - s"graph ${jobId} (${jobName}).") - } + if (log.isDebugEnabled) { + log.debug("Successfully created execution graph from job " + + s"graph ${jobId} (${jobName}).") + } - // configure the state checkpointing - val snapshotSettings = jobGraph.getSnapshotSettings - if (snapshotSettings != null) { + // configure the state checkpointing + val snapshotSettings = jobGraph.getSnapshotSettings + if (snapshotSettings != null) { + val jobId = jobGraph.getJobID() - val idToVertex: JobVertexID => ExecutionJobVertex = id => { - val vertex = executionGraph.getJobVertex(id) - if (vertex == null) { - throw new JobSubmissionException(jobId, - "The snapshot checkpointing settings refer to non-existent vertex " + id) - } - vertex - } + val idToVertex: JobVertexID => ExecutionJobVertex = id => { + val vertex = executionGraph.getJobVertex(id) + if (vertex == null) { + throw new JobSubmissionException(jobId, + "The snapshot checkpointing settings refer to non-existent vertex " + id) + } + vertex + } - val triggerVertices: java.util.List[ExecutionJobVertex] = + val triggerVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava - val ackVertices: java.util.List[ExecutionJobVertex] = + val ackVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava - val confirmVertices: java.util.List[ExecutionJobVertex] = + val confirmVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava + val completedCheckpoints = checkpointRecoveryFactory + .createCompletedCheckpoints(jobId, userCodeLoader) + + val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) + executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, @@ -764,23 +861,39 @@ class JobManager( ackVertices, confirmVertices, context.system, - leaderSessionID.orNull) + leaderSessionID.orNull, + checkpointIdCounter, + completedCheckpoints, + recoveryMode) } // get notified about job status changes executionGraph.registerJobStatusListener( new AkkaActorGateway(self, leaderSessionID.orNull)) - if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { + if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) { // the sender wants to be notified about state changes - val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull) + val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull) executionGraph.registerExecutionListener(gateway) executionGraph.registerJobStatusListener(gateway) } + if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() + } + else { + submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) + } + + // Add the job graph only after everything is finished. Otherwise there can be races in + // tests, which check the currentJobs (for example before killing a JM). + if (!currentJobs.contains(jobId)) { + currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo)) + } + // done with submitting the job - sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) + jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) } catch { case t: Throwable => @@ -799,33 +912,61 @@ class JobManager( new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t) } - sender() ! decorateMessage(JobResultFailure(new SerializedThrowable(rt))) + jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt))) return } - // NOTE: Scheduling the job for execution is a separate action from the job submission. - // The success of submitting the job must be independent from the success of scheduling - // the job. - try { - log.info(s"Scheduling job ${executionGraph.getJobName}.") - executionGraph.scheduleForExecution(scheduler) - } - catch { - case t: Throwable => try { - executionGraph.fail(t) + if (leaderElectionService.hasLeadership) { + // There is a small chance that multiple job managers schedule the same job after if they + // try to recover at the same time. This will eventually be noticed, but can not be ruled + // out from the beginning. + + // NOTE: Scheduling the job for execution is a separate action from the job submission. + // The success of submitting the job must be independent from the success of scheduling + // the job. + try { + log.info(s"Scheduling job $jobId ($jobName).") + + executionGraph.scheduleForExecution(scheduler) } catch { - case tt: Throwable => { - log.error("Error while marking ExecutionGraph as failed.", tt) + case t: Throwable => try { + executionGraph.fail(t) + } + catch { + case tt: Throwable => { + log.error("Error while marking ExecutionGraph as failed.", tt) + } } } } + else { + // Remove the job graph. Otherwise it will be lingering around and possibly removed from + // ZooKeeper by this JM. + currentJobs.remove(jobId) + + log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " + + "this. I am not scheduling the job for execution.") + } + } + } + + /** + * Submits the job if it is not already one of our current jobs. + * + * @param jobGraph Job to recover + */ + private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = { + if (!currentJobs.contains(jobGraph.getJobId)) { + future { + submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true) + }(context.dispatcher) } } /** * Dedicated handler for checkpoint messages. - * + * * @param actorMessage The checkpoint actor message. */ private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = { @@ -836,13 +977,15 @@ class JobManager( case Some((graph, _)) => val coordinator = graph.getCheckpointCoordinator() if (coordinator != null) { - try { - coordinator.receiveAcknowledgeMessage(ackMessage) - } - catch { - case t: Throwable => - log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t) - } + future { + try { + coordinator.receiveAcknowledgeMessage(ackMessage) + } + catch { + case t: Throwable => + log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t) + } + }(context.dispatcher) } else { log.error( @@ -1020,30 +1163,46 @@ class JobManager( } /** - * Removes the job and sends it to the MemoryArchivist + * Removes the job and sends it to the MemoryArchivist. + * + * This should be called asynchronously. Removing the job from the [[SubmittedJobGraphStore]] + * might block. Therefore be careful not to block the actor thread. + * * @param jobID ID of the job to remove and archive */ private def removeJob(jobID: JobID): Unit = { currentJobs.synchronized { - currentJobs.remove(jobID) match { + // Don't remove the job yet... + currentJobs.get(jobID) match { case Some((eg, _)) => try { + // ...otherwise, we can have lingering resources when there is a concurrent shutdown + // and the ZooKeeper client is closed. Not removing the job immediately allow the + // shutdown to release all resources. + submittedJobGraphs.removeJobGraph(jobID) + } catch { + case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) + } + + try { eg.prepareForArchiving() + archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg)) } catch { case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + "archiving.", t) } + currentJobs.remove(jobID) case None => } + } - try { - libraryCacheManager.unregisterJob(jobID) - } catch { - case t: Throwable => - log.error(s"Could not properly unregister job $jobID form the library cache.", t) - } + try { + libraryCacheManager.unregisterJob(jobID) + } catch { + case t: Throwable => + log.error(s"Could not properly unregister job $jobID form the library cache.", t) } } @@ -1053,17 +1212,21 @@ class JobManager( * @param cause Cause for the cancelling. */ private def cancelAndClearEverything(cause: Throwable) { - for((jobID, (eg, jobInfo)) <- currentJobs) { + for ((jobID, (eg, jobInfo)) <- currentJobs) { + try { + submittedJobGraphs.removeJobGraph(jobID) + } + catch { + case t: Throwable => { + log.error("Error during submitted job graph clean up.", t) + } + } + eg.fail(cause) - if(jobInfo.client != ActorRef.noSender) { + if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { jobInfo.client ! decorateMessage( - Failure( - new JobExecutionException( - jobID, - "All jobs are cancelled and cleared.", - cause) - )) + Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) } } @@ -1079,6 +1242,25 @@ class JobManager( self ! decorateMessage(RevokeLeadership) } + override def onAddedJobGraph(jobId: JobID): Unit = { + if (leaderSessionID.isDefined && !currentJobs.contains(jobId)) { + self ! decorateMessage(RecoverJob(jobId)) + } + } + + override def onRemovedJobGraph(jobId: JobID): Unit = { + if (leaderSessionID.isDefined) { + currentJobs.get(jobId).foreach( + job => + future { + // Fail the execution graph + job._1.fail(new IllegalStateException("Another JobManager removed the job from " + + "ZooKeeper.")) + }(context.dispatcher) + ) + } + } + override def getAddress: String = { AkkaUtils.getAkkaURL(context.system, self) } @@ -1166,7 +1348,7 @@ object JobManager { System.exit(STARTUP_FAILURE_RETURN_CODE) } - if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) { + if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { // address and will not be reachable from anyone remote if (listeningPort != 0) { val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + @@ -1227,7 +1409,7 @@ object JobManager { * * @param configuration The configuration object for the JobManager. * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an - * additional TaskManager in the same process. + * an additional TaskManager in the same process. * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only) * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages. @@ -1480,7 +1662,7 @@ object JobManager { // high availability mode val port: Int = - if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) { + if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { LOG.info("Starting JobManager in High-Availability Mode") configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) @@ -1524,7 +1706,9 @@ object JobManager { Long, // delay between retries FiniteDuration, // timeout Int, // number of archived jobs - LeaderElectionService) = { + LeaderElectionService, + SubmittedJobGraphStore, + CheckpointRecoveryFactory) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -1588,10 +1772,31 @@ object JobManager { } } - val leaderElectionService = leaderElectionServiceOption match { - case Some(les) => les - case None => LeaderElectionUtils.createLeaderElectionService(configuration) - } + // Create recovery related components + val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) = + RecoveryMode.fromConfig(configuration) match { + case RecoveryMode.STANDALONE => + val leaderElectionService = leaderElectionServiceOption match { + case Some(les) => les + case None => new StandaloneLeaderElectionService() + } + + (leaderElectionService, + new StandaloneSubmittedJobGraphStore(), + new StandaloneCheckpointRecoveryFactory()) + + case RecoveryMode.ZOOKEEPER => + val client = ZooKeeperUtils.startCuratorFramework(configuration) + + val leaderElectionService = leaderElectionServiceOption match { + case Some(les) => les + case None => ZooKeeperUtils.createLeaderElectionService(client, configuration) + } + + (leaderElectionService, + ZooKeeperUtils.createSubmittedJobGraphs(client, configuration), + new ZooKeeperCheckpointRecoveryFactory(client, configuration)) + } (executionContext, instanceManager, @@ -1599,9 +1804,11 @@ object JobManager { libraryCacheManager, executionRetries, delayBetweenRetries, - timeout, - archiveCount, - leaderElectionService) + timeout, + archiveCount, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) } /** @@ -1633,6 +1840,7 @@ object JobManager { jobManagerClass, archiveClass) } + /** * Starts the JobManager and job archiver based on the given configuration, in the * given actor system. @@ -1646,28 +1854,30 @@ object JobManager { * @param streamingMode The mode to run the system in (streaming vs. batch-only) * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the MemoryArchivist to be started - * + * * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors( - configuration: Configuration, - actorSystem: ActorSystem, - jobMangerActorName: Option[String], - archiveActorName: Option[String], - streamingMode: StreamingMode, - jobManagerClass: Class[_ <: JobManager], - archiveClass: Class[_ <: MemoryArchivist]) - : (ActorRef, ActorRef) = { + configuration: Configuration, + actorSystem: ActorSystem, + jobMangerActorName: Option[String], + archiveActorName: Option[String], + streamingMode: StreamingMode, + jobManagerClass: Class[_ <: JobManager], + archiveClass: Class[_ <: MemoryArchivist]) + : (ActorRef, ActorRef) = { val (executionContext, - instanceManager, - scheduler, - libraryCacheManager, - executionRetries, - delayBetweenRetries, - timeout, - archiveCount, - leaderElectionService) = createJobManagerComponents( + instanceManager, + scheduler, + libraryCacheManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) = createJobManagerComponents( configuration, None) @@ -1691,7 +1901,9 @@ object JobManager { delayBetweenRetries, timeout, streamingMode, - leaderElectionService) + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index c29df88..d776622 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -66,6 +66,18 @@ object JobManagerMessages { extends RequiresLeaderSessionID /** + * Triggers the recovery of the job with the given ID. + * + * @param jobId ID of the job to recover + */ + case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID + + /** + * Triggers recovery of all available jobs. + */ + case class RecoverAllJobs() extends RequiresLeaderSessionID + + /** * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is * sent back to the sender as a [[CancellationResponse]] message. * @@ -354,6 +366,10 @@ object JobManagerMessages { // -------------------------------------------------------------------------- // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- + + def getRequestJobStatus(jobId : JobID) : AnyRef = { + RequestJobStatus(jobId) + } def getRequestNumberRegisteredTaskManager : AnyRef = { RequestNumberRegisteredTaskManager http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 29add0e..2df3437 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -94,9 +94,7 @@ abstract class FlinkMiniCluster( implicit val timeout = AkkaUtils.getTimeout(userConfiguration) - val recoveryMode = RecoveryMode.valueOf(configuration.getString( - ConfigConstants.RECOVERY_MODE, - ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase) + val recoveryMode = RecoveryMode.fromConfig(configuration) val numJobManagers = getNumberOfJobManagers