apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-60) Iterative processing support
Date Tue, 05 Jan 2016 21:04:40 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083782#comment-15083782
] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48895909
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds)
{
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues)
{
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start)
or END_STREAM
    --- End diff --
    
    fixed


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>              Labels: roadmap
>             Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the graph (known
as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B that C connects
to will have a special attribute on how many window IDs ahead the incoming windows should
be treated as, and A and B will be responsible for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have C generate
the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message