[hotfix] Let ClusterClient only shut down own HaServices
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b25d3007
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b25d3007
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b25d3007
Branch: refs/heads/master
Commit: b25d30074c3f2b9474fc0597973e50a0b7b8f4e7
Parents: 4c84994
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Mar 1 17:05:44 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 2 08:52:33 2018 +0100
----------------------------------------------------------------------
.../apache/flink/client/program/ClusterClient.java | 14 ++++++++++----
.../flink/client/program/MiniClusterClient.java | 2 +-
.../flink/client/program/StandaloneClusterClient.java | 4 ++--
.../flink/client/cli/CliFrontendModifyTest.java | 2 +-
.../flink/client/cli/CliFrontendSavepointTest.java | 5 +++--
.../flink/client/program/ClientConnectionTest.java | 2 +-
.../flink/client/program/ClusterClientTest.java | 4 ++--
.../apache/flink/test/util/MiniClusterResource.java | 2 +-
.../checkpointing/AbstractLocalRecoveryITCase.java | 12 ++++++------
.../flink/test/example/client/JobRetrievalITCase.java | 2 +-
10 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e2efbac..1cf2bc2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -117,6 +117,8 @@ public abstract class ClusterClient<T> {
/** Service factory for high available. */
protected final HighAvailabilityServices highAvailabilityServices;
+ private final boolean sharedHaServices;
+
/** Flag indicating whether to sysout print execution updates. */
private boolean printStatusDuringExecution = true;
@@ -144,11 +146,13 @@ public abstract class ClusterClient<T> {
* @throws Exception we cannot create the high availability services
*/
public ClusterClient(Configuration flinkConfig) throws Exception {
- this(flinkConfig,
+ this(
+ flinkConfig,
HighAvailabilityServicesUtils.createHighAvailabilityServices(
flinkConfig,
Executors.directExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+ HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
+ false);
}
/**
@@ -158,8 +162,9 @@ public abstract class ClusterClient<T> {
*
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure
the optimizer.
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
+ * @param sharedHaServices true if the HighAvailabilityServices are shared and must not
be shut down
*/
- public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices)
{
+ public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
@@ -173,6 +178,7 @@ public abstract class ClusterClient<T> {
log);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+ this.sharedHaServices = sharedHaServices;
}
// ------------------------------------------------------------------------
@@ -265,7 +271,7 @@ public abstract class ClusterClient<T> {
synchronized (this) {
actorSystemLoader.shutdown();
- if (highAvailabilityServices != null) {
+ if (!sharedHaServices && highAvailabilityServices != null) {
highAvailabilityServices.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index b98e895..aca75e0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -52,7 +52,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
private final MiniCluster miniCluster;
public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster)
throws Exception {
- super(configuration, miniCluster.getHighAvailabilityServices());
+ super(configuration, miniCluster.getHighAvailabilityServices(), true);
this.miniCluster = miniCluster;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 0b91ed4..ee8ad44 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -47,8 +47,8 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId>
super(config);
}
- public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices)
{
- super(config, highAvailabilityServices);
+ public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices) {
+ super(config, highAvailabilityServices, sharedHaServices);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
index 50d87ba..00d5241 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
@@ -121,7 +121,7 @@ public class CliFrontendModifyTest extends TestLogger {
private final CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture;
public TestingClusterClient(CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture)
throws Exception {
- super(new Configuration(), new TestingHighAvailabilityServices());
+ super(new Configuration(), new TestingHighAvailabilityServices(), false);
this.rescaleJobFuture = rescaleJobFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
index d730344..f4c66eb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
@@ -138,7 +138,8 @@ public class CliFrontendSavepointTest extends TestLogger {
try {
CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient(
new Configuration(),
- new TestingHighAvailabilityServices()));
+ new TestingHighAvailabilityServices(),
+ false));
String[] parameters = { "invalid job id" };
try {
@@ -288,7 +289,7 @@ public class CliFrontendSavepointTest extends TestLogger {
private final BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction;
DisposeSavepointClusterClient(BiFunction<String, Time, CompletableFuture<Acknowledge>>
disposeSavepointFunction) throws Exception {
- super(new Configuration(), new TestingHighAvailabilityServices());
+ super(new Configuration(), new TestingHighAvailabilityServices(), false);
this.disposeSavepointFunction = Preconditions.checkNotNull(disposeSavepointFunction);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 1dd4787..2b8abb1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -140,7 +140,7 @@ public class ClientConnectionTest extends TestLogger {
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
settableLeaderRetrievalService);
- StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
+ StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices,
true);
ActorGateway gateway = client.getJobManagerGateway();
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index e2eb88d..f30fd19 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -71,7 +71,7 @@ public class ClusterClientTest extends TestLogger {
Configuration config = new Configuration();
HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class);
- StandaloneClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices);
+ StandaloneClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices,
false);
clusterClient.shutdown();
@@ -333,7 +333,7 @@ public class ClusterClientTest extends TestLogger {
private final ActorGateway jobmanagerGateway;
TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception
{
- super(config, new TestingHighAvailabilityServices());
+ super(config, new TestingHighAvailabilityServices(), false);
this.jobmanagerGateway = jobmanagerGateway;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index a1ce647..954b06f 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -161,7 +161,7 @@ public class MiniClusterResource extends ExternalResource {
true);
jobExecutorService = flinkMiniCluster;
- clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices());
+ clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(),
true);
}
private void startFlip6MiniCluster() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
index a02e902..13040c9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -78,20 +78,20 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger {
private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception
{
delegate.name = testName;
try {
- delegate.miniClusterResource.before();
+ delegate.setupTestCluster();
try {
delegate.testTumblingTimeWindow();
- delegate.miniClusterResource.after();
+ delegate.stopTestCluster();
} catch (Exception e) {
- delegate.miniClusterResource.after();
+ delegate.stopTestCluster();
}
- delegate.miniClusterResource.before();
+ delegate.setupTestCluster();
try {
delegate.testSlidingTimeWindow();
- delegate.miniClusterResource.after();
+ delegate.stopTestCluster();
} catch (Exception e) {
- delegate.miniClusterResource.after();
+ delegate.stopTestCluster();
}
} finally {
delegate.tempFolder.delete();
http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 221f3fa..d34b6c33 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -79,7 +79,7 @@ public class JobRetrievalITCase extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
- final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(),
cluster.highAvailabilityServices());
+ final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(),
cluster.highAvailabilityServices(), true);
// acquire the lock to make sure that the job cannot complete until the job client
// has been attached in resumingThread
|