heron-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
Date Fri, 30 Mar 2018 22:50:47 GMT
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178398628
 
 

 ##########
 File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
 ##########
 @@ -15,53 +15,100 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
 import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class SkewDetector extends BaseDetector {
-  private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName());
-  private final BaseSensor sensor;
   private final double skewRatio;
-  private final BaseDetector.SymptomName symptomName;
+  private final String metricName;
+  private final BaseDetector.SymptomType symptomType;
 
   @Inject
-  SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) {
-    this.sensor = sensor;
+  SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType
+      symptomType) {
     this.skewRatio = skewRatio;
-    this.symptomName = symptom;
+    this.metricName = metricName.text();
+    this.symptomType = symptomType;
   }
 
   /**
-   * Detects components experiencing data skew, instances with vastly different execute counts.
+   * Detects components experiencing skew on a specific metric
    *
-   * @return A collection of affected components
+   * @return At most two symptoms corresponding to each affected component -- one for positive
skew
+   * and one for negative skew
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> metrics = sensor.get();
-    for (ComponentMetrics compMetrics : metrics.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName());
-      if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
-        LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
-            compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax()));
-        result.add(new Symptom(symptomName.text(), compMetrics));
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName);
+    Instant now = context.checkpoint();
+    for (String component : metrics.uniqueComponents()) {
+      Set<String> addresses = new HashSet<>();
+      Set<String> positiveAddresses = new HashSet<>();
+      Set<String> negativeAddresses = new HashSet<>();
+
+      double componentMax = getMaxOfAverage(metrics.component(component));
+      double componentMin = getMinOfAverage(metrics.component(component));
+      if (componentMax > skewRatio * componentMin) {
+        //there is skew
+        addresses.add(component);
+        result.add(new Symptom(symptomType.text(), now, addresses));
+
+        for (String instance : metrics.component(component).uniqueInstances()) {
+          if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+            positiveAddresses.add(instance);
+          }
+          if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+            negativeAddresses.add(instance);
+          }
+        }
+
+        if (!positiveAddresses.isEmpty()) {
+          result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses));
+        }
+        if (!negativeAddresses.isEmpty()) {
+          result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses));
+        }
       }
-    }
 
+    }
     return result;
   }
+
+  @VisibleForTesting
+  double getMaxOfAverage(MeasurementsTable table) {
 
 Review comment:
   why not put the two helper methods in `MeasurementsTable`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message