Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0633200CBD for ; Thu, 6 Jul 2017 12:24:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BEEB0165FDD; Thu, 6 Jul 2017 10:24:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B4CED165FDA for ; Thu, 6 Jul 2017 12:24:05 +0200 (CEST) Received: (qmail 81492 invoked by uid 500); 6 Jul 2017 10:24:04 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 81483 invoked by uid 99); 6 Jul 2017 10:24:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jul 2017 10:24:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 782C9C11FB for ; Thu, 6 Jul 2017 10:24:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.211 X-Spam-Level: X-Spam-Status: No, score=-99.211 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id JxBDwUGZdpIX for ; Thu, 6 Jul 2017 10:24:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 5AFAB5F568 for ; Thu, 6 Jul 2017 10:24:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 828F9E0D57 for ; Thu, 6 Jul 2017 10:24:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id CDEF324650 for ; Thu, 6 Jul 2017 10:24:00 +0000 (UTC) Date: Thu, 6 Jul 2017 10:24:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7103) Implement skeletal structure of dispatcher component MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 06 Jul 2017 10:24:06 -0000 [ https://issues.apache.org/jira/browse/FLINK-7103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076312#comment-16076312 ] ASF GitHub Bot commented on FLINK-7103: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4260#discussion_r125863295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobService; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for the Dispatcher component. The Dispatcher component is responsible + * for receiving job submissions, persisting them, spawning JobManagers to execute + * the jobs and to recover them in case of a master failure. Furthermore, it knows + * about the state of the Flink session cluster. + */ +public abstract class Dispatcher extends RpcEndpoint { + + protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + public static final String DISPATCHER_NAME = "dispatcher"; + + private final SubmittedJobGraphStore submittedJobGraphStore; + private final RunningJobsRegistry runningJobsRegistry; + + private final HighAvailabilityServices highAvailabilityServices; + private final BlobServer blobServer; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + + private final FatalErrorHandler fatalErrorHandler; + + private final Map jobManagerRunners; + + protected Dispatcher( + RpcService rpcService, + String endpointId, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) throws Exception { + super(rpcService, endpointId); + + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.blobServer = Preconditions.checkNotNull(blobServer); + this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); + this.metricRegistry = Preconditions.checkNotNull(metricRegistry); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + + this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore(); + this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); + + jobManagerRunners = new HashMap<>(16); + } + + //------------------------------------------------------ + // Lifecycle methods + //------------------------------------------------------ + + @Override + public void shutDown() throws Exception { + Exception exception = null; + // stop all currently running JobManagerRunners + for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { + jobManagerRunner.shutdown(); + } + + jobManagerRunners.clear(); + + try { + submittedJobGraphStore.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + super.shutDown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly terminate the Dispatcher.", exception); + } + } + + //------------------------------------------------------ + // RPCs + //------------------------------------------------------ + + @RpcMethod + public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException { + final JobID jobId = jobGraph.getJobID(); + + log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + + final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus; + + try { + jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); + } catch (IOException e) { + log.warn("Cannot retrieve job status for {}.", jobId, e); + throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e); + } + + if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) { + try { + submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null)); + } catch (Exception e) { + log.warn("Cannot persist JobGraph.", e); + throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e); + } + + final JobManagerRunner jobManagerRunner; + + try { + jobManagerRunner = createJobManagerRunner( + ResourceID.generate(), + jobGraph, + null, + getRpcService(), + highAvailabilityServices, + blobServer, + heartbeatServices, + metricRegistry, + new DispatcherOnCompleteActions(jobGraph.getJobID()), + fatalErrorHandler); + + jobManagerRunner.start(); + } catch (Exception e) { + try { + // We should only remove a job from the submitted job graph store + // if the initial submission failed. Never in case of a recovery + submittedJobGraphStore.removeJobGraph(jobId); + } catch (Throwable t) { + log.warn("Cannot remove job graph from submitted job graph store.", t); + e.addSuppressed(t); + } + + throw new JobSubmissionException(jobId, "Could not start JobManager.", e); + } + + jobManagerRunners.put(jobId, jobManagerRunner); + + return Acknowledge.get(); + } else { + throw new JobSubmissionException(jobId, "Job has already been submitted and " + + "is currently in state " + jobSchedulingStatus + '.'); + } + } + + @RpcMethod + Collection listJobs() { + // TODO: return proper list of running jobs + return Collections.emptyList(); + } + + /** + * Cleans up the job related data from the dispatcher. If cleanupHA is true, then + * the data will also be removed from HA. + * + * @param jobId JobID identifying the job to clean up + * @param cleanupHA True iff HA data shall also be cleaned up + */ + private void removeJob(JobID jobId, boolean cleanupHA) throws Exception { + JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId); + + if (jobManagerRunner != null) { + jobManagerRunner.shutdown(); + } + + if (cleanupHA) { + submittedJobGraphStore.removeJobGraph(jobId); + } + + // TODO: remove job related files from blob server + } + + protected abstract JobManagerRunner createJobManagerRunner( + ResourceID resourceId, + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + BlobService blobService, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + OnCompletionActions onCompleteActions, + FatalErrorHandler fatalErrorHandler) throws Exception; + + //------------------------------------------------------ + // Utility classes + //------------------------------------------------------ + + private class DispatcherOnCompleteActions implements OnCompletionActions { + + private final JobID jobId; + + private DispatcherOnCompleteActions(JobID jobId) { + this.jobId = Preconditions.checkNotNull(jobId); + } + + @Override + public void jobFinished(JobExecutionResult result) { + LOG.info("Job {} finished.", jobId); + + runAsync(new Runnable() { --- End diff -- We could move this runnable into a separate class or a getter method to reduce duplication. > Implement skeletal structure of dispatcher component > ---------------------------------------------------- > > Key: FLINK-7103 > URL: https://issues.apache.org/jira/browse/FLINK-7103 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Labels: flip-6 > > Implement the skeletal structure of the {{Dispatcher}} component. The initial functionality will support job submissions and listing of jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)