brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [1/3] incubator-brooklyn git commit: fix time-sensitive sensor derivation when rebinding to old sensors
Date Tue, 10 Feb 2015 20:09:11 GMT
Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master f2de6ecfb -> 692eb0c87


fix time-sensitive sensor derivation when rebinding to old sensors

before this, autoscaler might scale out on rebind, because the
last total reqs might be 100, if data is v old, current value might be 10000;
a recent timestamp is attached to initial value, meaning it computes 9900 new reqs in a 1s
window.

this forces an invalid timestamp for enrichers, and time-based sensors are stricter
(configurable) about how they see windows and exclude timestamps.

this may change behaviour in some places, particularly where sensors were being fired based
on incomplete or stale data.

better fix would be to store timestamp on sensors themselves


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/87604e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/87604e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/87604e28

Branch: refs/heads/master
Commit: 87604e28cb6f3bb860a051aef579361d4ee7b2ef
Parents: 8c5dc9d
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Mon Feb 2 12:37:55 2015 +0000
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Mon Feb 2 17:31:50 2015 +0000

----------------------------------------------------------------------
 .../basic/AbstractTypeTransformingEnricher.java |  2 +-
 .../brooklyn/enricher/basic/AddingEnricher.java |  2 +-
 .../java/brooklyn/enricher/basic/Combiner.java  |  2 +-
 .../brooklyn/enricher/basic/Transformer.java    |  2 +-
 .../brooklyn/event/basic/BasicSensorEvent.java  |  9 +--
 .../enricher/RollingTimeWindowMeanEnricher.java | 83 ++++++++++++++++----
 .../enricher/TimeWeightedDeltaEnricher.java     |  8 +-
 .../policy/autoscaling/AutoScalerPolicy.java    |  4 +-
 .../brooklyn/enricher/RebindEnricherTest.java   |  3 +
 .../RollingTimeWindowMeanEnricherTest.groovy    | 10 ++-
 10 files changed, 94 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
index 27eac93..ab31ebc 100644
--- a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
+++ b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
@@ -61,7 +61,7 @@ public abstract class AbstractTypeTransformingEnricher<T,U> extends
AbstractEnri
             Object value = producer.getAttribute((AttributeSensor)source);
             // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex)
             if (value!=null)
