flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor
Date Thu, 11 Feb 2016 19:49:54 GMT
Repository: flink
Updated Branches:
  refs/heads/master ed7d3da7f -> dcea46e89


[FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor

This closes #1622.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcea46e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcea46e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcea46e8

Branch: refs/heads/master
Commit: dcea46e891a1479205fdfe939858d340cde87d57
Parents: ed7d3da
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Feb 11 14:58:35 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Feb 11 20:49:28 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 22 +++++++++++---------
 .../JobManagerLeaderElectionTest.java           |  6 +++++-
 .../runtime/testingUtils/TestingCluster.scala   |  6 ++++--
 .../testingUtils/TestingJobManager.scala        |  8 ++++---
 .../runtime/testingUtils/TestingUtils.scala     |  3 ++-
 .../flink/yarn/TestingYarnJobManager.scala      |  8 ++++---
 .../org/apache/flink/yarn/YarnJobManager.scala  |  8 ++++---
 7 files changed, 38 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d96575f..78612c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -113,7 +113,8 @@ class JobManager(
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
-    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    protected val savepointStore: SavepointStore)
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -151,9 +152,6 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
     ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
-  protected val savepointStore : SavepointStore =
-    SavepointStoreFactory.createFromConfig(flinkConfiguration)
-
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -2040,7 +2038,8 @@ object JobManager {
     Int, // number of archived jobs
     LeaderElectionService,
     SubmittedJobGraphStore,
-    CheckpointRecoveryFactory) = {
+    CheckpointRecoveryFactory,
+    SavepointStore) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -2078,8 +2077,6 @@ object JobManager {
           }
       }
 
-    
-
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
@@ -2140,6 +2137,8 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
+    val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
+
     (executorService,
       instanceManager,
       scheduler,
@@ -2150,7 +2149,8 @@ object JobManager {
       archiveCount,
       leaderElectionService,
       submittedJobGraphs,
-      checkpointRecoveryFactory)
+      checkpointRecoveryFactory,
+      savepointStore)
   }
 
   /**
@@ -2212,7 +2212,8 @@ object JobManager {
     archiveCount,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) = createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = createJobManagerComponents(
       configuration,
       None)
 
@@ -2237,7 +2238,8 @@ object JobManager {
       timeout,
       leaderElectionService,
       submittedJobGraphs,
-      checkpointRecoveryFactory)
+      checkpointRecoveryFactory,
+      savepointStore)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index f50a0a0..73c7646 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.SavepointStoreFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -179,6 +181,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		// We don't need recovery in this test
 		SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
 		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		SavepointStore savepointStore = SavepointStoreFactory.createFromConfig(configuration);
 
 		return Props.create(
 				TestingJobManager.class,
@@ -193,7 +196,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				leaderElectionService,
 				submittedJobGraphStore,
-				checkpointRecoveryFactory
+				checkpointRecoveryFactory,
+				savepointStore
 		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 22b0d29..cfb1192 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -102,7 +102,8 @@ class TestingCluster(
     archiveCount,
     leaderElectionService,
     submittedJobsGraphs,
-    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = JobManager.createJobManagerComponents(
       config,
       createLeaderElectionService())
 
@@ -122,7 +123,8 @@ class TestingCluster(
         timeout,
         leaderElectionService,
         submittedJobsGraphs,
-        checkpointRecoveryFactory))
+        checkpointRecoveryFactory,
+        savepointStore))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 0c0ca40..98d8863 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -48,7 +48,8 @@ class TestingJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore : SavepointStore)
   extends JobManager(
     flinkConfiguration,
       executorService,
@@ -61,5 +62,6 @@ class TestingJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory)
+    checkpointRecoveryFactory,
+    savepointStore)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 679dc71..6057f65 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -329,7 +329,8 @@ object TestingUtils {
     archiveCount,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+    checkpointRecoveryFactory,
+    savepointStore) = JobManager.createJobManagerComponents(
       configuration,
       None
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 2d50407..c7cd205 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
@@ -62,7 +62,8 @@ class TestingYarnJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore)
   extends YarnJobManager(
     flinkConfiguration,
     executorService,
@@ -75,7 +76,8 @@ class TestingYarnJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory)
+    checkpointRecoveryFactory,
+    savepointStore)
   with TestingJobManagerLike {
 
   override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 8dfa22d..ec1fb81 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -32,7 +32,7 @@ import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
@@ -91,7 +91,8 @@ class YarnJobManager(
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory)
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore)
   extends JobManager(
     flinkConfiguration,
     executorService,
@@ -104,7 +105,8 @@ class YarnJobManager(
     timeout,
     leaderElectionService,
     submittedJobGraphs,
-    checkpointRecoveryFactory) {
+    checkpointRecoveryFactory,
+    savepointStore) {
 
   import context._
   import scala.collection.JavaConverters._


Mime
View raw message