apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-60) Iterative processing support
Date Tue, 22 Dec 2015 03:47:46 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15067504#comment-15067504
] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48219663
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -940,17 +942,17 @@ private void updateStreamMappings(PMapping m)
                     PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream,
this,
                       sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT),
slidingWindowCount);
                     StreamMapping.addInput(slidingUnifier, sourceOut, null);
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper,
null, slidingUnifier.outputs.get(0));
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper,
null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                     sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
                   }
                   else {
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper,
null, sourceOut);
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper,
null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                   }
                   oper.inputs.add(input);
                 }
               }
             }
    -      } else {
    +      } else if (sourceMapping != null) {
    --- End diff --
    
    Why is this additional required? sourceMapping is used in `if` condition too where this
check is not being made


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>
> We would like to support iterative processing by introducing cycles in the graph (known
as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B that C connects
to will have a special attribute on how many window IDs ahead the incoming windows should
be treated as, and A and B will be responsible for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have C generate
the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message