heron-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ashvina closed pull request #2821: Update Dhalion dependency version
Date Mon, 09 Apr 2018 18:09:42 GMT
ashvina closed pull request #2821: Update Dhalion dependency version
URL: https://github.com/apache/incubator-heron/pull/2821
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/WORKSPACE b/WORKSPACE
index 3a59293828..80059f0c7b 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -458,7 +458,7 @@ maven_jar(
 
 maven_jar(
   name = "com_microsoft_dhalion",
-  artifact = "com.microsoft.dhalion:dhalion:0.0.1_2",
+  artifact = "com.microsoft.dhalion:dhalion:0.2.1",
 )
 
 maven_jar(
@@ -466,6 +466,21 @@ maven_jar(
   artifact = "org.apache.commons:commons-math3:3.6.1"
 )
 
+maven_jar(
+  name = "tech_tablesaw",
+  artifact = "tech.tablesaw:tablesaw-core:0.11.4"
+)
+
+maven_jar(
+  name = "it_unimi_dsi_fastutil",
+  artifact = "it.unimi.dsi:fastutil:8.1.1"
+)
+
+maven_jar(
+  name = "org_roaringbitmap",
+  artifact = "org.roaringbitmap:RoaringBitmap:0.6.51"
+)
+
 # Google Cloud
 maven_jar(
   name = "google_api_services_storage",
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 04db9b817f..3f36e5d743 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -498,7 +498,7 @@ def _get_healthmgr_cmd(self):
                      "--cluster", self.cluster,
                      "--role", self.role,
                      "--environment", self.environment,
-                     "--topology_name", self.topology_name, "--verbose"]
+                     "--topology_name", self.topology_name]
 
     return healthmgr_cmd
 
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index f6155c05d9..5b661e7f04 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -131,7 +131,7 @@ def get_expected_healthmgr_command():
              "-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \
              "-cp scheduler_classpath:healthmgr_classpath " \
              "com.twitter.heron.healthmgr.HealthManager --cluster cluster --role role " \
-             "--environment environ --topology_name topname --verbose"
+             "--environment environ --topology_name topname"
 
   def get_expected_instance_command(component_name, instance_id, container_id):
     instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
diff --git a/heron/healthmgr/src/java/BUILD b/heron/healthmgr/src/java/BUILD
index 2587c8f4ea..c081a081e8 100644
--- a/heron/healthmgr/src/java/BUILD
+++ b/heron/healthmgr/src/java/BUILD
@@ -42,6 +42,9 @@ healthmgr_deps_files = [
     "@com_microsoft_dhalion//jar",
     "@aopalliance_aopalliance//jar",
     "@org_apache_commons_commons_math3//jar",
+    "@tech_tablesaw//jar",
+    "@it_unimi_dsi_fastutil//jar",
+    "@org_roaringbitmap//jar",
 ]
 
 filegroup(
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
deleted file mode 100644
index 6a85188c50..0000000000
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-
-import com.twitter.heron.healthmgr.sensors.BaseSensor;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-
-
-/**
- * A helper class to compute and hold metrics derived from component metrics
- */
-public class ComponentMetricsHelper {
-  private final ComponentMetrics componentMetrics;
-
-  private List<InstanceMetrics> boltsWithBackpressure = new ArrayList<>();
-  private double maxBufferChangeRate = 0;
-  private double totalBackpressure = 0;
-
-  public ComponentMetricsHelper(ComponentMetrics compMetrics) {
-    this.componentMetrics = compMetrics;
-  }
-
-  public void computeBpStats() {
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      Double bpValue = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (bpValue != null && bpValue > 0) {
-        boltsWithBackpressure.add(instanceMetrics);
-        totalBackpressure += bpValue;
-      }
-    }
-  }
-
-  public void computeBufferSizeTrend() {
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      Map<Instant, Double> bufferMetrics
-          = instanceMetrics.getMetrics().get(METRIC_BUFFER_SIZE.text());
-      if (bufferMetrics == null || bufferMetrics.size() < 3) {
-        // missing of insufficient data for creating a trend line
-        continue;
-      }
-
-      SimpleRegression simpleRegression = new SimpleRegression(true);
-      for (Instant timestamp : bufferMetrics.keySet()) {
-        simpleRegression.addData(timestamp.getEpochSecond(), bufferMetrics.get(timestamp));
-      }
-
-      double slope = simpleRegression.getSlope();
-      instanceMetrics.addMetric(METRIC_WAIT_Q_GROWTH_RATE.text(), slope);
-
-      if (maxBufferChangeRate < slope) {
-        maxBufferChangeRate = slope;
-      }
-    }
-  }
-
-  public MetricsStats computeMinMaxStats(BaseSensor.MetricName metric) {
-    return computeMinMaxStats(metric.text());
-  }
-
-  public MetricsStats computeMinMaxStats(String metric) {
-    double metricMax = 0;
-    double metricMin = Double.MAX_VALUE;
-    for (InstanceMetrics instance : componentMetrics.getMetrics().values()) {
-
-      Double metricValue = instance.getMetricValueSum(metric);
-      if (metricValue == null) {
-        continue;
-      }
-      metricMax = metricMax < metricValue ? metricValue : metricMax;
-      metricMin = metricMin > metricValue ? metricValue : metricMin;
-    }
-    return new MetricsStats(metricMin, metricMax);
-  }
-
-  public double getTotalBackpressure() {
-    return totalBackpressure;
-  }
-
-  public double getMaxBufferChangeRate() {
-    return maxBufferChangeRate;
-  }
-
-  public List<InstanceMetrics> getBoltsWithBackpressure() {
-    return boltsWithBackpressure;
-  }
-}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
index 5a207eef68..7c81b5b010 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
@@ -14,7 +14,10 @@
 
 package com.twitter.heron.healthmgr.common;
 
-import com.microsoft.dhalion.resolver.Action;
+import java.time.Instant;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Action;
 
 public class HealthManagerEvents {
 
@@ -22,8 +25,8 @@
    * This event is created when a resolver executes topology update action
    */
   public static class TopologyUpdate extends Action {
-    public TopologyUpdate() {
-      super(TopologyUpdate.class.getSimpleName());
+    public TopologyUpdate(Instant timestamp, Collection<String> assignments) {
+      super(TopologyUpdate.class.getSimpleName(), timestamp, assignments, null);
     }
   }
 
@@ -31,8 +34,8 @@ public TopologyUpdate() {
    * This event is created when a resolver executes restart container action
    */
   public static class ContainerRestart extends Action {
-    public ContainerRestart() {
-      super(ContainerRestart.class.getSimpleName());
+    public ContainerRestart(Instant timestamp, Collection<String> assignments) {
+      super(ContainerRestart.class.getSimpleName(), timestamp, assignments, null);
     }
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
index b23bc2a212..12f12f08ac 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -54,7 +54,7 @@ public PhysicalPlanProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
       @Override
       public synchronized void onEvent(TopologyUpdate event) {
         LOG.info(
-            "Received topology update event, invalidating cached PhysicalPlan: " + event.getName());
+            "Received topology update event, invalidating cached PhysicalPlan: " + event.type());
         physicalPlan = null;
       }
     });
@@ -64,8 +64,8 @@ public synchronized void onEvent(TopologyUpdate event) {
        */
       @Override
       public synchronized void onEvent(ContainerRestart event) {
-        LOG.info("Received conatiner restart event, invalidating cached PhysicalPlan: "
-            + event.getName());
+        LOG.info("Received container restart event, invalidating cached PhysicalPlan: "
+            + event.type());
         physicalPlan = null;
       }
     });
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
index fd3b50bc9b..74845a8572 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
@@ -15,34 +15,33 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
-public class BackPressureDetector implements IDetector {
-  public static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
+public class BackPressureDetector extends BaseDetector {
+  static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
 
   private static final Logger LOG = Logger.getLogger(BackPressureDetector.class.getName());
-  private final BackPressureSensor bpSensor;
   private final int noiseFilterMillis;
 
   @Inject
-  BackPressureDetector(BackPressureSensor bpSensor,
-                       HealthPolicyConfig policyConfig) {
-    this.bpSensor = bpSensor;
+  BackPressureDetector(HealthPolicyConfig policyConfig) {
     noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
   }
 
@@ -50,23 +49,33 @@
    * Detects all components initiating backpressure above the configured limit. Normally there
    * will be only one component
    *
-   * @return A collection of all components causing backpressure.
+   * @return A collection of symptoms each one corresponding to a components with backpressure.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
-    Map<String, ComponentMetrics> backpressureMetrics = bpSensor.get();
-    for (ComponentMetrics compMetrics : backpressureMetrics.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      compStats.computeBpStats();
-      if (compStats.getTotalBackpressure() > noiseFilterMillis) {
-        LOG.info(String.format("Detected back pressure for %s, total back pressure is %f",
-            compMetrics.getName(), compStats.getTotalBackpressure()));
-        result.add(new Symptom(SYMPTOM_BACK_PRESSURE.text(), compMetrics));
+    MeasurementsTable bpMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_BACK_PRESSURE.text());
+    for (String component : bpMetrics.uniqueComponents()) {
+      double compBackPressure = bpMetrics.component(component).sum();
+      if (compBackPressure > noiseFilterMillis) {
+        LOG.info(String.format("Detected component back-pressure for %s, total back pressure is %f",
+            component, compBackPressure));
+        List<String> addresses = Collections.singletonList(component);
+        result.add(new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, addresses));
+      }
+    }
+    for (String instance : bpMetrics.uniqueInstances()) {
+      double totalBP = bpMetrics.instance(instance).sum();
+      if (totalBP > noiseFilterMillis) {
+        LOG.info(String.format("Detected instance back-pressure for %s, total back pressure is %f",
+            instance, totalBP));
+        List<String> addresses = Collections.singletonList(instance);
+        result.add(new Symptom(SYMPTOM_INSTANCE_BACK_PRESSURE.text(), now, addresses));
       }
     }
-
     return result;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
index 9af1ef7d38..334857e54d 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
@@ -15,18 +15,22 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import com.microsoft.dhalion.api.IDetector;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 public abstract class BaseDetector implements IDetector {
-  public enum SymptomName {
-    SYMPTOM_BACK_PRESSURE(BackPressureDetector.class.getSimpleName()),
+  protected ExecutionContext context;
+
+  public enum SymptomType {
+    SYMPTOM_COMP_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Component"),
+    SYMPTOM_INSTANCE_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Instance"),
     SYMPTOM_GROWING_WAIT_Q(GrowingWaitQueueDetector.class.getSimpleName()),
     SYMPTOM_LARGE_WAIT_Q(LargeWaitQueueDetector.class.getSimpleName()),
     SYMPTOM_PROCESSING_RATE_SKEW(ProcessingRateSkewDetector.class.getSimpleName()),
-    SYMPTOM_WAIT_Q_DISPARITY(WaitQueueDisparityDetector.class.getSimpleName());
+    SYMPTOM_WAIT_Q_SIZE_SKEW(WaitQueueSkewDetector.class.getSimpleName());
 
     private String text;
 
-    SymptomName(String name) {
+    SymptomType(String name) {
       this.text = name;
     }
 
@@ -39,4 +43,9 @@ public String toString() {
       return text();
     }
   }
+
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
index 0cf305e64f..855cd12417 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
@@ -16,34 +16,33 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 
-public class GrowingWaitQueueDetector implements IDetector {
-  static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
+public class GrowingWaitQueueDetector extends BaseDetector {
+  static final String CONF_LIMIT
+      = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
 
   private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final double rateLimit;
 
   @Inject
-  GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-                           HealthPolicyConfig policyConfig) {
-    this.pendingBufferSensor = pendingBufferSensor;
+  GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) {
     rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0);
   }
 
@@ -51,23 +50,53 @@
    * Detects all components unable to keep up with input load, hence having a growing pending buffer
    * or wait queue
    *
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to a components executing slower
+   * than input rate.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
-    for (ComponentMetrics compMetrics : bufferSizes.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      compStats.computeBufferSizeTrend();
-      if (compStats.getMaxBufferChangeRate() > rateLimit) {
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable waitQueueMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+
+    for (String component : waitQueueMetrics.uniqueComponents()) {
+      double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component));
+      if (maxSlope > rateLimit) {
         LOG.info(String.format("Detected growing wait queues for %s, max rate %f",
-            compMetrics.getName(), compStats.getMaxBufferChangeRate()));
-        result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics));
+            component, maxSlope));
+        Collection<String> addresses = Collections.singletonList(component);
+        result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses));
       }
     }
 
     return result;
   }
