brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [01/22] incubator-brooklyn git commit: yaml-friendly enrichers for delta and rolling avg
Date Wed, 24 Jun 2015 08:05:12 GMT
Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master b08fe4eca -> 45dd54940


yaml-friendly enrichers for delta and rolling avg


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/089fe862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/089fe862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/089fe862

Branch: refs/heads/master
Commit: 089fe862677e5588b68b9c084fefe88fb2cc2a1d
Parents: 78cbc22
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Fri Jun 19 04:56:39 2015 -0700
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Wed Jun 24 00:40:32 2015 -0700

----------------------------------------------------------------------
 .../enricher/basic/AbstractTransformer.java     | 106 +++++++++++
 .../brooklyn/enricher/basic/Transformer.java    |  64 +------
 .../YamlRollingTimeWindowMeanEnricher.java      | 178 +++++++++++++++++++
 .../basic/YamlTimeWeightedDeltaEnricher.java    |  81 +++++++++
 .../YamlRollingTimeWindowMeanEnricherTest.java  | 178 +++++++++++++++++++
 .../YamlTimeWeightedDeltaEnricherTest.java      | 107 +++++++++++
 .../enricher/RollingTimeWindowMeanEnricher.java |   4 +-
 .../enricher/TimeWeightedDeltaEnricher.java     |   3 +
 8 files changed, 658 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
new file mode 100644
index 0000000..28a6de1
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements
SensorEventListener<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class);
+
+    public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class,
"enricher.producer");
+
+    public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new
TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
+
+    public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new
TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+    
+    protected Entity producer;
+    protected Sensor<T> sourceSensor;
+    protected Sensor<U> targetSensor;
+
+    public AbstractTransformer() {
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+
+        Function<SensorEvent<T>, U> transformation = getTransformation();
+        this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER);
+        this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
+        Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR);
+        this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified
: (Sensor<U>) this.sourceSensor;
+        if (producer.equals(entity) && targetSensorSpecified==null) {
+            LOG.error("Refusing to add an enricher which reads and publishes on the same
sensor: "+
+                producer+"."+sourceSensor+" (computing "+transformation+")");
+            // we don't throw because this error may manifest itself after a lengthy deployment,

+            // and failing it at that point simply because of an enricher is not very pleasant
+            // (at least not until we have good re-run support across the board)
+            return;
+        }
+        
+        subscribe(producer, sourceSensor, this);
+        
+        if (sourceSensor instanceof AttributeSensor) {
+            Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
+            // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
+            if (value!=null) {
+                onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
+            }
+        }
+    }
+
+    /** returns a function for transformation, for immediate use only (not for caching, as
it may change) */
+    protected abstract Function<SensorEvent<T>, U> getTransformation();
+
+    @Override
+    public void onEvent(SensorEvent<T> event) {
+        emit(targetSensor, compute(event));
+    }
+
+    protected Object compute(SensorEvent<T> event) {
+        // transformation is not going to change, but this design makes it easier to support
changing config in future. 
+        // if it's an efficiency hole we can switch to populate the transformation at start.
+        U result = getTransformation().apply(event);
+        if (LOG.isTraceEnabled())
+            LOG.trace("Enricher "+this+" computed "+result+" from "+event);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
index c6c88a6..2fa85fe 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
@@ -24,14 +24,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import brooklyn.config.ConfigKey;
-import brooklyn.entity.Entity;
 import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityLocal;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.Sensor;
 import brooklyn.event.SensorEvent;
-import brooklyn.event.SensorEventListener;
-import brooklyn.event.basic.BasicSensorEvent;
 import brooklyn.util.collections.MutableSet;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
@@ -41,7 +35,7 @@ import com.google.common.reflect.TypeToken;
 
 //@Catalog(name="Transformer", description="Transforms attributes of an entity; see Enrichers.builder().transforming(...)")
 @SuppressWarnings("serial")
-public class Transformer<T,U> extends AbstractEnricher implements SensorEventListener<T>
{
+public class Transformer<T,U> extends AbstractTransformer<T,U> {
 
     private static final Logger LOG = LoggerFactory.getLogger(Transformer.class);
 
@@ -50,50 +44,11 @@ public class Transformer<T,U> extends AbstractEnricher implements
SensorEventLis
     public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_VALUE = ConfigKeys.newConfigKey(new
TypeToken<Function<?, ?>>() {}, "enricher.transformation");
     public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_EVENT = ConfigKeys.newConfigKey(new
TypeToken<Function<?, ?>>() {}, "enricher.transformation.fromevent");
     
-    public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class,
"enricher.producer");
-
-    public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new
TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
-
-    public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new
TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
-    
-    protected Entity producer;
-    protected Sensor<T> sourceSensor;
-    protected Sensor<U> targetSensor;
-
     public Transformer() {
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    public void setEntity(EntityLocal entity) {
-        super.setEntity(entity);
-
-        Function<SensorEvent<T>, U> transformation = getTransformation();
-        this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER);
-        this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
-        Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR);
-        this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified
: (Sensor<U>) this.sourceSensor;
-        if (producer.equals(entity) && targetSensorSpecified==null) {
-            LOG.error("Refusing to add an enricher which reads and publishes on the same
sensor: "+
-                producer+"."+sourceSensor+" (computing "+transformation+")");
-            // we don't throw because this error may manifest itself after a lengthy deployment,

-            // and failing it at that point simply because of an enricher is not very pleasant
-            // (at least not until we have good re-run support across the board)
-            return;
-        }
-        
-        subscribe(producer, sourceSensor, this);
-        
-        if (sourceSensor instanceof AttributeSensor) {
-            Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
-            // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
-            if (value!=null) {
-                onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
-            }
-        }
-    }
-
     /** returns a function for transformation, for immediate use only (not for caching, as
it may change) */
+    @Override
     @SuppressWarnings("unchecked")
     protected Function<SensorEvent<T>, U> getTransformation() {
         MutableSet<Object> suppliers = MutableSet.of();
@@ -144,18 +99,5 @@ public class Transformer<T,U> extends AbstractEnricher implements
SensorEventLis
             }
         };
     }
