brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [16/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming package policy
Date Tue, 18 Aug 2015 11:06:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
new file mode 100644
index 0000000..98d814f
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -0,0 +1,1092 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.apache.brooklyn.core.util.flags.TypeCoercions;
+import org.apache.brooklyn.core.util.task.Tasks;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.policy.autoscaling.SizeHistory.WindowSummary;
+import org.apache.brooklyn.policy.loadbalancing.LoadBalancingPolicy;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to
+ * emitted {@code POOL_COLD} and {@code POOL_HOT} events. Alternatively, the policy can be configured to
+ * keep a given metric within a required range.
+ * <p>
+ * TThis policy does not itself determine whether the pool is hot or cold, but instead relies on these 
+ * events being emitted by the monitored entity itself, or by another policy that is attached to it; see, 
+ * for example, {@link LoadBalancingPolicy}.)
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@Catalog(name="Auto-scaler", description="Policy that is attached to a Resizable entity and dynamically "
+        + "adjusts its size in response to either keep a metric within a given range, or in response to "
+        + "POOL_COLD and POOL_HOT events")
+public class AutoScalerPolicy extends AbstractPolicy {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(AutoScalerPolicy.class);
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private String id;
+        private String name;
+        private AttributeSensor<? extends Number> metric;
+        private Entity entityWithMetric;
+        private Number metricUpperBound;
+        private Number metricLowerBound;
+        private int minPoolSize = 1;
+        private int maxPoolSize = Integer.MAX_VALUE;
+        private Integer resizeDownIterationIncrement;
+        private Integer resizeDownIterationMax;
+        private Integer resizeUpIterationIncrement;
+        private Integer resizeUpIterationMax;
+        private Duration minPeriodBetweenExecs;
+        private Duration resizeUpStabilizationDelay;
+        private Duration resizeDownStabilizationDelay;
+        private ResizeOperator resizeOperator;
+        private Function<Entity,Integer> currentSizeOperator;
+        private BasicNotificationSensor<?> poolHotSensor;
+        private BasicNotificationSensor<?> poolColdSensor;
+        private BasicNotificationSensor<?> poolOkSensor;
+        private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor;
+        private Duration maxReachedNotificationDelay;
+        
+        public Builder id(String val) {
+            this.id = val; return this;
+        }
+        public Builder name(String val) {
+            this.name = val; return this;
+        }
+        public Builder metric(AttributeSensor<? extends Number> val) {
+            this.metric = val; return this;
+        }
+        public Builder entityWithMetric(Entity val) {
+            this.entityWithMetric = val; return this;
+        }
+        public Builder metricLowerBound(Number val) {
+            this.metricLowerBound = val; return this;
+        }
+        public Builder metricUpperBound(Number val) {
+            this.metricUpperBound = val; return this;
+        }
+        public Builder metricRange(Number min, Number max) {
+            metricLowerBound = checkNotNull(min);
+            metricUpperBound = checkNotNull(max);
+            return this;
+        }
+        public Builder minPoolSize(int val) {
+            this.minPoolSize = val; return this;
+        }
+        public Builder maxPoolSize(int val) {
+            this.maxPoolSize = val; return this;
+        }
+        public Builder sizeRange(int min, int max) {
+            minPoolSize = min;
+            maxPoolSize = max;
+            return this;
+        }
+        
+        public Builder resizeUpIterationIncrement(Integer val) {
+            this.resizeUpIterationIncrement = val; return this;
+        }
+        public Builder resizeUpIterationMax(Integer val) {
+            this.resizeUpIterationMax = val; return this;
+        }
+        public Builder resizeDownIterationIncrement(Integer val) {
+            this.resizeUpIterationIncrement = val; return this;
+        }
+        public Builder resizeDownIterationMax(Integer val) {
+            this.resizeUpIterationMax = val; return this;
+        }
+
+        public Builder minPeriodBetweenExecs(Duration val) {
+            this.minPeriodBetweenExecs = val; return this;
+        }
+        public Builder resizeUpStabilizationDelay(Duration val) {
+            this.resizeUpStabilizationDelay = val; return this;
+        }
+        public Builder resizeDownStabilizationDelay(Duration val) {
+            this.resizeDownStabilizationDelay = val; return this;
+        }
+        public Builder resizeOperator(ResizeOperator val) {
+            this.resizeOperator = val; return this;
+        }
+        public Builder currentSizeOperator(Function<Entity, Integer> val) {
+            this.currentSizeOperator = val; return this;
+        }
+        public Builder poolHotSensor(BasicNotificationSensor<?> val) {
+            this.poolHotSensor = val; return this;
+        }
+        public Builder poolColdSensor(BasicNotificationSensor<?> val) {
+            this.poolColdSensor = val; return this;
+        }
+        public Builder poolOkSensor(BasicNotificationSensor<?> val) {
+            this.poolOkSensor = val; return this;
+        }
+        public Builder maxSizeReachedSensor(BasicNotificationSensor<? super MaxPoolSizeReachedEvent> val) {
+            this.maxSizeReachedSensor = val; return this;
+        }
+        public Builder maxReachedNotificationDelay(Duration val) {
+            this.maxReachedNotificationDelay = val; return this;
+        }
+        public AutoScalerPolicy build() {
+            return new AutoScalerPolicy(toFlags());
+        }
+        public PolicySpec<AutoScalerPolicy> buildSpec() {
+            return PolicySpec.create(AutoScalerPolicy.class)
+                    .configure(toFlags());
+        }
+        private Map<String,?> toFlags() {
+            return MutableMap.<String,Object>builder()
+                    .putIfNotNull("id", id)
+                    .putIfNotNull("name", name)
+                    .putIfNotNull("metric", metric)
+                    .putIfNotNull("entityWithMetric", entityWithMetric)
+                    .putIfNotNull("metricUpperBound", metricUpperBound)
+                    .putIfNotNull("metricLowerBound", metricLowerBound)
+                    .putIfNotNull("minPoolSize", minPoolSize)
+                    .putIfNotNull("maxPoolSize", maxPoolSize)
+                    .putIfNotNull("resizeUpIterationMax", resizeUpIterationMax)
+                    .putIfNotNull("resizeUpIterationIncrement", resizeUpIterationIncrement)
+                    .putIfNotNull("resizeDownIterationMax", resizeDownIterationMax)
+                    .putIfNotNull("resizeDownIterationIncrement", resizeDownIterationIncrement)
+                    .putIfNotNull("minPeriodBetweenExecs", minPeriodBetweenExecs)
+                    .putIfNotNull("resizeUpStabilizationDelay", resizeUpStabilizationDelay)
+                    .putIfNotNull("resizeDownStabilizationDelay", resizeDownStabilizationDelay)
+                    .putIfNotNull("resizeOperator", resizeOperator)
+                    .putIfNotNull("currentSizeOperator", currentSizeOperator)
+                    .putIfNotNull("poolHotSensor", poolHotSensor)
+                    .putIfNotNull("poolColdSensor", poolColdSensor)
+                    .putIfNotNull("poolOkSensor", poolOkSensor)
+                    .putIfNotNull("maxSizeReachedSensor", maxSizeReachedSensor)
+                    .putIfNotNull("maxReachedNotificationDelay", maxReachedNotificationDelay)
+                    .build();
+        }
+    }
+    
+    // TODO Is there a nicer pattern for registering such type-coercions? 
+    // Can't put it in the ResizeOperator interface, nor in core TypeCoercions class because interface is defined in policy/.
+    static {
+        TypeCoercions.registerAdapter(Closure.class, ResizeOperator.class, new Function<Closure,ResizeOperator>() {
+            @Override
+            public ResizeOperator apply(final Closure closure) {
+                return new ResizeOperator() {
+                    @Override public Integer resize(Entity entity, Integer input) {
+                        return (Integer) closure.call(entity, input);
+                    }
+                };
+            }
+        });
+    }
+    
+    // Pool workrate notifications.
+    public static BasicNotificationSensor<Map> DEFAULT_POOL_HOT_SENSOR = new BasicNotificationSensor<Map>(
+        Map.class, "resizablepool.hot", "Pool is over-utilized; it has insufficient resource for current workload");
+    public static BasicNotificationSensor<Map> DEFAULT_POOL_COLD_SENSOR = new BasicNotificationSensor<Map>(
+        Map.class, "resizablepool.cold", "Pool is under-utilized; it has too much resource for current workload");
+    public static BasicNotificationSensor<Map> DEFAULT_POOL_OK_SENSOR = new BasicNotificationSensor<Map>(
+        Map.class, "resizablepool.cold", "Pool utilization is ok; the available resources are fine for the current workload");
+
+    /**
+     * A convenience for policies that want to register a {@code builder.maxSizeReachedSensor(sensor)}.
+     * Note that this "default" is not set automatically; the default is for no sensor to be used (so
+     * no events emitted).
+     */
+    public static BasicNotificationSensor<MaxPoolSizeReachedEvent> DEFAULT_MAX_SIZE_REACHED_SENSOR = new BasicNotificationSensor<MaxPoolSizeReachedEvent>(
+            MaxPoolSizeReachedEvent.class, "resizablepool.maxSizeReached", "Consistently wanted to resize the pool above the max allowed size");
+
+    public static final String POOL_CURRENT_SIZE_KEY = "pool.current.size";
+    public static final String POOL_HIGH_THRESHOLD_KEY = "pool.high.threshold";
+    public static final String POOL_LOW_THRESHOLD_KEY = "pool.low.threshold";
+    public static final String POOL_CURRENT_WORKRATE_KEY = "pool.current.workrate";
+    
+    @SuppressWarnings("serial")
+    @SetFromFlag("metric")
+    public static final ConfigKey<AttributeSensor<? extends Number>> METRIC = BasicConfigKey.builder(new TypeToken<AttributeSensor<? extends Number>>() {})
+            .name("autoscaler.metric")
+            .build();
+
+    @SetFromFlag("entityWithMetric")
+    public static final ConfigKey<Entity> ENTITY_WITH_METRIC = BasicConfigKey.builder(Entity.class)
+            .name("autoscaler.entityWithMetric")
+            .build();
+    
+    @SetFromFlag("metricLowerBound")
+    public static final ConfigKey<Number> METRIC_LOWER_BOUND = BasicConfigKey.builder(Number.class)
+            .name("autoscaler.metricLowerBound")
+            .reconfigurable(true)
+            .build();
+    
+    @SetFromFlag("metricUpperBound")
+    public static final ConfigKey<Number> METRIC_UPPER_BOUND = BasicConfigKey.builder(Number.class)
+            .name("autoscaler.metricUpperBound")
+            .reconfigurable(true)
+            .build();
+    
+    @SetFromFlag("resizeUpIterationIncrement")
+    public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.resizeUpIterationIncrement")
+            .description("Batch size for resizing up; the size will be increased by a multiple of this value")
+            .defaultValue(1)
+            .reconfigurable(true)
+            .build();
+    @SetFromFlag("resizeUpIterationMax")
+    public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.resizeUpIterationMax")
+            .defaultValue(Integer.MAX_VALUE)
+            .description("Maximum change to the size on a single iteration when scaling up")
+            .reconfigurable(true)
+            .build();
+    @SetFromFlag("resizeDownIterationIncrement")
+    public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.resizeDownIterationIncrement")
+            .description("Batch size for resizing down; the size will be decreased by a multiple of this value")
+            .defaultValue(1)
+            .reconfigurable(true)
+            .build();
+    @SetFromFlag("resizeDownIterationMax")
+    public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.resizeDownIterationMax")
+            .defaultValue(Integer.MAX_VALUE)
+            .description("Maximum change to the size on a single iteration when scaling down")
+            .reconfigurable(true)
+            .build();
+
+    @SetFromFlag("minPeriodBetweenExecs")
+    public static final ConfigKey<Duration> MIN_PERIOD_BETWEEN_EXECS = BasicConfigKey.builder(Duration.class)
+            .name("autoscaler.minPeriodBetweenExecs")
+            .defaultValue(Duration.millis(100))
+            .build();
+    
+    @SetFromFlag("resizeUpStabilizationDelay")
+    public static final ConfigKey<Duration> RESIZE_UP_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("autoscaler.resizeUpStabilizationDelay")
+            .defaultValue(Duration.ZERO)
+            .reconfigurable(true)
+            .build();
+    
+    @SetFromFlag("resizeDownStabilizationDelay")
+    public static final ConfigKey<Duration> RESIZE_DOWN_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("autoscaler.resizeDownStabilizationDelay")
+            .defaultValue(Duration.ZERO)
+            .reconfigurable(true)
+            .build();
+
+    @SetFromFlag("minPoolSize")
+    public static final ConfigKey<Integer> MIN_POOL_SIZE = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.minPoolSize")
+            .defaultValue(1)
+            .reconfigurable(true)
+            .build();
+    
+    @SetFromFlag("maxPoolSize")
+    public static final ConfigKey<Integer> MAX_POOL_SIZE = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.maxPoolSize")
+            .defaultValue(Integer.MAX_VALUE)
+            .reconfigurable(true)
+            .build();
+
+    @SetFromFlag("resizeOperator")
+    public static final ConfigKey<ResizeOperator> RESIZE_OPERATOR = BasicConfigKey.builder(ResizeOperator.class)
+            .name("autoscaler.resizeOperator")
+            .defaultValue(new ResizeOperator() {
+                    public Integer resize(Entity entity, Integer desiredSize) {
+                        return ((Resizable)entity).resize(desiredSize);
+                    }})
+            .build();
+    
+    @SuppressWarnings("serial")
+    @SetFromFlag("currentSizeOperator")
+    public static final ConfigKey<Function<Entity,Integer>> CURRENT_SIZE_OPERATOR = BasicConfigKey.builder(new TypeToken<Function<Entity,Integer>>() {})
+            .name("autoscaler.currentSizeOperator")
+            .defaultValue(new Function<Entity,Integer>() {
+                    public Integer apply(Entity entity) {
+                        return ((Resizable)entity).getCurrentSize();
+                    }})
+            .build();
+
+    @SuppressWarnings("serial")
+    @SetFromFlag("poolHotSensor")
+    public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_HOT_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+            .name("autoscaler.poolHotSensor")
+            .defaultValue(DEFAULT_POOL_HOT_SENSOR)
+            .build();
+
+    @SuppressWarnings("serial")
+    @SetFromFlag("poolColdSensor")
+    public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_COLD_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+            .name("autoscaler.poolColdSensor")
+            .defaultValue(DEFAULT_POOL_COLD_SENSOR)
+            .build();
+
+    @SuppressWarnings("serial")
+    @SetFromFlag("poolOkSensor")
+    public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_OK_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+            .name("autoscaler.poolOkSensor")
+            .defaultValue(DEFAULT_POOL_OK_SENSOR)
+            .build();
+
+    @SuppressWarnings("serial")
+    @SetFromFlag("maxSizeReachedSensor")
+    public static final ConfigKey<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>> MAX_SIZE_REACHED_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>>() {})
+            .name("autoscaler.maxSizeReachedSensor")
+            .description("Sensor for which a notification will be emitted (on the associated entity) when " +
+                    "we consistently wanted to resize the pool above the max allowed size, for " +
+                    "maxReachedNotificationDelay milliseconds")
+            .build();
+    
+    @SetFromFlag("maxReachedNotificationDelay")
+    public static final ConfigKey<Duration> MAX_REACHED_NOTIFICATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("autoscaler.maxReachedNotificationDelay")
+            .description("Time that we consistently wanted to go above the maxPoolSize for, after which the " +
+                    "maxSizeReachedSensor (if any) will be emitted")
+            .defaultValue(Duration.ZERO)
+            .build();
+    
+    private Entity poolEntity;
+    
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+    private volatile ScheduledExecutorService executor;
+
+    private SizeHistory recentUnboundedResizes;
+
+    private SizeHistory recentDesiredResizes;
+    
+    private long maxReachedLastNotifiedTime;
+    
+    private final SensorEventListener<Map> utilizationEventHandler = new SensorEventListener<Map>() {
+        public void onEvent(SensorEvent<Map> event) {
+            Map<String, ?> properties = (Map<String, ?>) event.getValue();
+            Sensor<?> sensor = event.getSensor();
+            
+            if (sensor.equals(getPoolColdSensor())) {
+                onPoolCold(properties);
+            } else if (sensor.equals(getPoolHotSensor())) {
+                onPoolHot(properties);
+            } else if (sensor.equals(getPoolOkSensor())) {
+                onPoolOk(properties);
+            } else {
+                throw new IllegalStateException("Unexpected sensor type: "+sensor+"; event="+event);
+            }
+        }
+    };
+
+    private final SensorEventListener<Number> metricEventHandler = new SensorEventListener<Number>() {
+        public void onEvent(SensorEvent<Number> event) {
+            assert event.getSensor().equals(getMetric());
+            onMetricChanged(event.getValue());
+        }
+    };
+
+    public AutoScalerPolicy() {
+        this(MutableMap.<String,Object>of());
+    }
+    
+    public AutoScalerPolicy(Map<String,?> props) {
+        super(props);
+    }
+
+    @Override
+    public void init() {
+        doInit();
+    }
+
+    @Override
+    public void rebind() {
+        doInit();
+    }
+    
+    protected void doInit() {
+        long maxReachedNotificationDelay = getMaxReachedNotificationDelay().toMilliseconds();
+        recentUnboundedResizes = new SizeHistory(maxReachedNotificationDelay);
+        
+        long maxResizeStabilizationDelay = Math.max(getResizeUpStabilizationDelay().toMilliseconds(), getResizeDownStabilizationDelay().toMilliseconds());
+        recentDesiredResizes = new SizeHistory(maxResizeStabilizationDelay);
+        
+        // TODO Should re-use the execution manager's thread pool, somehow
+        executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+    }
+
+    public void setMetricLowerBound(Number val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing metricLowerBound from {} to {}", new Object[] {this, getMetricLowerBound(), val});
+        config().set(METRIC_LOWER_BOUND, checkNotNull(val));
+    }
+    
+    public void setMetricUpperBound(Number val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing metricUpperBound from {} to {}", new Object[] {this, getMetricUpperBound(), val});
+        config().set(METRIC_UPPER_BOUND, checkNotNull(val));
+    }
+    
+    private <T> void setOrDefault(ConfigKey<T> key, T val) {
+        if (val==null) val = key.getDefaultValue();
+        config().set(key, val);
+    }
+    public int getResizeUpIterationIncrement() { return getConfig(RESIZE_UP_ITERATION_INCREMENT); }
+    public void setResizeUpIterationIncrement(Integer val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationIncrement from {} to {}", new Object[] {this, getResizeUpIterationIncrement(), val});
+        setOrDefault(RESIZE_UP_ITERATION_INCREMENT, val);
+    }
+    public int getResizeDownIterationIncrement() { return getConfig(RESIZE_DOWN_ITERATION_INCREMENT); }
+    public void setResizeDownIterationIncrement(Integer val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationIncrement from {} to {}", new Object[] {this, getResizeDownIterationIncrement(), val});
+        setOrDefault(RESIZE_DOWN_ITERATION_INCREMENT, val);
+    }
+    public int getResizeUpIterationMax() { return getConfig(RESIZE_UP_ITERATION_MAX); }
+    public void setResizeUpIterationMax(Integer val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationMax from {} to {}", new Object[] {this, getResizeUpIterationMax(), val});
+        setOrDefault(RESIZE_UP_ITERATION_MAX, val);
+    }
+    public int getResizeDownIterationMax() { return getConfig(RESIZE_DOWN_ITERATION_MAX); }
+    public void setResizeDownIterationMax(Integer val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationMax from {} to {}", new Object[] {this, getResizeDownIterationMax(), val});
+        setOrDefault(RESIZE_DOWN_ITERATION_MAX, val);
+    }
+
+    public void setMinPeriodBetweenExecs(Duration val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing minPeriodBetweenExecs from {} to {}", new Object[] {this, getMinPeriodBetweenExecs(), val});
+        config().set(MIN_PERIOD_BETWEEN_EXECS, val);
+    }
+
+    public void setResizeUpStabilizationDelay(Duration val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpStabilizationDelay from {} to {}", new Object[] {this, getResizeUpStabilizationDelay(), val});
+        config().set(RESIZE_UP_STABILIZATION_DELAY, val);
+    }
+    
+    public void setResizeDownStabilizationDelay(Duration val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownStabilizationDelay from {} to {}", new Object[] {this, getResizeDownStabilizationDelay(), val});
+        config().set(RESIZE_DOWN_STABILIZATION_DELAY, val);
+    }
+    
+    public void setMinPoolSize(int val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing minPoolSize from {} to {}", new Object[] {this, getMinPoolSize(), val});
+        config().set(MIN_POOL_SIZE, val);
+    }
+    
+    public void setMaxPoolSize(int val) {
+        if (LOG.isInfoEnabled()) LOG.info("{} changing maxPoolSize from {} to {}", new Object[] {this, getMaxPoolSize(), val});
+        config().set(MAX_POOL_SIZE, val);
+    }
+    
+    private AttributeSensor<? extends Number> getMetric() {
+        return getConfig(METRIC);
+    }
+
+    private Entity getEntityWithMetric() {
+        return getConfig(ENTITY_WITH_METRIC);
+    }
+    
+    private Number getMetricLowerBound() {
+        return getConfig(METRIC_LOWER_BOUND);
+    }
+    
+    private Number getMetricUpperBound() {
+        return getConfig(METRIC_UPPER_BOUND);
+    }
+    
+    private Duration getMinPeriodBetweenExecs() {
+        return getConfig(MIN_PERIOD_BETWEEN_EXECS);
+    }
+    
+    private Duration getResizeUpStabilizationDelay() {
+        return getConfig(RESIZE_UP_STABILIZATION_DELAY);
+    }
+    
+    private Duration getResizeDownStabilizationDelay() {
+        return getConfig(RESIZE_DOWN_STABILIZATION_DELAY);
+    }
+    
+    private int getMinPoolSize() {
+        return getConfig(MIN_POOL_SIZE);
+    }
+    
+    private int getMaxPoolSize() {
+        return getConfig(MAX_POOL_SIZE);
+    }
+    
+    private ResizeOperator getResizeOperator() {
+        return getConfig(RESIZE_OPERATOR);
+    }
+    
+    private Function<Entity,Integer> getCurrentSizeOperator() {
+        return getConfig(CURRENT_SIZE_OPERATOR);
+    }
+    
+    private BasicNotificationSensor<? extends Map> getPoolHotSensor() {
+        return getConfig(POOL_HOT_SENSOR);
+    }
+    
+    private BasicNotificationSensor<? extends Map> getPoolColdSensor() {
+        return getConfig(POOL_COLD_SENSOR);
+    }
+    
+    private BasicNotificationSensor<? extends Map> getPoolOkSensor() {
+        return getConfig(POOL_OK_SENSOR);
+    }
+    
+    private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> getMaxSizeReachedSensor() {
+        return getConfig(MAX_SIZE_REACHED_SENSOR);
+    }
+    
+    private Duration getMaxReachedNotificationDelay() {
+        return getConfig(MAX_REACHED_NOTIFICATION_DELAY);
+    }
+
+    @Override
+    protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+        if (key.equals(RESIZE_UP_STABILIZATION_DELAY)) {
+            Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeDownStabilizationDelay());
+            recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay);
+        } else if (key.equals(RESIZE_DOWN_STABILIZATION_DELAY)) {
+            Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeUpStabilizationDelay());
+            recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay);
+        } else if (key.equals(METRIC_LOWER_BOUND)) {
+            // TODO If recorded what last metric value was then we could recalculate immediately
+            // Rely on next metric-change to trigger recalculation; 
+            // and same for those below...
+        } else if (key.equals(METRIC_UPPER_BOUND)) {
+            // see above
+        } else if (key.equals(RESIZE_UP_ITERATION_INCREMENT) || key.equals(RESIZE_UP_ITERATION_MAX) || key.equals(RESIZE_DOWN_ITERATION_INCREMENT) || key.equals(RESIZE_DOWN_ITERATION_MAX)) {
+            // no special actions needed
+        } else if (key.equals(MIN_POOL_SIZE)) {
+            int newMin = (Integer) val;
+            if (newMin > getConfig(MAX_POOL_SIZE)) {
+                throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
+            }
+            onPoolSizeLimitsChanged(newMin, getConfig(MAX_POOL_SIZE));
+        } else if (key.equals(MAX_POOL_SIZE)) {
+            int newMax = (Integer) val;
+            if (newMax < getConfig(MIN_POOL_SIZE)) {
+                throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
+            }
+            onPoolSizeLimitsChanged(getConfig(MIN_POOL_SIZE), newMax);
+        } else {
+            throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this);
+        }
+    }
+
+    @Override
+    public void suspend() {
+        super.suspend();
+        // TODO unsubscribe from everything? And resubscribe on resume?
+        if (executor != null) executor.shutdownNow();
+    }
+    
+    @Override
+    public void resume() {
+        super.resume();
+        executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+    }
+    
+    @Override
+    public void setEntity(EntityLocal entity) {
+        if (!config().getRaw(RESIZE_OPERATOR).isPresentAndNonNull()) {
+            Preconditions.checkArgument(entity instanceof Resizable, "Provided entity "+entity+" must be an instance of Resizable, because no custom-resizer operator supplied");
+        }
+        super.setEntity(entity);
+        this.poolEntity = entity;
+        
+        if (getMetric() != null) {
+            Entity entityToSubscribeTo = (getEntityWithMetric() != null) ? getEntityWithMetric() : entity;
+            subscribe(entityToSubscribeTo, getMetric(), metricEventHandler);
+        }
+        subscribe(poolEntity, getPoolColdSensor(), utilizationEventHandler);
+        subscribe(poolEntity, getPoolHotSensor(), utilizationEventHandler);
+        subscribe(poolEntity, getPoolOkSensor(), utilizationEventHandler);
+    }
+    
+    private ThreadFactory newThreadFactory() {
+        return new ThreadFactoryBuilder()
+                .setNameFormat("brooklyn-autoscalerpolicy-%d")
+                .build();
+    }
+
+    /**
+     * Forces an immediate resize (without waiting for stabilization etc) if the current size is 
+     * not within the min and max limits. We schedule this so that all resize operations are done
+     * by the same thread.
+     */
+    private void onPoolSizeLimitsChanged(final int min, final int max) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} checking pool size on limits changed for {} (between {} and {})", new Object[] {this, poolEntity, min, max});
+        
+        if (isRunning() && isEntityUp()) {
+            executor.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        int currentSize = getCurrentSizeOperator().apply(entity);
+                        int desiredSize = Math.min(max, Math.max(min, currentSize));
+
+                        if (currentSize != desiredSize) {
+                            if (LOG.isInfoEnabled()) LOG.info("{} resizing pool {} immediateley from {} to {} (due to new pool size limits)", new Object[] {this, poolEntity, currentSize, desiredSize});
+                            getResizeOperator().resize(poolEntity, desiredSize);
+                        }
+                        
+                    } catch (Exception e) {
+                        if (isRunning()) {
+                            LOG.error("Error resizing: "+e, e);
+                        } else {
+                            if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e);
+                        }
+                    } catch (Throwable t) {
+                        LOG.error("Error resizing: "+t, t);
+                        throw Throwables.propagate(t);
+                    }
+                }});
+        }
+    }
+    
+    private enum ScalingType { HOT, COLD }
+    private static class ScalingData {
+        ScalingType scalingMode;
+        int currentSize;
+        double currentMetricValue;
+        Double metricUpperBound;
+        Double metricLowerBound;
+        
+        public double getCurrentTotalActivity() {
+            return currentMetricValue * currentSize;
+        }
+        
+        public boolean isHot() {
+            return ((scalingMode==null || scalingMode==ScalingType.HOT) && isValid(metricUpperBound) && currentMetricValue > metricUpperBound);
+        }
+        public boolean isCold() {
+            return ((scalingMode==null || scalingMode==ScalingType.COLD) && isValid(metricLowerBound) && currentMetricValue < metricLowerBound);
+        }
+        private boolean isValid(Double bound) {
+            return (bound!=null && bound>0);
+        }
+    }
+
+    private void onMetricChanged(Number val) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-metric for {}: {}", new Object[] {this, poolEntity, val});
+
+        if (val==null) {
+            // occurs e.g. if using an aggregating enricher who returns null when empty, the sensor has gone away
+            if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {}, inbound metric is null", new Object[] {this, poolEntity});
+            return;
+        }
+        
+        ScalingData data = new ScalingData();
+        data.currentMetricValue = val.doubleValue();
+        data.currentSize = getCurrentSizeOperator().apply(entity);
+        data.metricUpperBound = getMetricUpperBound().doubleValue();
+        data.metricLowerBound = getMetricLowerBound().doubleValue();
+        
+        analyze(data, "pool");
+    }
+    
+    private void onPoolCold(Map<String, ?> properties) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties});
+        analyzeOnHotOrColdSensor(ScalingType.COLD, "cold pool", properties);
+    }
+    
+    private void onPoolHot(Map<String, ?> properties) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties});
+        analyzeOnHotOrColdSensor(ScalingType.HOT, "hot pool", properties);
+    }
+    
+    private void analyzeOnHotOrColdSensor(ScalingType scalingMode, String description, Map<String, ?> properties) {
+        ScalingData data = new ScalingData();
+        data.scalingMode = scalingMode;
+        data.currentMetricValue = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY);
+        data.currentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
+        data.metricUpperBound = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY);
+        data.metricLowerBound = (Double) properties.get(POOL_LOW_THRESHOLD_KEY);
+        
+        analyze(data, description);   
+    }
+    
+    private void analyze(ScalingData data, String description) {
+        int desiredSizeUnconstrained;
+        
+        /* We always scale out (modulo stabilization delay) if:
+         *   currentTotalActivity > currentSize*metricUpperBound
+         * With newDesiredSize the smallest n such that   n*metricUpperBound >= currentTotalActivity
+         * ie  n >= currentTotalActiviy/metricUpperBound, thus n := Math.ceil(currentTotalActivity/metricUpperBound)
+         * 
+         * Else consider scale back if:
+         *   currentTotalActivity < currentSize*metricLowerBound
+         * With newDesiredSize normally the largest n such that:  
+         *   n*metricLowerBound <= currentTotalActivity
+         * BUT with an absolute requirement which trumps the above computation
+         * that the newDesiredSize doesn't cause immediate scale out:
+         *   n*metricUpperBound >= currentTotalActivity
+         * thus n := Math.max ( floor(currentTotalActiviy/metricLowerBound), ceil(currentTotal/metricUpperBound) )
+         */
+        if (data.isHot()) {
+            // scale out
+            desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound);
+            data.scalingMode = ScalingType.HOT;
+            
+        } else if (data.isCold()) {
+            // scale back
+            desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound);
+            data.scalingMode = ScalingType.COLD;
+            
+        } else {
+            if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound});
+            abortResize(data.currentSize);
+            return; // within the healthy range; no-op
+        }
+        
+        if (LOG.isTraceEnabled()) LOG.debug("{} detected unconstrained desired size {}", new Object[] {this, desiredSizeUnconstrained});
+        int desiredSize = applyMinMaxConstraints(desiredSizeUnconstrained);
+
+        if ((data.scalingMode==ScalingType.COLD) && (desiredSize < data.currentSize)) {
+
+            int delta = data.currentSize - desiredSize;
+            int scaleIncrement = getResizeDownIterationIncrement();
+            int scaleMax = getResizeDownIterationMax();
+            if (delta>scaleMax) {
+                delta=scaleMax;
+            } else if (delta % scaleIncrement != 0) {
+                // keep scaling to the increment
+                delta += scaleIncrement - (delta % scaleIncrement);
+            }
+            desiredSize = data.currentSize - delta;
+            
+            if (data.metricUpperBound!=null) {
+                // if upper bound supplied, check that this desired scale-back size 
+                // is not going to cause scale-out on next run; i.e. anti-thrashing
+                while (desiredSize < data.currentSize && data.getCurrentTotalActivity() > data.metricUpperBound * desiredSize) {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} when resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, data.currentSize, desiredSize });
+                    desiredSize += scaleIncrement;
+                }
+            }
+            desiredSize = applyMinMaxConstraints(desiredSize);
+            if (desiredSize >= data.currentSize) data.scalingMode = null;
+            
+        } else if ((data.scalingMode==ScalingType.HOT) && (desiredSize > data.currentSize)) {
+
+            int delta = desiredSize - data.currentSize;
+            int scaleIncrement = getResizeUpIterationIncrement();
+            int scaleMax = getResizeUpIterationMax();
+            if (delta>scaleMax) {
+                delta=scaleMax;
+            } else if (delta % scaleIncrement != 0) {
+                // keep scaling to the increment
+                delta += scaleIncrement - (delta % scaleIncrement);
+            }
+            desiredSize = data.currentSize + delta;
+            desiredSize = applyMinMaxConstraints(desiredSize);
+            if (desiredSize <= data.currentSize) data.scalingMode = null;
+
+        } else {
+            data.scalingMode = null;
+        }
+    
+        if (data.scalingMode!=null) {
+            if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained});
+            scheduleResize(desiredSize);
+        } else {
+            if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained});
+            abortResize(data.currentSize);
+            // but add to the unbounded record for future consideration
+        }
+        
+        onNewUnboundedPoolSize(desiredSizeUnconstrained);
+    }
+
+    private int applyMinMaxConstraints(int desiredSize) {
+        desiredSize = Math.max(getMinPoolSize(), desiredSize);
+        desiredSize = Math.min(getMaxPoolSize(), desiredSize);
+        return desiredSize;
+    }
+
+    private void onPoolOk(Map<String, ?> properties) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-ok for {}: {}", new Object[] {this, poolEntity, properties});
+        
+        int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
+        
+        if (LOG.isTraceEnabled()) LOG.trace("{} not resizing ok pool {} from {}", new Object[] {this, poolEntity, poolCurrentSize});
+        abortResize(poolCurrentSize);
+    }
+
+    /**
+     * Schedules a resize, if there is not already a resize operation queued up. When that resize
+     * executes, it will resize to whatever the latest value is to be (rather than what it was told
+     * to do at the point the job was queued).
+     */
+    private void scheduleResize(final int newSize) {
+        recentDesiredResizes.add(newSize);
+        
+        scheduleResize();
+    }
+
+    /**
+     * If a listener is registered to be notified of the max-pool-size cap being reached, then record
+     * what our unbounded size would be and schedule a check to see if this unbounded size is sustained.
+     * 
+     * Piggy-backs off the existing scheduleResize execution, which now also checks if the listener
+     * needs to be called.
+     */
+    private void onNewUnboundedPoolSize(final int val) {
+        if (getMaxSizeReachedSensor() != null) {
+            recentUnboundedResizes.add(val);
+            scheduleResize();
+        }
+    }
+    
+    private void abortResize(final int currentSize) {
+        recentDesiredResizes.add(currentSize);
+        recentUnboundedResizes.add(currentSize);
+    }
+
+    private boolean isEntityUp() {
+        if (entity == null) {
+            return false;
+        } else if (entity.getEntityType().getSensors().contains(Startable.SERVICE_UP)) {
+            return Boolean.TRUE.equals(entity.getAttribute(Startable.SERVICE_UP));
+        } else {
+            return true;
+        }
+    }
+
+    private void scheduleResize() {
+        // TODO Make scale-out calls concurrent, rather than waiting for first resize to entirely 
+        // finish. On ec2 for example, this can cause us to grow very slowly if first request is for
+        // just one new VM to be provisioned.
+        
+        if (isRunning() && isEntityUp() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            long delay = Math.max(0, (executorTime + getMinPeriodBetweenExecs().toMilliseconds()) - now);
+            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling resize in {}ms", this, delay);
+            
+            executor.schedule(new Runnable() {
+                @Override public void run() {
+                    try {
+                        executorTime = System.currentTimeMillis();
+                        executorQueued.set(false);
+
+                        resizeNow();
+                        notifyMaxReachedIfRequiredNow();
+                        
+                    } catch (Exception e) {
+                        if (isRunning()) {
+                            LOG.error("Error resizing: "+e, e);
+                        } else {
+                            if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e);
+                        }
+                    } catch (Throwable t) {
+                        LOG.error("Error resizing: "+t, t);
+                        throw Throwables.propagate(t);
+                    }
+                }},
+                delay,
+                TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Looks at the values for "unbounded pool size" (i.e. if we ignore caps of minSize and maxSize) to report what
+     * those values have been within a time window. The time window used is the "maxReachedNotificationDelay",
+     * which determines how many milliseconds after being consistently above the max-size will it take before
+     * we emit the sensor event (if any).
+     */
+    private void notifyMaxReachedIfRequiredNow() {
+        BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor = getMaxSizeReachedSensor();
+        if (maxSizeReachedSensor == null) {
+            return;
+        }
+        
+        WindowSummary valsSummary = recentUnboundedResizes.summarizeWindow(getMaxReachedNotificationDelay());
+        long timeWindowSize = getMaxReachedNotificationDelay().toMilliseconds();
+        long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+        int maxAllowedPoolSize = getMaxPoolSize();
+        long unboundedSustainedMaxPoolSize = valsSummary.min; // The sustained maximum (i.e. the smallest it's dropped down to)
+        long unboundedCurrentPoolSize = valsSummary.latest;
+        
+        if (maxReachedLastNotifiedTime > 0) {
+            // already notified the listener; don't do it again
+            // TODO Could have max period for notifications, or a step increment to warn when exceeded by ever bigger amounts
+            
+        } else if (unboundedSustainedMaxPoolSize > maxAllowedPoolSize) {
+            // We have consistently wanted to be bigger than the max allowed; tell the listener
+            if (LOG.isDebugEnabled()) LOG.debug("{} notifying listener of max pool size reached; current {}, max {}, unbounded current {}, unbounded max {}", 
+                    new Object[] {this, currentPoolSize, maxAllowedPoolSize, unboundedCurrentPoolSize, unboundedSustainedMaxPoolSize});
+            
+            maxReachedLastNotifiedTime = System.currentTimeMillis();
+            MaxPoolSizeReachedEvent event = MaxPoolSizeReachedEvent.builder()
+                    .currentPoolSize(currentPoolSize)
+                    .maxAllowed(maxAllowedPoolSize)
+                    .currentUnbounded(unboundedCurrentPoolSize)
+                    .maxUnbounded(unboundedSustainedMaxPoolSize)
+                    .timeWindow(timeWindowSize)
+                    .build();
+            entity.emit(maxSizeReachedSensor, event);
+            
+        } else if (valsSummary.max > maxAllowedPoolSize) {
+            // We temporarily wanted to be bigger than the max allowed; check back later to see if consistent
+            // TODO Could check if there has been anything bigger than "min" since min happened (would be more efficient)
+            if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling max-reached check for {}, as unbounded size not stable (min {}, max {}, latest {})", 
+                    new Object[] {this, poolEntity, valsSummary.min, valsSummary.max, valsSummary.latest});
+            scheduleResize();
+            
+        } else {
+            // nothing to write home about; continually below maxAllowed
+        }
+    }
+
+    private void resizeNow() {
+        long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+        CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
+        final long desiredPoolSize = calculatedDesiredPoolSize.size;
+        boolean stable = calculatedDesiredPoolSize.stable;
+        
+        if (!stable) {
+            // the desired size fluctuations are not stable; ensure we check again later (due to time-window)
+            // even if no additional events have been received
+            // (note we continue now with as "good" a resize as we can given the instability)
+            if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...", 
+                    new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+            scheduleResize();
+        }
+        if (currentPoolSize == desiredPoolSize) {
+            if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}", 
+                    new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+            return;
+        }
+        
+        if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}", 
+                new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
+        
+        Entities.submit(entity, Tasks.<Void>builder().name("Auto-scaler")
+            .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize)
+            .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+            .body(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    // TODO Should we use int throughout, rather than casting here?
+                    getResizeOperator().resize(poolEntity, (int) desiredPoolSize);
+                    return null;
+                }
+            }).build())
+            .blockUntilEnded();
+    }
+    
+    /**
+     * Complicated logic for stabilization-delay...
+     * Only grow if we have consistently been asked to grow for the resizeUpStabilizationDelay period;
+     * Only shrink if we have consistently been asked to shrink for the resizeDownStabilizationDelay period.
+     * 
+     * @return tuple of desired pool size, and whether this is "stable" (i.e. if we receive no more events 
+     *         will this continue to be the desired pool size)
+     */
+    private CalculatedDesiredPoolSize calculateDesiredPoolSize(long currentPoolSize) {
+        long now = System.currentTimeMillis();
+        WindowSummary downsizeSummary = recentDesiredResizes.summarizeWindow(getResizeDownStabilizationDelay());
+        WindowSummary upsizeSummary = recentDesiredResizes.summarizeWindow(getResizeUpStabilizationDelay());
+        
+        // this is the _sustained_ growth value; the smallest size that has been requested in the "stable-for-growing" period
+        long maxDesiredPoolSize = upsizeSummary.min;
+        boolean stableForGrowing = upsizeSummary.stableForGrowth;
+        
+        // this is the _sustained_ shrink value; largest size that has been requested in the "stable-for-shrinking" period:
+        long minDesiredPoolSize = downsizeSummary.max;
+        boolean stableForShrinking = downsizeSummary.stableForShrinking;
+        
+        // (it is a logical consequence of the above that minDesired >= maxDesired -- this is correct, if confusing:
+        // think of minDesired as the minimum size we are allowed to resize to, and similarly for maxDesired; 
+        // if min > max we can scale to max if current < max, or scale to min if current > min)
+
+        long desiredPoolSize;
+        
+        boolean stable;
+        
+        if (currentPoolSize < maxDesiredPoolSize) {
+            // we have valid request to grow 
+            // (we'll never have a valid request to grow and a valid to shrink simultaneously, btw)
+            desiredPoolSize = maxDesiredPoolSize;
+            stable = stableForGrowing;
+        } else if (currentPoolSize > minDesiredPoolSize) {
+            // we have valid request to shrink
+            desiredPoolSize = minDesiredPoolSize;
+            stable = stableForShrinking;
+        } else {
+            desiredPoolSize = currentPoolSize;
+            stable = stableForGrowing && stableForShrinking;
+        }
+
+        if (LOG.isTraceEnabled()) LOG.trace("{} calculated desired pool size: from {} to {}; minDesired {}, maxDesired {}; " +
+                "stable {}; now {}; downsizeHistory {}; upsizeHistory {}", 
+                new Object[] {this, currentPoolSize, desiredPoolSize, minDesiredPoolSize, maxDesiredPoolSize, stable, now, downsizeSummary, upsizeSummary});
+        
+        return new CalculatedDesiredPoolSize(desiredPoolSize, stable);
+    }
+    
+    private static class CalculatedDesiredPoolSize {
+        final long size;
+        final boolean stable;
+        
+        CalculatedDesiredPoolSize(long size, boolean stable) {
+            this.size = size;
+            this.stable = stable;
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
new file mode 100644
index 0000000..6e97771
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+
+public class MaxPoolSizeReachedEvent implements Serializable {
+    private static final long serialVersionUID = 1602627701360505190L;
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        protected long maxAllowed;
+        protected long currentPoolSize;
+        protected long currentUnbounded;
+        protected long maxUnbounded;
+        protected long timeWindow;
+        
+        public Builder maxAllowed(long val) {
+            this.maxAllowed = val; return this;
+        }
+
+        public Builder currentPoolSize(long val) {
+            this.currentPoolSize = val; return this;
+        }
+
+        public Builder currentUnbounded(long val) {
+            this.currentUnbounded = val; return this;
+        }
+
+        public Builder maxUnbounded(long val) {
+            this.maxUnbounded = val; return this;
+        }
+
+        public Builder timeWindow(long val) {
+            this.timeWindow = val; return this;
+        }
+        public MaxPoolSizeReachedEvent build() {
+            return new MaxPoolSizeReachedEvent(this);
+        }
+    }
+    
+    private final long maxAllowed;
+    private final long currentPoolSize;
+    private final long currentUnbounded;
+    private final long maxUnbounded;
+    private final long timeWindow;
+    
+    protected MaxPoolSizeReachedEvent(Builder builder) {
+        maxAllowed = builder.maxAllowed;
+        currentPoolSize = builder.currentPoolSize;
+        currentUnbounded = builder.currentUnbounded;
+        maxUnbounded = builder.maxUnbounded;
+        timeWindow = builder.timeWindow;
+    }
+    
+    public long getMaxAllowed() {
+        return maxAllowed;
+    }
+    
+    public long getCurrentPoolSize() {
+        return currentPoolSize;
+    }
+    
+    public long getCurrentUnbounded() {
+        return currentUnbounded;
+    }
+    
+    public long getMaxUnbounded() {
+        return maxUnbounded;
+    }
+    
+    public long getTimeWindow() {
+        return timeWindow;
+    }
+    
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("maxAllowed", maxAllowed).add("currentPoolSize", currentPoolSize)
+                .add("currentUnbounded", currentUnbounded).add("maxUnbounded", maxUnbounded)
+                .add("timeWindow", timeWindow).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
new file mode 100644
index 0000000..4f4fbb0
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+public interface ResizeOperator {
+
+    /**
+     * Resizes the given entity to the desired size, if possible.
+     * 
+     * @return the new size of the entity
+     */
+    public Integer resize(Entity entity, Integer desiredSize);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
new file mode 100644
index 0000000..0aa8801
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import java.util.List;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.TimeWindowedList;
+import brooklyn.util.collections.TimestampedValue;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+
+/**
+ * Using a {@link TimeWindowedList}, tracks the recent history of values to allow a summary of 
+ * those values to be obtained. 
+ *   
+ * @author aled
+ */
+public class SizeHistory {
+
+    public static class WindowSummary {
+        /** The most recent value (or -1 if there has been no value) */
+        public final long latest;
+        
+        /** The minimum vaule within the given time period */
+        public final long min;
+        
+        /** The maximum vaule within the given time period */
+        public final long max;
+        
+        /** true if, since that max value, there have not been any higher values */
+        public final boolean stableForGrowth;
+        
+        /** true if, since that low value, there have not been any lower values */
+        public final boolean stableForShrinking;
+        
+        public WindowSummary(long latest, long min, long max, boolean stableForGrowth, boolean stableForShrinking) {
+            this.latest = latest;
+            this.min = min;
+            this.max = max;
+            this.stableForGrowth = stableForGrowth;
+            this.stableForShrinking = stableForShrinking;
+        }
+        
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("latest", latest).add("min", min).add("max", max)
+                    .add("stableForGrowth", stableForGrowth).add("stableForShrinking", stableForShrinking).toString();
+        }
+    }
+    
+    private final TimeWindowedList<Number> recentDesiredResizes;
+    
+    public SizeHistory(long windowSize) {
+        recentDesiredResizes = new TimeWindowedList<Number>(MutableMap.of("timePeriod", windowSize, "minExpiredVals", 1));
+    }
+
+    public void add(final int val) {
+        recentDesiredResizes.add(val);
+    }
+
+    public void setWindowSize(Duration newWindowSize) {
+        recentDesiredResizes.setTimePeriod(newWindowSize);
+    }
+    
+    /**
+     * Summarises the history of values in this time window, with a few special things:
+     * <ul>
+     *   <li>If entire time-window is not covered by the given values, then min is Integer.MIN_VALUE and max is Integer.MAX_VALUE 
+     *   <li>If no values, then latest is -1
+     *   <li>If no recent values, then keeps last-seen value (no matter how old), to use that
+     *   <li>"stable for growth" means that since that max value, there have not been any higher values
+     *   <li>"stable for shrinking" means that since that low value, there have not been any lower values
+     * </ul>
+     */
+    public WindowSummary summarizeWindow(Duration windowSize) {
+        long now = System.currentTimeMillis();
+        List<TimestampedValue<Number>> windowVals = recentDesiredResizes.getValuesInWindow(now, windowSize);
+        
+        Number latestObj = latestInWindow(windowVals);
+        long latest = (latestObj == null) ? -1: latestObj.longValue();
+        long max = maxInWindow(windowVals, windowSize).longValue();
+        long min = minInWindow(windowVals, windowSize).longValue();
+        
+        // TODO Could do more sophisticated "stable" check; this is the easiest code - correct but not most efficient
+        // in terms of the caller having to schedule additional stability checks.
+        boolean stable = (min == max);
+        
+        return new WindowSummary(latest, min, max, stable, stable);
+    }
+    
+    /**
+     * If the entire time-window is not covered by the given values, then returns Integer.MAX_VALUE.
+     */
+    private <T extends Number> T maxInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
+        // TODO bad casting from Integer default result to T
+        long now = System.currentTimeMillis();
+        long epoch = now - timeWindow.toMilliseconds();
+        T result = null;
+        double resultAsDouble = Integer.MAX_VALUE;
+        for (TimestampedValue<T> val : vals) {
+            T valAsNum = val.getValue();
+            double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
+            if (result == null && val.getTimestamp() > epoch) {
+                result = withDefault(null, Integer.MAX_VALUE);
+                resultAsDouble = result.doubleValue();
+            }
+            if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) {
+                result = valAsNum;
+                resultAsDouble = valAsDouble;
+            }
+        }
+        return withDefault(result, Integer.MAX_VALUE);
+    }
+    
+    /**
+     * If the entire time-window is not covered by the given values, then returns Integer.MIN_VALUE
+     */
+    private <T extends Number> T minInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
+        long now = System.currentTimeMillis();
+        long epoch = now - timeWindow.toMilliseconds();
+        T result = null;
+        double resultAsDouble = Integer.MIN_VALUE;
+        for (TimestampedValue<T> val : vals) {
+            T valAsNum = val.getValue();
+            double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
+            if (result == null && val.getTimestamp() > epoch) {
+                result = withDefault(null, Integer.MIN_VALUE);
+                resultAsDouble = result.doubleValue();
+            }
+            if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) {
+                result = valAsNum;
+                resultAsDouble = valAsDouble;
+            }
+        }
+        return withDefault(result, Integer.MIN_VALUE);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T withDefault(T result, Integer defaultValue) {
+        return result!=null ? result : (T) defaultValue;
+    }
+    /**
+     * @return null if empty, or the most recent value
+     */
+    private <T extends Number> T latestInWindow(List<TimestampedValue<T>> vals) {
+        return vals.isEmpty() ? null : vals.get(vals.size()-1).getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
new file mode 100644
index 0000000..fef3d7f
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.location.basic.AbstractLocation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class DefaultFollowTheSunModel<ContainerType, ItemType> implements FollowTheSunModel<ContainerType, ItemType> {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultFollowTheSunModel.class);
+    
+    // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item 
+    private static final String NULL = "null-val";
+    private static final Location NULL_LOCATION = new AbstractLocation(newHashMap("name","null-location")) {};
+    
+    private final String name;
+    private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
+    private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
+    private final Map<ContainerType, Location> containerToLocation = new ConcurrentHashMap<ContainerType, Location>();
+    private final Map<ItemType, Location> itemToLocation = new ConcurrentHashMap<ItemType, Location>();
+    private final Map<ItemType, Map<? extends ItemType, Double>> itemUsage = new ConcurrentHashMap<ItemType, Map<? extends ItemType,Double>>();
+    private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
+
+    public DefaultFollowTheSunModel(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public Set<ItemType> getItems() {
+        return itemToContainer.keySet();
+    }
+    
+    @Override
+    public ContainerType getItemContainer(ItemType item) {
+        ContainerType result = itemToContainer.get(item);
+        return (isNull(result) ? null : result);
+    }
+    
+    @Override
+    public Location getItemLocation(ItemType item) {
+        Location result = itemToLocation.get(item);
+        return (isNull(result) ? null : result);
+    }
+    
+    @Override
+    public Location getContainerLocation(ContainerType container) {
+        Location result = containerToLocation.get(container);
+        return (isNull(result) ? null : result);
+    }
+    
+    // Provider methods.
+    
+    @Override public String getName() {
+        return name;
+    }
+    
+    // TODO: delete?
+    @Override public String getName(ItemType item) {
+        return item.toString();
+    }
+    
+    @Override public boolean isItemMoveable(ItemType item) {
+        // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
+        return hasItem(item) && !immovableItems.contains(item);
+    }
+    
+    @Override public boolean isItemAllowedIn(ItemType item, Location location) {
+        return true; // TODO?
+    }
+    
+    @Override public boolean hasActiveMigration(ItemType item) {
+        return false; // TODO?
+    }
+    
+    @Override
+    // FIXME Too expensive to compute; store in a different data structure?
+    public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation() {
+        Map<ItemType, Map<Location, Double>> result = new LinkedHashMap<ItemType, Map<Location,Double>>(getNumItems());
+        
+        for (Map.Entry<ItemType, Map<? extends ItemType, Double>> entry : itemUsage.entrySet()) {
+            ItemType targetItem = entry.getKey();
+            Map<? extends ItemType, Double> sources = entry.getValue();
+            if (sources.isEmpty()) continue; // no-one talking to us
+            
+            Map<Location, Double> targetUsageByLocation = new LinkedHashMap<Location, Double>();
+            result.put(targetItem, targetUsageByLocation);
+
+            for (Map.Entry<? extends ItemType, Double> entry2 : sources.entrySet()) {
+                ItemType sourceItem = entry2.getKey();
+                Location sourceLocation = getItemLocation(sourceItem);
+                double usageVal = (entry.getValue() != null) ? entry2.getValue() : 0d;
+                if (sourceLocation == null) continue; // don't know where to attribute this load; e.g. item may have just terminated
+                if (sourceItem.equals(targetItem)) continue; // ignore msgs to self
+                
+                Double usageValTotal = targetUsageByLocation.get(sourceLocation);
+                double newUsageValTotal = (usageValTotal != null ? usageValTotal : 0d) + usageVal;
+                targetUsageByLocation.put(sourceLocation, newUsageValTotal);
+            }
+        }
+        
+        return result;
+    }
+    
+    @Override
+    public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location) {
+        checkNotNull(location);
+        return getContainersInLocation(location);
+    }
+
+
+    // Mutators.
+    
+    @Override
+    public void onItemMoved(ItemType item, ContainerType newContainer) {
+        // idempotent, as may be called multiple times
+        Location newLocation = (newContainer != null) ? containerToLocation.get(newContainer) : null;
+        ContainerType newContainerNonNull = toNonNullContainer(newContainer);
+        Location newLocationNonNull = toNonNullLocation(newLocation);
+        ContainerType oldContainer = itemToContainer.put(item, newContainerNonNull);
+        Location oldLocation = itemToLocation.put(item, newLocationNonNull);
+    }
+    
+    @Override
+    public void onContainerAdded(ContainerType container, Location location) {
+        Location locationNonNull = toNonNullLocation(location);
+        containers.add(container);
+        containerToLocation.put(container, locationNonNull);
+        for (ItemType item : getItemsOnContainer(container)) {
+            itemToLocation.put(item, locationNonNull);
+        }
+    }
+    
+    @Override
+    public void onContainerRemoved(ContainerType container) {
+        containers.remove(container);
+        containerToLocation.remove(container);
+    }
+    
+    public void onContainerLocationUpdated(ContainerType container, Location location) {
+        if (!containers.contains(container)) {
+            // unknown container; probably just stopped? 
+            // If this overtook onContainerAdded, then assume we'll lookup the location and get it right in onContainerAdded
+            if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of location for unknown container {}, to {}", container, location);
+            return;
+        }
+        Location locationNonNull = toNonNullLocation(location);
+        containerToLocation.put(container, locationNonNull);
+        for (ItemType item : getItemsOnContainer(container)) {
+            itemToLocation.put(item, locationNonNull);
+        }
+    }
+
+    @Override
+    public void onItemAdded(ItemType item, ContainerType container, boolean immovable) {
+        // idempotent, as may be called multiple times
+        
+        if (immovable) {
+            immovableItems.add(item);
+        }
+        Location location = (container != null) ? containerToLocation.get(container) : null;
+        ContainerType containerNonNull = toNonNullContainer(container);
+        Location locationNonNull = toNonNullLocation(location);
+        ContainerType oldContainer = itemToContainer.put(item, containerNonNull);
+        Location oldLocation = itemToLocation.put(item, locationNonNull);
+    }
+    
+    @Override
+    public void onItemRemoved(ItemType item) {
+        itemToContainer.remove(item);
+        itemToLocation.remove(item);
+        itemUsage.remove(item);
+        immovableItems.remove(item);
+    }
+    
+    @Override
+    public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValue) {
+        if (hasItem(item)) {
+            itemUsage.put(item, newValue);
+        } else {
+            // Can happen when item removed - get notification of removal and workrate from group and item
+            // respectively, so can overtake each other
+            if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of usage for unknown item {}, to {}", item, newValue);
+        }
+    }
+    
+    
+    // Additional methods for tests.
+
+    /**
+     * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers. 
+     */
+    @VisibleForTesting
+    public String itemDistributionToString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        dumpItemDistribution(new PrintStream(baos));
+        return new String(baos.toByteArray());
+    }
+
+    @VisibleForTesting
+    public void dumpItemDistribution() {
+        dumpItemDistribution(System.out);
+    }
+    
+    @VisibleForTesting
+    public void dumpItemDistribution(PrintStream out) {
+        Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = getDirectSendsToItemByLocation();
+        
+        out.println("Follow-The-Sun dump: ");
+        for (Location location: getLocations()) {
+            out.println("\t"+"Location "+location);
+            for (ContainerType container : getContainersInLocation(location)) {
+                out.println("\t\t"+"Container "+container);
+                for (ItemType item : getItemsOnContainer(container)) {
+                    Map<Location, Double> inboundUsage = directSendsToItemByLocation.get(item);
+                    Map<? extends ItemType, Double> outboundUsage = itemUsage.get(item);
+                    double totalInboundByLocation = (inboundUsage != null) ? sum(inboundUsage.values()) : 0d;
+                    double totalInboundByActor = (outboundUsage != null) ? sum(outboundUsage.values()) : 0d;
+                    out.println("\t\t\t"+"Item "+item);
+                    out.println("\t\t\t\t"+"Inbound-by-location: "+totalInboundByLocation+": "+inboundUsage);
+                    out.println("\t\t\t\t"+"Inbound-by-actor: "+totalInboundByActor+": "+outboundUsage);
+                }
+            }
+        }
+        out.flush();
+    }
+    
+    private boolean hasItem(ItemType item) {
+        return itemToContainer.containsKey(item);
+    }
+    
+    private Set<Location> getLocations() {
+        return ImmutableSet.copyOf(containerToLocation.values());
+    }
+    
+    private Set<ContainerType> getContainersInLocation(Location location) {
+        Set<ContainerType> result = new LinkedHashSet<ContainerType>();
+        for (Map.Entry<ContainerType, Location> entry : containerToLocation.entrySet()) {
+            if (location.equals(entry.getValue())) {
+                result.add(entry.getKey());
+            }
+        }
+        return result;
+    }
+    
+    private Set<ItemType> getItemsOnContainer(ContainerType container) {
+        Set<ItemType> result = new LinkedHashSet<ItemType>();
+        for (Map.Entry<ItemType, ContainerType> entry : itemToContainer.entrySet()) {
+            if (container.equals(entry.getValue())) {
+                result.add(entry.getKey());
+            }
+        }
+        return result;
+    }
+    
+    private int getNumItems() {
+        return itemToContainer.size();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private ContainerType nullContainer() {
+        return (ContainerType) NULL; // relies on erasure
+    }
+    
+    private Location nullLocation() {
+        return NULL_LOCATION;
+    }
+    
+    private ContainerType toNonNullContainer(ContainerType val) {
+        return (val != null) ? val : nullContainer();
+    }
+    
+    private Location toNonNullLocation(Location val) {
+        return (val != null) ? val : nullLocation();
+    }
+    
+    private boolean isNull(Object val) {
+        return val == NULL || val == NULL_LOCATION;
+    }
+    
+    // TODO Move to utils; or stop AbstractLocation from removing things from the map!
+    public static <K,V> Map<K,V> newHashMap(K k, V v) {
+        Map<K,V> result = Maps.newLinkedHashMap();
+        result.put(k, v);
+        return result;
+    }
+    
+    public static double sum(Collection<? extends Number> values) {
+        double total = 0;
+        for (Number d : values) {
+            total += d.doubleValue();
+        }
+        return total;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
new file mode 100644
index 0000000..07c6ed0
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.location.Location;
+
+/**
+ * Captures the state of items, containers and locations for the purpose of moving items around
+ * to minimise latency. For consumption by a {@link FollowTheSunStrategy}.
+ */
+public interface FollowTheSunModel<ContainerType, ItemType> {
+
+    // Attributes of the pool.
+    public String getName();
+    
+    // Attributes of containers and items.
+    public String getName(ItemType item);
+    public Set<ItemType> getItems();
+    public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation();
+    public Location getItemLocation(ItemType item);
+    public ContainerType getItemContainer(ItemType item);
+    public Location getContainerLocation(ContainerType container);
+    public boolean hasActiveMigration(ItemType item);
+    public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location);
+    public boolean isItemMoveable(ItemType item);
+    public boolean isItemAllowedIn(ItemType item, Location location);
+    
+    // Mutators for keeping the model in-sync with the observed world
+    public void onContainerAdded(ContainerType container, Location location);
+    public void onContainerRemoved(ContainerType container);
+    public void onContainerLocationUpdated(ContainerType container, Location location);
+
+    public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable);
+    public void onItemRemoved(ItemType item);
+    public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValues);
+    public void onItemMoved(ItemType item, ContainerType newContainer);
+}


Mime
View raw message