+
+
+  private double computeWaitQueueSizeTrend(MeasurementsTable metrics) {
+    double maxSlope = 0;
+    for (String instance : metrics.uniqueInstances()) {
+
+      if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) {
+        // missing of insufficient data for creating a trend line
+        continue;
+      }
+
+      Collection<Measurement> measurements
+          = metrics.instance(instance).sort(false, MeasurementsTable.SortKey.TIME_STAMP).get();
+      SimpleRegression simpleRegression = new SimpleRegression(true);
+
+      for (Measurement m : measurements) {
+        simpleRegression.addData(m.instant().getEpochSecond(), m.value());
+      }
+
+      double slope = simpleRegression.getSlope();
+
+      if (maxSlope < slope) {
+        maxSlope = slope;
+      }
+    }
+    return maxSlope;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
index 9a720e0aea..bd5a93ccf2 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
@@ -16,59 +16,65 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_LARGE_WAIT_Q;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_LARGE_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
-public class LargeWaitQueueDetector implements IDetector {
+public class LargeWaitQueueDetector extends BaseDetector {
   static final String CONF_SIZE_LIMIT = "LargeWaitQueueDetector.limit";
 
   private static final Logger LOG = Logger.getLogger(LargeWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final int sizeLimit;
 
   @Inject
-  LargeWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-                         HealthPolicyConfig policyConfig) {
-    this.pendingBufferSensor = pendingBufferSensor;
+  LargeWaitQueueDetector(HealthPolicyConfig policyConfig) {
     sizeLimit = (int) policyConfig.getConfig(CONF_SIZE_LIMIT, 1000);
   }
 
   /**
-   * Detects all components unable to keep up with input load, hence having a large pending buffer
-   * or wait queue
+   * Detects all components having a large pending buffer or wait queue
    *
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to components with
+   * large wait queues.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
-    for (ComponentMetrics compMetrics : bufferSizes.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      MetricsStats stats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE.text());
-      if (stats.getMetricMin() > sizeLimit) {
-        LOG.info(String.format("Detected large wait queues for %s, smallest queue is %f",
-            compMetrics.getName(), stats.getMetricMin()));
-        result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), compMetrics));
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable waitQueueMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+    for (String component : waitQueueMetrics.uniqueComponents()) {
+      Set<String> addresses = new HashSet<>();
+      MeasurementsTable instanceMetrics = waitQueueMetrics.component(component);
+      for (String instance : instanceMetrics.uniqueInstances()) {
+        double avgWaitQSize = instanceMetrics.instance(instance).mean();
+        if (avgWaitQSize > sizeLimit) {
+          LOG.info(String.format("Detected large wait queues for instance"
+              + "%s, smallest queue is + %f", instance, avgWaitQSize));
+          addresses.add(instance);
+        }
+      }
+      if (addresses.size() > 0) {
+        result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), context.checkpoint(), addresses));
       }
     }
 
     return result;
   }
 }
+
+
+
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
index 78c0ca7af7..f32c7be81c 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
@@ -18,16 +18,15 @@
 import javax.inject.Inject;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class ProcessingRateSkewDetector extends SkewDetector {
   public static final String CONF_SKEW_RATIO = "ProcessingRateSkewDetector.skewRatio";
 
   @Inject
-  ProcessingRateSkewDetector(ExecuteCountSensor exeCountSensor,
-                             HealthPolicyConfig policyConfig) {
-    super(exeCountSensor,
-        (double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
-        BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW);
+  ProcessingRateSkewDetector(HealthPolicyConfig policyConfig) {
+    super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
+        BaseSensor.MetricName.METRIC_EXE_COUNT,
+        BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
index fa26c320da..473435df7d 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
@@ -15,53 +15,100 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
 import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class SkewDetector extends BaseDetector {
-  private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName());
-  private final BaseSensor sensor;
   private final double skewRatio;
-  private final BaseDetector.SymptomName symptomName;
+  private final String metricName;
+  private final BaseDetector.SymptomType symptomType;
 
   @Inject
-  SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) {
-    this.sensor = sensor;
+  SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType
+      symptomType) {
     this.skewRatio = skewRatio;
-    this.symptomName = symptom;
+    this.metricName = metricName.text();
+    this.symptomType = symptomType;
   }
 
   /**
-   * Detects components experiencing data skew, instances with vastly different execute counts.
+   * Detects components experiencing skew on a specific metric
    *
-   * @return A collection of affected components
+   * @return At most two symptoms corresponding to each affected component -- one for positive skew
+   * and one for negative skew
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> metrics = sensor.get();
-    for (ComponentMetrics compMetrics : metrics.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName());
-      if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
-        LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
-            compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax()));
-        result.add(new Symptom(symptomName.text(), compMetrics));
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName);
+    Instant now = context.checkpoint();
+    for (String component : metrics.uniqueComponents()) {
+      Set<String> addresses = new HashSet<>();
+      Set<String> positiveAddresses = new HashSet<>();
+      Set<String> negativeAddresses = new HashSet<>();
+
+      double componentMax = getMaxOfAverage(metrics.component(component));
+      double componentMin = getMinOfAverage(metrics.component(component));
+      if (componentMax > skewRatio * componentMin) {
+        //there is skew
+        addresses.add(component);
+        result.add(new Symptom(symptomType.text(), now, addresses));
+
+        for (String instance : metrics.component(component).uniqueInstances()) {
+          if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+            positiveAddresses.add(instance);
+          }
+          if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+            negativeAddresses.add(instance);
+          }
+        }
+
+        if (!positiveAddresses.isEmpty()) {
+          result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses));
+        }
+        if (!negativeAddresses.isEmpty()) {
+          result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses));
+        }
       }
-    }
 
+    }
     return result;
   }
+
+  @VisibleForTesting
+  double getMaxOfAverage(MeasurementsTable table) {
+    double max = 0;
+    for (String instance : table.uniqueInstances()) {
+      double instanceMean = table.instance(instance).mean();
+      if (instanceMean > max) {
+        max = instanceMean;
+      }
+    }
+    return max;
+  }
+
+  @VisibleForTesting
+  double getMinOfAverage(MeasurementsTable table) {
+    double min = Double.MAX_VALUE;
+    for (String instance : table.uniqueInstances()) {
+      double instanceMean = table.instance(instance).mean();
+      if (instanceMean < min) {
+        min = instanceMean;
+      }
+    }
+    return min;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
similarity index 59%
rename from heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
rename to heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
index c8e62d284d..133e342368 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
@@ -18,16 +18,15 @@
 import javax.inject.Inject;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
-public class WaitQueueDisparityDetector extends SkewDetector {
-  public static final String CONF_DISPARITY_RATIO = "WaitQueueDisparityDetector.disparityRatio";
+public class WaitQueueSkewDetector extends SkewDetector {
+  static final String CONF_SKEW_RATIO = "WaitQueueSkewDetector.skewRatio";
 
   @Inject
-  WaitQueueDisparityDetector(BufferSizeSensor pendingBufferSensor,
-                             HealthPolicyConfig policyConfig) {
-    super(pendingBufferSensor,
-        (double) policyConfig.getConfig(CONF_DISPARITY_RATIO, 20.0),
-        BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY);
+  WaitQueueSkewDetector(HealthPolicyConfig policyConfig) {
+    super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 20.0),
+        BaseSensor.MetricName.METRIC_WAIT_Q_SIZE,
+        BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
index 107598466c..b56e4887b6 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
@@ -14,26 +14,19 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import com.microsoft.dhalion.api.IDiagnoser;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
 
 public abstract class BaseDiagnoser implements IDiagnoser {
-  public enum DiagnosisName {
-    SYMPTOM_UNDER_PROVISIONING("SYMPTOM_UNDER_PROVISIONING"),
-    SYMPTOM_DATA_SKEW("SYMPTOM_DATA_SKEW"),
-    SYMPTOM_SLOW_INSTANCE("SYMPTOM_SLOW_INSTANCE"),
+  protected ExecutionContext context;
+
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  public enum DiagnosisType {
 
     DIAGNOSIS_UNDER_PROVISIONING(UnderProvisioningDiagnoser.class.getSimpleName()),
     DIAGNOSIS_SLOW_INSTANCE(SlowInstanceDiagnoser.class.getSimpleName()),
@@ -41,7 +34,7 @@
 
     private String text;
 
-    DiagnosisName(String name) {
+    DiagnosisType(String name) {
       this.text = name;
     }
 
@@ -54,37 +47,4 @@ public String toString() {
       return text();
     }
   }
-
-  List<Symptom> getBackPressureSymptoms(List<Symptom> symptoms) {
-    return getFilteredSymptoms(symptoms, SYMPTOM_BACK_PRESSURE);
-  }
-
-  Map<String, ComponentMetrics> getProcessingRateSkewComponents(List<Symptom> symptoms) {
-    return getFilteredComponents(symptoms, SYMPTOM_PROCESSING_RATE_SKEW);
-  }
-
-  Map<String, ComponentMetrics> getWaitQDisparityComponents(List<Symptom> symptoms) {
-    return getFilteredComponents(symptoms, SYMPTOM_WAIT_Q_DISPARITY);
-  }
-
-  private List<Symptom> getFilteredSymptoms(List<Symptom> symptoms, SymptomName type) {
-    List<Symptom> result = new ArrayList<>();
-    for (Symptom symptom : symptoms) {
-      if (symptom.getName().equals(type.text())) {
-        result.add(symptom);
-      }
-    }
-    return result;
-  }
-
-  private Map<String, ComponentMetrics> getFilteredComponents(List<Symptom> symptoms,
-                                                              SymptomName type) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (Symptom symptom : symptoms) {
-      if (symptom.getName().equals(type.text())) {
-        result.putAll(symptom.getComponents());
-      }
-    }
-    return result;
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
index 0082f2d159..564dcfbc07 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
@@ -15,73 +15,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_DATA_SKEW;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class DataSkewDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(DataSkewDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-    if (bpSymptoms.isEmpty() || processingRateSkewComponents.isEmpty()
-        || waitQDisparityComponents.isEmpty()) {
-      // Since there is no back pressure or disparate execute count, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
+
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
 
     // verify data skew, larger queue size and back pressure for the same component exists
-    ComponentMetrics exeCountMetrics = processingRateSkewComponents.get(bpMetrics.getName());
-    ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
-    if (exeCountMetrics == null || pendingBufferMetrics == null) {
-      // no processing rate skew and buffer size skew
-      // for the component with back pressure. This is not a data skew case
-      return null;
+    if (waitQSkew.assignment(bpComponent).size() == 0
+        || processingRateSkew.assignment(bpComponent).size() == 0) {
+      return diagnoses;
     }
 
-    ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics,
-        ComponentMetrics.merge(exeCountMetrics, pendingBufferMetrics));
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-    compStats.computeBpStats();
-    MetricsStats exeStats = compStats.computeMinMaxStats(METRIC_EXE_COUNT);
-    MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+    Collection<String> assignments = new ArrayList<>();
 
-    Symptom resultSymptom = null;
-    for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-      double exeCount = boltMetrics.getMetricValueSum(METRIC_EXE_COUNT.text());
-      double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
-      double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (exeStats.getMetricMax() < 1.10 * exeCount
-          && bufferStats.getMetricMax() < 2 * bufferSize) {
-        LOG.info(String.format("DataSkew: %s back-pressure(%s), high execution count: %s and "
-            + "high buffer size %s", boltMetrics.getName(), bpValue, exeCount, bufferSize));
-        resultSymptom = new Symptom(SYMPTOM_DATA_SKEW.text(), mergedData);
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable measurements = context.measurements()
+        .between(oldest, newest)
+        .component(bpComponent);
+
+    for (String instance : measurements.uniqueInstances()) {
+      MeasurementsTable instanceMeasurements = measurements.instance(instance);
+      double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+      double processingRate = instanceMeasurements.type(METRIC_EXE_COUNT.text()).mean();
+      if ((measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2)
+          && (measurements.type(METRIC_EXE_COUNT.text()).max() < 1.10 * processingRate)) {
+        assignments.add(instance);
+        LOG.info(String.format("DataSkew: %s back-pressure, high execution count: %s and "
+            + "high buffer size %s", instance, processingRate, waitQSize));
       }
     }
 
-    return resultSymptom != null ? new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), resultSymptom) : null;
+    if (assignments.size() > 0) {
+      diagnoses.add(new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), context.checkpoint(), assignments));
+    }
+
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
index af1ae4c8eb..673c3b6ff4 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
@@ -14,69 +14,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class SlowInstanceDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-    if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
-        || !processingRateSkewComponents.isEmpty()) {
-      // Since there is no back pressure or disparate wait count or similar
-      // execution count, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
 
-    // verify wait Q disparity and back pressure for the same component exists
-    ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
-    if (pendingBufferMetrics == null) {
-      // no wait Q disparity for the component with back pressure. There is no slow instance
-      return null;
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+    // verify wait Q disparity, similar processing rates and back pressure for the same component
+    // exist
+    if (waitQSkew.assignment(bpComponent).size() == 0
+        || processingRateSkew.assignment(bpComponent).size() > 0) {
+      // TODO in a short window rate skew could exist
+      return diagnoses;
     }
 
-    ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, pendingBufferMetrics);
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-    compStats.computeBpStats();
-    MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+    Collection<String> assignments = new ArrayList<>();
+
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable measurements = context.measurements()
+        .between(oldest, newest)
+        .component(bpComponent);
 
-    Symptom resultSymptom = null;
-    for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-      double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
-      double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (bufferStats.getMetricMax() < bufferSize * 2) {
-        LOG.info(String.format("SLOW: %s back-pressure(%s) and high buffer size: %s "
+    for (String instance : measurements.uniqueInstances()) {
+      MeasurementsTable instanceMeasurements = measurements.instance(instance);
+      double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+      if (measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2) {
+        assignments.add(instance);
+        LOG.info(String.format("SLOW: %s back-pressure and high buffer size: %s "
                 + "and similar processing rates",
-            boltMetrics.getName(), bpValue, bufferSize));
-        resultSymptom = new Symptom(SYMPTOM_SLOW_INSTANCE.text(), mergedData);
+            instance, waitQSize));
       }
     }
 
-    return resultSymptom != null
-        ? new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), resultSymptom) : null;
+    if (assignments.size() > 0) {
+      Instant now = context.checkpoint();
+      diagnoses.add(new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), now, assignments));
+    }
+
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
index 033002f638..445a188693 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
@@ -15,48 +15,56 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 
 public class UnderProvisioningDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
-
-    if (bpSymptoms.isEmpty() || !processingRateSkewComponents.isEmpty()
-        || !waitQDisparityComponents.isEmpty()) {
-      // Since there is no back pressure or similar processing rates
-      // and buffer sizes, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
 
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(bpMetrics);
-    compStats.computeBpStats();
-    LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure(%s) and similar processing rates "
-            + "and buffer sizes",
-        bpMetrics.getName(), compStats.getTotalBackpressure()));
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
+
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+
+    if (waitQSkew.assignment(bpComponent).size() != 0
+        || processingRateSkew.assignment(bpComponent).size() != 0) {
+      return diagnoses;
+    }
+
+    Collection<String> assignments = Collections.singletonList(bpComponent);
+    LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure and similar processing rates "
+        + "and wait queue sizes", bpComponent));
+
+    diagnoses.add(
+        new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), context.checkpoint(), assignments));
 
+    //TODO verify large wait queue for all instances
 
-    Symptom resultSymptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), bpMetrics);
-    return new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), resultSymptom);
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
index 57e18b9a98..3dc1ef795b 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
@@ -15,36 +15,27 @@
 
 package com.twitter.heron.healthmgr.policy;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 import java.util.logging.Logger;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
 import com.microsoft.dhalion.events.EventHandler;
 import com.microsoft.dhalion.events.EventManager;
 import com.microsoft.dhalion.policy.HealthPolicyImpl;
 
-import com.twitter.heron.common.basics.TypeUtils;
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
 import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
-import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
-import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
 import com.twitter.heron.healthmgr.resolvers.RestartContainerResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
 import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
 
 /**
  * This Policy class
  * 1. detector: find out the container that has been in backpressure
- *              state for long time, which we believe the container cannot recover.
+ * state for long time, which we believe the container cannot recover.
  * 2. resolver: try to restart the backpressure container so as to be rescheduled.
  */
 public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
@@ -56,44 +47,29 @@
       Logger.getLogger(AutoRestartBackpressureContainerPolicy.class.getName());
 
   private final HealthPolicyConfig policyConfig;
-  private final RestartContainerResolver restartContainerResolver;
 
   @Inject
-  AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig, EventManager eventManager,
-      BackPressureDetector backPressureDetector,
-      ProcessingRateSkewDetector processingRateSkewDetector,
-      WaitQueueDisparityDetector waitQueueDisparityDetector,
-      SlowInstanceDiagnoser slowInstanceDiagnoser,
-      RestartContainerResolver restartContainerResolver) {
+  AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig,
+                                         EventManager eventManager,
+                                         BackPressureSensor backPressureSensor,
+                                         BackPressureDetector backPressureDetector,
+                                         RestartContainerResolver restartContainerResolver) {
     this.policyConfig = policyConfig;
-    this.restartContainerResolver = restartContainerResolver;
 
-    registerDetectors(backPressureDetector, waitQueueDisparityDetector, processingRateSkewDetector);
-    registerDiagnosers(slowInstanceDiagnoser);
+    registerSensors(backPressureSensor);
+    registerDetectors(backPressureDetector);
+    registerResolvers(restartContainerResolver);
 
-    setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
-        TypeUtils.getInteger(policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
+    setPolicyExecutionInterval(
+        Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
 
     eventManager.addEventListener(ContainerRestart.class, this);
   }
 
-  @Override
-  public IResolver selectResolver(List<Diagnosis> diagnosis) {
-    Map<String, Diagnosis> diagnosisMap =
-        diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
-
-    if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
-      return restartContainerResolver;
-    }
-
-    LOG.warning("Unknown diagnoses. None resolver selected.");
-    return null;
-  }
-
   @Override
   public void onEvent(ContainerRestart event) {
     int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
     LOG.info("Received container restart action event: " + event);
-    setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+    setOneTimeDelay(Duration.ofMillis(interval));
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
index 61c00481e4..b060b40ad1 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
@@ -15,16 +15,15 @@
 
 package com.twitter.heron.healthmgr.policy;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Collection;
 import java.util.logging.Logger;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
 import com.microsoft.dhalion.events.EventHandler;
 import com.microsoft.dhalion.events.EventManager;
 import com.microsoft.dhalion.policy.HealthPolicyImpl;
@@ -34,16 +33,19 @@
 import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
 import com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector;
 import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
+import com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector;
 import com.twitter.heron.healthmgr.diagnosers.DataSkewDiagnoser;
 import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
 import com.twitter.heron.healthmgr.diagnosers.UnderProvisioningDiagnoser;
 import com.twitter.heron.healthmgr.resolvers.ScaleUpResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
+import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
 
 import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 
 public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
     implements EventHandler<TopologyUpdate> {
@@ -59,10 +61,13 @@
   @Inject
   DynamicResourceAllocationPolicy(HealthPolicyConfig policyConfig,
                                   EventManager eventManager,
+                                  BackPressureSensor backPressureSensor,
+                                  BufferSizeSensor bufferSizeSensor,
+                                  ExecuteCountSensor executeCountSensor,
                                   BackPressureDetector backPressureDetector,
                                   LargeWaitQueueDetector largeWaitQueueDetector,
                                   ProcessingRateSkewDetector dataSkewDetector,
-                                  WaitQueueDisparityDetector waitQueueDisparityDetector,
+                                  WaitQueueSkewDetector waitQueueSkewDetector,
                                   UnderProvisioningDiagnoser underProvisioningDiagnoser,
                                   DataSkewDiagnoser dataSkewDiagnoser,
                                   SlowInstanceDiagnoser slowInstanceDiagnoser,
@@ -70,26 +75,27 @@
     this.policyConfig = policyConfig;
     this.scaleUpResolver = scaleUpResolver;
 
+    registerSensors(backPressureSensor, bufferSizeSensor, executeCountSensor);
     registerDetectors(backPressureDetector, largeWaitQueueDetector,
-        waitQueueDisparityDetector, dataSkewDetector);
+        waitQueueSkewDetector, dataSkewDetector);
     registerDiagnosers(underProvisioningDiagnoser, dataSkewDiagnoser, slowInstanceDiagnoser);
+    registerResolvers(scaleUpResolver);
 
-    setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
-        (int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000));
+    setPolicyExecutionInterval(
+        Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
 
     eventManager.addEventListener(TopologyUpdate.class, this);
   }
 
   @Override
-  public IResolver selectResolver(List<Diagnosis> diagnosis) {
-    Map<String, Diagnosis> diagnosisMap
-        = diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
+  public IResolver selectResolver(Collection<Diagnosis> diagnosis) {
+    DiagnosisTable diagnosisTable = DiagnosisTable.of(diagnosis);
 
-    if (diagnosisMap.containsKey(DIAGNOSIS_DATA_SKEW.text())) {
+    if (diagnosisTable.type(DIAGNOSIS_DATA_SKEW.text()).size() > 0) {
       LOG.warning("Data Skew diagnoses. This diagnosis does not have any resolver.");
-    } else if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
+    } else if (diagnosisTable.type(DIAGNOSIS_SLOW_INSTANCE.text()).size() > 0) {
       LOG.warning("Slow Instance diagnoses. This diagnosis does not have any resolver.");
-    } else if (diagnosisMap.containsKey(DIAGNOSIS_UNDER_PROVISIONING.text())) {
+    } else if (diagnosisTable.type(DIAGNOSIS_UNDER_PROVISIONING.text()).size() > 0) {
       return scaleUpResolver;
     }
 
@@ -100,6 +106,6 @@ public IResolver selectResolver(List<Diagnosis> diagnosis) {
   public void onEvent(TopologyUpdate event) {
     int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
     LOG.info("Received topology update action event: " + event);
-    setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+    setOneTimeDelay(Duration.ofMillis(interval));
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
index b7b7a316d3..7bc7833093 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
@@ -13,7 +13,10 @@
 // limitations under the License.
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -21,89 +24,85 @@
 import javax.inject.Named;
 
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.SymptomsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
-import com.twitter.heron.healthmgr.common.PhysicalPlanProvider;
 import com.twitter.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
 import com.twitter.heron.scheduler.client.ISchedulerClient;
 
 import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
-import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
 
 public class RestartContainerResolver implements IResolver {
   private static final Logger LOG = Logger.getLogger(RestartContainerResolver.class.getName());
 
-  private final PhysicalPlanProvider physicalPlanProvider;
   private final EventManager eventManager;
   private final String topologyName;
   private final ISchedulerClient schedulerClient;
-  private final int noiseFilterMillis;
+  private ExecutionContext context;
 
   @Inject
   public RestartContainerResolver(@Named(CONF_TOPOLOGY_NAME) String topologyName,
-      PhysicalPlanProvider physicalPlanProvider, EventManager eventManager,
-      ISchedulerClient schedulerClient, HealthPolicyConfig policyConfig) {
+                                  EventManager eventManager,
+                                  ISchedulerClient schedulerClient) {
     this.topologyName = topologyName;
-    this.physicalPlanProvider = physicalPlanProvider;
     this.eventManager = eventManager;
     this.schedulerClient = schedulerClient;
-    this.noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
   }
 
   @Override
-  public List<Action> resolve(List<Diagnosis> diagnosis) {
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  @Override
+  public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
     List<Action> actions = new ArrayList<>();
 
-    for (Diagnosis diagnoses : diagnosis) {
-      Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_SLOW_INSTANCE.text());
-      if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
-        // nothing to fix as there is no back pressure
-        continue;
-      }
-
-      if (bpSymptom.getComponents().size() > 1) {
-        throw new UnsupportedOperationException("Multiple components with back pressure symptom");
-      }
-
-      // want to know which stmgr has backpressure
-      String stmgrId = null;
-      for (InstanceMetrics im : bpSymptom.getComponent().getMetrics().values()) {
-        if (im.hasMetricAboveLimit(METRIC_BACK_PRESSURE.text(), noiseFilterMillis)) {
-          String instanceId = im.getName();
-          int fromIndex = instanceId.indexOf('_') + 1;
-          int toIndex = instanceId.indexOf('_', fromIndex);
-          stmgrId = instanceId.substring(fromIndex, toIndex);
-          break;
-        }
-      }
+    // find all back pressure measurements reported in this execution cycle
+    Instant current = context.checkpoint();
+    Instant previous = context.previousCheckpoint();
+    SymptomsTable bpSymptoms = context.symptoms()
+        .type(SYMPTOM_INSTANCE_BACK_PRESSURE.text())
+        .between(previous, current);
+
+    if (bpSymptoms.size() == 0) {
+      LOG.fine("No back-pressure measurements found, ending as there's nothing to fix");
+      return actions;
+    }
+
+    Collection<String> allBpInstances = new HashSet<>();
+    bpSymptoms.get().forEach(symptom -> allBpInstances.addAll(symptom.assignments()));
+
+    LOG.info(String.format("%d instances caused back-pressure", allBpInstances.size()));
+
+    Collection<String> stmgrIds = new HashSet<>();
+    allBpInstances.forEach(instanceId -> {
+      LOG.info("Id of instance causing back-pressure: " + instanceId);
+      int fromIndex = instanceId.indexOf('_') + 1;
+      int toIndex = instanceId.indexOf('_', fromIndex);
+      String stmgrId = instanceId.substring(fromIndex, toIndex);
+      stmgrIds.add(stmgrId);
+    });
+
+    stmgrIds.forEach(stmgrId -> {
       LOG.info("Restarting container: " + stmgrId);
       boolean b = schedulerClient.restartTopology(
           RestartTopologyRequest.newBuilder()
-          .setContainerIndex(Integer.valueOf(stmgrId))
-          .setTopologyName(topologyName)
-          .build());
+              .setContainerIndex(Integer.valueOf(stmgrId))
+              .setTopologyName(topologyName)
+              .build());
       LOG.info("Restarted container result: " + b);
+    });
 
-      ContainerRestart action = new ContainerRestart();
-      LOG.info("Broadcasting container restart event");
-      eventManager.onEvent(action);
-
-      actions.add(action);
-      return actions;
-    }
-
+    LOG.info("Broadcasting container restart event");
+    ContainerRestart action = new ContainerRestart(current, stmgrIds);
+    eventManager.onEvent(action);
+    actions.add(action);
     return actions;
   }
-
-  @Override
-  public void close() {
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
index 4f374898bd..3ed7ef2fc6 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
@@ -13,8 +13,11 @@
 // limitations under the License.
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -24,12 +27,12 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
+import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import com.twitter.heron.api.generated.TopologyAPI.Topology;
 import com.twitter.heron.common.basics.SysUtils;
@@ -46,7 +49,7 @@
 import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
 import com.twitter.heron.spi.utils.ReflectionUtils;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 
@@ -58,6 +61,7 @@
   private ISchedulerClient schedulerClient;
   private EventManager eventManager;
   private Config config;
+  private ExecutionContext context;
 
   @Inject
   public ScaleUpResolver(TopologyProvider topologyProvider,
@@ -73,75 +77,85 @@ public ScaleUpResolver(TopologyProvider topologyProvider,
   }
 
   @Override
-  public List<Action> resolve(List<Diagnosis> diagnosis) {
-    for (Diagnosis diagnoses : diagnosis) {
-      Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_UNDER_PROVISIONING.text());
-      if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
-        // nothing to fix as there is no back pressure
-        continue;
-      }
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
 
-      if (bpSymptom.getComponents().size() > 1) {
-        throw new UnsupportedOperationException("Multiple components with back pressure symptom");
-      }
+  @Override
+  public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
+    List<Action> actions = new ArrayList<>();
 
-      ComponentMetrics bpComponent = bpSymptom.getComponent();
-      int newParallelism = computeScaleUpFactor(bpComponent);
-      Map<String, Integer> changeRequest = new HashMap<>();
-      changeRequest.put(bpComponent.getName(), newParallelism);
+    DiagnosisTable table = DiagnosisTable.of(diagnosis);
+    table = table.type(DIAGNOSIS_UNDER_PROVISIONING.text());
 
-      PackingPlan currentPackingPlan = packingPlanProvider.get();
-      PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
-      if (newPlan == null) {
-        return null;
-      }
+    if (table.size() == 0) {
+      LOG.fine("No under-previsioning diagnosis present, ending as there's nothing to fix");
+      return actions;
+    }
 
-      Scheduler.UpdateTopologyRequest updateTopologyRequest =
-          Scheduler.UpdateTopologyRequest.newBuilder()
-              .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
-              .setProposedPackingPlan(getSerializedPlan(newPlan))
-              .build();
+    // Scale the first assigned component
+    Diagnosis diagnoses = table.first();
+    // verify diagnoses instance is valid
+    if (diagnoses.assignments().isEmpty()) {
+      LOG.warning(String.format("Diagnosis %s is missing assignments", diagnoses.id()));
+      return actions;
+    }
+    String component = diagnoses.assignments().iterator().next();
 
-      LOG.info("Sending Updating topology request: " + updateTopologyRequest);
-      if (!schedulerClient.updateTopology(updateTopologyRequest)) {
-        throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
-            + "updateTopologyRequest=%s", updateTopologyRequest));
-      }
+    int newParallelism = computeScaleUpFactor(component);
+    Map<String, Integer> changeRequest = new HashMap<>();
+    changeRequest.put(component, newParallelism);
 
-      TopologyUpdate action = new TopologyUpdate();
-      LOG.info("Broadcasting topology update event");
-      eventManager.onEvent(action);
+    PackingPlan currentPackingPlan = packingPlanProvider.get();
+    PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
+    if (newPlan == null) {
+      return null;
+    }
 
-      LOG.info("Scheduler updated topology successfully.");
+    Scheduler.UpdateTopologyRequest updateTopologyRequest =
+        Scheduler.UpdateTopologyRequest.newBuilder()
+            .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
+            .setProposedPackingPlan(getSerializedPlan(newPlan))
+            .build();
 
-      List<Action> actions = new ArrayList<>();
-      actions.add(action);
-      return actions;
+    LOG.info("Sending Updating topology request: " + updateTopologyRequest);
+    if (!schedulerClient.updateTopology(updateTopologyRequest)) {
+      throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
+          + "updateTopologyRequest=%s", updateTopologyRequest));
     }
+    LOG.info("Scheduler updated topology successfully.");
+
+    LOG.info("Broadcasting topology update event");
+    TopologyUpdate action
+        = new TopologyUpdate(context.checkpoint(), Collections.singletonList(component));
+    eventManager.onEvent(action);
 
-    return null;
+    actions.add(action);
+    return actions;
   }
 
   @VisibleForTesting
-  int computeScaleUpFactor(ComponentMetrics componentMetrics) {
-    double totalCompBpTime = 0;
-    String compName = componentMetrics.getName();
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      double instanceBp = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      LOG.info(String.format("Instance:%s, bpTime:%.0f", instanceMetrics.getName(), instanceBp));
-      totalCompBpTime += instanceBp;
-    }
-
+  int computeScaleUpFactor(String compName) {
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable table = context.measurements()
+        .component(compName)
+        .type(METRIC_BACK_PRESSURE.text())
+        .between(oldest, newest);
+
+    double totalCompBpTime = table.sum();
     LOG.info(String.format("Component: %s, bpTime: %.0f", compName, totalCompBpTime));
+
     if (totalCompBpTime >= 1000) {
       totalCompBpTime = 999;
+      LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
     }
-    LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
 
     double unusedCapacity = (1.0 * totalCompBpTime) / (1000 - totalCompBpTime);
+
     // scale up fencing: do not scale more than 4 times the current size
     unusedCapacity = unusedCapacity > 4.0 ? 4.0 : unusedCapacity;
-    int parallelism = (int) Math.ceil(componentMetrics.getMetrics().size() * (1 + unusedCapacity));
+    int parallelism = (int) Math.ceil(table.uniqueInstances().size() * (1 + unusedCapacity));
     LOG.info(String.format("Component's, %s, unused capacity is: %.3f. New parallelism: %d",
         compName, unusedCapacity, parallelism));
     return parallelism;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
index 275084f51b..76829cfbde 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
@@ -16,16 +16,15 @@
 package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
@@ -34,8 +33,6 @@
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 public class BackPressureSensor extends BaseSensor {
-  private static final Logger LOG = Logger.getLogger(BackPressureSensor.class.getName());
-
   private final MetricsProvider metricsProvider;
   private final PackingPlanProvider packingPlanProvider;
   private final TopologyProvider topologyProvider;
@@ -51,72 +48,46 @@ public BackPressureSensor(PackingPlanProvider packingPlanProvider,
     this.metricsProvider = metricsProvider;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> get(String... components) {
-    return get();
-  }
-
   /**
    * Computes the average (millis/sec) back-pressure caused by instances in the configured window
    *
-   * @return the average value
+   * @return the average value measurements
    */
-  public Map<String, ComponentMetrics> get() {
-    Map<String, ComponentMetrics> result = new HashMap<>();
+  @Override
+  public Collection<Measurement> fetch() {
+    Collection<Measurement> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
     String[] boltComponents = topologyProvider.getBoltNames();
-    for (String boltComponent : boltComponents) {
-      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+    Duration duration = getDuration();
+    for (String component : boltComponents) {
+      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
 
-      Duration duration = getDuration();
-      Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
-      for (String boltInstanceName : boltInstanceNames) {
-        String metric = getMetricName() + boltInstanceName;
-        Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
-            metric, duration, COMPONENT_STMGR);
+      for (String instance : boltInstanceNames) {
+        String metric = getMetricName() + instance;
 
-        if (stmgrResult.get(COMPONENT_STMGR) == null) {
+        Collection<Measurement> stmgrResult
+            = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+        if (stmgrResult.isEmpty()) {
           continue;
         }
 
-        HashMap<String, InstanceMetrics> streamManagerResult =
-            stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
-        if (streamManagerResult.isEmpty()) {
+        MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+        if (table.size() == 0) {
           continue;
         }
-
-        // since a bolt instance belongs to one stream manager,
-        // for tracker rest api: expect just one metrics manager instance in the result;
-        // for tmaster/metricscache stat interface: expect a list
-        Double valueSum = 0.0;
-        for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
-            it.hasNext();) {
-          InstanceMetrics stmgrInstanceResult = it.next();
-
-          Double val = stmgrInstanceResult.getMetricValueSum(metric);
-          if (val == null) {
-            continue;
-          } else {
-            valueSum += val;
-          }
-        }
-        double averageBp = valueSum / duration.getSeconds();
+        double averageBp = table.type(metric).sum() / duration.getSeconds();
 
         // The maximum value of averageBp should be 1000, i.e. 1000 millis of BP per second. Due to
         // a bug in Heron (Issue: 1753), this value could be higher in some cases. The following
         // check partially corrects the reported BP value
         averageBp = averageBp > 1000 ? 1000 : averageBp;
-        InstanceMetrics boltInstanceMetric
-            = new InstanceMetrics(boltInstanceName, getMetricName(), averageBp);
 
-        instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+        Measurement measurement
+            = new Measurement(component, instance, getMetricName(), now, averageBp);
+        result.add(measurement);
       }
-
-      ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
-      result.put(boltComponent, componentMetrics);
     }
-
     return result;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
index 61e2b0b4b0..18b175a096 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
@@ -15,8 +15,11 @@
 package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.ISensor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
@@ -24,12 +27,13 @@
 public abstract class BaseSensor implements ISensor {
   static final Duration DEFAULT_METRIC_DURATION = Duration.ofSeconds(300);
   static final String COMPONENT_STMGR = "__stmgr__";
+  protected ExecutionContext context;
 
   public enum MetricName {
     METRIC_EXE_COUNT("__execute-count/default"),
     METRIC_BACK_PRESSURE("__time_spent_back_pressure_by_compid/"),
-    METRIC_BUFFER_SIZE("__connection_buffer_by_instanceid/"),
-    METRIC_BUFFER_SIZE_SUFFIX("/bytes"),
+    METRIC_WAIT_Q_SIZE("__connection_buffer_by_instanceid/"),
+    METRIC_WAIT_Q_SIZE_SUFFIX("/bytes"),
     METRIC_WAIT_Q_GROWTH_RATE("METRIC_WAIT_Q_GROWTH_RATE");
 
     private String text;
@@ -78,7 +82,17 @@ private Duration getDurationFromConfig(String prefix) {
     return value;
   }
 
-  public String getMetricName() {
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  @Override
+  public Collection<String> getMetricTypes() {
+    return Collections.singletonList(metricName);
+  }
+
+  String getMetricName() {
     return metricName;
   }
 
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
index 2657e62715..aa81b37f1e 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
@@ -15,24 +15,22 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class BufferSizeSensor extends BaseSensor {
   private final MetricsProvider metricsProvider;
@@ -44,80 +42,46 @@ public BufferSizeSensor(HealthPolicyConfig policyConfig,
                           PackingPlanProvider packingPlanProvider,
                           TopologyProvider topologyProvider,
                           MetricsProvider metricsProvider) {
-    super(policyConfig, METRIC_BUFFER_SIZE.text(), BufferSizeSensor.class.getSimpleName());
+    super(policyConfig, METRIC_WAIT_Q_SIZE.text(), BufferSizeSensor.class.getSimpleName());
     this.packingPlanProvider = packingPlanProvider;
     this.topologyProvider = topologyProvider;
     this.metricsProvider = metricsProvider;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> get() {
-    return get(topologyProvider.getBoltNames());
-  }
-
   /**
    * The buffer size as provided by tracker
    *
-   * @return buffer size
+   * @return buffer size measurements
    */
-  public Map<String, ComponentMetrics> get(String... desiredBoltNames) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-
-    Set<String> boltNameFilter = new HashSet<>();
-    if (desiredBoltNames.length > 0) {
-      boltNameFilter.addAll(Arrays.asList(desiredBoltNames));
-    }
+  @Override
+  public Collection<Measurement> fetch() {
+    Collection<Measurement> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
     String[] boltComponents = topologyProvider.getBoltNames();
-    for (String boltComponent : boltComponents) {
-      if (!boltNameFilter.isEmpty() && !boltNameFilter.contains(boltComponent)) {
-        continue;
-      }
+    Duration duration = getDuration();
 
-      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+    for (String component : boltComponents) {
+      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
+      for (String instance : boltInstanceNames) {
+        String metric = getMetricName() + instance + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
 
-      Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
-      for (String boltInstanceName : boltInstanceNames) {
-        String metric = getMetricName() + boltInstanceName + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
-
-        Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
-            metric,
-            getDuration(),
-            COMPONENT_STMGR);
-
-        if (stmgrResult.get(COMPONENT_STMGR) == null) {
+        Collection<Measurement> stmgrResult
+            = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+        if (stmgrResult.isEmpty()) {
           continue;
         }
 
-        HashMap<String, InstanceMetrics> streamManagerResult =
-            stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
-        if (streamManagerResult.isEmpty()) {
+        MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+        if (table.size() == 0) {
           continue;
         }
+        double totalSize = table.type(metric).sum();
 
-        // since a bolt instance belongs to one stream manager, expect just one metrics
-        // manager instance in the result
-        Double stmgrInstanceResult = 0.0;
-        for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
-            it.hasNext();) {
-          InstanceMetrics iMetrics = it.next();
-          Double val = iMetrics.getMetricValueSum(metric);
-          if (val == null) {
-            continue;
-          } else {
-            stmgrInstanceResult += val;
-          }
-        }
-
-        InstanceMetrics boltInstanceMetric =
-            new InstanceMetrics(boltInstanceName, getMetricName(), stmgrInstanceResult);
-
-        instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+        Measurement measurement
+            = new Measurement(component, instance, getMetricName(), now, totalSize);
+        result.add(measurement);
       }
-
-      ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
-      result.put(boltComponent, componentMetrics);
     }
 
     return result;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
index 47119c11e7..21ee98af99 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
@@ -15,12 +15,15 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Map;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
@@ -30,6 +33,7 @@
 public class ExecuteCountSensor extends BaseSensor {
   private final TopologyProvider topologyProvider;
   private final MetricsProvider metricsProvider;
+  private Instant now;
 
   @Inject
   ExecuteCountSensor(TopologyProvider topologyProvider,
@@ -40,14 +44,10 @@
     this.metricsProvider = metricsProvider;
   }
 
-  public Map<String, ComponentMetrics> get() {
-    String[] boltNames = topologyProvider.getBoltNames();
-    return get(boltNames);
-  }
-
-  public Map<String, ComponentMetrics> get(String... boltNames) {
-    return metricsProvider.getComponentMetrics(getMetricName(),
-        getDuration(),
-        boltNames);
+  @Override
+  public Collection<Measurement> fetch() {
+    List<String> bolts = Arrays.asList(topologyProvider.getBoltNames());
+    now = context.checkpoint();
+    return metricsProvider.getMeasurements(now, getDuration(), getMetricTypes(), bolts);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
index f3e7024426..44728ae9f9 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
@@ -18,8 +18,8 @@
 import java.net.HttpURLConnection;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,8 +29,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import com.twitter.heron.proto.system.Common.StatusCode;
 import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -51,7 +50,6 @@
   private final SchedulerStateManagerAdaptor stateManagerAdaptor;
   private final String topologyName;
 
-  private Clock clock = new Clock();
   private String metricsCacheLocation;
 
   @Inject
@@ -64,35 +62,29 @@ public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdap
   }
 
   @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Instant startTime,
-                                                           Duration duration,
-                                                           String... components) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (String component : components) {
-      TopologyMaster.MetricResponse response =
-          getMetricsFromMetricsCache(metric, component, startTime, duration);
-
-      Map<String, InstanceMetrics> metrics = parse(response, component, metric, startTime);
-      ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
-      result.put(component, componentMetric);
+  public Collection<Measurement> getMeasurements(Instant startTime,
+                                                 Duration duration,
+                                                 Collection<String> metricNames,
+                                                 Collection<String> components) {
+    Collection<Measurement> result = new ArrayList<>();
+    for (String metric : metricNames) {
+      for (String component : components) {
+        TopologyMaster.MetricResponse response =
+            getMetricsFromMetricsCache(metric, component, startTime, duration);
+        Collection<Measurement> measurements = parse(response, component, metric, startTime);
+        LOG.fine(String.format("%d measurements received for %s/%s",
+            measurements.size(), component, metric));
+        result.addAll(measurements);
+      }
     }
     return result;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Duration duration,
-                                                           String... components) {
-    Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
-    return getComponentMetrics(metric, start, duration, components);
-  }
-
   @VisibleForTesting
   @SuppressWarnings("unchecked")
-  Map<String, InstanceMetrics> parse(
+  Collection<Measurement> parse(
       TopologyMaster.MetricResponse response, String component, String metric, Instant startTime) {
-    Map<String, InstanceMetrics> metricsData = new HashMap<>();
+    Collection<Measurement> metricsData = new ArrayList();
 
     if (response == null || !response.getStatus().getStatus().equals(StatusCode.OK)) {
       LOG.info(String.format(
@@ -109,29 +101,32 @@ public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdap
     // convert heron.protobuf.taskMetrics to dhalion.InstanceMetrics
     for (TaskMetric tm : response.getMetricList()) {
       String instanceId = tm.getInstanceId();
-      InstanceMetrics instanceMetrics = new InstanceMetrics(instanceId);
-
       for (IndividualMetric im : tm.getMetricList()) {
         String metricName = im.getName();
-        Map<Instant, Double> values = new HashMap<>();
 
         // case 1
         for (IntervalValue iv : im.getIntervalValuesList()) {
           MetricInterval mi = iv.getInterval();
           String value = iv.getValue();
-          values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
+          Measurement measurement = new Measurement(
+              component,
+              instanceId,
+              metricName,
+              Instant.ofEpochSecond(mi.getStart()),
+              Double.parseDouble(value));
+          metricsData.add(measurement);
         }
         // case 2
         if (im.hasValue()) {
-          values.put(startTime, Double.parseDouble(im.getValue()));
-        }
-
-        if (!values.isEmpty()) {
-          instanceMetrics.addMetric(metricName, values);
+          Measurement measurement = new Measurement(
+              component,
+              instanceId,
+              metricName,
+              startTime,
+              Double.parseDouble(im.getValue()));
+          metricsData.add(measurement);
         }
       }
-
-      metricsData.put(instanceId, instanceMetrics);
     }
 
     return metricsData;
@@ -145,8 +140,8 @@ public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdap
         .setComponentName(component)
         .setExplicitInterval(
             MetricInterval.newBuilder()
-                .setStart(start.getEpochSecond())
-                .setEnd(start.plus(duration).getEpochSecond())
+                .setStart(start.minus(duration).getEpochSecond())
+                .setEnd(start.getEpochSecond())
                 .build())
         .addMetric(metric)
         .build();
@@ -179,11 +174,6 @@ public MetricsCacheMetricsProvider(SchedulerStateManagerAdaptor stateManagerAdap
     }
   }
 
-  @VisibleForTesting
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
   /* returns last known location of metrics cache
    */
   private synchronized String getCacheLocation() {
@@ -200,10 +190,4 @@ private synchronized String getCacheLocation() {
   private synchronized void resetCacheLocation() {
     metricsCacheLocation = null;
   }
-
-  static class Clock {
-    long currentTime() {
-      return System.currentTimeMillis();
-    }
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
index f311842992..8964a67093 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
@@ -17,7 +17,8 @@
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -34,8 +35,7 @@
 import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.JsonPath;
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import net.minidev.json.JSONArray;
 
@@ -49,8 +49,6 @@
   private static final Logger LOG = Logger.getLogger(TrackerMetricsProvider.class.getName());
   private final WebTarget baseTarget;
 
-  private Clock clock = new Clock();
-
   @Inject
   public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
                                 @Named(CONF_TOPOLOGY_NAME) String topologyName,
@@ -68,35 +66,26 @@ public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
   }
 
   @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Instant startTime,
-                                                           Duration duration,
-                                                           String... components) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (String component : components) {
-      String response = getMetricsFromTracker(metric, component, startTime, duration);
-      Map<String, InstanceMetrics> metrics = parse(response, component, metric);
-      ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
-      result.put(component, componentMetric);
+  public Collection<Measurement> getMeasurements(Instant startTime,
+                                                 Duration duration,
+                                                 Collection<String> metricNames,
+                                                 Collection<String> components) {
+    Collection<Measurement> result = new ArrayList<>();
+    for (String metric : metricNames) {
+      for (String component : components) {
+        String response = getMetricsFromTracker(metric, component, startTime, duration);
+        Collection<Measurement> measurements = parse(response, component, metric);
+        LOG.fine(String.format("%d measurements received for %s/%s",
+            measurements.size(), component, metric));
+        result.addAll(measurements);
+      }
     }
     return result;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Duration duration,
-                                                           String... components) {
-    Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
-    return getComponentMetrics(metric, start, duration, components);
-  }
-
   @SuppressWarnings("unchecked")
-  private Map<String, InstanceMetrics> parse(String response, String component, String metric) {
-    Map<String, InstanceMetrics> metricsData = new HashMap<>();
-
-    if (response == null || response.isEmpty()) {
-      return metricsData;
-    }
+  private Collection<Measurement> parse(String response, String component, String metric) {
+    Collection<Measurement> metricsData = new ArrayList();
 
     DocumentContext result = JsonPath.parse(response);
     JSONArray jsonArray = result.read("$.." + metric);
@@ -113,14 +102,15 @@ public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
 
     for (String instanceName : metricsMap.keySet()) {
       Map<String, String> tmpValues = (Map<String, String>) metricsMap.get(instanceName);
-      Map<Instant, Double> values = new HashMap<>();
       for (String timeStamp : tmpValues.keySet()) {
-        values.put(Instant.ofEpochSecond(Long.parseLong(timeStamp)),
+        Measurement measurement = new Measurement(
+            component,
+            instanceName,
+            metric,
+            Instant.ofEpochSecond(Long.parseLong(timeStamp)),
             Double.parseDouble(tmpValues.get(timeStamp)));
+        metricsData.add(measurement);
       }
-      InstanceMetrics instanceMetrics = new InstanceMetrics(instanceName);
-      instanceMetrics.addMetric(metric, values);
-      metricsData.put(instanceName, instanceMetrics);
     }
 
     return metricsData;
@@ -131,23 +121,12 @@ String getMetricsFromTracker(String metric, String component, Instant start, Dur
     WebTarget target = baseTarget
         .queryParam("metricname", metric)
         .queryParam("component", component)
-        .queryParam("starttime", start.getEpochSecond())
-        .queryParam("endtime", start.getEpochSecond() + duration.getSeconds());
+        .queryParam("starttime", start.getEpochSecond() - duration.getSeconds())
+        .queryParam("endtime", start.getEpochSecond());
 
     LOG.log(Level.FINE, "Tracker Query URI: {0}", target.getUri());
 
     Response r = target.request(MediaType.APPLICATION_JSON_TYPE).get();
     return r.readEntity(String.class);
   }
-
-  @VisibleForTesting
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
-  static class Clock {
-    long currentTime() {
-      return System.currentTimeMillis();
-    }
-  }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
deleted file mode 100644
index b75c8039bd..0000000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
-
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
-
-public class TestUtils {
-  public static List<Symptom> createBpSymptomList(int... bpValues) {
-    return createListFromSymptom(createBPSymptom(bpValues));
-  }
-
-  public static Symptom createExeCountSymptom(int... exeCounts) {
-    return createSymptom(SYMPTOM_PROCESSING_RATE_SKEW, METRIC_EXE_COUNT, exeCounts);
-  }
-
-  public static Symptom createWaitQueueDisparitySymptom(int... bufferSizes) {
-    return createSymptom(SYMPTOM_WAIT_Q_DISPARITY, METRIC_BUFFER_SIZE, bufferSizes);
-  }
-
-  private static Symptom createBPSymptom(int... bpValues) {
-    return createSymptom(SYMPTOM_BACK_PRESSURE, METRIC_BACK_PRESSURE, bpValues);
-  }
-
-  private static void addInstanceMetric(ComponentMetrics metrics, int i, double val, String metric) {
-    InstanceMetrics instanceMetric = new InstanceMetrics("container_1_bolt_" + i, metric, val);
-    metrics.addInstanceMetric(instanceMetric);
-  }
-
-  private static Symptom createSymptom(SymptomName symptom, MetricName metric, int... values) {
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    for (int i = 0; i < values.length; i++) {
-      addInstanceMetric(compMetrics, i, values[i], metric.text());
-    }
-    return new Symptom(symptom.text(), compMetrics);
-  }
-
-  private static List<Symptom> createListFromSymptom(Symptom symptom) {
-    List<Symptom> symptoms = new ArrayList<>();
-    symptoms.add(symptom);
-    return symptoms;
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
deleted file mode 100644
index 7df7fce3fa..0000000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-import static org.junit.Assert.assertEquals;
-
-public class ComponentMetricsHelperTest {
-
-  @Test
-  public void detectsMultipleCompIncreasingBuffer() {
-    ComponentMetrics compMetrics;
-    InstanceMetrics instanceMetrics;
-    Map<Instant, Double> bufferSizes;
-
-    compMetrics = new ComponentMetrics("bolt");
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892210), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 600.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 900.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 1200.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    instanceMetrics = new InstanceMetrics("i2");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 180.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 360.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 540.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    ComponentMetricsHelper helper = new ComponentMetricsHelper(compMetrics);
-    helper.computeBufferSizeTrend();
-    assertEquals(5, helper.getMaxBufferChangeRate(), 0.1);
-
-    HashMap<String, InstanceMetrics> metrics = compMetrics.getMetrics();
-    assertEquals(1, metrics.get("i1").getMetrics().get(METRIC_WAIT_Q_GROWTH_RATE.text()).size());
-    assertEquals(5, metrics.get("i1").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
-    assertEquals(3, metrics.get("i2").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
index 3d45ec97da..231641605a 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
@@ -35,7 +35,7 @@
 import static org.mockito.Mockito.when;
 
 public class PackingPlanProviderTest {
-  String topologyName = "topologyName";
+  private String topologyName = "topologyName";
   private EventManager eventManager = new EventManager();
 
   @Test
@@ -67,7 +67,7 @@ public void refreshesPackingPlanOnUpdate() {
     PackingPlan packing = provider.get();
     Assert.assertEquals(1, packing.getContainers().size());
 
-    provider.onEvent(new TopologyUpdate());
+    provider.onEvent(new TopologyUpdate(null, null));
     provider.get();
     verify(adaptor, times(2)).getPackingPlan(topologyName);
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
index 3b7a538e54..bff793b897 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
@@ -58,7 +58,7 @@ public void refreshesPackingPlanOnUpdate() {
     Assert.assertEquals(2, provider.get().getBoltsCount());
 
     // once fetched it is cached
-    provider.onEvent(new TopologyUpdate());
+    provider.onEvent(new TopologyUpdate(null, null));
     provider.get();
     verify(adaptor, times(2)).getPhysicalPlan(topology);
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
index b5dd2f2fda..ad6c266d35 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
@@ -14,51 +14,82 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
 import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class BackPressureDetectorTest {
+  Instant now;
+
+  @Before
+  public void setup() {
+    now = Instant.now();
+  }
   @Test
   public void testConfigAndFilter() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_NOISE_FILTER, 20)).thenReturn(50);
 
-    ComponentMetrics compMetrics =
-        new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 55);
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
 
-    BackPressureSensor sensor = mock(BackPressureSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 55);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+    Measurement measurement3
+        = new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+
+    BackPressureDetector detector = new BackPressureDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    Assert.assertEquals(2, symptoms.size());
+    SymptomsTable compSymptom = SymptomsTable.of(symptoms).type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    Assert.assertEquals(1,compSymptom.size());
+    Assert.assertEquals(1, compSymptom.get().iterator().next().assignments().size());
 
-    BackPressureDetector detector = new BackPressureDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    SymptomsTable instanceSymptom
+        = SymptomsTable.of(symptoms).type(SYMPTOM_INSTANCE_BACK_PRESSURE.text());
+    Assert.assertEquals(1, instanceSymptom.size());
+    Assert.assertEquals(1, instanceSymptom.get().iterator().next().assignments().size());
 
-    Assert.assertEquals(1, symptoms.size());
+    Symptom symptom = symptoms.iterator().next();
 
-    compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 45);
-    topologyMetrics.put("bolt", compMetrics);
 
-    sensor = mock(BackPressureSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 45);
+    measurement2
+        = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    detector = new BackPressureDetector(sensor, config);
-    symptoms = detector.detect();
+    detector = new BackPressureDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
 
     Assert.assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
index 8508e748b1..26b48bf644 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
@@ -15,21 +15,19 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
 import static com.twitter.heron.healthmgr.detectors.GrowingWaitQueueDetector.CONF_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -40,52 +38,64 @@ public void testDetector() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_LIMIT, 10.0)).thenReturn(5.0);
 
-    ComponentMetrics compMetrics;
-    InstanceMetrics instanceMetrics;
-    Map<Instant, Double> bufferSizes;
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 700.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 1000.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 1300.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    topologyMetrics.put("bolt", compMetrics);
-
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 0.0);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892270), 300.0);
+    Measurement measurement3
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892330), 700.0);
+    Measurement measurement4
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892390), 1000.0);
+    Measurement measurement5
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892450), 1300.0);
+
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+
+    GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
 
     assertEquals(1, symptoms.size());
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 200.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 400.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 600.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 800.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    topologyMetrics.put("bolt", compMetrics);
-
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    detector = new GrowingWaitQueueDetector(sensor, config);
-    symptoms = detector.detect();
+    assertEquals(1, symptoms.iterator().next().assignments().size());
+    
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 0.0);
+    measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892270), 200.0);
+    measurement3
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892330), 400.0);
+    measurement4
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892390), 600.0);
+    measurement5
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892450), 800.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+
+    detector = new GrowingWaitQueueDetector(config);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
index a081c9e402..1e1bb6ab07 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
@@ -14,20 +14,20 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
 import static com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector.CONF_SIZE_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -38,28 +38,40 @@ public void testConfigAndFilter() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SIZE_LIMIT, 1000)).thenReturn(20);
 
-    ComponentMetrics compMetrics
-        = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 21);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 21);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892322), 21);
 
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    LargeWaitQueueDetector detector = new LargeWaitQueueDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    LargeWaitQueueDetector detector = new LargeWaitQueueDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
 
     assertEquals(1, symptoms.size());
+    assertEquals(1, symptoms.iterator().next().assignments().size());
+
 
-    compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 19);
-    topologyMetrics.put("bolt", compMetrics);
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 11);
+    measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892322), 10);
 
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    detector = new LargeWaitQueueDetector(sensor, config);
-    symptoms = detector.detect();
+    detector = new LargeWaitQueueDetector(config);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
index edf811565d..f97dcd2e0d 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
@@ -14,59 +14,111 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
 
 import static com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector.CONF_SKEW_RATIO;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ProcessingRateSkewDetectorTest {
+
   @Test
-  public void testConfigAndFilter() {
+  public void testGetMaxMin() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
 
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892122), 3000);
+    Measurement measurement3
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+    Measurement measurement4
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 400.0);
 
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
 
-    ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
+    MeasurementsTable metricsTable = MeasurementsTable.of(metrics);
 
-    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
 
-    assertEquals(1, symptoms.size());
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
 
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
-    topologyMetrics.put("bolt", compMetrics);
+    assertEquals(2000, (int) detector.getMaxOfAverage(metricsTable));
+    assertEquals(300, (int) detector.getMinOfAverage(metricsTable));
 
-    sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+  }
 
-    detector = new ProcessingRateSkewDetector(sensor, config);
-    symptoms = detector.detect();
+  @Test
+  public void testConfigAndFilter() {
+    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+    when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
+
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(3, symptoms.size());
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 500.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    detector = new ProcessingRateSkewDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
@@ -76,43 +128,60 @@ public void testReturnsMultipleComponents() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
 
-    ComponentMetrics compMetrics1 = new ComponentMetrics("bolt-1");
-    compMetrics1.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics1.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
-    ComponentMetrics compMetrics2 = new ComponentMetrics("bolt-2");
-    compMetrics2.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics2.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
-    ComponentMetrics compMetrics3 = new ComponentMetrics("bolt-3");
-    compMetrics3.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics3.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
-
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt-1", compMetrics1);
-    topologyMetrics.put("bolt-2", compMetrics2);
-    topologyMetrics.put("bolt-3", compMetrics3);
-
-    ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
-
-    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
-
-    assertEquals(2, symptoms.size());
-    for (Symptom symptom : symptoms) {
-      if (symptom.getComponent().getName().equals("bolt-1")) {
-        compMetrics1 = null;
-      } else if (symptom.getComponent().getName().equals("bolt-2")) {
-        compMetrics2 = null;
-      } else if (symptom.getComponent().getName().equals("bolt-3")) {
-        compMetrics3 = null;
-      }
-    }
-
-    assertNull(compMetrics1);
-    assertNull(compMetrics2);
-    assertNotNull(compMetrics3);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+
+    Measurement measurement3
+        = new Measurement("bolt2", "i3", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement4
+        = new Measurement("bolt2", "i4", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+
+    Measurement measurement5
+        = new Measurement("bolt3", "i5", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement6
+        = new Measurement("bolt3", "i6", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 500.0);
+
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+    metrics.add(measurement6);
+
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(6, symptoms.size());
+
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(2, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(2, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i3").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i4").size());
+
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
deleted file mode 100644
index a22e303c1e..0000000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.detectors;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
-
-import static com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector.CONF_DISPARITY_RATIO;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class WaitQueueDisparityDetectorTest {
-  @Test
-  public void testConfigAndFilter() {
-    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
-    when(config.getConfig(CONF_DISPARITY_RATIO, 20.0)).thenReturn(15.0);
-
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1501));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 100));
-
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
-
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_BUFFER_SIZE.text());
-
-    WaitQueueDisparityDetector detector = new WaitQueueDisparityDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
-
-    assertEquals(1, symptoms.size());
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1500));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 110));
-    topologyMetrics.put("bolt", compMetrics);
-
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    detector = new WaitQueueDisparityDetector(sensor, config);
-    symptoms = detector.detect();
-
-    assertEquals(0, symptoms.size());
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
new file mode 100644
index 0000000000..e73d1c69ef
--- /dev/null
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.healthmgr.detectors;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+
+import org.junit.Test;
+
+import com.twitter.heron.healthmgr.HealthPolicyConfig;
+
+import static com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector.CONF_SKEW_RATIO;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WaitQueueSkewDetectorTest {
+  @Test
+  public void testConfigAndFilter() {
+    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+    when(config.getConfig(CONF_SKEW_RATIO, 20.0)).thenReturn(15.0);
+
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 1501);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 100.0);
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    WaitQueueSkewDetector detector = new WaitQueueSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(3, symptoms.size());
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i2").size());
+
+
+     measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 1500);
+     measurement2
+        = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 110.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    detector = new WaitQueueSkewDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
+
+    assertEquals(0, symptoms.size());
+  }
+}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
index 8a1b48162e..3602548dd0 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
@@ -14,62 +14,122 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DataSkewDiagnoserTest {
+  private final String comp = "comp";
+  private Instant now = Instant.now();
+  private Collection<Measurement> measurements = new ArrayList<>();
+  private ExecutionContext context;
+  private IDiagnoser diagnoser;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+    measurements = new ArrayList<>();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new DataSkewDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
   public void failsIfNoDataSkewSymptom() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123);
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosis1DataSkewInstance() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
-
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNotNull(result);
-    assertEquals(DIAGNOSIS_DATA_SKEW.text(), result.getName());
-    assertEquals(1, result.getSymptoms().size());
-    Symptom symptom = result.getSymptoms().values().iterator().next();
-
-    assertEquals(123, symptom.getComponent()
-        .getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text()).intValue());
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_DATA_SKEW.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals("i1", diagnoses.assignments().iterator().next());
+    // TODO
+//    assertEquals(1, diagnoses.symptoms().size());
   }
 
   @Test
   public void diagnosisNoDataSkewLowBufferSize() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1, 500, 500));
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 1, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
 
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosisNoDataSkewLowRate() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(100, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 100, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
+  }
 
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+  private void addMeasurements(MetricName metricExeCount, int i1, int i2, int i3) {
+    measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+    measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+    measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
index 50aab9d166..cb6a3cb06f 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
@@ -14,47 +14,99 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class SlowInstanceDiagnoserTest {
+  private final String comp = "comp";
+  private IDiagnoser diagnoser;
+  private Instant now = Instant.now();
+  private Collection<Measurement> measurements = new ArrayList<>();
+  private ExecutionContext context;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+    measurements = new ArrayList<>();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new SlowInstanceDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
-  public void failsIfNoBufferSizeDiaparity() {
-    SlowInstanceDiagnoser diagnoser = new SlowInstanceDiagnoser();
-    Diagnosis result = diagnoser.diagnose(TestUtils.createBpSymptomList(123));
-    assertNull(result);
+  public void failsIfNoBufferSizeDisparity() {
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosis1of3SlowInstances() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1000, 20, 20));
-
-    Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
-    assertEquals(1, result.getSymptoms().size());
-    ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
-    assertEquals(123,
-        data.getMetricValueSum("container_1_bolt_0",METRIC_BACK_PRESSURE.text())
-            .intValue());
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 1000, 20, 20);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_SLOW_INSTANCE.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals("i1", diagnoses.assignments().iterator().next());
+    // TODO
+//    assertEquals(1, diagnoses.symptoms().size());
   }
 
   @Test
   public void failIfInstanceWithBpHasSmallBuffer() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Symptom exeDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom, exeDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
