apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-60) Iterative processing support
Date Tue, 12 Jan 2016 04:06:39 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093250#comment-15093250
] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49414079
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis,
this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId)
{
    -          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    It's not working because the recovery checkpoint of the operator where the delay loop
joins can be older than the downstream operators. Therefore, when traversing the loop, upstream
checkpoints needs to be taken into consideration, which is part of the broader solution Pramod
refers to. Looking into this further, would like to clean up the special case handling for
delay operator also.


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>              Labels: roadmap
>             Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the graph (known
as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B that C connects
to will have a special attribute on how many window IDs ahead the incoming windows should
be treated as, and A and B will be responsible for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have C generate
the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message