apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-201) Reported latency is wrong when a downstream operator is behind more than 1000 windows
Date Sat, 23 Jan 2016 00:48:39 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15113416#comment-15113416
] 

ASF GitHub Bot commented on APEXCORE-201:
-----------------------------------------

Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50610441
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new operator after
partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp -
upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS)
{
    -          LOG.warn("Latency calculation for this operator may not be correct because
upstream end window timestamp is greater than this operator's end window timestamp: {} ({})
> {} ({}). Please verify that the system clocks are in sync in your cluster. You can also
try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to
{}).",
    -                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp,
oper, this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited,
leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = new CriticalPathInfo();
    +      findCriticalPathHelper(leafOperator, cpi);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    -    }
    -    for (PTOperator.PTInput input : maxOperator.getInputs()) {
    -      if (null != input.source.source) {
    -        operators.add(input.source.source);
    -      }
    +  private void findCriticalPathHelper(PTOperator operator, CriticalPathInfo cpi)
    +  {
    +    cpi.latency += operator.stats.getLatencyMA();
    --- End diff --
    
    Can this be optimized to not visit all the upstream operators if they have been already
visited. 
    E.g.  If I have following dag
    <pre>
              |-->C 
    A--> B -->|-->D
    </pre>
    During finding critical path calculation, A,B are traversed for both leaf nodes C and
D. Can we not store intermediary values for A, B when it is done either from C or D and use
same values for other leaf D or C respectively?


> Reported latency is wrong when a downstream operator is behind more than 1000 windows
> -------------------------------------------------------------------------------------
>
>                 Key: APEXCORE-201
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-201
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: David Yan
>            Assignee: David Yan
>
> We should probably estimate this by reporting the latency using the number of windows
behind when that happens.  Right now it reports a stale latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message