apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davidyan74 <...@git.apache.org>
Subject [GitHub] incubator-apex-core pull request: APEXCORE-201
Date Mon, 07 Mar 2016 19:22:01 GMT
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/230#discussion_r55256158
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -919,106 +914,41 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new operator after
partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp -
upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS)
{
    -          LOG.warn("Latency calculation for this operator may not be correct because
upstream end window timestamp is greater than this operator's end window timestamp: {} ({})
> {} ({}). Please verify that the system clocks are in sync in your cluster. You can also
try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to
{}).",
    -                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp,
oper, this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited,
leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    Map<PTOperator, CriticalPathInfo> cache = new HashMap<>();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = findCriticalPathHelper(leafOperator, cache);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap,
Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    +  private CriticalPathInfo findCriticalPathHelper(PTOperator operator, Map<PTOperator,
CriticalPathInfo> cache)
    +  {
    +    if (cache.containsKey(operator)) {
    --- End diff --
    
    @vrozov done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message