+  }
 
-    Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
-    assertNull(result);
+  private void addMeasurements(BaseSensor.MetricName metricExeCount, int i1, int i2, int i3) {
+    measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+    measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+    measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
index 0355308dc8..e180ae6075 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
@@ -14,51 +14,83 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class UnderProvisioningDiagnoserTest {
+  private final String comp = "comp";
+  private IDiagnoser diagnoser;
+  private Instant now = Instant.now();
+  private ExecutionContext context;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new UnderProvisioningDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
   public void diagnosisWhen1Of1InstanceInBP() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0);
-    //symptoms.add(TestUtils.createLargeWaitQSymptom(5000));
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
     validateDiagnosis(result);
   }
 
   @Test
   public void diagnosisFailsNotSimilarQueueSizes() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosisFailsNotSimilarProcessingRates() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(100, 500, 500));
+    // TODO BP instance should be same as the one with high processing rate
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
 
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
-  private void validateDiagnosis(Diagnosis result) {
-    assertEquals(1, result.getSymptoms().size());
-    ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
-    assertEquals(123,
-        data.getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text())
-            .intValue());
+  private void validateDiagnosis(Collection<Diagnosis> result) {
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_UNDER_PROVISIONING.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals(comp, diagnoses.assignments().iterator().next());
+    // TODO
+//    Assert.assertEquals(1, result.getSymptoms().size());
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
index 91c634fa52..6bceaf1bf5 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
@@ -14,21 +14,26 @@
 
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.common.utils.topology.TopologyTests;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 import com.twitter.heron.packing.roundrobin.RoundRobinPacking;
@@ -38,9 +43,8 @@
 import com.twitter.heron.spi.common.Key;
 import com.twitter.heron.spi.packing.IRepacking;
 import com.twitter.heron.spi.packing.PackingPlan;
-import com.twitter.heron.common.utils.topology.TopologyTests;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
@@ -67,20 +71,26 @@ public void testResolve() {
     ISchedulerClient scheduler = mock(ISchedulerClient.class);
     when(scheduler.updateTopology(any(UpdateTopologyRequest.class))).thenReturn(true);
 
-    ComponentMetrics metrics
-        = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 123);
-    Symptom symptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), metrics);
-    List<Diagnosis> diagnosis = new ArrayList<>();
-    diagnosis.add(new Diagnosis("test", symptom));
+    Instant now = Instant.now();
+    Collections.singletonList(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 123));
+
+    List<String> assignments = Collections.singletonList("bolt");
+    Diagnosis diagnoses =
+        new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), now, assignments, null);
+    List<Diagnosis> diagnosis = Collections.singletonList(diagnoses);
+
+    ExecutionContext context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
 
     ScaleUpResolver resolver
         = new ScaleUpResolver(null, packingPlanProvider, scheduler, eventManager, null);