-                onEvent(new BasicSensorEvent(source, producer, value));
+                onEvent(new BasicSensorEvent(source, producer, value, -1));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
index 11254bf..868240d 100644
--- a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
+++ b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
@@ -61,7 +61,7 @@ public class AddingEnricher extends AbstractEnricher implements SensorEventListe
             if (source instanceof AttributeSensor) {
                 Object value = entity.getAttribute((AttributeSensor)source);
                 if (value!=null)
-                    onEvent(new BasicSensorEvent(source, entity, value));
+                    onEvent(new BasicSensorEvent(source, entity, value, -1));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Combiner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Combiner.java b/core/src/main/java/brooklyn/enricher/basic/Combiner.java
index cd410a8..cc37d8c 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Combiner.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Combiner.java
@@ -103,7 +103,7 @@ public class Combiner<T,U> extends AbstractEnricher implements SensorEventListen
                 // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex)
                 //      Unfortunately not yet!
                 if (value != null) {
-                    onEvent(new BasicSensorEvent(sourceSensor, producer, value));
+                    onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
index d8244c3..d126307 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
@@ -98,7 +98,7 @@ public class Transformer<T,U> extends AbstractEnricher implements
SensorEventLis
             Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
             // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
             if (value!=null) {
-                onEvent(new BasicSensorEvent(sourceSensor, producer, value));
+                onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
index da912a8..b571eb7 100644
--- a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
+++ b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
@@ -51,19 +51,14 @@ public class BasicSensorEvent<T> implements SensorEvent<T>
{
 
     /** arguments should not be null (except in certain limited testing situations) */
     public BasicSensorEvent(Sensor<T> sensor, Entity source, T value) {
-        this(sensor, source, value, 0);
+        this(sensor, source, value, System.currentTimeMillis());
     }
     
     public BasicSensorEvent(Sensor<T> sensor, Entity source, T value, long timestamp)
{
         this.sensor = sensor;
         this.source = source;
         this.value = value;
-
-        if (timestamp > 0) {
-            this.timestamp = timestamp;
-        } else {
-            this.timestamp = System.currentTimeMillis();
-        }
+        this.timestamp = timestamp;
     }
     
     public static <T> SensorEvent<T> of(Sensor<T> sensor, Entity source,
T value, long timestamp) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
index ea70445..3c9247b 100644
--- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
@@ -21,11 +21,10 @@ package brooklyn.enricher;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import com.google.common.base.Preconditions;
-
-import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
 import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
 import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.Sensor;
 import brooklyn.event.SensorEvent;
@@ -33,6 +32,8 @@ import brooklyn.util.flags.SetFromFlag;
 import brooklyn.util.javalang.JavaClassNames;
 import brooklyn.util.time.Duration;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Transforms {@link Sensor} data into a rolling average based on a time window.
  * 
@@ -59,6 +60,15 @@ import brooklyn.util.time.Duration;
 //@Catalog(name="Rolling Mean in Time Window", description="Transforms a sensor's data into
a rolling average "
 //        + "based on a time window.")
 public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTypeTransformingEnricher<T,Double>
{
+    
+    public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("confidenceRequired",
+        "Minimum confidence level (ie period covered) required to publish a rolling average",
0.8d);
+
+    // without this, we will refuse to publish if the server time differs from the publisher
time (in a distributed setup);
+    // also we won't publish if a lot of time is spent actually doing the computation
+    public static ConfigKey<Duration> TIMESTAMP_GRACE_TIME = ConfigKeys.newConfigKey(Duration.class,
"timestampGraceTime",
+        "When computing windowed average, allow this much slippage time between published
metrics and local clock", Duration.millis(500));
+
     public static class ConfidenceQualifiedNumber {
         final Double value;
         final double confidence;
@@ -67,6 +77,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends
AbstractTyp
             this.value = value;
             this.confidence = confidence;
         }
+        
+        @Override
+        public String toString() {
+            return ""+value+" ("+(int)(confidence*100)+"%)";
+        } 
+        
     }
     
     private final LinkedList<T> values = new LinkedList<T>();
@@ -103,31 +119,69 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends
AbstractTyp
     public void onEvent(SensorEvent<T> event, long eventTime) {
         values.addLast(event.getValue());
         timestamps.addLast(eventTime);
-        pruneValues(eventTime);
-        entity.setAttribute((AttributeSensor<Double>)target, getAverage(eventTime).value);
//TODO this can potentially go stale... maybe we need to timestamp as well?
+        if (eventTime>0) {
+            ConfidenceQualifiedNumber average = getAverage(eventTime, 0);
+
+            if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) { 
+                // without confidence, we might publish wildly varying estimates,
+                // causing spurious resizes, so allow it to be configured, and
+                // by default require a high value
+
+                // TODO would be nice to include timestamp, etc
+                entity.setAttribute((AttributeSensor<Double>)target, average.value);

+            }
+        }
     }
     
     public ConfidenceQualifiedNumber getAverage() {
-        return getAverage(System.currentTimeMillis());
+        return getAverage(System.currentTimeMillis(), getConfig(TIMESTAMP_GRACE_TIME).toMilliseconds());
     }
     
-    public ConfidenceQualifiedNumber getAverage(long now) {
-        pruneValues(now);
+    public ConfidenceQualifiedNumber getAverage(long fromTimeExact) {
+        return getAverage(fromTimeExact, 0);
+    }
+    
+    public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) {
         if (timestamps.isEmpty()) {
             return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d);
         }
+        
+        // (previously there was an old comment here, pre-Jul-2014,  
+        // saying "grkvlt - see email to development list";
+        // but i can't find that email)
+        // some of the more recent confidence and bogus-timestamp + exclusion logic might
fix this though
+        
+        long firstTimestamp = -1;
+        Iterator<Long> ti = timestamps.iterator();
+        while (ti.hasNext()) {
+            firstTimestamp = ti.next();
+            if (firstTimestamp>0) break;
+        }
+        if (firstTimestamp<=0) {
+            // no values with reasonable timestamps
+            return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(),
0.0d);
+        }
 
-        // XXX grkvlt - see email to development list
+        long lastTimestamp = timestamps.get(timestamps.size()-1);
 
+        long now = fromTime;
+        if (lastTimestamp > fromTime - graceAllowed) {
+            // without this, if the computation takes place X seconds after the publish,
+            // we treat X seconds as time for which we have no confidence in the data
+            now = lastTimestamp;
+        }
+        pruneValues(now);
         
-        long lastTimestamp = timestamps.get(timestamps.size()-1);
-        Double confidence = ((double)(timePeriod.toMilliseconds() - (now - lastTimestamp)))
/ timePeriod.toMilliseconds();
-        if (confidence <= 0.0d) {
+        long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp);
+        long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp);
+        Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds();
+        if (confidence <= 0.0000001d) {
+            // not enough timestamps in window 
             double lastValue = values.get(values.size()-1).doubleValue();
             return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d);
         }
         
-        long start = (now - timePeriod.toMilliseconds());
+        long start = windowStart;
         long end;
         double weightedAverage = 0.0d;
         
@@ -151,7 +205,8 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends
AbstractTyp
      * Discards out-of-date values, but keeps at least one value.
      */
     private void pruneValues(long now) {
-        while(timestamps.size() > 1 && timestamps.get(0) < (now - timePeriod.toMilliseconds()))
{
+        // keep one value from before the period, so that we can tell the window's start
time 
+        while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds()))
{
             timestamps.removeFirst();
             values.removeFirst();
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
index 0d4ff84..b746edd 100644
--- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
@@ -23,7 +23,6 @@ import groovy.lang.Closure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import brooklyn.catalog.Catalog;
 import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
 import brooklyn.entity.Entity;
 import brooklyn.event.AttributeSensor;
@@ -102,8 +101,8 @@ public class TimeWeightedDeltaEnricher<T extends Number> extends
AbstractTypeTra
             return;
         }
         
-        if (eventTime > lastTime) {
-            if (lastValue == null) {
+        if (eventTime > 0 && eventTime > lastTime) {
+            if (lastValue == null || lastTime <= 0) {
                 // cannot calculate time-based delta with a single value
                 if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last value
so will not emit, null -> {} at {}", new Object[] {this, current, eventTime}); 
             } else {
@@ -116,6 +115,9 @@ public class TimeWeightedDeltaEnricher<T extends Number> extends
AbstractTypeTra
             }
             lastValue = current;
             lastTime = eventTime;
+        } else if (lastTime<0) {
+            lastValue = current;
+            lastTime = -1;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index a733fb6..a5f0ded 100644
--- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -690,7 +690,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
             unboundedSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD);
             desiredSize = toBoundedDesiredPoolSize(unboundedSize);
             if (desiredSize > currentSize) {
-                if (LOG.isTraceEnabled()) LOG.trace("{} resizing out pool {} from {} to {}
({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricUpperBoundD});
+                if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing out pool {}
from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD,
metricUpperBoundD});
                 scheduleResize(desiredSize);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({}
> {} > {}, but scale-out blocked eg by bounds/check)", new Object[] {this, poolEntity,
currentSize, currentMetricD, metricUpperBoundD, metricLowerBoundD});
@@ -708,7 +708,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
                 desiredSize = toBoundedDesiredPoolSize(desiredSize);
             }
             if (desiredSize < currentSize) {
-                if (LOG.isTraceEnabled()) LOG.trace("{} resizing back pool {} from {} to
{} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD,
metricLowerBoundD});
+                if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing back pool
{} from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize,
currentMetricD, metricLowerBoundD});
                 scheduleResize(desiredSize);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({}
< {} < {}, but scale-back blocked eg by bounds/check)", new Object[] {this, poolEntity,
currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD});

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
index bcfc651..dc08e19 100644
--- a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
+++ b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
@@ -35,6 +35,7 @@ import brooklyn.test.EntityTestUtils;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.util.http.BetterMockWebServer;
 import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
 import com.google.mockwebserver.MockResponse;
 
@@ -108,6 +109,8 @@ public class RebindEnricherTest extends RebindTestFixtureWithApp {
         TestApplication newApp = rebind();
 
         newApp.setAttribute(INT_METRIC, 10);
+        Time.sleep(Duration.millis(10));
+        newApp.setAttribute(INT_METRIC, 10);
         EntityTestUtils.assertAttributeEqualsEventually(newApp, DOUBLE_METRIC, 10d);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
index 1a54302..ae45081 100644
--- a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
+++ b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
@@ -93,9 +93,17 @@ class RollingTimeWindowMeanEnricherTest {
     }
     
     @Test
-    public void testSingleValueAverage() {
+    public void testSingleValueTimeAverage() {
         averager.onEvent(intSensor.newEvent(producer, 10), 1000)
         average = averager.getAverage(1000)
+        assertEquals(average.confidence, 0d)
+    }
+    
+    @Test
+    public void testTwoValueAverageForPeriod() {
+        averager.onEvent(intSensor.newEvent(producer, 10), 1000)
+        averager.onEvent(intSensor.newEvent(producer, 10), 2000)
+        average = averager.getAverage(2000)
         assertEquals(average.value, 10 /1d)
         assertEquals(average.confidence, 1d)
     }


Mime
View raw message