apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gauravgopi123 <...@git.apache.org>
Subject [GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...
Date Sat, 23 Jan 2016 00:48:38 GMT
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50610441
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new operator after
partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp -
upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS)
{
    -          LOG.warn("Latency calculation for this operator may not be correct because
upstream end window timestamp is greater than this operator's end window timestamp: {} ({})
> {} ({}). Please verify that the system clocks are in sync in your cluster. You can also
try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to
{}).",
    -                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp,
oper, this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited,
leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = new CriticalPathInfo();
    +      findCriticalPathHelper(leafOperator, cpi);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    -    }
    -    for (PTOperator.PTInput input : maxOperator.getInputs()) {
    -      if (null != input.source.source) {
    -        operators.add(input.source.source);
    -      }
    +  private void findCriticalPathHelper(PTOperator operator, CriticalPathInfo cpi)
    +  {
    +    cpi.latency += operator.stats.getLatencyMA();
    --- End diff --
    
    Can this be optimized to not visit all the upstream operators if they have been already
visited. 
    E.g.  If I have following dag
    <pre>
              |-->C 
    A--> B -->|-->D
    </pre>
    During finding critical path calculation, A,B are traversed for both leaf nodes C and
D. Can we not store intermediary values for A, B when it is done either from C or D and use
same values for other leaf D or C respectively?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message