+    resolver.initialize(context);
     ScaleUpResolver spyResolver = spy(resolver);
 
-    doReturn(2).when(spyResolver).computeScaleUpFactor(metrics);
+    doReturn(2).when(spyResolver).computeScaleUpFactor("bolt");
     doReturn(currentPlan).when(spyResolver).buildNewPackingPlan(any(HashMap.class), eq(currentPlan));
 
-    List<Action> result = spyResolver.resolve(diagnosis);
+    Collection<Action> result = spyResolver.resolve(diagnosis);
     verify(scheduler, times(1)).updateTopology(any(UpdateTopologyRequest.class));
     assertEquals(1, result.size());
   }
@@ -141,28 +151,35 @@ private TopologyProvider createTopologyProvider(TopologyAPI.Topology topology) {
 
   @Test
   public void testScaleUpFactorComputation() {
-    ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+    Instant now = Instant.now();
+    Collection<Measurement> result = new ArrayList<>();
 
-    ComponentMetrics metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 500));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
+    ExecutionContext context = Mockito.mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    when(context.previousCheckpoint()).thenReturn(now);
 
-    int result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(4, result);
-
-    metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 750));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
-
-    result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(8, result);
-
-    metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 400));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 100));
-    metrics.addInstanceMetric(new InstanceMetrics("i3", METRIC_BACK_PRESSURE.text(), 0));
-
-    result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(6, result);
+    ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+    resolver.initialize(context);
+
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 500));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    int factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(4, factor);
+
+    result.clear();
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 750));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(8, factor);
+
+    result.clear();
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 400));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 100));
+    result.add(new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(6, factor);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
index 75256f01b1..9091175409 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
@@ -14,19 +14,27 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -49,27 +57,44 @@ public void providesBackPressureMetricForBolts() {
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
     for (String boltId : boltIds) {
-      String metric = MetricName.METRIC_BACK_PRESSURE + boltId;
+      String metric = METRIC_BACK_PRESSURE + boltId;
       // the back pressure sensor will return average bp per second, so multiply by duration
-      BufferSizeSensorTest.registerStMgrInstanceMetricResponse(metricsProvider,
+      registerStMgrInstanceMetricResponse(metricsProvider,
           metric,
           boltId.length() * DEFAULT_METRIC_DURATION.getSeconds());
     }
 
+
     BackPressureSensor backPressureSensor =
         new BackPressureSensor(packingPlanProvider, topologyProvider, null, metricsProvider);
 
-    Map<String, ComponentMetrics> componentMetrics = backPressureSensor.get();
-    assertEquals(2, componentMetrics.size());
+    ExecutionContext context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    backPressureSensor.initialize(context);
+
+    Collection<Measurement> componentMetrics = backPressureSensor.fetch();
+    assertEquals(3, componentMetrics.size());
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(1, table.component("bolt-1").size());
+    assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+
+    assertEquals(2, table.component("bolt-2").size());
+    assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+    assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+  }
 
-    assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
-    assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+  static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
+                                                  String metric,
+                                                  long value) {
+    Instant instant = Instant.ofEpochSecond(10);
+    Measurement measurement = new Measurement("__stmgr__", "stmgr-1", metric, instant, value);
+    Collection<Measurement> result = Collections.singletonList(measurement);
 
-    assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
-    assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
-    assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+    when(metricsProvider.getMeasurements(
+        any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(metric), eq("__stmgr__")))
+        .thenReturn(result);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
index 22f62fe47b..b395d9c9fb 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
@@ -14,12 +14,13 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
@@ -27,7 +28,7 @@
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -51,39 +52,31 @@ public void providesBufferSizeMetricForBolts() {
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
     for (String boltId : boltIds) {
-      String metric = MetricName.METRIC_BUFFER_SIZE
-          + boltId + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
-      registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
+      String metric = METRIC_WAIT_Q_SIZE
+          + boltId + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
+      BackPressureSensorTest
+          .registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
     }
 
     BufferSizeSensor bufferSizeSensor =
         new BufferSizeSensor(null, packingPlanProvider, topologyProvider, metricsProvider);
 
-    Map<String, ComponentMetrics> componentMetrics = bufferSizeSensor.get();
-    assertEquals(2, componentMetrics.size());
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    bufferSizeSensor.initialize(context);
 
-    assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
-    assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
+    Collection<Measurement> componentMetrics = bufferSizeSensor.fetch();
+    assertEquals(3, componentMetrics.size());
 
-    assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
-    assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
-    assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
-  }
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(1, table.component("bolt-1").size());
+    assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
 
-  static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
-                                                  String metric,
-                                                  long value) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    ComponentMetrics metrics = new ComponentMetrics("__stmgr__");
-    InstanceMetrics instanceMetrics = new InstanceMetrics("stmgr-1");
-    instanceMetrics.addMetric(metric, value);
-    metrics.addInstanceMetric(instanceMetrics);
-    result.put("__stmgr__", metrics);
-
-    when(metricsProvider.getComponentMetrics(metric, DEFAULT_METRIC_DURATION, "__stmgr__"))
-        .thenReturn(result);
+    assertEquals(2, table.component("bolt-2").size());
+    assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
+    assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
index 2f5a6cc609..88fcc03dfc 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
@@ -14,12 +14,16 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
@@ -28,54 +32,48 @@
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ExecuteCountSensorTest {
   @Test
   public void providesBoltExecutionCountMetrics() {
+    Instant now = Instant.now();
+    String metric = METRIC_EXE_COUNT.text();
     TopologyProvider topologyProvider = mock(TopologyProvider.class);
     when(topologyProvider.getBoltNames()).thenReturn(new String[]{"bolt-1", "bolt-2"});
 
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
-    Map<String, ComponentMetrics> result = new HashMap<>();
+    Collection<Measurement> result = new ArrayList<>();
+    result.add(new Measurement("bolt-1", "container_1_bolt-1_1", metric, now, 123));
+    result.add(new Measurement("bolt-1", "container_1_bolt-1_2", metric, now, 345));
+    result.add(new Measurement("bolt-2", "container_1_bolt-2_3", metric, now, 321));
+    result.add(new Measurement("bolt-2", "container_1_bolt-2_4", metric, now, 543));
 
-    ComponentMetrics metrics = new ComponentMetrics("bolt-1");
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_1", 123));
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_2", 345));
-    result.put("bolt-1", metrics);
-
-    metrics = new ComponentMetrics("bolt-2");
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_3", 321));
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_4", 543));
-    result.put("bolt-2", metrics);
-
-    when(metricsProvider
-        .getComponentMetrics(METRIC_EXE_COUNT.text(), DEFAULT_METRIC_DURATION, "bolt-1", "bolt-2"))
+    Collection<String> comps = Arrays.asList("bolt-1", "bolt-2");
+    when(metricsProvider.getMeasurements(
+        any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(Collections.singletonList(metric)), eq(comps)))
         .thenReturn(result);
 
     ExecuteCountSensor executeCountSensor
         = new ExecuteCountSensor(topologyProvider, null, metricsProvider);
-    Map<String, ComponentMetrics> componentMetrics = executeCountSensor.get();
-    assertEquals(2, componentMetrics.size());
-    assertEquals(123, componentMetrics.get("bolt-1")
-        .getMetrics("container_1_bolt-1_1")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(345, componentMetrics.get("bolt-1")
-        .getMetrics("container_1_bolt-1_2")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(321, componentMetrics.get("bolt-2")
-        .getMetrics("container_1_bolt-2_3")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(543, componentMetrics.get("bolt-2")
-        .getMetrics("container_1_bolt-2_4")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-  }
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    executeCountSensor.initialize(context);
 
-  private InstanceMetrics createTestInstanceMetric(String name, int value) {
-    InstanceMetrics instanceMetrics = new InstanceMetrics(name);
-    instanceMetrics.addMetric(METRIC_EXE_COUNT.text(), value);
-    return instanceMetrics;
+    Collection<Measurement> componentMetrics = executeCountSensor.fetch();
+    assertEquals(4, componentMetrics.size());
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(123, table.component("bolt-1").instance("container_1_bolt-1_1")
+        .type(metric).sum(), 0.01);
+    assertEquals(345, table.component("bolt-1").instance("container_1_bolt-1_2")
+        .type(metric).sum(), 0.01);
+    assertEquals(321, table.component("bolt-2").instance("container_1_bolt-2_3")
+        .type(metric).sum(), 0.01);
+    assertEquals(543, table.component("bolt-2").instance("container_1_bolt-2_4")
+        .type(metric).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
index ea087c39bb..60f7ef6eb4 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
@@ -17,11 +17,16 @@
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+
+import org.junit.Test;
+import org.mockito.Mockito;
 
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
 import com.twitter.heron.proto.system.Common.Status;
 import com.twitter.heron.proto.system.Common.StatusCode;
 import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -32,18 +37,14 @@
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
 import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
 
-import org.junit.Test;
-import org.mockito.Mockito;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 public class MetricsCacheMetricsProviderTest {
   @Test
-  public void provides1Comp2InstanceMetricsFromeMetricsCache() {
+  public void provides1Comp2InstanceMetricsFromMetricsCache() {
     MetricsCacheMetricsProvider spyMetricsProvider = createMetricsProviderSpy();
 
     String metric = "count";
@@ -83,18 +84,20 @@ public void provides1Comp2InstanceMetricsFromeMetricsCache() {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(2, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(104,
-        componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
-    assertEquals(17,
-        componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+    assertEquals(2, table.uniqueInstances().size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(1, table.instance("container_1_bolt_1").size());
+    assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+    assertEquals(3, table.instance("container_1_bolt_2").size());
+    assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
   }
 
   @Test
@@ -146,19 +149,22 @@ public void providesMultipleComponentMetricsFromMetricsCache() {
     doReturn(response2).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
-    assertEquals(2, metrics.size());
-    assertNotNull(metrics.get(comp1));
-    assertEquals(1, metrics.get(comp1).getMetrics().size());
-    assertEquals(104,
-        metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
-    assertNotNull(metrics.get(comp2));
-    assertEquals(1, metrics.get(comp2).getMetrics().size());
-    assertEquals(17,
-        metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            Collections.singletonList(metric),
+            Arrays.asList(comp1, comp2));
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(2, table.uniqueComponents().size());
+    assertEquals(1, table.component(comp1).size());
+    assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+    assertEquals(3, table.component(comp2).size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+    assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
   }
 
   @Test
@@ -182,15 +188,16 @@ public void parsesBackPressureMetric() {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
     assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(1, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(1, table.component(comp).size());
+    assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
   }
 
   @Test
@@ -205,12 +212,13 @@ public void handleMissingData() {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(0, metrics.get(comp).getMetrics().size());
+    assertEquals(0, metrics.size());
   }
 
   private MetricsCacheMetricsProvider createMetricsProviderSpy() {
@@ -228,9 +236,7 @@ private MetricsCacheMetricsProvider createMetricsProviderSpy() {
     MetricsCacheMetricsProvider metricsProvider
         = new MetricsCacheMetricsProvider(stateMgr, "testTopo");
 
-    MetricsCacheMetricsProvider spyMetricsProvider = spy(metricsProvider);
-    spyMetricsProvider.setClock(new TestClock(70000));
-    return spyMetricsProvider;
+    return spy(metricsProvider);
   }
 
   @Test
@@ -274,44 +280,24 @@ public void testGetTimeLineMetrics() {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider
-            .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    ComponentMetrics componentMetrics = metrics.get(comp);
-    assertNotNull(componentMetrics);
-    assertEquals(2, componentMetrics.getMetrics().size());
-
-    InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(1, metricValues.size());
-    assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
-    instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(3, metricValues.size());
-    assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
-    assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
-    assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
-  }
-
-  private class TestClock extends MetricsCacheMetricsProvider.Clock {
-    long timeStamp;
-
-    TestClock(long timeStamp) {
-      this.timeStamp = timeStamp;
-    }
-
-    @Override
-    long currentTime() {
-      return timeStamp;
-    }
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+
+    MeasurementsTable result = table.instance("container_1_bolt_1");
+    assertEquals(1, result.size());
+    assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+    result = table.instance("container_1_bolt_2");
+    assertEquals(3, result.size());
+    assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+    assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+    assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
index 01c71d2c12..2e7625fa74 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
@@ -17,16 +17,16 @@
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
@@ -48,18 +48,21 @@ public void provides1Comp2InstanceMetricsFromTracker() {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(2, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(104,
-        componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
-    assertEquals(17,
-        componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+    assertEquals(2, table.uniqueInstances().size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(1, table.instance("container_1_bolt_1").size());
+    assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+    assertEquals(3, table.instance("container_1_bolt_2").size());
+    assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
   }
 
   @Test
@@ -88,19 +91,22 @@ public void providesMultipleComponentMetricsFromTracker() {
     doReturn(response2).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
-    assertEquals(2, metrics.size());
-    assertNotNull(metrics.get(comp1));
-    assertEquals(1, metrics.get(comp1).getMetrics().size());
-    assertEquals(104,
-        metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
-    assertNotNull(metrics.get(comp2));
-    assertEquals(1, metrics.get(comp2).getMetrics().size());
-    assertEquals(17,
-        metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            Collections.singletonList(metric),
+            Arrays.asList(comp1, comp2));
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(2, table.uniqueComponents().size());
+    assertEquals(1, table.component(comp1).size());
+    assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+    assertEquals(3, table.component(comp2).size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+    assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
   }
 
   @Test
@@ -118,15 +124,17 @@ public void parsesBackPressureMetric() {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(1, metrics.get(comp).getMetrics().size());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+    assertEquals(1, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(1, table.component(comp).size());
+    assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
   }
 
   @Test
@@ -141,12 +149,14 @@ public void handleMissingData() {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(0, metrics.get(comp).getMetrics().size());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(0, metrics.size());
   }
 
   private TrackerMetricsProvider createMetricsProviderSpy() {
@@ -154,7 +164,6 @@ private TrackerMetricsProvider createMetricsProviderSpy() {
         = new TrackerMetricsProvider("127.0.0.1", "topology", "dev", "env");
 
     TrackerMetricsProvider spyMetricsProvider = spy(metricsProvider);
-    spyMetricsProvider.setClock(new TestClock(70000));
     return spyMetricsProvider;
   }
 
@@ -175,44 +184,24 @@ public void testGetTimeLineMetrics() {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider
-            .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    ComponentMetrics componentMetrics = metrics.get(comp);
-    assertNotNull(componentMetrics);
-    assertEquals(2, componentMetrics.getMetrics().size());
-
-    InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(1, metricValues.size());
-    assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
-    instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(3, metricValues.size());
-    assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
-    assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
-    assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
-  }
-
-  private class TestClock extends TrackerMetricsProvider.Clock {
-    long timeStamp;
-
-    TestClock(long timeStamp) {
-      this.timeStamp = timeStamp;
-    }
-
-    @Override
-    long currentTime() {
-      return timeStamp;
-    }
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+
+    MeasurementsTable result = table.instance("container_1_bolt_1");
+    assertEquals(1, result.size());
+    assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+    result = table.instance("container_1_bolt_2");
+    assertEquals(3, result.size());
+    assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+    assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+    assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message