brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [14/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming package policy
Date Tue, 18 Aug 2015 11:06:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
new file mode 100644
index 0000000..80feb4c
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceState;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.BasicNotificationSensor;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+/** 
+ * Emits {@link HASensors#ENTITY_FAILED} whenever the parent's default logic ({@link ComputeServiceState}) would detect a problem,
+ * and similarly {@link HASensors#ENTITY_RECOVERED} when recovered.
+ * <p>
+ * gives more control over suppressing {@link Lifecycle#ON_FIRE}, 
+ * for some period of time
+ * (or until another process manually sets {@link Attributes#SERVICE_STATE_ACTUAL} to {@value Lifecycle#ON_FIRE},
+ * which this enricher will not clear until all problems have gone away)
+ */
+//@Catalog(name="Service Failure Detector", description="HA policy for deteting failure of a service")
+public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceState {
+
+    // TODO Remove duplication between this and MemberFailureDetectionPolicy.
+    // The latter could be re-written to use this. Or could even be deprecated
+    // in favour of this.
+
+    public enum LastPublished {
+        NONE,
+        FAILED,
+        RECOVERED;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetector.class);
+
+    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+
+    public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = HASensors.ENTITY_FAILED;
+
+    @SetFromFlag("onlyReportIfPreviouslyUp")
+    public static final ConfigKey<Boolean> ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP = ConfigKeys.newBooleanConfigKey("onlyReportIfPreviouslyUp", 
+        "Prevents the policy from emitting ENTITY_FAILED if the entity fails on startup (ie has never been up)", true);
+    
+    public static final ConfigKey<Boolean> MONITOR_SERVICE_PROBLEMS = ConfigKeys.newBooleanConfigKey("monitorServiceProblems", 
+        "Whether to monitor service problems, and emit on failures there (if set to false, this monitors only service up)", true);
+
+    @SetFromFlag("serviceOnFireStabilizationDelay")
+    public static final ConfigKey<Duration> SERVICE_ON_FIRE_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("serviceOnFire.stabilizationDelay")
+            .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before concluding ON_FIRE")
+            .defaultValue(Duration.ZERO)
+            .build();
+
+    @SetFromFlag("entityFailedStabilizationDelay")
+    public static final ConfigKey<Duration> ENTITY_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("entityFailed.stabilizationDelay")
+            .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before emitting ENTITY_FAILED")
+            .defaultValue(Duration.ZERO)
+            .build();
+
+    @SetFromFlag("entityRecoveredStabilizationDelay")
+    public static final ConfigKey<Duration> ENTITY_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("entityRecovered.stabilizationDelay")
+            .description("For a failed entity, time period for which the service must be consistently up for (e.g. doesn't report up-down-up) before emitting ENTITY_RECOVERED")
+            .defaultValue(Duration.ZERO)
+            .build();
+
+    @SetFromFlag("entityFailedRepublishTime")
+    public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
+            .name("entityFailed.republishTime")
+            .description("Publish failed state periodically at the specified intervals, null to disable.")
+            .build();
+
+    protected Long firstUpTime;
+    
+    protected Long currentFailureStartTime = null;
+    protected Long currentRecoveryStartTime = null;
+    
+    protected Long publishEntityFailedTime = null;
+    protected Long publishEntityRecoveredTime = null;
+    protected Long setEntityOnFireTime = null;
+    
+    protected LastPublished lastPublished = LastPublished.NONE;
+
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+
+    /**
+     * TODO Really don't want this mutex!
+     * ServiceStateLogic.setExpectedState() will call into `onEvent(null)`, so could get concurrent calls.
+     * How to handle that? I don't think `ServiceStateLogic.setExpectedState` should be making the call, but
+     * presumably that is their to remove a race condition so it is set before method returns. Caller shouldn't
+     * rely on that though.
+     * e.g. see `ServiceFailureDetectorTest.testNotifiedOfFailureOnStateOnFire`, where we get two notifications.
+     */
+    private final Object mutex = new Object();
+    
+    public ServiceFailureDetector() {
+        this(new ConfigBag());
+    }
+    
+    public ServiceFailureDetector(Map<String,?> flags) {
+        this(new ConfigBag().putAll(flags));
+    }
+    
+    public ServiceFailureDetector(ConfigBag configBag) {
+        // TODO hierarchy should use ConfigBag, and not change flags
+        super(configBag.getAllConfigMutable());
+    }
+    
+    @Override
+    public void onEvent(SensorEvent<Object> event) {
+        if (firstUpTime==null) {
+            if (event!=null && Attributes.SERVICE_UP.equals(event.getSensor()) && Boolean.TRUE.equals(event.getValue())) {
+                firstUpTime = event.getTimestamp();
+            } else if (event == null && Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+                // If this enricher is registered after the entity is up, then we'll get a "synthetic" onEvent(null) 
+                firstUpTime = System.currentTimeMillis();
+            }
+        }
+        
+        super.onEvent(event);
+    }
+    
+    @Override
+    protected void setActualState(Lifecycle state) {
+        long now = System.currentTimeMillis();
+
+        synchronized (mutex) {
+            if (state==Lifecycle.ON_FIRE) {
+                if (lastPublished == LastPublished.FAILED) {
+                    if (currentRecoveryStartTime != null) {
+                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was recovering, now failing: {}", new Object[] {this, entity, getExplanation(state)});
+                        currentRecoveryStartTime = null;
+                        publishEntityRecoveredTime = null;
+                    } else {
+                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still failed: {}", new Object[] {this, entity, getExplanation(state)});
+                    }
+                } else {
+                    if (firstUpTime == null && getConfig(ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP)) {
+                        // suppress; won't publish
+                    } else if (currentFailureStartTime == null) {
+                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now failing: {}", new Object[] {this, entity, getExplanation(state)});
+                        currentFailureStartTime = now;
+                        publishEntityFailedTime = currentFailureStartTime + getConfig(ENTITY_FAILED_STABILIZATION_DELAY).toMilliseconds();
+                    } else {
+                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing failing: {}", new Object[] {this, entity, getExplanation(state)});
+                    }
+                }
+                if (setEntityOnFireTime == null) {
+                    setEntityOnFireTime = now + getConfig(SERVICE_ON_FIRE_STABILIZATION_DELAY).toMilliseconds();
+                }
+                currentRecoveryStartTime = null;
+                publishEntityRecoveredTime = null;
+                
+            } else if (state == Lifecycle.RUNNING) {
+                if (lastPublished == LastPublished.FAILED) {
+                    if (currentRecoveryStartTime == null) {
+                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now recovering: {}", new Object[] {this, entity, getExplanation(state)});
+                        currentRecoveryStartTime = now;
+                        publishEntityRecoveredTime = currentRecoveryStartTime + getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY).toMilliseconds();
+                    } else {
+                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing recovering: {}", new Object[] {this, entity, getExplanation(state)});
+                    }
+                } else {
+                    if (currentFailureStartTime != null) {
+                        if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was failing, now healthy: {}", new Object[] {this, entity, getExplanation(state)});
+                    } else {
+                        if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still healthy: {}", new Object[] {this, entity, getExplanation(state)});
+                    }
+                }
+                currentFailureStartTime = null;
+                publishEntityFailedTime = null;
+                setEntityOnFireTime = null;
+                
+            } else {
+                if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, in unconfirmed sate: {}", new Object[] {this, entity, getExplanation(state)});
+            }
+
+            long recomputeIn = Long.MAX_VALUE; // For whether to call recomputeAfterDelay
+            
+            if (publishEntityFailedTime != null) {
+                long delayBeforeCheck = publishEntityFailedTime - now;
+                if (delayBeforeCheck<=0) {
+                    if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}", 
+                            new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
+                    Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
+                    if (republishDelay == null) {
+                        publishEntityFailedTime = null;
+                    } else {
+                        publishEntityFailedTime = now + republishDelay.toMilliseconds();
+                        recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
+                    }
+                    lastPublished = LastPublished.FAILED;
+                    entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
+                } else {
+                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
+                }
+            } else if (publishEntityRecoveredTime != null) {
+                long delayBeforeCheck = publishEntityRecoveredTime - now;
+                if (delayBeforeCheck<=0) {
+                    if (LOG.isDebugEnabled()) LOG.debug("{} publishing recovered (state={}; currentRecoveryStartTime={}; now={}", 
+                            new Object[] {this, state, Time.makeDateString(currentRecoveryStartTime), Time.makeDateString(now)});
+                    publishEntityRecoveredTime = null;
+                    lastPublished = LastPublished.RECOVERED;
+                    entity.emit(HASensors.ENTITY_RECOVERED, new HASensors.FailureDescriptor(entity, null));
+                } else {
+                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
+                }
+            }
+            
+            if (setEntityOnFireTime != null) {
+                long delayBeforeCheck = setEntityOnFireTime - now;
+                if (delayBeforeCheck<=0) {
+                    if (LOG.isDebugEnabled()) LOG.debug("{} setting on-fire, now that deferred period has passed (state={})", 
+                            new Object[] {this, state});
+                    setEntityOnFireTime = null;
+                    super.setActualState(state);
+                } else {
+                    recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
+                }
+            } else {
+                super.setActualState(state);
+            }
+            
+            if (recomputeIn < Long.MAX_VALUE) {
+                recomputeAfterDelay(recomputeIn);
+            }
+        }
+    }
+
+    protected String getExplanation(Lifecycle state) {
+        Duration serviceFailedStabilizationDelay = getConfig(ENTITY_FAILED_STABILIZATION_DELAY);
+        Duration serviceRecoveredStabilizationDelay = getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY);
+
+        return String.format("location=%s; status=%s; lastPublished=%s; timeNow=%s; "+
+                    "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
+                entity.getLocations(), 
+                (state != null ? state : "<unreported>"),
+                lastPublished,
+                Time.makeDateString(System.currentTimeMillis()),
+                (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
+                (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
+    }
+    
+    private String getFailureDescription(long now) {
+        String description = null;
+        Map<String, Object> serviceProblems = entity.getAttribute(Attributes.SERVICE_PROBLEMS);
+        if (serviceProblems!=null && !serviceProblems.isEmpty()) {
+            Entry<String, Object> problem = serviceProblems.entrySet().iterator().next();
+            description = problem.getKey()+": "+problem.getValue();
+            if (serviceProblems.size()>1) {
+                description = serviceProblems.size()+" service problems, including "+description;
+            } else {
+                description = "service problem: "+description;
+            }
+        } else if (Boolean.FALSE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+            description = "service not up";
+        } else {
+            description = "service failure detected";
+        }
+        if (publishEntityFailedTime!=null && currentFailureStartTime!=null && publishEntityFailedTime > currentFailureStartTime)
+            description = " (stabilized for "+Duration.of(now - currentFailureStartTime, TimeUnit.MILLISECONDS)+")";
+        return description;
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected void recomputeAfterDelay(long delay) {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
+            
+            Runnable job = new Runnable() {
+                @Override public void run() {
+                    try {
+                        executorTime = System.currentTimeMillis();
+                        executorQueued.set(false);
+
+                        onEvent(null);
+                        
+                    } catch (Exception e) {
+                        if (isRunning()) {
+                            LOG.error("Error in enricher "+this+": "+e, e);
+                        } else {
+                            if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e);
+                        }
+                    } catch (Throwable t) {
+                        LOG.error("Error in enricher "+this+": "+t, t);
+                        throw Exceptions.propagate(t);
+                    }
+                }
+            };
+            
+            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
+            ((EntityInternal)entity).getExecutionContext().submit(task);
+        }
+    }
+    
+    private String getTimeStringSince(Long time) {
+        return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
new file mode 100644
index 0000000..d6989d9
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
+import brooklyn.entity.group.StopFailedRuntimeException;
+import brooklyn.entity.trait.MemberReplaceable;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+
+/** attaches to a DynamicCluster and replaces a failed member in response to HASensors.ENTITY_FAILED or other sensor;
+ * if this fails, it sets the Cluster state to on-fire */
+@Catalog(name="Service Replacer", description="HA policy for replacing a failed member of a group")
+public class ServiceReplacer extends AbstractPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceReplacer.class);
+
+    // TODO if there are multiple failures perhaps we should abort quickly
+    
+    public static final BasicNotificationSensor<FailureDescriptor> ENTITY_REPLACEMENT_FAILED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.entityFailed.replacement", "Indicates that an entity replacement attempt has failed");
+
+    @SetFromFlag("setOnFireOnFailure")
+    public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
+    
+    /** monitors this sensor, by default ENTITY_RESTART_FAILED */
+    @SetFromFlag("failureSensorToMonitor")
+    @SuppressWarnings("rawtypes")
+    public static final ConfigKey<Sensor> FAILURE_SENSOR_TO_MONITOR = new BasicConfigKey<Sensor>(Sensor.class, "failureSensorToMonitor", "", ServiceRestarter.ENTITY_RESTART_FAILED); 
+
+    /** skips replace if replacement has failed this many times failure re-occurs within this time interval */
+    @SetFromFlag("failOnRecurringFailuresInThisDuration")
+    public static final ConfigKey<Long> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newLongConfigKey(
+            "failOnRecurringFailuresInThisDuration", 
+            "abandon replace if replacement has failed many times within this time interval",
+            5*60*1000L);
+
+    /** skips replace if replacement has failed this many times failure re-occurs within this time interval */
+    @SetFromFlag("failOnNumRecurringFailures")
+    public static final ConfigKey<Integer> FAIL_ON_NUM_RECURRING_FAILURES = ConfigKeys.newIntegerConfigKey(
+            "failOnNumRecurringFailures", 
+            "abandon replace if replacement has failed this many times (100% of attempts) within the time interval",
+            5);
+
+    @SetFromFlag("ticker")
+    public static final ConfigKey<Ticker> TICKER = ConfigKeys.newConfigKey(Ticker.class,
+            "ticker", 
+            "A time source (defaults to system-clock, which is almost certainly what's wanted, except in tests)",
+            null);
+
+    protected final List<Long> consecutiveReplacementFailureTimes = Lists.newCopyOnWriteArrayList();
+    
+    public ServiceReplacer() {
+        this(new ConfigBag());
+    }
+    
+    public ServiceReplacer(Map<String,?> flags) {
+        this(new ConfigBag().putAll(flags));
+    }
+    
+    public ServiceReplacer(ConfigBag configBag) {
+        // TODO hierarchy should use ConfigBag, and not change flags
+        super(configBag.getAllConfigMutable());
+    }
+    
+    public ServiceReplacer(Sensor<?> failureSensorToMonitor) {
+        this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor));
+    }
+
+    @Override
+    public void setEntity(final EntityLocal entity) {
+        checkArgument(entity instanceof MemberReplaceable, "ServiceReplacer must take a MemberReplaceable, not %s", entity);
+        Sensor<?> failureSensorToMonitor = checkNotNull(getConfig(FAILURE_SENSOR_TO_MONITOR), "failureSensorToMonitor");
+        
+        super.setEntity(entity);
+
+        subscribeToMembers((Group)entity, failureSensorToMonitor, new SensorEventListener<Object>() {
+                @Override public void onEvent(final SensorEvent<Object> event) {
+                    // Must execute in another thread - if we called entity.replaceMember in the event-listener's thread
+                    // then we'd block all other events being delivered to this entity's other subscribers.
+                    // Relies on synchronization of `onDetectedFailure`.
+                    // See same pattern used in ServiceRestarter.
+                    
+                    // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
+                    // events being received in rapid succession, and onDetectedFailure being executed out-of-order
+                    // for them; or could write events to a blocking queue and have onDetectedFailure read from that.
+                    
+                    if (isRunning()) {
+                        LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
+                        ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
+                            @Override public void run() {
+                                onDetectedFailure(event);
+                            }});
+                    } else {
+                        LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
+                    }
+                }
+            });
+    }
+    
+    // TODO semaphores would be better to allow at-most-one-blocking behaviour
+    protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
+        final Entity failedEntity = event.getSource();
+        final Object reason = event.getValue();
+        
+        if (isSuspended()) {
+            LOG.warn("ServiceReplacer suspended, so not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
+            return;
+        }
+
+        if (isRepeatedlyFailingTooMuch()) {
+            LOG.error("ServiceReplacer not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+"), because too many recent replacement failures");
+            return;
+        }
+        
+        LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
+        ((EntityInternal)entity).getManagementSupport().getExecutionContext().submit(MutableMap.of(), new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
+                    consecutiveReplacementFailureTimes.clear();
+                } catch (Exception e) {
+                    if (Exceptions.getFirstThrowableOfType(e, StopFailedRuntimeException.class) != null) {
+                        LOG.info("ServiceReplacer: ignoring error reported from stopping failed node "+failedEntity);
+                        return;
+                    }
+                    onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
+                }
+            }
+        });
+    }
+
+    private boolean isRepeatedlyFailingTooMuch() {
+        Integer failOnNumRecurringFailures = getConfig(FAIL_ON_NUM_RECURRING_FAILURES);
+        long failOnRecurringFailuresInThisDuration = getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION);
+        long oldestPermitted = currentTimeMillis() - failOnRecurringFailuresInThisDuration;
+        
+        // trim old ones
+        for (Iterator<Long> iter = consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) {
+            Long timestamp = iter.next();
+            if (timestamp < oldestPermitted) {
+                iter.remove();
+            } else {
+                break;
+            }
+        }
+        
+        return (consecutiveReplacementFailureTimes.size() >= failOnNumRecurringFailures);
+    }
+
+    protected long currentTimeMillis() {
+        Ticker ticker = getConfig(TICKER);
+        return (ticker == null) ? System.currentTimeMillis() : TimeUnit.NANOSECONDS.toMillis(ticker.read());
+    }
+    
+    protected void onReplacementFailed(String msg) {
+        LOG.warn("ServiceReplacer failed for "+entity+": "+msg);
+        consecutiveReplacementFailureTimes.add(currentTimeMillis());
+        
+        if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
+            ServiceProblemsLogic.updateProblemsIndicator(entity, "ServiceReplacer", "replacement failed: "+msg);
+        }
+        entity.emit(ENTITY_REPLACEMENT_FAILED, new FailureDescriptor(entity, msg));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
new file mode 100644
index 0000000..82f8664
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Preconditions;
+
+/** attaches to a SoftwareProcess (or anything Startable, emitting ENTITY_FAILED or other configurable sensor),
+ * and invokes restart on failure; 
+ * if there is a subsequent failure within a configurable time interval, or if the restart fails,
+ * this gives up and emits {@link #ENTITY_RESTART_FAILED} 
+ */
+@Catalog(name="Service Restarter", description="HA policy for restarting a service automatically, "
+        + "and for emitting an events if the service repeatedly fails")
+public class ServiceRestarter extends AbstractPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceRestarter.class);
+
+    public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RESTART_FAILED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.entityFailed.restart", "Indicates that an entity restart attempt has failed");
+
+    /** skips retry if a failure re-occurs within this time interval */
+    @SetFromFlag("failOnRecurringFailuresInThisDuration")
+    public static final ConfigKey<Duration> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newConfigKey(
+            Duration.class, 
+            "failOnRecurringFailuresInThisDuration", 
+            "Reports entity as failed if it fails two or more times in this time window", 
+            Duration.minutes(3));
+
+    @SetFromFlag("setOnFireOnFailure")
+    public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
+
+    /** monitors this sensor, by default ENTITY_FAILED */
+    @SetFromFlag("failureSensorToMonitor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> FAILURE_SENSOR_TO_MONITOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, "failureSensorToMonitor", "", HASensors.ENTITY_FAILED); 
+    
+    protected final AtomicReference<Long> lastFailureTime = new AtomicReference<Long>();
+
+    public ServiceRestarter() {
+        this(new ConfigBag());
+    }
+    
+    public ServiceRestarter(Map<String,?> flags) {
+        this(new ConfigBag().putAll(flags));
+    }
+    
+    public ServiceRestarter(ConfigBag configBag) {
+        // TODO hierarchy should use ConfigBag, and not change flags
+        super(configBag.getAllConfigMutable());
+        uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName();
+    }
+    
+    public ServiceRestarter(Sensor<?> failureSensorToMonitor) {
+        this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor));
+    }
+
+    @Override
+    public void setEntity(final EntityLocal entity) {
+        Preconditions.checkArgument(entity instanceof Startable, "Restarter must take a Startable, not "+entity);
+        
+        super.setEntity(entity);
+        
+        subscribe(entity, getConfig(FAILURE_SENSOR_TO_MONITOR), new SensorEventListener<Object>() {
+                @Override public void onEvent(final SensorEvent<Object> event) {
+                    // Must execute in another thread - if we called entity.restart in the event-listener's thread
+                    // then we'd block all other events being delivered to this entity's other subscribers.
+                    // Relies on synchronization of `onDetectedFailure`.
+                    // See same pattern used in ServiceReplacer.
+
+                    // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
+                    // events being received in rapid succession, and onDetectedFailure being executed out-of-order
+                    // for them; or could write events to a blocking queue and have onDetectedFailure read from that.
+                    
+                    if (isRunning()) {
+                        LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
+                        ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
+                            @Override public void run() {
+                                onDetectedFailure(event);
+                            }});
+                    } else {
+                        LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
+                    }
+                }
+            });
+    }
+    
+    // TODO semaphores would be better to allow at-most-one-blocking behaviour
+    // FIXME as this is called in message-dispatch (single threaded) we should do most of this in a new submitted task
+    // (as has been done in ServiceReplacer)
+    protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
+        if (isSuspended()) {
+            LOG.warn("ServiceRestarter suspended, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
+            return;
+        }
+
+        LOG.warn("ServiceRestarter acting on failure detected at "+entity+" ("+event.getValue()+")");
+        long current = System.currentTimeMillis();
+        Long last = lastFailureTime.getAndSet(current);
+        long elapsed = last==null ? -1 : current-last;
+        if (elapsed>=0 && elapsed <= getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION).toMilliseconds()) {
+            onRestartFailed("Restart failure (failed again after "+Time.makeTimeStringRounded(elapsed)+") at "+entity+": "+event.getValue());
+            return;
+        }
+        try {
+            ServiceStateLogic.setExpectedState(entity, Lifecycle.STARTING);
+            Entities.invokeEffector(entity, entity, Startable.RESTART).get();
+        } catch (Exception e) {
+            onRestartFailed("Restart failure (error "+e+") at "+entity+": "+event.getValue());
+        }
+    }
+
+    protected void onRestartFailed(String msg) {
+        LOG.warn("ServiceRestarter failed for "+entity+": "+msg);
+        if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
+            ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
+        }
+        entity.emit(ENTITY_RESTART_FAILED, new FailureDescriptor(entity, msg));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/SshMachineFailureDetector.java
new file mode 100644
index 0000000..e679309
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.core.util.internal.ssh.SshTool;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.location.basic.Machines;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, "
+        + "emitting an event if the connection is lost/restored")
+public class SshMachineFailureDetector extends AbstractFailureDetector {
+    private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
+    public static final String DEFAULT_UNIQUE_TAG = "failureDetector.sshMachine.tag";
+
+    public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
+
+    public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
+
+    public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey(
+            "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS);
+
+    @Override
+    public void init() {
+        super.init();
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
+        }
+        if (config().getRaw(POLL_PERIOD).isAbsent()) {
+            config().set(POLL_PERIOD, Duration.ONE_MINUTE);
+        }
+        uniqueTag = DEFAULT_UNIQUE_TAG;
+    }
+
+    @Override
+    protected CalculatedStatus calculateStatus() {
+        Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations());
+        if (sshMachineOption.isPresent()) {
+            SshMachineLocation sshMachine = sshMachineOption.get();
+            try {
+                Duration timeout = config().get(CONNECT_TIMEOUT);
+                Map<String, ?> flags = ImmutableMap.of(
+                        SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(),
+                        SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(),
+                        SshTool.PROP_SSH_TRIES.getName(), 1);
+                int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
+                return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString());
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null;
+                if (isFirstFailure) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed connecting to machine " + sshMachine, e);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Failed connecting to machine " + sshMachine, e);
+                    }
+                }
+                return new BasicCalculatedStatus(false, e.getMessage());
+            }
+        } else {
+            return new BasicCalculatedStatus(true, "no machine started, not complaining");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableContainer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableContainer.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableContainer.java
new file mode 100644
index 0000000..451d150
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableContainer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicNotificationSensor;
+import brooklyn.util.collections.QuorumCheck;
+import brooklyn.util.collections.QuorumCheck.QuorumChecks;
+
+/**
+ * Contains worker items that can be moved between this container and others to effect load balancing.
+ * Membership of a balanceable container does not imply a parent-child relationship in the Brooklyn
+ * management sense.
+ */
+public interface BalanceableContainer<ItemType extends Movable> extends Entity, AbstractGroup {
+    
+    public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>(
+            Entity.class, "balanceablecontainer.item.added", "Movable item added to balanceable container");
+    public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>(
+            Entity.class, "balanceablecontainer.item.removed", "Movable item removed from balanceable container");
+    
+    public static final ConfigKey<QuorumCheck> UP_QUORUM_CHECK = ConfigKeys.newConfigKeyWithDefault(AbstractGroup.UP_QUORUM_CHECK, 
+        "Up check from members; default one for container overrides usual check to always return true, "
+        + "i.e. not block service up simply because the container is empty or something in the container has failed",
+        QuorumChecks.alwaysTrue());
+
+    public Set<ItemType> getBalanceableItems();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceablePoolModel.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
new file mode 100644
index 0000000..72793b1
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.location.Location;
+
+/**
+ * Captures the state of a balanceable cluster of containers and all their constituent items, including workrates,
+ * for consumption by a {@link BalancingStrategy}.
+ */
+public interface BalanceablePoolModel<ContainerType, ItemType> {
+    
+    // Attributes of the pool.
+    public String getName();
+    public int getPoolSize();
+    public Set<ContainerType> getPoolContents();
+    public double getPoolLowThreshold();
+    public double getPoolHighThreshold();
+    public double getCurrentPoolWorkrate();
+    public boolean isHot();
+    public boolean isCold();
+    
+    
+    // Attributes of containers and items.
+    public String getName(ContainerType container);
+    public Location getLocation(ContainerType container);
+    public double getLowThreshold(ContainerType container); // -1 for not known / invalid
+    public double getHighThreshold(ContainerType container); // -1 for not known / invalid
+    public double getTotalWorkrate(ContainerType container); // -1 for not known / invalid
+    public Map<ContainerType, Double> getContainerWorkrates(); // contains -1 for items which are unknown
+    /** contains -1 instead of actual item workrate, for items which cannot be moved */
+    // @Nullable("null if the node is prevented from reporting and/or being adjusted, or has no data yet")
+    public Map<ItemType, Double> getItemWorkrates(ContainerType container);
+    public boolean isItemMoveable(ItemType item);
+    public boolean isItemAllowedIn(ItemType item, Location location);
+    
+    // Mutators for keeping the model in-sync with the observed world
+    public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold);
+    public void onContainerRemoved(ContainerType oldContainer);
+    public void onItemAdded(ItemType item, ContainerType parentContainer);
+    public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable);
+    public void onItemRemoved(ItemType item);
+    public void onItemWorkrateUpdated(ItemType item, double newValue);
+    public void onItemMoved(ItemType item, ContainerType targetContainer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
new file mode 100644
index 0000000..d600bc4
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import brooklyn.entity.trait.Resizable;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+/**
+ * Represents an elastic group of "container" entities, each of which is capable of hosting "item" entities that perform
+ * work and consume the container's available resources (e.g. CPU or bandwidth). Auto-scaling and load-balancing policies can
+ * be attached to this pool to provide dynamic elasticity based on workrates reported by the individual item entities.
+ * <p>
+ * The containers must be "up" in order to receive work, thus they must NOT follow the default enricher pattern
+ * for groups which says that the group must be up to receive work.
+ */
+@ImplementedBy(BalanceableWorkerPoolImpl.class)
+public interface BalanceableWorkerPool extends Entity, Resizable {
+
+    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
+    // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
+    
+    /** Encapsulates an item and a container; emitted for {@code ITEM_ADDED}, {@code ITEM_REMOVED} and
+     * {@code ITEM_MOVED} sensors.
+     */
+    public static class ContainerItemPair implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public final BalanceableContainer<?> container;
+        public final Entity item;
+        
+        public ContainerItemPair(BalanceableContainer<?> container, Entity item) {
+            this.container = container;
+            this.item = checkNotNull(item);
+        }
+        
+        @Override
+        public String toString() {
+            return ""+item+" @ "+container;
+        }
+    }
+    
+    // Pool constituent notifications.
+    public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>(
+        Entity.class, "balanceablepool.container.added", "Container added to balanceable pool");
+    public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>(
+        Entity.class, "balanceablepool.container.removed", "Container removed from balanceable pool");
+    public static BasicNotificationSensor<ContainerItemPair> ITEM_ADDED = new BasicNotificationSensor<ContainerItemPair>(
+        ContainerItemPair.class, "balanceablepool.item.added", "Item added to balanceable pool");
+    public static BasicNotificationSensor<ContainerItemPair> ITEM_REMOVED = new BasicNotificationSensor<ContainerItemPair>(
+        ContainerItemPair.class, "balanceablepool.item.removed", "Item removed from balanceable pool");
+    public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>(
+        ContainerItemPair.class, "balanceablepool.item.moved", "Item moved in balanceable pool to the given container");
+    
+    public void setResizable(Resizable resizable);
+    
+    public void setContents(Group containerGroup, Group itemGroup);
+    
+    public Group getContainerGroup();
+    
+    public Group getItemGroup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
new file mode 100644
index 0000000..888b957
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+
+/**
+ * @see BalanceableWorkerPool
+ */
+public class BalanceableWorkerPoolImpl extends AbstractEntity implements BalanceableWorkerPool {
+
+    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
+    // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
+    
+    private static final Logger LOG = LoggerFactory.getLogger(BalanceableWorkerPool.class);
+    
+    private Group containerGroup;
+    private Group itemGroup;
+    private Resizable resizable;
+    
+    private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>());
+    private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>());
+    
+    private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
+        @Override
+        public void onEvent(SensorEvent<Object> event) {
+            if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", BalanceableWorkerPoolImpl.this, event);
+            Entity source = event.getSource();
+            Object value = event.getValue();
+            Sensor<?> sensor = event.getSensor();
+            
+            if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
+                if (source.equals(containerGroup)) {
+                    onContainerAdded((BalanceableContainer<?>) value);
+                } else if (source.equals(itemGroup)) {
+                    onItemAdded((Entity)value);
+                } else {
+                    throw new IllegalStateException("unexpected event source="+source);
+                }
+            } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
+                if (source.equals(containerGroup)) {
+                    onContainerRemoved((BalanceableContainer<?>) value);
+                } else if (source.equals(itemGroup)) {
+                    onItemRemoved((Entity) value);
+                } else {
+                    throw new IllegalStateException("unexpected event source="+source);
+                }
+            } else if (sensor.equals(Startable.SERVICE_UP)) {
+                // TODO What if start has failed? Is there a sensor to indicate that?
+                if ((Boolean)value) {
+                    onContainerUp((BalanceableContainer<?>) source);
+                } else {
+                    onContainerDown((BalanceableContainer<?>) source);
+                }
+            } else if (sensor.equals(Movable.CONTAINER)) {
+                onItemMoved(source, (BalanceableContainer<?>) value);
+            } else {
+                throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
+            }
+        }
+    };
+    
+    public BalanceableWorkerPoolImpl() {
+    }
+
+    @Override
+    public void setResizable(Resizable resizable) {
+        this.resizable = resizable;
+    }
+    
+    @Override
+    public void setContents(Group containerGroup, Group itemGroup) {
+        this.containerGroup = containerGroup;
+        this.itemGroup = itemGroup;
+        if (resizable == null && containerGroup instanceof Resizable) resizable = (Resizable) containerGroup;
+        
+        subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+        subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+        subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+        subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+        
+        // Process extant containers and items
+        for (Entity existingContainer : containerGroup.getMembers()) {
+            onContainerAdded((BalanceableContainer<?>)existingContainer);
+        }
+        for (Entity existingItem : itemGroup.getMembers()) {
+            onItemAdded(existingItem);
+        }
+    }
+    
+    @Override
+    public Group getContainerGroup() {
+        return containerGroup;
+    }
+    
+    @Override
+    public Group getItemGroup() {
+        return itemGroup;
+    }
+
+    @Override
+    public Integer getCurrentSize() {
+        return containerGroup.getCurrentSize();
+    }
+    
+    @Override
+    public Integer resize(Integer desiredSize) {
+        if (resizable != null) return resizable.resize(desiredSize);
+        
+        throw new UnsupportedOperationException("Container group is not resizable, and no resizable supplied: "+containerGroup+" of type "+(containerGroup != null ? containerGroup.getClass().getCanonicalName() : null));
+    }
+    
+    private void onContainerAdded(BalanceableContainer<?> newContainer) {
+        subscribe(newContainer, Startable.SERVICE_UP, eventHandler);
+        if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) {
+            onContainerUp(newContainer);
+        }
+    }
+    
+    private void onContainerUp(BalanceableContainer<?> newContainer) {
+        if (containers.add(newContainer)) {
+            emit(CONTAINER_ADDED, newContainer);
+        }
+    }
+    
+    private void onContainerDown(BalanceableContainer<?> oldContainer) {
+        if (containers.remove(oldContainer)) {
+            emit(CONTAINER_REMOVED, oldContainer);
+        }
+    }
+    
+    private void onContainerRemoved(BalanceableContainer<?> oldContainer) {
+        unsubscribe(oldContainer);
+        onContainerDown(oldContainer);
+    }
+    
+    private void onItemAdded(Entity item) {
+        if (items.add(item)) {
+            subscribe(item, Movable.CONTAINER, eventHandler);
+            emit(ITEM_ADDED, new ContainerItemPair(item.getAttribute(Movable.CONTAINER), item));
+        }
+    }
+    
+    private void onItemRemoved(Entity item) {
+        if (items.remove(item)) {
+            unsubscribe(item);
+            emit(ITEM_REMOVED, new ContainerItemPair(null, item));
+        }
+    }
+    
+    private void onItemMoved(Entity item, BalanceableContainer<?> container) {
+        emit(ITEM_MOVED, new ContainerItemPair(container, item));
+    }
+}


Mime
View raw message