flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [05/12] flink git commit: [hotfix] Let ClusterClient only shut down own HaServices
Date Fri, 02 Mar 2018 07:53:05 GMT
[hotfix] Let ClusterClient only shut down own HaServices


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b25d3007
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b25d3007
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b25d3007

Branch: refs/heads/master
Commit: b25d30074c3f2b9474fc0597973e50a0b7b8f4e7
Parents: 4c84994
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Mar 1 17:05:44 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 2 08:52:33 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/ClusterClient.java    | 14 ++++++++++----
 .../flink/client/program/MiniClusterClient.java       |  2 +-
 .../flink/client/program/StandaloneClusterClient.java |  4 ++--
 .../flink/client/cli/CliFrontendModifyTest.java       |  2 +-
 .../flink/client/cli/CliFrontendSavepointTest.java    |  5 +++--
 .../flink/client/program/ClientConnectionTest.java    |  2 +-
 .../flink/client/program/ClusterClientTest.java       |  4 ++--
 .../apache/flink/test/util/MiniClusterResource.java   |  2 +-
 .../checkpointing/AbstractLocalRecoveryITCase.java    | 12 ++++++------
 .../flink/test/example/client/JobRetrievalITCase.java |  2 +-
 10 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e2efbac..1cf2bc2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -117,6 +117,8 @@ public abstract class ClusterClient<T> {
 	/** Service factory for high available. */
 	protected final HighAvailabilityServices highAvailabilityServices;
 
+	private final boolean sharedHaServices;
+
 	/** Flag indicating whether to sysout print execution updates. */
 	private boolean printStatusDuringExecution = true;
 
@@ -144,11 +146,13 @@ public abstract class ClusterClient<T> {
 	 * @throws Exception we cannot create the high availability services
 	 */
 	public ClusterClient(Configuration flinkConfig) throws Exception {
-		this(flinkConfig,
+		this(
+			flinkConfig,
 			HighAvailabilityServicesUtils.createHighAvailabilityServices(
 				flinkConfig,
 				Executors.directExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
+			false);
 	}
 
 	/**
@@ -158,8 +162,9 @@ public abstract class ClusterClient<T> {
 	 *
 	 * @param flinkConfig The config used to obtain the job-manager's address, and used to configure
the optimizer.
 	 * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
+	 * @param sharedHaServices true if the HighAvailabilityServices are shared and must not
be shut down
 	 */
-	public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices)
{
+	public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
 
@@ -173,6 +178,7 @@ public abstract class ClusterClient<T> {
 			log);
 
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+		this.sharedHaServices = sharedHaServices;
 	}
 
 	// ------------------------------------------------------------------------
@@ -265,7 +271,7 @@ public abstract class ClusterClient<T> {
 		synchronized (this) {
 			actorSystemLoader.shutdown();
 
-			if (highAvailabilityServices != null) {
+			if (!sharedHaServices && highAvailabilityServices != null) {
 				highAvailabilityServices.close();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index b98e895..aca75e0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -52,7 +52,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 	private final MiniCluster miniCluster;
 
 	public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull MiniCluster miniCluster)
throws Exception {
-		super(configuration, miniCluster.getHighAvailabilityServices());
+		super(configuration, miniCluster.getHighAvailabilityServices(), true);
 
 		this.miniCluster = miniCluster;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 0b91ed4..ee8ad44 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -47,8 +47,8 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId>
 		super(config);
 	}
 
-	public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices)
{
-		super(config, highAvailabilityServices);
+	public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices,
boolean sharedHaServices) {
+		super(config, highAvailabilityServices, sharedHaServices);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
index 50d87ba..00d5241 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
@@ -121,7 +121,7 @@ public class CliFrontendModifyTest extends TestLogger {
 		private final CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture;
 
 		public TestingClusterClient(CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture)
throws Exception {
-			super(new Configuration(), new TestingHighAvailabilityServices());
+			super(new Configuration(), new TestingHighAvailabilityServices(), false);
 
 			this.rescaleJobFuture = rescaleJobFuture;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
index d730344..f4c66eb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
@@ -138,7 +138,8 @@ public class CliFrontendSavepointTest extends TestLogger {
 		try {
 			CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient(
 				new Configuration(),
-				new TestingHighAvailabilityServices()));
+				new TestingHighAvailabilityServices(),
+				false));
 
 			String[] parameters = { "invalid job id" };
 			try {
@@ -288,7 +289,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 		private final BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction;
 
 		DisposeSavepointClusterClient(BiFunction<String, Time, CompletableFuture<Acknowledge>>
disposeSavepointFunction) throws Exception {
-			super(new Configuration(), new TestingHighAvailabilityServices());
+			super(new Configuration(), new TestingHighAvailabilityServices(), false);
 
 			this.disposeSavepointFunction = Preconditions.checkNotNull(disposeSavepointFunction);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 1dd4787..2b8abb1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -140,7 +140,7 @@ public class ClientConnectionTest extends TestLogger {
 
 			highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
settableLeaderRetrievalService);
 
-			StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
+			StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices,
true);
 
 			ActorGateway gateway = client.getJobManagerGateway();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index e2eb88d..f30fd19 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -71,7 +71,7 @@ public class ClusterClientTest extends TestLogger {
 		Configuration config = new Configuration();
 		HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class);
 
-		StandaloneClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices);
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices,
false);
 
 		clusterClient.shutdown();
 
@@ -333,7 +333,7 @@ public class ClusterClientTest extends TestLogger {
 		private final ActorGateway jobmanagerGateway;
 
 		TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception
{
-			super(config, new TestingHighAvailabilityServices());
+			super(config, new TestingHighAvailabilityServices(), false);
 			this.jobmanagerGateway = jobmanagerGateway;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index a1ce647..954b06f 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -161,7 +161,7 @@ public class MiniClusterResource extends ExternalResource {
 			true);
 
 		jobExecutorService = flinkMiniCluster;
-		clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices());
+		clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(),
true);
 	}
 
 	private void startFlip6MiniCluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
index a02e902..13040c9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -78,20 +78,20 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger {
 	private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception
{
 		delegate.name = testName;
 		try {
-			delegate.miniClusterResource.before();
+			delegate.setupTestCluster();
 			try {
 				delegate.testTumblingTimeWindow();
-				delegate.miniClusterResource.after();
+				delegate.stopTestCluster();
 			} catch (Exception e) {
-				delegate.miniClusterResource.after();
+				delegate.stopTestCluster();
 			}
 
-			delegate.miniClusterResource.before();
+			delegate.setupTestCluster();
 			try {
 				delegate.testSlidingTimeWindow();
-				delegate.miniClusterResource.after();
+				delegate.stopTestCluster();
 			} catch (Exception e) {
-				delegate.miniClusterResource.after();
+				delegate.stopTestCluster();
 			}
 		} finally {
 			delegate.tempFolder.delete();

http://git-wip-us.apache.org/repos/asf/flink/blob/b25d3007/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 221f3fa..d34b6c33 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -79,7 +79,7 @@ public class JobRetrievalITCase extends TestLogger {
 
 		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
 
-		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(),
cluster.highAvailabilityServices());
+		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(),
cluster.highAvailabilityServices(), true);
 
 		// acquire the lock to make sure that the job cannot complete until the job client
 		// has been attached in resumingThread


Mime
View raw message