Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D044200CBD for ; Thu, 6 Jul 2017 12:21:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3BB07165FB4; Thu, 6 Jul 2017 10:21:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5C347165FAF for ; Thu, 6 Jul 2017 12:21:03 +0200 (CEST) Received: (qmail 72272 invoked by uid 500); 6 Jul 2017 10:21:02 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 72263 invoked by uid 99); 6 Jul 2017 10:21:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jul 2017 10:21:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 228D6C0096 for ; Thu, 6 Jul 2017 10:21:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id f_3YE7ksX5UL for ; Thu, 6 Jul 2017 10:21:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id BCCE35FB06 for ; Thu, 6 Jul 2017 10:20:59 +0000 (UTC) Received: (qmail 72118 invoked by uid 99); 6 Jul 2017 10:20:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jul 2017 10:20:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE793DFC28; Thu, 6 Jul 2017 10:20:58 +0000 (UTC) From: zentol To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o... Content-Type: text/plain Message-Id: <20170706102058.DE793DFC28@git1-us-west.apache.org> Date: Thu, 6 Jul 2017 10:20:58 +0000 (UTC) archived-at: Thu, 06 Jul 2017 10:21:04 -0000 Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4260#discussion_r125862847 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobService; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for the Dispatcher component. The Dispatcher component is responsible + * for receiving job submissions, persisting them, spawning JobManagers to execute + * the jobs and to recover them in case of a master failure. Furthermore, it knows + * about the state of the Flink session cluster. + */ +public abstract class Dispatcher extends RpcEndpoint { + + protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + public static final String DISPATCHER_NAME = "dispatcher"; + + private final SubmittedJobGraphStore submittedJobGraphStore; + private final RunningJobsRegistry runningJobsRegistry; + + private final HighAvailabilityServices highAvailabilityServices; + private final BlobServer blobServer; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + + private final FatalErrorHandler fatalErrorHandler; + + private final Map jobManagerRunners; + + protected Dispatcher( + RpcService rpcService, + String endpointId, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) throws Exception { + super(rpcService, endpointId); + + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.blobServer = Preconditions.checkNotNull(blobServer); + this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); + this.metricRegistry = Preconditions.checkNotNull(metricRegistry); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + + this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore(); + this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); + + jobManagerRunners = new HashMap<>(16); + } + + //------------------------------------------------------ + // Lifecycle methods + //------------------------------------------------------ + + @Override + public void shutDown() throws Exception { + Exception exception = null; + // stop all currently running JobManagerRunners + for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { + jobManagerRunner.shutdown(); + } + + jobManagerRunners.clear(); + + try { + submittedJobGraphStore.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + super.shutDown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly terminate the Dispatcher.", exception); + } + } + + //------------------------------------------------------ + // RPCs + //------------------------------------------------------ + + @RpcMethod + public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException { + final JobID jobId = jobGraph.getJobID(); + + log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + + final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus; + + try { + jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); + } catch (IOException e) { + log.warn("Cannot retrieve job status for {}.", jobId, e); + throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e); + } + + if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) { + try { + submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); + } catch (Exception e) { + log.warn("Cannot persist JobGraph.", e); + throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e); + } + + final JobManagerRunner jobManagerRunner; + + try { + jobManagerRunner = createJobManagerRunner( + ResourceID.generate(), + jobGraph, + null, + getRpcService(), + highAvailabilityServices, + blobServer, + heartbeatServices, + metricRegistry, + new DispatcherOnCompleteActions(jobGraph.getJobID()), + fatalErrorHandler); + + jobManagerRunner.start(); + } catch (Exception e) { + try { + // We should only remove a job from the submitted job graph store + // if the initial submission failed. Never in case of a recovery + submittedJobGraphStore.removeJobGraph(jobId); + } catch (Throwable t) { + log.warn("Cannot remove job graph from submitted job graph store.", t); + e.addSuppressed(t); + } + + throw new JobSubmissionException(jobId, "Could not start JobManager.", e); + } + + jobManagerRunners.put(jobId, jobManagerRunner); + + return Acknowledge.get(); + } else { + throw new JobSubmissionException(jobId, "Job has already been submitted and " + + "is currently in state " + jobSchedulingStatus + '.'); + } + } + + @RpcMethod + Collection listJobs() { + // TODO: return proper list of running jobs + return Collections.emptyList(); --- End diff -- could we not already implement this by taking the keyset from the jobManagerRunners map? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---