Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE12B200BE8 for ; Fri, 23 Dec 2016 21:22:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BCD13160B3E; Fri, 23 Dec 2016 20:22:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EBB58160B46 for ; Fri, 23 Dec 2016 21:22:22 +0100 (CET) Received: (qmail 92653 invoked by uid 500); 23 Dec 2016 20:22:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 91996 invoked by uid 99); 23 Dec 2016 20:22:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2016 20:22:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2605DF2DF1; Fri, 23 Dec 2016 20:22:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 23 Dec 2016 20:22:39 -0000 Message-Id: <6b44b4bc220244218002e0d19a4f6316@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/52] [abbrv] flink git commit: [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary archived-at: Fri, 23 Dec 2016 20:22:24 -0000 [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary Cleanup of the interface HighAvailabilityServices so that only methods which really throw an exception have an exception clause defined. This closes #2679. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3aafa16e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3aafa16e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3aafa16e Branch: refs/heads/master Commit: 3aafa16eae04d8a5a41b84c9a82480b9742c3fb1 Parents: 6f691ad Author: Till Rohrmann Authored: Wed Oct 19 14:09:31 2016 +0200 Committer: Stephan Ewen Committed: Fri Dec 23 20:54:24 2016 +0100 ---------------------------------------------------------------------- .../highavailability/EmbeddedNonHaServices.java | 4 +-- .../HighAvailabilityServices.java | 33 +++++++++++++++----- .../runtime/highavailability/NonHaServices.java | 4 +-- .../highavailability/ZookeeperHaServices.java | 12 +++---- .../nonha/AbstractNonHaServices.java | 10 +++--- .../flink/runtime/jobmaster/JobMaster.java | 17 ++-------- .../flink/runtime/util/ZooKeeperUtils.java | 5 ++- .../TestingHighAvailabilityServices.java | 14 ++++----- 8 files changed, 51 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java index 58da287..523218e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return resourceManagerLeaderService.createLeaderRetrievalService(); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return resourceManagerLeaderService.createLeaderElectionService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index f6db682..360de7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -43,50 +43,67 @@ public interface HighAvailabilityServices { /** * Gets the leader retriever for the cluster's resource manager. */ - LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; + LeaderRetrievalService getResourceManagerLeaderRetriever(); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. - * @return - * @throws Exception + * @return Leader retrieval service to retrieve the job manager for the given job */ - LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID); /** * Gets the leader election service for the cluster's resource manager. + * + * @return Leader election service for the resource manager leader election */ - LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; + LeaderElectionService getResourceManagerLeaderElectionService(); /** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. + * @return Leader election service for the job manager leader election */ - LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception; + LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); /** * Gets the checkpoint recovery factory for the job manager + * + * @return Checkpoint recovery factory */ - CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; + CheckpointRecoveryFactory getCheckpointRecoveryFactory(); /** * Gets the submitted job graph store for the job manager + * + * @return Submitted job graph store + * @throws Exception if the submitted job graph store could not be created */ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; /** * Gets the registry that holds information about whether jobs are currently running. + * + * @return Running job registry to retrieve running jobs */ - RunningJobsRegistry getRunningJobsRegistry() throws Exception; + RunningJobsRegistry getRunningJobsRegistry(); /** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. + * + * @return Blob store + * @throws IOException if the blob store could not be created */ BlobStore createBlobStore() throws IOException; // ------------------------------------------------------------------------ + /** + * Shut the high availability service down. + * + * @throws Exception if the shut down fails + */ void shutdown() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 107cbd0..75f44ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return new StandaloneLeaderElectionService(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index e38840b..3e909e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -108,27 +108,27 @@ public class ZookeeperHaServices implements HighAvailabilityServices { // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID)); } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor); } @@ -138,7 +138,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices { } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { throw new UnsupportedOperationException("not yet implemented"); } http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 8c15a52..237727f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { checkNotNull(jobID); synchronized (lock) { @@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { checkNotNull(jobID); synchronized (lock) { @@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { checkNotShutdown(); return new StandaloneCheckpointRecoveryFactory(); } @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + public SubmittedJobGraphStore getSubmittedJobGraphStore() { checkNotShutdown(); return new StandaloneSubmittedJobGraphStore(); } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { checkNotShutdown(); return runningJobsRegistry; } http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3c6bbd3..204cd80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -229,21 +228,9 @@ public class JobMaster extends RpcEndpoint { log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - CheckpointRecoveryFactory checkpointRecoveryFactory; - try { - checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - } catch (Exception e) { - log.error("Could not create the access to highly-available checkpoint storage.", e); - throw new Exception("Could not create the access to highly-available checkpoint storage.", e); - } + CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - try { - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); - } catch (Exception e) { - log.error("Could not get the resource manager leader retriever.", e); - throw new JobSubmissionException(jobGraph.getJobID(), - "Could not get the resource manager leader retriever.", e); - } + resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.executionGraph = ExecutionGraphBuilder.buildGraph( null, http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 81609c2..621edcb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -193,7 +193,7 @@ public class ZooKeeperUtils { public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( final CuratorFramework client, final Configuration configuration, - final String pathSuffix) throws Exception + final String pathSuffix) { String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( configuration, @@ -240,12 +240,11 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object containing the configuration values * @param pathSuffix The path suffix which we want to append * @return {@link ZooKeeperLeaderElectionService} instance. - * @throws Exception */ public static ZooKeeperLeaderElectionService createLeaderElectionService( final CuratorFramework client, final Configuration configuration, - final String pathSuffix) throws Exception + final String pathSuffix) { final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 877812b..e0f71ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { LeaderRetrievalService service = this.resourceManagerLeaderRetriever; if (service != null) { return service; @@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; @@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { LeaderElectionService service = resourceManagerLeaderElectionService; if (service != null) { @@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID); if (service != null) { @@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { CheckpointRecoveryFactory factory = checkpointRecoveryFactory; if (factory != null) { @@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + public SubmittedJobGraphStore getSubmittedJobGraphStore() { SubmittedJobGraphStore store = submittedJobGraphStore; if (store != null) { @@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { return new NonHaRegistry(); }