Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7ADB8200B9C for ; Mon, 10 Oct 2016 10:43:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 79656160AE1; Mon, 10 Oct 2016 08:43:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 95EF5160ACA for ; Mon, 10 Oct 2016 10:43:29 +0200 (CEST) Received: (qmail 59170 invoked by uid 500); 10 Oct 2016 08:43:28 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 59160 invoked by uid 99); 10 Oct 2016 08:43:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 08:43:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 370ABC27DB for ; Mon, 10 Oct 2016 08:43:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 2nMyo49THuWD for ; Mon, 10 Oct 2016 08:43:26 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C5C915F610 for ; Mon, 10 Oct 2016 08:43:25 +0000 (UTC) Received: (qmail 58731 invoked by uid 99); 10 Oct 2016 08:42:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 08:42:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B02A4DFE65; Mon, 10 Oct 2016 08:42:59 +0000 (UTC) From: tillrohrmann To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #2594: [FLINK-4738] [TaskManager] Port TaskManager logic ... Content-Type: text/plain Message-Id: <20161010084259.B02A4DFE65@git1-us-west.apache.org> Date: Mon, 10 Oct 2016 08:42:59 +0000 (UTC) archived-at: Mon, 10 Oct 2016 08:43:30 -0000 Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2594#discussion_r82563262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -127,12 +195,423 @@ public void start() { } } + /** + * Called to shut down the TaskManager. The method closes all TaskManager services. + */ + @Override + public void shutDown() { + log.info("Stopping TaskManager {}.", getAddress()); + + if (resourceManagerConnection.isConnected()) { + try { + resourceManagerConnection.close(); + } catch (Exception e) { + log.error("Could not cleanly close the ResourceManager connection.", e); + } + } + + try { + ioManager.shutdown(); + } catch (Exception e) { + log.error("IOManager did not shut down properly.", e); + } + + try { + memoryManager.shutdown(); + } catch (Exception e) { + log.error("MemoryManager did not shut down properly.", e); + } + + try { + networkEnvironment.shutdown(); + } catch (Exception e) { + log.error("Network environment did not shut down properly.", e); + } + + try { + fileCache.shutdown(); + } catch (Exception e) { + log.error("File cache did not shut down properly.", e); + } + + try { + metricRegistry.shutdown(); + } catch (Exception e) { + log.error("MetricRegistry did not shut down properly.", e); + } + + log.info("Stopped TaskManager {}.", getAddress()); + } + + // ======================================================================== + // RPC methods + // ======================================================================== + + // ---------------------------------------------------------------------- + // Task lifecycle RPCs + // ---------------------------------------------------------------------- + + @RpcMethod + public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID) throws TaskSubmissionException { + + JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID); + + if (jobManagerConnection == null) { + final String message = "Could not submit task because JobManager " + jobManagerID + + " was not associated."; + + log.debug(message); + throw new TaskSubmissionException(message); + } + + TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID()); + + if (taskSlot == null) { + final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID() + '.'; + log.debug(message); + throw new TaskSubmissionException(message); + } + + TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd); + + InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( + jobManagerConnection.getJobManagerGateway(), + tdd.getJobID(), + tdd.getVertexID(), + tdd.getExecutionId(), + taskManagerConfiguration.getTimeout()); + + TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); + CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); + LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); + PartitionStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); + + Task task = new Task( + tdd, + memoryManager, + ioManager, + networkEnvironment, + broadcastVariableManager, + taskManagerActions, + inputSplitProvider, + checkpointResponder, + libraryCache, + fileCache, + taskManagerRuntimeInfo, + taskMetricGroup, + resultPartitionConsumableNotifier, + partitionStateChecker, + getRpcService().getExecutor()); + + log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); + + if(taskSlot.add(task)) { + TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot); + + taskSlotMappings.put(task.getExecutionId(), taskSlotMapping); + task.startTaskThread(); + + return Acknowledge.get(); + } else { + final String message = "TaskManager already contains a task for id " + + task.getExecutionId() + '.'; + + log.debug(message); + throw new TaskSubmissionException(message); + } + } + + @RpcMethod + public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException { + final Task task = getTask(executionAttemptID); + + if (task != null) { + try { + task.cancelExecution(); + return Acknowledge.get(); + } catch (Throwable t) { + throw new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t); + } + } else { + final String message = "Cannot find task to stop for execution " + executionAttemptID + '.'; + + log.debug(message); + throw new TaskException(message); + } + } + + @RpcMethod + public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException { + final Task task = getTask(executionAttemptID); + + if (task != null) { + try { + task.stopExecution(); + return Acknowledge.get(); + } catch (Throwable t) { + throw new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t); + } + } else { + final String message = "Cannot find task to stop for execution " + executionAttemptID + '.'; + + log.debug(message); + throw new TaskException(message); + } + } + + // ---------------------------------------------------------------------- + // Partition lifecycle RPCs + // ---------------------------------------------------------------------- + + @RpcMethod + public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection partitionInfos) throws PartitionException { + final Task task = getTask(executionAttemptID); + + if (task != null) { + for (final PartitionInfo partitionInfo: partitionInfos) { + IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); + + final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); + + if (singleInputGate != null) { + // Run asynchronously because it might be blocking + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); + } catch (IOException | InterruptedException e) { + log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); + + try { + task.failExternally(e); + } catch (RuntimeException re) { + // TODO: Check whether we need this or make exception in failExtenally checked + log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); + } + } + } + }); + } else { + throw new PartitionException("No reader with ID " + + intermediateResultPartitionID + " for task " + executionAttemptID + + " was found."); + } + } + + return Acknowledge.get(); + } else { + log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID); + return Acknowledge.get(); + } + } + + @RpcMethod + public void failPartition(ExecutionAttemptID executionAttemptID) { + log.info("Discarding the results produced by task execution {}.", executionAttemptID); + + try { + networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID); + } catch (Throwable t) { + // TODO: Do we still need this catch branch? + onFatalError(t); + } + + // TODO: Maybe it's better to return an Acknowledge here to notify the JM about the success/failure with an Exception + } + + // ---------------------------------------------------------------------- + // Checkpointing RPCs + // ---------------------------------------------------------------------- + + @RpcMethod + public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException { + log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); + + final Task task = getTask(executionAttemptID); + + if (task != null) { + task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp); + + return Acknowledge.get(); + } else { + final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; + + log.debug(message); + throw new CheckpointException(message); + } + } + --- End diff -- Good catch. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---