Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9D12D18707 for ; Thu, 11 Feb 2016 19:49:54 +0000 (UTC) Received: (qmail 43884 invoked by uid 500); 11 Feb 2016 19:49:54 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 43841 invoked by uid 500); 11 Feb 2016 19:49:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 43832 invoked by uid 99); 11 Feb 2016 19:49:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 19:49:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B2FFE0534; Thu, 11 Feb 2016 19:49:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Message-Id: <4cf6b8336b4342d4a65db5b18a97946c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor Date: Thu, 11 Feb 2016 19:49:54 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master ed7d3da7f -> dcea46e89 [FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor This closes #1622. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcea46e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcea46e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcea46e8 Branch: refs/heads/master Commit: dcea46e891a1479205fdfe939858d340cde87d57 Parents: ed7d3da Author: Ufuk Celebi Authored: Thu Feb 11 14:58:35 2016 +0100 Committer: Ufuk Celebi Committed: Thu Feb 11 20:49:28 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 22 +++++++++++--------- .../JobManagerLeaderElectionTest.java | 6 +++++- .../runtime/testingUtils/TestingCluster.scala | 6 ++++-- .../testingUtils/TestingJobManager.scala | 8 ++++--- .../runtime/testingUtils/TestingUtils.scala | 3 ++- .../flink/yarn/TestingYarnJobManager.scala | 8 ++++--- .../org/apache/flink/yarn/YarnJobManager.scala | 8 ++++--- 7 files changed, 38 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d96575f..78612c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -113,7 +113,8 @@ class JobManager( protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, - protected val checkpointRecoveryFactory : CheckpointRecoveryFactory) + protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, + protected val savepointStore: SavepointStore) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging @@ -151,9 +152,6 @@ class JobManager( val webMonitorPort : Int = flinkConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1) - protected val savepointStore : SavepointStore = - SavepointStoreFactory.createFromConfig(flinkConfiguration) - /** * Run when the job manager is started. Simply logs an informational message. * The method also starts the leader election service. @@ -2040,7 +2038,8 @@ object JobManager { Int, // number of archived jobs LeaderElectionService, SubmittedJobGraphStore, - CheckpointRecoveryFactory) = { + CheckpointRecoveryFactory, + SavepointStore) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -2078,8 +2077,6 @@ object JobManager { } } - - var blobServer: BlobServer = null var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null @@ -2140,6 +2137,8 @@ object JobManager { new ZooKeeperCheckpointRecoveryFactory(client, configuration)) } + val savepointStore = SavepointStoreFactory.createFromConfig(configuration) + (executorService, instanceManager, scheduler, @@ -2150,7 +2149,8 @@ object JobManager { archiveCount, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) } /** @@ -2212,7 +2212,8 @@ object JobManager { archiveCount, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) = createJobManagerComponents( + checkpointRecoveryFactory, + savepointStore) = createJobManagerComponents( configuration, None) @@ -2237,7 +2238,8 @@ object JobManager { timeout, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index f50a0a0..73c7646 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -31,6 +31,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.SavepointStoreFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.instance.InstanceManager; @@ -179,6 +181,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { // We don't need recovery in this test SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore(); CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + SavepointStore savepointStore = SavepointStoreFactory.createFromConfig(configuration); return Props.create( TestingJobManager.class, @@ -193,7 +196,8 @@ public class JobManagerLeaderElectionTest extends TestLogger { AkkaUtils.getDefaultTimeout(), leaderElectionService, submittedJobGraphStore, - checkpointRecoveryFactory + checkpointRecoveryFactory, + savepointStore ); } } http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 22b0d29..cfb1192 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -102,7 +102,8 @@ class TestingCluster( archiveCount, leaderElectionService, submittedJobsGraphs, - checkpointRecoveryFactory) = JobManager.createJobManagerComponents( + checkpointRecoveryFactory, + savepointStore) = JobManager.createJobManagerComponents( config, createLeaderElectionService()) @@ -122,7 +123,8 @@ class TestingCluster( timeout, leaderElectionService, submittedJobsGraphs, - checkpointRecoveryFactory)) + checkpointRecoveryFactory, + savepointStore)) val dispatcherJobManagerProps = if (synchronousDispatcher) { // disable asynchronous futures (e.g. accumulator update in Heartbeat) http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 0c0ca40..98d8863 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler @@ -48,7 +48,8 @@ class TestingJobManager( timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory) + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore : SavepointStore) extends JobManager( flinkConfiguration, executorService, @@ -61,5 +62,6 @@ class TestingJobManager( timeout, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 679dc71..6057f65 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -329,7 +329,8 @@ object TestingUtils { archiveCount, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) = JobManager.createJobManagerComponents( + checkpointRecoveryFactory, + savepointStore) = JobManager.createJobManagerComponents( configuration, None ) http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 2d50407..c7cd205 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore @@ -62,7 +62,8 @@ class TestingYarnJobManager( timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory) + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore) extends YarnJobManager( flinkConfiguration, executorService, @@ -75,7 +76,8 @@ class TestingYarnJobManager( timeout, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) with TestingJobManagerLike { override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner] http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 8dfa22d..ec1fb81 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -32,7 +32,7 @@ import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.leaderelection.LeaderElectionService @@ -91,7 +91,8 @@ class YarnJobManager( timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory) + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore) extends JobManager( flinkConfiguration, executorService, @@ -104,7 +105,8 @@ class YarnJobManager( timeout, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) { + checkpointRecoveryFactory, + savepointStore) { import context._ import scala.collection.JavaConverters._