-
-    @Override
-    public void onEvent(SensorEvent<T> event) {
-        emit(targetSensor, compute(event));
-    }
-
-    protected Object compute(SensorEvent<T> event) {
-        // transformation is not going to change, but this design makes it easier to support
changing config in future. 
-        // if it's an efficiency hole we can switch to populate the transformation at start.
-        U result = getTransformation().apply(event);
-        if (LOG.isTraceEnabled())
-            LOG.trace("Enricher "+this+" computed "+result+" from "+event);
-        return result;
-    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
new file mode 100644
index 0000000..64333d4
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+
+/**
+ * Transforms {@link Sensor} data into a rolling average based on a time window.
+ * 
+ * All values within the window are weighted or discarded based on the timestamps associated
with
+ * them (discards occur when a new value is added or an average is requested)
+ * <p>
+ * This will not extrapolate figures - it is assumed a value is valid and correct for the
entire
+ * time period between it and the previous value. Normally, the average attribute is only
updated
+ * when a new value arrives so it can give a fully informed average, but there is a danger
of this
+ * going stale.
+ * <p>
+ * When an average is requested, it is likely there will be a segment of the window for which
there
+ * isn't a value. Instead of extrapolating a value and providing different extrapolation
techniques,
+ * the average is reported with a confidence value which reflects the fraction of the time
+ * window for which the values were valid.
+ * <p>
+ * Consumers of the average may ignore the confidence value and just use the last known average.
+ * They could multiply the returned value by the confidence value to get a decay-type behavior
as
+ * the window empties. A third alternative is to, at a certain confidence threshold, report
that
+ * the average is no longer meaningful.
+ * <p>
+ * The default average when no data has been received is 0, with a confidence of 0
+ */
+public class YamlRollingTimeWindowMeanEnricher<T extends Number> extends AbstractTransformer<T,Double>
{
+    
+    public static ConfigKey<Duration> WINDOW_DURATION = ConfigKeys.newConfigKey(Duration.class,
"enricher.window.duration",
+        "Duration for which this window should store data, default one minute", Duration.ONE_MINUTE);
+
+    public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("enricher.window.confidenceRequired",
+        "Minimum confidence level (ie period covered) required to publish a rolling average",
0.8d);
+
+    public static class ConfidenceQualifiedNumber {
+        final Double value;
+        final double confidence;
+        
+        public ConfidenceQualifiedNumber(Double value, double confidence) {
+            this.value = value;
+            this.confidence = confidence;
+        }
+        
+        @Override
+        public String toString() {
+            return ""+value+" ("+(int)(confidence*100)+"%)";
+        } 
+        
+    }
+    
+    private final LinkedList<T> values = new LinkedList<T>();
+    private final LinkedList<Long> timestamps = new LinkedList<Long>();
+    volatile ConfidenceQualifiedNumber lastAverage = new ConfidenceQualifiedNumber(0d,0d);
+    
+    @Override
+    protected Function<SensorEvent<T>, Double> getTransformation() {
+        return new Function<SensorEvent<T>, Double>() {
+            @Override
+            public Double apply(SensorEvent<T> event) {
+                long eventTime = event.getTimestamp();
+                if (event.getValue()==null) {
+                    return null;
+                }
+                values.addLast(event.getValue());
+                timestamps.addLast(eventTime);
+                if (eventTime>0) {
+                    ConfidenceQualifiedNumber average = getAverage(eventTime, 0);
+
+                    if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH))
{ 
+                        // without confidence, we might publish wildly varying estimates,
+                        // causing spurious resizes, so allow it to be configured, and
+                        // by default require a high value
+
+                        // TODO would be nice to include timestamp, etc
+                        return average.value; 
+                    }
+                }
+                return null;
+            }
+        };
+    }
+    
+    public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) {
+        if (timestamps.isEmpty()) {
+            return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d);
+        }
+        
+        long firstTimestamp = -1;
+        Iterator<Long> ti = timestamps.iterator();
+        while (ti.hasNext()) {
+            firstTimestamp = ti.next();
+            if (firstTimestamp>0) break;
+        }
+        if (firstTimestamp<=0) {
+            // no values with reasonable timestamps
+            return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(),
0.0d);
+        }
+
+        long lastTimestamp = timestamps.get(timestamps.size()-1);
+
+        long now = fromTime;
+        if (lastTimestamp > fromTime - graceAllowed) {
+            // without this, if the computation takes place X seconds after the publish,
+            // we treat X seconds as time for which we have no confidence in the data
+            now = lastTimestamp;
+        }
+        pruneValues(now);
+        
+        Duration timePeriod = getConfig(WINDOW_DURATION);
+        long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp);
+        long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp);
+        Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds();
+        if (confidence <= 0.0000001d) {
+            // not enough timestamps in window 
+            double lastValue = values.get(values.size()-1).doubleValue();
+            return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d);
+        }
+        
+        long start = windowStart;
+        long end;
+        double weightedAverage = 0.0d;
+        
+        Iterator<T> valuesIter = values.iterator();
+        Iterator<Long> timestampsIter = timestamps.iterator();
+        while (valuesIter.hasNext()) {
+            // Ignores null and out-of-date values (and also values that are received out-of-order,
but that shouldn't happen!)
+            Number val = valuesIter.next();
+            Long timestamp = timestampsIter.next();
+            if (val!=null && timestamp >= start) {
+                end = timestamp;
+                weightedAverage += ((end - start) / (confidence * timePeriod.toMilliseconds()))
* val.doubleValue();
+                start = timestamp;
+            }
+        }
+        
+        return lastAverage = new ConfidenceQualifiedNumber(weightedAverage, confidence);
+    }
+    
+    /**
+     * Discards out-of-date values, but keeps at least one value.
+     */
+    private void pruneValues(long now) {
+        // keep one value from before the period, so that we can tell the window's start
time
+        Duration timePeriod = getConfig(WINDOW_DURATION);
+        while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds()))
{
+            timestamps.removeFirst();
+            values.removeFirst();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
new file mode 100644
index 0000000..b515da4
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.enricher.basic.AbstractTransformer;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.SensorEvent;
+import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+
+/**
+ * Converts an absolute count sensor into a delta sensor (i.e. the diff between the current
and previous value),
+ * presented as a units/timeUnit based on the event timing.
+ * <p>
+ * For example, given a requests.count sensor, this can make a requests.per_sec sensor with
{@link #DELTA_PERIOD} set to "1s" (the default).
+ * <p>
+ * Suitable for configuration from YAML.
+ */
+public class YamlTimeWeightedDeltaEnricher<T extends Number> extends AbstractTransformer<T,Double>
{
+    private static final Logger LOG = LoggerFactory.getLogger(YamlTimeWeightedDeltaEnricher.class);
+    
+    Number lastValue;
+    long lastTime = -1;
+    
+    public static ConfigKey<Duration> DELTA_PERIOD = ConfigKeys.newConfigKey(Duration.class,
"enricher.delta.period",
+        "Duration that this delta should compute for, default per second", Duration.ONE_SECOND);
+    
+    @Override
+    protected Function<SensorEvent<T>, Double> getTransformation() {
+        return new Function<SensorEvent<T>, Double>() {
+            @Override
+            public Double apply(SensorEvent<T> event) {
+                Number current = TypeCoercions.coerce(event.getValue(), Double.class);
+                
+                if (current == null) return null;
+
+                long eventTime = event.getTimestamp();
+                long unitMillis = getConfig(DELTA_PERIOD).toMilliseconds();
+                Double result = null;
+                
+                if (eventTime > 0 && eventTime > lastTime) {
+                    if (lastValue == null || lastTime < 0) {
+                        // cannot calculate time-based delta with a single value
+                        if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last
value so will not emit, null -> {} at {}", new Object[] {this, current, eventTime}); 
+                    } else {
+                        double duration = eventTime - lastTime;
+                        result = (current.doubleValue() - lastValue.doubleValue()) / (duration
/ unitMillis);
+                    }
+                }
+                
+                lastValue = current;
+                lastTime = eventTime;
+                
+                return result;
+            }
+        };
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
new file mode 100644
index 0000000..45b7ec3
--- /dev/null
+++ b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.enricher.basic.YamlRollingTimeWindowMeanEnricher.ConfidenceQualifiedNumber;
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.BasicEntity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.management.SubscriptionContext;
+import brooklyn.policy.EnricherSpec;
+import brooklyn.util.time.Duration;
+
+public class YamlRollingTimeWindowMeanEnricherTest {
+    
+    AbstractApplication app;
+    
+    BasicEntity producer;
+
+    AttributeSensor<Integer> intSensor;
+    AttributeSensor<Double> avgSensor, deltaSensor;
+    
+    Duration timePeriod = Duration.ONE_SECOND;
+    
+    YamlTimeWeightedDeltaEnricher<Double> delta;
+    YamlRollingTimeWindowMeanEnricher<Double> averager;
+    
+    ConfidenceQualifiedNumber average;
+    SubscriptionContext subscription;
+    
+    @SuppressWarnings("unchecked")
+    @BeforeMethod
+    public void before() {
+        app = new AbstractApplication() {};
+        Entities.startManagement(app);
+        producer = app.addChild(EntitySpec.create(BasicEntity.class));
+
+        intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor");
+        deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor");
+        avgSensor = new BasicAttributeSensor<Double>(Double.class, "avg sensor");
+            
+        delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+                .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+                .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+                .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+
+        averager = producer.addEnricher(EnricherSpec.create(YamlRollingTimeWindowMeanEnricher.class)
+                .configure(YamlRollingTimeWindowMeanEnricher.PRODUCER, producer)
+                .configure(YamlRollingTimeWindowMeanEnricher.SOURCE_SENSOR, deltaSensor)
+                .configure(YamlRollingTimeWindowMeanEnricher.TARGET_SENSOR, avgSensor)
+                .configure(YamlRollingTimeWindowMeanEnricher.WINDOW_DURATION, timePeriod));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+        
+    @Test
+    public void testDefaultAverageWhenEmpty() {
+        ConfidenceQualifiedNumber average = averager.getAverage(0, 0);
+        assertEquals(average.value, 0d);
+        assertEquals(average.confidence, 0.0d);
+    }
+    
+    protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp)
{
+        return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp);
+    }
+    protected BasicSensorEvent<Double> newDeltaSensorEvent(double value, long timestamp)
{
+        return new BasicSensorEvent<Double>(deltaSensor, producer, value, timestamp);
+    }
+
+    @Test
+    public void testNoRecentValuesAverage() {
+        averager.onEvent(newDeltaSensorEvent(10, 0));
+        average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0);
+        assertEquals(average.value, 10d);
+        assertEquals(average.confidence, 0d);
+    }
+
+    @Test
+    public void testNoRecentValuesUsesLastForAverage() {
+        averager.onEvent(newDeltaSensorEvent(10, 0));
+        averager.onEvent(newDeltaSensorEvent(20, 10));
+        average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0);
+        assertEquals(average.value, 20d);
+        assertEquals(average.confidence, 0d);
+    }
+
+    @Test
+    public void testSingleValueTimeAverage() {
+        averager.onEvent(newDeltaSensorEvent(10, 1000));
+        average = averager.getAverage(1000, 0);
+        assertEquals(average.confidence, 0d);
+    }
+
+    @Test
+    public void testTwoValueAverageForPeriod() {
+        averager.onEvent(newDeltaSensorEvent(10, 1000));
+        averager.onEvent(newDeltaSensorEvent(10, 2000));
+        average = averager.getAverage(2000, 0);
+        assertEquals(average.value, 10 /1d);
+        assertEquals(average.confidence, 1d);
+    }
+
+    @Test
+    public void testMonospacedAverage() {
+        averager.onEvent(newDeltaSensorEvent(10, 1000));
+        averager.onEvent(newDeltaSensorEvent(20, 1250));
+        averager.onEvent(newDeltaSensorEvent(30, 1500));
+        averager.onEvent(newDeltaSensorEvent(40, 1750));
+        averager.onEvent(newDeltaSensorEvent(50, 2000));
+        average = averager.getAverage(2000, 0);
+        assertEquals(average.value, (20+30+40+50)/4d);
+        assertEquals(average.confidence, 1d);
+    }
+
+    @Test
+    public void testWeightedAverage() {
+        averager.onEvent(newDeltaSensorEvent(10, 1000));
+        averager.onEvent(newDeltaSensorEvent(20, 1100));
+        averager.onEvent(newDeltaSensorEvent(30, 1300));
+        averager.onEvent(newDeltaSensorEvent(40, 1600));
+        averager.onEvent(newDeltaSensorEvent(50, 2000));
+        
+        average = averager.getAverage(2000, 0);
+        assertEquals(average.value, (20*0.1d)+(30*0.2d)+(40*0.3d)+(50*0.4d));
+        assertEquals(average.confidence, 1d);
+    }
+
+    @Test
+    public void testConfidenceDecay() {
+        averager.onEvent(newDeltaSensorEvent(10, 1000));
+        averager.onEvent(newDeltaSensorEvent(20, 1250));
+        averager.onEvent(newDeltaSensorEvent(30, 1500));
+        averager.onEvent(newDeltaSensorEvent(40, 1750));
+        averager.onEvent(newDeltaSensorEvent(50, 2000));
+
+        average = averager.getAverage(2250, 0);
+        assertEquals(average.value, (30+40+50)/3d);
+        assertEquals(average.confidence, 0.75d);
+        average = averager.getAverage(2500, 0);
+        assertEquals(average.value, (40+50)/2d);
+        assertEquals(average.confidence, 0.5d);
+        average = averager.getAverage(2750, 0);
+        assertEquals(average.value, 50d);
+        assertEquals(average.confidence, 0.25d);
+        average = averager.getAverage(3000, 0);
+        assertEquals(average.value, 50d);
+        assertEquals(average.confidence, 0d);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
new file mode 100644
index 0000000..2a7a974
--- /dev/null
+++ b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.BasicEntity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.management.SubscriptionContext;
+import brooklyn.policy.EnricherSpec;
+
+public class YamlTimeWeightedDeltaEnricherTest {
+    
+    AbstractApplication app;
+    
+    BasicEntity producer;
+
+    AttributeSensor<Integer> intSensor;
+    AttributeSensor<Double> avgSensor, deltaSensor;
+    SubscriptionContext subscription;
+    
+    @BeforeMethod
+    public void before() {
+        app = new AbstractApplication() {};
+        Entities.startManagement(app);
+        producer = app.addChild(EntitySpec.create(BasicEntity.class));
+
+        intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor");
+        deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor");
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+    
+    @Test
+    public void testMonospaceTimeWeightedDeltaEnricher() {
+        @SuppressWarnings("unchecked")
+        YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+            .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+            .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+            .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+        
+        delta.onEvent(newIntSensorEvent(0, 0));
+        assertEquals(producer.getAttribute(deltaSensor), null);
+        delta.onEvent(newIntSensorEvent(0, 1000));
+        assertEquals(producer.getAttribute(deltaSensor), 0d);
+        delta.onEvent(newIntSensorEvent(1, 2000));
+        assertEquals(producer.getAttribute(deltaSensor), 1d);
+        delta.onEvent(newIntSensorEvent(3, 3000));
+        assertEquals(producer.getAttribute(deltaSensor), 2d);
+        delta.onEvent(newIntSensorEvent(8, 4000));
+        assertEquals(producer.getAttribute(deltaSensor), 5d);
+    }
+    
+    protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp)
{
+        return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp);
+    }
+    
+    @Test
+    public void testVariableTimeWeightedDeltaEnricher() {
+        @SuppressWarnings("unchecked")
+        YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+            .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+            .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+            .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+        
+        delta.onEvent(newIntSensorEvent(0, 0));
+        delta.onEvent(newIntSensorEvent(0, 2000));
+        assertEquals(producer.getAttribute(deltaSensor), 0d);
+        delta.onEvent(newIntSensorEvent(3, 5000));
+        assertEquals(producer.getAttribute(deltaSensor), 1d);
+        delta.onEvent(newIntSensorEvent(7, 7000));
+        assertEquals(producer.getAttribute(deltaSensor), 2d);
+        delta.onEvent(newIntSensorEvent(12, 7500));
+        assertEquals(producer.getAttribute(deltaSensor), 10d);
+        delta.onEvent(newIntSensorEvent(15, 9500));
+        assertEquals(producer.getAttribute(deltaSensor), 1.5d);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
index 694977c..7c55a81 100644
--- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
@@ -128,12 +128,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends
AbstractTyp
         }
     }
     
-    @Deprecated /** @deprecatedsince 0.7.0; not used; use the 2-arg method */
+    @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg
method */
     public ConfidenceQualifiedNumber getAverage() {
         return getAverage(System.currentTimeMillis(), 0);
     }
     
-    @Deprecated /** @deprecated since 0.7.0; not used; use the 2-arg method */
+    @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg
method */
     public ConfidenceQualifiedNumber getAverage(long fromTimeExact) {
         return getAverage(fromTimeExact, 0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
index b746edd..42e418f 100644
--- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
+import brooklyn.enricher.basic.YamlTimeWeightedDeltaEnricher;
 import brooklyn.entity.Entity;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.Sensor;
@@ -41,6 +42,8 @@ import com.google.common.base.Functions;
  * presented as a units/timeUnit based on the event timing.
  * <p>
  * NB for time (e.g. "total milliseconds consumed") use {@link TimeFractionDeltaEnricher}
+ * <p>
+ * See also {@link YamlTimeWeightedDeltaEnricher} designed for use from YAML.
  */
 //@Catalog(name="Time-weighted Delta", description="Converts an absolute sensor into a delta
sensor "
 //        + "(i.e. the diff between the current and previous value), presented as a units/timeUnit
"


Mime
View raw message