flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [FLINK-4776] [distributed coordination] Move ExecutionGraph initialization into the dedicated class ExecutionGraphBuilder
Date Mon, 10 Oct 2016 10:25:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5e30ba384 -> 33c36e62a


[FLINK-4776] [distributed coordination] Move ExecutionGraph initialization into the dedicated
class ExecutionGraphBuilder


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed96cb53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed96cb53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed96cb53

Branch: refs/heads/master
Commit: ed96cb53bfcbe96dd36248839ec4f2bffe60971b
Parents: 5e30ba3
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Oct 7 19:58:24 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Oct 10 12:17:03 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   4 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionGraphBuilder.java   | 262 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 168 ++----------
 4 files changed, 297 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed96cb53/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4428427..e95afe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -171,7 +171,7 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			SavepointStore savepointStore,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker) {
 
 		// sanity checks
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -207,7 +207,7 @@ public class CheckpointCoordinator {
 			// issues a blocking call to ZooKeeper.
 			checkpointIDCounter.start();
 		} catch (Throwable t) {
-			throw new Exception("Failed to start checkpoint ID counter: " + t.getMessage(), t);
+			throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(),
t);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed96cb53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6023205..cf98ca6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -359,7 +359,7 @@ public class ExecutionGraph {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			SavepointStore savepointStore,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker) {
 
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
@@ -374,7 +374,11 @@ public class ExecutionGraph {
 		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
 
 		// disable to make sure existing checkpoint coordinators are cleared
-		disableSnaphotCheckpointing();
+		try {
+			disableSnaphotCheckpointing();
+		} catch (Throwable t) {
+			LOG.error("Error while shutting down checkpointer.");
+		}
 
 		checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed96cb53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
new file mode 100644
index 0000000..1c6eb8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link
JobGraph}.
+ */
+public class ExecutionGraphBuilder {
+
+	/**
+	 * Builds the ExecutionGraph from the JobGraph.
+	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
+	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
+	 */
+	public static ExecutionGraph buildGraph(
+			@Nullable ExecutionGraph prior,
+			JobGraph jobGraph,
+			Configuration jobManagerConfig,
+			Executor executor,
+			ClassLoader classLoader,
+			CheckpointRecoveryFactory recoveryFactory,
+			SavepointStore savepointStore,
+			Time timeout,
+			RestartStrategy restartStrategy,
+			MetricGroup metrics,
+			int parallelismForAutoMax,
+			Logger log)
+			throws JobExecutionException, JobException
+	{
+		final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor);
+		
+		return buildGraph(prior, jobGraph, jobManagerConfig, executionContext,
+				classLoader, recoveryFactory, savepointStore, timeout, restartStrategy,
+				metrics, parallelismForAutoMax, log);
+	}
+
+	/**
+	 * Builds the ExecutionGraph from the JobGraph.
+	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
+	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
+	 */
+	public static ExecutionGraph buildGraph(
+			@Nullable ExecutionGraph prior,
+			JobGraph jobGraph,
+			Configuration jobManagerConfig,
+			ExecutionContext executionContext,
+			ClassLoader classLoader,
+			CheckpointRecoveryFactory recoveryFactory,
+			SavepointStore savepointStore,
+			Time timeout,
+			RestartStrategy restartStrategy,
+			MetricGroup metrics,
+			int parallelismForAutoMax,
+			Logger log)
+		throws JobExecutionException, JobException
+	{
+		checkNotNull(jobGraph, "job graph cannot be null");
+
+		final String jobName = jobGraph.getName();
+		final JobID jobId = jobGraph.getJobID();
+
+		// create a new execution graph, if none exists so far
+		final ExecutionGraph executionGraph = (prior != null) ? prior :
+				new ExecutionGraph(
+						executionContext,
+						jobId,
+						jobName,
+						jobGraph.getJobConfiguration(),
+						jobGraph.getSerializedExecutionConfig(),
+						new FiniteDuration(timeout.getSize(), timeout.getUnit()),
+						restartStrategy,
+						jobGraph.getUserJarBlobKeys(),
+						jobGraph.getClasspaths(),
+						classLoader,
+						metrics);
+
+		// set the basic properties
+
+		executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+		executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+
+		try {
+			executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
+		}
+		catch (Throwable t) {
+			log.warn("Cannot create JSON plan for job", t);
+			// give the graph an empty plan
+			executionGraph.setJsonPlan("{}");
+		}
+
+		// initialize the vertices that have a master initialization hook
+		// file output formats create directories here, input formats create splits
+
+		final long initMasterStart = System.nanoTime();
+		log.info("Running initialization on master for job {} ({}).", jobName, jobId);
+
+		for (JobVertex vertex : jobGraph.getVertices()) {
+			String executableClass = vertex.getInvokableClassName();
+			if (executableClass == null || executableClass.isEmpty()) {
+				throw new JobSubmissionException(jobId,
+						"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
+			}
+
+			if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+				vertex.setParallelism(parallelismForAutoMax);
+			}
+
+			try {
+				vertex.initializeOnMaster(classLoader);
+			}
+			catch (Throwable t) {
+					throw new JobExecutionException(jobId,
+							"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
+			}
+		}
+
+		log.info("Successfully ran initialization on master in {} ms.",
+				(System.nanoTime() - initMasterStart) / 1_000_000);
+
+		// topologically sort the job vertices and attach the graph to the existing one
+		List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
+		if (log.isDebugEnabled()) {
+			log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName,
jobId);
+		}
+		executionGraph.attachJobGraph(sortedTopology);
+
+		if (log.isDebugEnabled()) {
+			log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
+		}
+
+		// configure the state checkpointing
+		JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
+		if (snapshotSettings != null) {
+
+			List<ExecutionJobVertex> triggerVertices = 
+					idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
+
+			List<ExecutionJobVertex> ackVertices =
+					idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
+
+			List<ExecutionJobVertex> confirmVertices =
+					idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
+
+			CompletedCheckpointStore completedCheckpoints;
+			CheckpointIDCounter checkpointIdCounter;
+			try {
+				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader);
+				checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
+			}
+			catch (Exception e) {
+				throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint
handler", e);
+			}
+
+			// Checkpoint stats tracker
+			boolean isStatsDisabled = jobManagerConfig.getBoolean(
+					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+
+			CheckpointStatsTracker checkpointStatsTracker;
+			if (isStatsDisabled) {
+				checkpointStatsTracker = new DisabledCheckpointStatsTracker();
+			}
+			else {
+				int historySize = jobManagerConfig.getInteger(
+						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+
+				checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
+			}
+
+			executionGraph.enableSnapshotCheckpointing(
+					snapshotSettings.getCheckpointInterval(),
+					snapshotSettings.getCheckpointTimeout(),
+					snapshotSettings.getMinPauseBetweenCheckpoints(),
+					snapshotSettings.getMaxConcurrentCheckpoints(),
+					triggerVertices,
+					ackVertices,
+					confirmVertices,
+					checkpointIdCounter,
+					completedCheckpoints,
+					savepointStore,
+					checkpointStatsTracker);
+		}
+
+		return executionGraph;
+	}
+
+	private static List<ExecutionJobVertex> idToVertex(
+			List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException
{
+
+		List<ExecutionJobVertex> result = new ArrayList<>(jobVertices.size());
+
+		for (JobVertexID id : jobVertices) {
+			ExecutionJobVertex vertex = executionGraph.getJobVertex(id);
+			if (vertex != null) {
+				result.add(vertex);
+			} else {
+				throw new IllegalArgumentException(
+						"The snapshot checkpointing settings refer to non-existent vertex " + id);
+			} 
+		}
+
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not supposed to be instantiated */
+	private ExecutionGraphBuilder() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed96cb53/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 01f9cec..e90f2d2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,7 +29,8 @@ import akka.actor.Status.{Failure, Success}
 import akka.actor._
 import akka.pattern.ask
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
@@ -49,11 +50,10 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, ExecutionGraph, ExecutionJobVertex,
StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
@@ -1114,7 +1114,7 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+            .map(RestartStrategyFactory.createRestartStrategy) match {
             case Some(strategy) => strategy
             case None => restartStrategyFactory.createRestartStrategy()
           }
@@ -1131,148 +1131,34 @@ class JobManager(
             new UnregisteredMetricsGroup()
         }
 
+        val numSlots = scheduler.getTotalNumberOfSlots()
+
         // see if there already exists an ExecutionGraph for the corresponding job ID
-        executionGraph = currentJobs.get(jobGraph.getJobID) match {
+        val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
           case Some((graph, currentJobInfo)) =>
+            executionGraph = graph
             currentJobInfo.setLastActive()
-            graph
+            false
           case None =>
-            val graph = new ExecutionGraph(
-              executionContext,
-              jobGraph.getJobID,
-              jobGraph.getName,
-              jobGraph.getJobConfiguration,
-              jobGraph.getSerializedExecutionConfig,
-              timeout,
-              restartStrategy,
-              jobGraph.getUserJarBlobKeys,
-              jobGraph.getClasspaths,
-              userCodeLoader,
-              jobMetrics)
-
-            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
-            graph
-        }
-
-        executionGraph.setScheduleMode(jobGraph.getScheduleMode())
-        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
-
-        try {
-          executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
-        }
-        catch {
-          case t: Throwable =>
-            log.warn("Cannot create JSON plan for job", t)
-            executionGraph.setJsonPlan("{}")
-        }
-
-        // initialize the vertices that have a master initialization hook
-        // file output formats create directories here, input formats create splits
-        if (log.isDebugEnabled) {
-          log.debug(s"Running initialization on master for job $jobId ($jobName).")
-        }
-
-        val numSlots = scheduler.getTotalNumberOfSlots()
-
-        for (vertex <- jobGraph.getVertices.asScala) {
-          val executableClass = vertex.getInvokableClassName
-          if (executableClass == null || executableClass.length == 0) {
-            throw new JobSubmissionException(jobId,
-              s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
-          }
-
-          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-            vertex.setParallelism(numSlots)
-          }
-
-          try {
-            vertex.initializeOnMaster(userCodeLoader)
-          }
-          catch {
-            case t: Throwable =>
-              throw new JobExecutionException(jobId,
-                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
-          }
+            true
         }
 
-        // topologically sort the job vertices and attach the graph to the existing one
-        val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
-        if (log.isDebugEnabled) {
-          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-            s"job graph $jobId ($jobName).")
-        }
-        executionGraph.attachJobGraph(sortedTopology)
-
-        if (log.isDebugEnabled) {
-          log.debug("Successfully created execution graph from job " +
-            s"graph $jobId ($jobName).")
-        }
-
-        // configure the state checkpointing
-        val snapshotSettings = jobGraph.getSnapshotSettings
-        if (snapshotSettings != null) {
-          val jobId = jobGraph.getJobID()
-
-          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
-            val vertex = executionGraph.getJobVertex(id)
-            if (vertex == null) {
-              throw new JobSubmissionException(jobId,
-                "The snapshot checkpointing settings refer to non-existent vertex " + id)
-            }
-            vertex
-          }
-
-          val triggerVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
-
-          val ackVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
-
-          val confirmVertices: java.util.List[ExecutionJobVertex] =
-            snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
-
-          val completedCheckpoints = checkpointRecoveryFactory
-            .createCheckpointStore(jobId, userCodeLoader)
-
-          val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
-
-          // Checkpoint stats tracker
-          val isStatsDisabled: Boolean = flinkConfiguration.getBoolean(
-            ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-            ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE)
-
-          val checkpointStatsTracker : CheckpointStatsTracker =
-            if (isStatsDisabled) {
-              new DisabledCheckpointStatsTracker()
-            } else {
-              val historySize: Int = flinkConfiguration.getInteger(
-                ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
-
-              new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics)
-            }
-
-          val jobParallelism = jobGraph.getSerializedExecutionConfig
-            .deserializeValue(userCodeLoader).getParallelism()
-
-          val parallelism = if (jobParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-            numSlots
-          } else {
-            jobParallelism
-          }
-
-          executionGraph.enableSnapshotCheckpointing(
-            snapshotSettings.getCheckpointInterval,
-            snapshotSettings.getCheckpointTimeout,
-            snapshotSettings.getMinPauseBetweenCheckpoints,
-            snapshotSettings.getMaxConcurrentCheckpoints,
-            triggerVertices,
-            ackVertices,
-            confirmVertices,
-            checkpointIdCounter,
-            completedCheckpoints,
-            savepointStore,
-            checkpointStatsTracker)
+        executionGraph = ExecutionGraphBuilder.buildGraph(
+          executionGraph,
+          jobGraph,
+          flinkConfiguration,
+          executionContext,
+          userCodeLoader,
+          checkpointRecoveryFactory,
+          savepointStore,
+          Time.of(timeout.length, timeout.unit),
+          restartStrategy,
+          jobMetrics,
+          numSlots,
+          log.logger)
+        
+        if (registerNewGraph) {
+          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
         }
 
         // get notified about job status changes


Mime
View raw message