From dev-return-663-archive-asf-public=cust-asf.ponee.io@heron.incubator.apache.org Fri Apr 6 22:47:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 51EA3180649 for ; Fri, 6 Apr 2018 22:47:15 +0200 (CEST) Received: (qmail 79966 invoked by uid 500); 6 Apr 2018 20:47:14 -0000 Mailing-List: contact dev-help@heron.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@heron.incubator.apache.org Delivered-To: mailing list dev@heron.incubator.apache.org Received: (qmail 79955 invoked by uid 99); 6 Apr 2018 20:47:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Apr 2018 20:47:14 +0000 From: GitBox To: dev@heron.apache.org Subject: [GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version Message-ID: <152304763390.26434.686878874365330654.gitbox@gitbox.apache.org> Date: Fri, 06 Apr 2018 20:47:13 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r179871874 ########## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java ########## @@ -16,58 +16,89 @@ package com.twitter.heron.healthmgr.detectors; import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.logging.Logger; import javax.inject.Inject; -import com.microsoft.dhalion.api.IDetector; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; + +import org.apache.commons.math3.stat.regression.SimpleRegression; import com.twitter.heron.healthmgr.HealthPolicyConfig; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.sensors.BufferSizeSensor; -import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE; -public class GrowingWaitQueueDetector implements IDetector { - static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; +public class GrowingWaitQueueDetector extends BaseDetector { + static final String CONF_LIMIT + = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName()); - private final BufferSizeSensor pendingBufferSensor; private final double rateLimit; @Inject - GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor, - HealthPolicyConfig policyConfig) { - this.pendingBufferSensor = pendingBufferSensor; + GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) { rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0); } /** * Detects all components unable to keep up with input load, hence having a growing pending buffer * or wait queue * - * @return A collection of all components executing slower than input rate. + * @return A collection of symptoms each one corresponding to a components executing slower + * than input rate. */ @Override - public List detect() { - ArrayList result = new ArrayList<>(); - - Map bufferSizes = pendingBufferSensor.get(); - for (ComponentMetrics compMetrics : bufferSizes.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - compStats.computeBufferSizeTrend(); - if (compStats.getMaxBufferChangeRate() > rateLimit) { + public Collection detect(Collection measurements) { + + Collection result = new ArrayList<>(); + + MeasurementsTable waitQueueMetrics = MeasurementsTable.of(measurements).type + (METRIC_WAIT_Q_SIZE.text()); + for (String component : waitQueueMetrics.uniqueComponents()) { + Set addresses = new HashSet<>(); + double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component)); + if (maxSlope > rateLimit) { LOG.info(String.format("Detected growing wait queues for %s, max rate %f", - compMetrics.getName(), compStats.getMaxBufferChangeRate())); - result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics)); + component, maxSlope)); + addresses.add(component); + result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses)); } } - return result; } + + + private double computeWaitQueueSizeTrend(MeasurementsTable metrics) { + double maxSlope = 0; + for (String instance : metrics.uniqueInstances()) { + + if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) { + // missing of insufficient data for creating a trend line + continue; + } + + Collection measurements = metrics.instance(instance).sort(false, + MeasurementsTable.SortKey + .TIME_STAMP).get(); + SimpleRegression simpleRegression = new SimpleRegression(true); + + for (Measurement m : measurements) { + simpleRegression.addData(m.instant().getEpochSecond(), m.value()); Review comment: sgtm ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services