flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
Date Thu, 09 Jul 2015 16:11:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14620756#comment-14620756
] 

ASF GitHub Bot commented on FLINK-2292:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/896#discussion_r34272101
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
---
    @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging
{
         if (!isConnected) {
           log.debug(s"Dropping message $message because the TaskManager is currently " +
             "not connected to a JobManager.")
    -    }
    +    } else {
     
    -    // we order the messages by frequency, to make sure the code paths for matching
    -    // are as short as possible
    -    message match {
    +      // we order the messages by frequency, to make sure the code paths for matching
    +      // are as short as possible
    +      message match {
    +
    +        // tell the task about the availability of a new input partition
    +        case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) =>
    +          updateTaskInputPartitions(executionID, List((resultID, partitionInfo)))
    +
    +        // tell the task about the availability of some new input partitions
    +        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
    +          updateTaskInputPartitions(executionID, partitionInfos)
    +
    +        // discards intermediate result partitions of a task execution on this TaskManager
    +        case FailIntermediateResultPartitions(executionID) =>
    +          log.info("Discarding the results produced by task execution " + executionID)
    +          if (network.isAssociated) {
    +            try {
    +              network.getPartitionManager.releasePartitionsProducedBy(executionID)
    +            } catch {
    +              case t: Throwable => killTaskManagerFatal(
    +                "Fatal leak: Unable to release intermediate result partition data", t)
    +            }
    +          }
     
    -      // tell the task about the availability of a new input partition
    -      case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) =>
    -        updateTaskInputPartitions(executionID, List((resultID, partitionInfo)))
    +        // notifies the TaskManager that the state of a task has changed.
    +        // the TaskManager informs the JobManager and cleans up in case the transition
    +        // was into a terminal state, or in case the JobManager cannot be informed of
the
    +        // state transition
     
    -      // tell the task about the availability of some new input partitions
    -      case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
    -        updateTaskInputPartitions(executionID, partitionInfos)
    +        case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
=>
     
    -      // discards intermediate result partitions of a task execution on this TaskManager
    -      case FailIntermediateResultPartitions(executionID) =>
    -        log.info("Discarding the results produced by task execution " + executionID)
    -        if (network.isAssociated) {
    -          try {
    -            network.getPartitionManager.releasePartitionsProducedBy(executionID)
    -          } catch {
    -            case t: Throwable => killTaskManagerFatal(
    -                "Fatal leak: Unable to release intermediate result partition data", t)
    -          }
    -        }
    +          // we receive these from our tasks and forward them to the JobManager
    --- End diff --
    
    Here is a lot of changed code that was seemingly edited without need (has nothing to do
with the accumulators). Since that is pretty sensitive code, I feel very hesitant to commit
these massive edits. What was the reason for these changes in the first place?


> Report accumulators periodically while job is running
> -----------------------------------------------------
>
>                 Key: FLINK-2292
>                 URL: https://issues.apache.org/jira/browse/FLINK-2292
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, TaskManager
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 0.10
>
>
> Accumulators should be sent periodically, as part of the heartbeat that sends metrics.
This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message