brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [20/29] git commit: refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration, with DependentConfiguration now relying on an internal class and: * checking for unmanaged (and now this is the defau
Date Mon, 03 Nov 2014 15:52:04 GMT
refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration,
with DependentConfiguration now relying on an internal class and:
* checking for unmanaged (and now this is the default)
* supporting a timeout
* ensuring that all values put into the subscription queue are read (previously it could miss a value if subscriptions were updated twice in succession)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/14f17bc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/14f17bc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/14f17bc9

Branch: refs/heads/master
Commit: 14f17bc94d76693eadafe7d33a34e4c5fad06ff9
Parents: 7f09a80
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Tue Oct 28 00:42:36 2014 -0700
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Fri Oct 31 09:39:50 2014 -0500

----------------------------------------------------------------------
 .../java/brooklyn/entity/basic/EntityTasks.java |  72 +--
 .../event/basic/DependentConfiguration.java     | 468 +++++++++++++++----
 .../src/main/java/brooklyn/util/task/Tasks.java |  28 +-
 .../entity/basic/EntityPredicatesTest.java      |   4 +-
 .../test/java/brooklyn/util/task/TasksTest.java |  29 ++
 .../BrooklynClusterUpgradeEffectorBody.java     |  16 +-
 .../BrooklynNodeUpgradeEffectorBody.java        |   2 +-
 .../util/collections/CollectionFunctionals.java |   2 +-
 .../util/exceptions/NotManagedException.java    |  36 ++
 .../util/exceptions/TimeoutException.java       |  36 ++
 .../java/brooklyn/util/repeat/Repeater.java     |   4 +
 11 files changed, 547 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
index 37114c5..99c5ca3 100644
--- a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
+++ b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
@@ -20,44 +20,62 @@ package brooklyn.entity.basic;
 
 import brooklyn.entity.Entity;
 import brooklyn.event.AttributeSensor;
-import brooklyn.management.TaskAdaptable;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.management.Task;
 import brooklyn.util.collections.CollectionFunctionals;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
 
 import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicates;
 
 /** Generally useful tasks related to entities */
 public class EntityTasks {
 
     /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate,
-     * with an optional timeout */
-    public static <T> TaskAdaptable<Boolean> awaitingAttribute(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName())
-                .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-//                TODO abort if entity is unmanaged
-                .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), entity)),
-                true)
-            .description("waiting on "+entity+" "+sensor.getName()+" "+condition+
-                (timeout!=null ? ", timeout "+timeout : "")).build();
+     * returning false if it times out or becomes unmanaged */
+    public static <T> Task<Boolean> testingAttributeEventually(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReady(entity, sensor)
+            .readiness(condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutReturn(false)
+            .onUnmanagedReturn(false)
+            .build();
     }
 
-    /** as {@link #awaitingAttribute(Entity, AttributeSensor, Predicate, Duration)} for multiple entities */
-    public static <T> TaskAdaptable<Boolean> awaitingAttribute(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName())
-                .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-//                TODO abort if entity is unmanaged
-                .until(Functionals.callable(Functions.forPredicate(
-                    CollectionFunctionals.all(EntityPredicates.attributeSatisfies(sensor, condition))), entities)),
-                true)
-            .description("waiting on "+Iterables.size(entities)+", "+sensor.getName()+" "+condition+
-                (timeout!=null ? ", timeout "+timeout : "")+
-                ": "+entities).build();
+    /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate,
+     * throwing if it times out or becomes unmanaged */
+    public static <T> Task<Boolean> requiringAttributeEventually(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReady(entity, sensor)
+            .readiness(condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutThrow()
+            .onUnmanagedThrow()
+            .build();
+    }
+
+    /** as {@link #testingAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */
+    public static <T> Task<Boolean> testingAttributeEventually(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutReturn(false)
+            .onUnmanagedReturn(false)
+            .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true)))
+            .build();
     }
+    
+    /** as {@link #requiringAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */
+    public static <T> Task<Boolean> requiringAttributeEventually(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutThrow()
+            .onUnmanagedThrow()
+            .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true)))
+            .build();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
index de45514..51af110 100644
--- a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
+++ b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
@@ -19,18 +19,18 @@
 package brooklyn.event.basic;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import groovy.lang.Closure;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -41,6 +41,7 @@ import brooklyn.config.ConfigKey;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.basic.EntityLocal;
 import brooklyn.entity.basic.Lifecycle;
@@ -53,9 +54,15 @@ import brooklyn.management.Task;
 import brooklyn.management.TaskAdaptable;
 import brooklyn.management.TaskFactory;
 import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.collections.MutableList;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.exceptions.CompoundRuntimeException;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.NotManagedException;
+import brooklyn.util.exceptions.TimeoutException;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.guava.Maybe;
 import brooklyn.util.task.BasicExecutionContext;
 import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.DeferredSupplier;
@@ -63,6 +70,9 @@ import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.ParallelTask;
 import brooklyn.util.task.TaskInternal;
 import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
@@ -105,7 +115,7 @@ public class DependentConfiguration {
         return attributeWhenReady(source, sensor, readyPredicate);
     }
     
-    /** returns a {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value;
+    /** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value;
      * particular useful in Entity configuration where config will block until Tasks have a value
      */
     public static <T> Task<T> attributeWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) {
@@ -141,11 +151,19 @@ public class DependentConfiguration {
         return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
     }
     
+    @SuppressWarnings("unchecked")
     public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Function<? super T,V> postProcess) {
-        Builder<T, T> builder = builder().attributeWhenReady(source, sensor);
+        Builder<T,T> builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor);
+        // messy generics here to support null postProcess; would be nice to disallow that here
+        Builder<T,V> builder;
+        if (postProcess != null) {
+            builder = builder1.postProcess(postProcess);
+        } else {
+            builder = (Builder<T,V>)builder1;
+        }
         if (ready != null) builder.readiness(ready);
-        if (postProcess != null) builder.postProcess(postProcess);
-        return ((Builder)builder).build();
+        
+        return builder.build();
     }
 
     public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) {
@@ -158,49 +176,83 @@ public class DependentConfiguration {
     }
     
     // TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp)
-    // and TODO would be nice to have it stop when source is unmanaged (with ability to define post-processing)
-    // probably using the builder for both of these...
+    
     public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
+        return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call();
+    }
+    
+    protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> {
+
+        /* This is a change since before Oct 2014. Previously it would continue to poll,
+         * (maybe finding a different error) if the target entity becomes unmanaged. 
+         * Now it actively checks unmanaged by default, and still throws although it might 
+         * now find a different problem. */
+        private final static boolean DEFAULT_IGNORE_UNMANAGED = false;
         
-        T value = source.getAttribute(sensor);
-        final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
-
-        // return immediately if either the ready predicate or the abort conditions hold
-        if (ready==null) ready = GroovyJavaMethods.truthPredicate();
-        if (ready.apply(value)) return value;
-        for (AttributeAndSensorCondition abortCondition : abortConditions) {
-            Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
-            if (abortCondition.predicate.apply(abortValue)) {
-                abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
-            }
+        final Entity source;
+        final AttributeSensor<T> sensor;
+        final Predicate<? super T> ready;
+        final List<AttributeAndSensorCondition<?>> abortSensorConditions;
+        final String blockingDetails;
+        final Function<? super T,? extends V> postProcess;
+        final Duration timeout;
+        final Maybe<V> onTimeout;
+        final boolean ignoreUnmanaged;
+        final Maybe<V> onUnmanaged;
+        // TODO onError Continue / Throw / Return(V)
+        
+        protected WaitInTaskForAttributeReady(Builder<T, V> builder) {
+            this.source = builder.source;
+            this.sensor = builder.sensor;
+            this.ready = builder.readiness;
+            this.abortSensorConditions = builder.abortSensorConditions;
+            this.blockingDetails = builder.blockingDetails;
+            this.postProcess = builder.postProcess;
+            this.timeout = builder.timeout;
+            this.onTimeout = builder.onTimeout;
+            this.ignoreUnmanaged = builder.ignoreUnmanaged;
+            this.onUnmanaged = builder.onUnmanaged;
         }
-        if (abortionExceptions.size() > 0) {
-            throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
+        
+        private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready,
+                List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
+            this.source = source;
+            this.sensor = sensor;
+            this.ready = ready;
+            this.abortSensorConditions = abortConditions;
+            this.blockingDetails = blockingDetails;
+            
+            this.timeout = Duration.PRACTICALLY_FOREVER;
+            this.onTimeout = Maybe.absent();
+            this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
+            this.onUnmanaged = Maybe.absent();
+            this.postProcess = null;
         }
 
-        TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
-        if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
-        Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
-        if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
-                current+" has no entity tag ("+current.getStatusDetail(false)+")");
-        final AtomicReference<T> data = new AtomicReference<T>();
-        final Semaphore semaphore = new Semaphore(0); // could use Exchanger
-        SubscriptionHandle subscription = null;
-        List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
-        try {
-            subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener<T>() {
-                @Override public void onEvent(SensorEvent<T> event) {
-                    data.set(event.getValue());
-                    semaphore.release();
-                }});
-            for (final AttributeAndSensorCondition abortCondition : abortConditions) {
-                abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
-                    @Override public void onEvent(SensorEvent<Object> event) {
-                        if (abortCondition.predicate.apply(event.getValue())) {
-                            abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
-                            semaphore.release();
-                        }
-                    }}));
+        @SuppressWarnings("unchecked")
+        protected V postProcess(T value) {
+            if (this.postProcess!=null) return postProcess.apply(value);
+            // if no post-processing assume the types are correct
+            return (V) value;
+        }
+        
+        protected boolean ready(T value) {
+            if (ready!=null) return ready.apply(value);
+            return GroovyJavaMethods.truth(value);
+        }
+        
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public V call() {
+            T value = source.getAttribute(sensor);
+
+            // return immediately if either the ready predicate or the abort conditions hold
+            if (ready(value)) return postProcess(value);
+
+            final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
+            long start = System.currentTimeMillis();
+            
+            for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
                 Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
                 if (abortCondition.predicate.apply(abortValue)) {
                     abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
@@ -209,31 +261,101 @@ public class DependentConfiguration {
             if (abortionExceptions.size() > 0) {
                 throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
             }
+
+            TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
+            if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
+            Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
+            if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
+                current+" has no entity tag ("+current.getStatusDetail(false)+")");
+            
+            final LinkedList<T> publishedValues = new LinkedList<T>();
+            final Semaphore semaphore = new Semaphore(0); // could use Exchanger
+            SubscriptionHandle subscription = null;
+            List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
             
-            value = source.getAttribute(sensor);
-            while (!ready.apply(value)) {
-                String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
-                try {
-                    semaphore.acquire();
-                } finally {
-                    current.setBlockingDetails(prevBlockingDetails);
+            try {
+                subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener<T>() {
+                    @Override public void onEvent(SensorEvent<T> event) {
+                        synchronized (publishedValues) { publishedValues.add(event.getValue()); }
+                        semaphore.release();
+                    }});
+                for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
+                    abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
+                        @Override public void onEvent(SensorEvent<Object> event) {
+                            if (abortCondition.predicate.apply(event.getValue())) {
+                                abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
+                                semaphore.release();
+                            }
+                        }}));
+                    Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
+                    if (abortCondition.predicate.apply(abortValue)) {
+                        abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
+                    }
                 }
-                
                 if (abortionExceptions.size() > 0) {
                     throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
                 }
-                value = data.get();
-            }
-            if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
-            return value;
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
-        } finally {
-            if (subscription != null) {
-                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription);
-            }
-            for (SubscriptionHandle handle : abortSubscriptions) {
-                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle);
+
+                CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
+                Duration maxPeriod = Duration.millis(200);
+                Duration nextPeriod = Duration.millis(10);
+                while (true) {
+                    // check the source on initial run (could be done outside the loop) 
+                    // and also (optionally) on each iteration in case it is more recent 
+                    value = source.getAttribute(sensor);
+                    if (ready(value)) break;
+
+                    if (timer!=null) {
+                        if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
+                            nextPeriod = timer.getDurationRemaining();
+                        }
+                        if (timer.isExpired()) {
+                            if (onTimeout.isPresent()) return onTimeout.get();
+                            throw new TimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
+                        }
+                    }
+
+                    String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
+                    try {
+                        if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
+                            // immediately release so we are available for the next check
+                            semaphore.release();
+                            // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
+                            semaphore.drainPermits();
+                        }
+                    } finally {
+                        current.setBlockingDetails(prevBlockingDetails);
+                    }
+
+                    // check any subscribed values which have come in first
+                    while (!publishedValues.isEmpty()) {
+                        synchronized (publishedValues) { value = publishedValues.pop(); }
+                        if (ready(value)) break;
+                    }
+
+                    // if unmanaged then ignore the other abort conditions
+                    if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
+                        if (onTimeout.isPresent()) return onTimeout.get();
+                        throw new NotManagedException(entity);                        
+                    }
+                    
+                    if (abortionExceptions.size() > 0) {
+                        throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
+                    }
+
+                    nextPeriod = nextPeriod.times(2).maximum(maxPeriod);
+                }
+                if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
+                return postProcess(value);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                if (subscription != null) {
+                    ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription);
+                }
+                for (SubscriptionHandle handle : abortSubscriptions) {
+                    ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle);
+                }
             }
         }
     }
@@ -268,6 +390,7 @@ public class DependentConfiguration {
     }
     
     /** @see #transform(Task, Function) */
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) {
         return new BasicTask<T>(flags, new Callable<T>() {
             public T call() throws Exception {
@@ -286,19 +409,23 @@ public class DependentConfiguration {
     }
 
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Closure transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks);
     }
 
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, Closure transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks);
     }
     
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(flags, transformer, Arrays.asList(tasks));
     }
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, Collection<? extends TaskAdaptable<U>> tasks) {
         if (tasks.size()==1) {
             return transform(flags, Iterables.getOnlyElement(tasks), new Function<U,T>() {
@@ -411,18 +538,26 @@ public class DependentConfiguration {
     public static class ProtoBuilder {
         /**
          * Will wait for the attribute on the given entity.
-         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. 
+         * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. 
          */
         public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) {
-            return new Builder<T2,T2>().attributeWhenReady(source, sensor);
+            return new Builder<T2,T2>(source, sensor).abortIfOnFire();
         }
 
-        /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, 
+        /**
+         * Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}.
+         */
+        public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) {
+            return new Builder<T2,T2>(source, sensor);
+        }
+
+        /** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity, 
          * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ 
         @Beta
         public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) {
             return attributeWhenReadyFromMultiple(sources, sensor, GroovyJavaMethods.truthPredicate());
         }
+        /** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */
         @Beta
         public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
             return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness);
@@ -432,24 +567,33 @@ public class DependentConfiguration {
     /**
      * Builder for producing variants of attributeWhenReady.
      */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Beta
     public static class Builder<T,V> {
         protected Entity source;
         protected AttributeSensor<T> sensor;
         protected Predicate<? super T> readiness;
         protected Function<? super T, ? extends V> postProcess;
-        protected List<AttributeAndSensorCondition<?>> abortConditions = Lists.newArrayList();
+        protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList();
         protected String blockingDetails;
+        protected Duration timeout;
+        protected Maybe<V> onTimeout;
+        protected  boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED;
+        protected Maybe<V> onUnmanaged;
+
+        protected Builder(Entity source, AttributeSensor<T> sensor) {
+            this.source = source;
+            this.sensor = sensor;
+        }
         
         /**
          * Will wait for the attribute on the given entity.
-         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. 
+         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort.
+         * @deprecated since 0.7.0 use {@link DependentConfiguration#builder()} then {@link ProtoBuilder#attributeWhenReady(Entity, AttributeSensor)} then {@link #abortIfOnFire()} 
          */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) {
             this.source = checkNotNull(source, "source");
             this.sensor = (AttributeSensor) checkNotNull(sensor, "sensor");
-            abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
+            abortIfOnFire();
             return (Builder<T2, T2>) this;
         }
         public Builder<T,V> readiness(Closure<Boolean> val) {
@@ -460,10 +604,12 @@ public class DependentConfiguration {
             this.readiness = checkNotNull(val, "ready");
             return this;
         }
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <V2> Builder<T,V2> postProcess(Closure<V2> val) {
             this.postProcess = (Function) GroovyJavaMethods.<T,V2>functionFromClosure(checkNotNull(val, "postProcess"));
             return (Builder<T,V2>) this;
         }
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <V2> Builder<T,V2> postProcess(final Function<? super T, V2>  val) {
             this.postProcess = (Function) checkNotNull(val, "postProcess");
             return (Builder<T,V2>) this;
@@ -472,28 +618,71 @@ public class DependentConfiguration {
             return abortIf(source, sensor, GroovyJavaMethods.truthPredicate());
         }
         public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
-            abortConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
+            abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
+            return this;
+        }
+        public Builder<T,V> abortIfOnFire() {
+            abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
             return this;
         }
         public Builder<T,V> blockingDetails(String val) {
             blockingDetails = val;
             return this;
         }
+        /** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */
+        public Builder<T,V> timeout(Duration val) {
+            timeout = val;
+            return this;
+        }
+        public Builder<T,V> onTimeoutReturn(V val) {
+            onTimeout = Maybe.of(val);
+            return this;
+        }
+        public Builder<T,V> onTimeoutThrow() {
+            onTimeout = Maybe.<V>absent();
+            return this;
+        }
+        public Builder<T,V> onUnmanagedReturn(V val) {
+            onUnmanaged = Maybe.of(val);
+            return this;
+        }
+        public Builder<T,V> onUnmanagedThrow() {
+            onUnmanaged = Maybe.<V>absent();
+            return this;
+        }
+        /** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required
+         * (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed,
+         * and before management the current code will continue, so long as there are no other errors) */ @Deprecated
+        public Builder<T,V> onUnmanagedContinue() {
+            ignoreUnmanaged = true;
+            return this;
+        }
+        /** take advantage of the fact that this builder can build multiple times, allowing subclasses 
+         * to change the source along the way */
+        protected Builder<T,V> source(Entity source) {
+            this.source = source;
+            return this;
+        }
+        /** as {@link #source(Entity)} */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        protected Builder<T,V> sensor(AttributeSensor<? extends T> sensor) {
+            this.sensor = (AttributeSensor) sensor;
+            return this;
+        }
         public Task<V> build() {
             validate();
-            return new BasicTask<V>(
-                    MutableMap.of("tag", "attributeWhenReady", "displayName", "retrieving sensor "+sensor.getName()+" from "+source.getDisplayName()), 
-                    new Callable<V>() {
-                        @Override public V call() {
-                            T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails);
-                            return postProcess.apply(result);
-                        }
-                    });
+            
+            return Tasks.<V>builder().dynamic(false)
+                .name("waiting on "+sensor.getName())
+                .description("Waiting on sensor "+sensor.getName()+" from "+source)
+                .tag("attributeWhenReady")
+                .body(new WaitInTaskForAttributeReady<T,V>(this))
+                .build();
         }
+        
         public V runNow() {
             validate();
-            T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails);
-            return postProcess.apply(result);
+            return new WaitInTaskForAttributeReady<T,V>(this).call();
         }
         private void validate() {
             checkNotNull(source, "Entity source");
@@ -509,7 +698,12 @@ public class DependentConfiguration {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Beta
     public static class MultiBuilder<T, V, V2> {
-        protected List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList();
+        protected final String name;
+        protected final String descriptionBase;
+        protected final Builder<T,V> builder;
+        // if desired, the use of this multiSource could allow different conditions; 
+        // but probably an easier API just for the caller to build the parallel task  
+        protected final List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList();
         protected Function<? super List<V>, ? extends V2> postProcessFromMultiple;
         
         /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, 
@@ -520,34 +714,106 @@ public class DependentConfiguration {
         }
         @Beta
         protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
+            builder = new Builder<T,V>(null, sensor);
+            builder.readiness(readiness);
+            
             for (Entity s : checkNotNull(sources, "sources")) {
-                AttributeAndSensorCondition<T> condition = new AttributeAndSensorCondition<T>(s, sensor, readiness);
-                multiSource.add(condition);
+                multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness));
             }
+            this.name = "waiting on "+sensor.getName();
+            this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness
+                +" from "+Iterables.size(sources)+" entit"+Strings.ies(sources);
         }
+        
+        /** Apply post-processing to the entire list of results */
         public <V2b> MultiBuilder<T, V, V2b> postProcessFromMultiple(final Function<? super List<V>, V2b> val) {
-            this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcess");
+            this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple");
             return (MultiBuilder<T,V, V2b>) this;
         }
+        /** Apply post-processing to the entire list of results 
+         * See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(brooklyn.util.collections.QuorumCheck, Predicate)
+         * which allow useful arguments. */
+        public MultiBuilder<T, V, Boolean> postProcessFromMultiple(final Predicate<? super List<V>> val) {
+            return postProcessFromMultiple(Functions.forPredicate(val));
+        }
+        
+        public <V1> MultiBuilder<T, V1, V2> postProcess(Closure<V1> val) {
+            builder.postProcess(val);
+            return (MultiBuilder<T, V1, V2>) this;
+        }
+        public <V1> MultiBuilder<T, V1, V2> postProcess(final Function<? super T, V1>  val) {
+            builder.postProcess(val);
+            return (MultiBuilder<T, V1, V2>) this;
+        }
+        public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor) {
+            builder.abortIf(source, sensor);
+            return this;
+        }
+        public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
+            builder.abortIf(source, sensor, predicate);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> abortIfOnFire() {
+            builder.abortIfOnFire();
+            return this;
+        }
+        public MultiBuilder<T, V, V2> blockingDetails(String val) {
+            builder.blockingDetails(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> timeout(Duration val) {
+            builder.timeout(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onTimeoutReturn(V val) {
+            builder.onTimeoutReturn(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onTimeoutThrow() {
+            builder.onTimeoutThrow();
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onUnmanagedReturn(V val) {
+            builder.onUnmanagedReturn(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onUnmanagedThrow() {
+            builder.onUnmanagedThrow();
+            return this;
+        }
+        
         public Task<V2> build() {
-            checkState(multiSource.size() > 0, "Entity sources must be set: multiSource=%s", multiSource);
+            List<Task<V>> tasks = MutableList.of();
+            for (AttributeAndSensorCondition<?> source: multiSource) {
+                builder.source(source.source);
+                builder.sensor((AttributeSensor)source.sensor);
+                builder.readiness((Predicate)source.predicate);
+                tasks.add(builder.build());
+            }
+            final Task<List<V>> parallelTask = Tasks.<List<V>>builder().parallel(true).addAll(tasks)
+                .name(name)
+                .description(descriptionBase+
+                    (builder.timeout!=null ? ", timeout "+builder.timeout : ""))
+                .build();
             
-            // TODO Do we really want to try to support the list-of-entities?
-            final Task<List<V>> task = (Task<List<V>>) new ParallelTask<V>(Iterables.transform(multiSource, new Function<AttributeAndSensorCondition<?>, Task<T>>() {
-                @Override public Task<T> apply(AttributeAndSensorCondition<?> it) {
-                    return (Task) builder().attributeWhenReady(it.source, it.sensor).readiness((Predicate)it.predicate).build();
-                }
-            }));
             if (postProcessFromMultiple == null) {
-                return (Task<V2>) task;
+                // V2 should be the right type in normal operations
+                return (Task<V2>) parallelTask;
             } else {
-                return new BasicTask(new Callable<V2>() {
-                    @Override public V2 call() throws Exception {
-                        List<V> prePostProgress = DynamicTasks.queueIfPossible(task).orSubmitAndBlock().getTask().get();
-                        return postProcessFromMultiple.apply(prePostProgress);
-                    }
-                });
+                return Tasks.<V2>builder().name(name).description(descriptionBase)
+                    .tag("attributeWhenReady")
+                    .body(new Callable<V2>() {
+                        @Override public V2 call() throws Exception {
+                            List<V> prePostProgress = DynamicTasks.queue(parallelTask).get();
+                            return DynamicTasks.queue(
+                                Tasks.<V2>builder().name("post-processing").description("Applying "+postProcessFromMultiple)
+                                    .body(Functionals.<List<V>,V2>callable((Function)postProcessFromMultiple, prePostProgress))
+                                    .build()).get();
+                        }
+                    })
+                    .build();
             }
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java
index 0edd07b..fd33985 100644
--- a/core/src/main/java/brooklyn/util/task/Tasks.java
+++ b/core/src/main/java/brooklyn/util/task/Tasks.java
@@ -445,16 +445,28 @@ public class Tasks {
             return false;
         }
     }
-    
-    /** creates an (unsubmitted) task which waits for the given repeater, optionally failing if it does not complete with success */
-    public static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
-        return awaitingBuilder(repeater, requireTrue).build();
+
+    /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
+     * returning true or false depending on whether repeater succeed */
+    public static TaskBuilder<Boolean> testing(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
+            .name("waiting for condition")
+            .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
     }
 
-    /** creates a partially instantiated builder which waits for the given repeater, optionally failing if it does not complete with success,
-     * for further task customization and then {@link TaskBuilder#build()} */
-    public static TaskBuilder<Boolean> awaitingBuilder(Repeater repeater, boolean requireTrue) {
-        return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue));
+    /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
+     * throwing if it does not */
+    public static TaskBuilder<?> requiring(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
+            .name("waiting for condition")
+            .description("Requiring " + getTimeoutString(repeater) + ": "+repeater);
+    }
+    
+    private static String getTimeoutString(Repeater repeater) {
+        Duration timeout = repeater.getTimeLimit();
+        if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
+            return "eventually";
+        return "in "+timeout;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
index b4c144f..9394b85 100644
--- a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
+++ b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
@@ -110,7 +110,7 @@ public class EntityPredicatesTest extends BrooklynAppUnitTestSupport {
     @Test
     public void testWithLocation() throws Exception {
         entity.addLocations(ImmutableList.of(loc));
-        assertTrue(EntityPredicates.locationsInclude(loc).apply(entity));
-        assertFalse(EntityPredicates.locationsInclude(loc).apply(app));
+        assertTrue(EntityPredicates.locationsIncludes(loc).apply(entity));
+        assertFalse(EntityPredicates.locationsIncludes(loc).apply(app));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/util/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java
index 9ee9970..68aa2ea 100644
--- a/core/src/test/java/brooklyn/util/task/TasksTest.java
+++ b/core/src/test/java/brooklyn/util/task/TasksTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals;
 import java.util.Map;
 import java.util.Set;
 
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -34,12 +35,14 @@ import brooklyn.management.Task;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.test.entity.TestEntity;
 import brooklyn.util.guava.Functionals;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.time.Duration;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
 
 
 public class TasksTest extends BrooklynAppUnitTestSupport {
@@ -127,4 +130,30 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
         assertResolvesValue(v, Object.class, "foo");
     }
 
+    @Test
+    public void testRepeater() throws Exception {
+        Task<?> t;
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        t.get(Duration.ONE_SECOND);
+        
+        t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.ONE_SECOND), true);
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        try {
+            t.get(Duration.ONE_SECOND);
+            Assert.fail("Should have failed");
+        } catch (Exception e) {
+            // expected
+        }
+
+        t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.ONE_SECOND), false);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
index b815ee4..48553b3 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
@@ -22,14 +22,12 @@ import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.Group;
@@ -91,9 +89,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
             
             specCfg.putAll(ConfigBag.newInstance(parameters.get(EXTRA_CONFIG)).getAllConfigAsConfigKeyMap());
             
-            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
-            cfgLive.clear();
-            cfgLive.putAll(specCfg.getAllConfigAsConfigKeyMap());
+            memberSpec.clearConfig();
+            memberSpec.configure(specCfg.getAllConfigAsConfigKeyMap());
             // not necessary, but good practice
             entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
             log.debug("Upgrading "+entity()+", new "+BrooklynCluster.MEMBER_SPEC+": "+memberSpec+" / "+specCfg);
@@ -101,9 +98,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
             upgrade(parameters);
         } catch (Exception e) {
             log.debug("Upgrading "+entity()+" failed, will rethrow after restoring "+BrooklynCluster.MEMBER_SPEC+" to: "+origSpecCfg);
-            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
-            cfgLive.clear();
-            cfgLive.putAll(origSpecCfg.getAllConfigAsConfigKeyMap());
+            memberSpec.clearConfig();
+            memberSpec.configure(origSpecCfg.getAllConfigAsConfigKeyMap());
             // not necessary, but good practice
             entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
             
@@ -187,7 +183,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
 
         //2. Wait for them to be RUNNING (or at least STARTING to have completed)
         // (should already be the case, because above is synchronous and, we think, it will fail if start does not succeed)
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, Attributes.SERVICE_STATE_ACTUAL, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, Attributes.SERVICE_STATE_ACTUAL, 
                 Predicates.not(Predicates.equalTo(Lifecycle.STARTING)), Duration.minutes(30)));
 
         //3. Set HOT_STANDBY in case it is not enabled on the command line ...
@@ -197,7 +193,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
                 newNodes)).asTask().getUnchecked();
         //... and wait until all of the nodes change state
         // TODO fail quicker if state changes to FAILED
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, 
                 Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
index 299bd5b..00ab7bc 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -128,7 +128,7 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void> {
         DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
 
         // 2 confirm hot standby status
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
             Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         // 3 stop new version

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
index a42091e..9ac7202 100644
--- a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
+++ b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
@@ -181,7 +181,7 @@ public class CollectionFunctionals {
     // ---------
     
     public static <T,TT extends Iterable<T>> Predicate<TT> all(Predicate<T> attributeSatisfies) {
-        return new QuorumSatisfies<T, TT>(QuorumChecks.all(), attributeSatisfies);
+        return quorum(QuorumChecks.all(), attributeSatisfies);
     }
 
     public static <T,TT extends Iterable<T>> Predicate<TT> quorum(QuorumCheck quorumCheck, Predicate<T> attributeSatisfies) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
new file mode 100644
index 0000000..9550288
--- /dev/null
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.exceptions;
+
+public class NotManagedException extends IllegalStateException {
+
+    private static final long serialVersionUID = -3359163414517503809L;
+
+    public NotManagedException(Object object) {
+        super(object+" is not managed");
+    }
+    
+    public NotManagedException(String message) {
+        super(message);
+    }
+    
+    public NotManagedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
new file mode 100644
index 0000000..c31512f
--- /dev/null
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.exceptions;
+
+public class TimeoutException extends IllegalStateException {
+
+    private static final long serialVersionUID = -3359163414517503809L;
+
+    public TimeoutException() {
+        super("timeout");
+    }
+    
+    public TimeoutException(String message) {
+        super(message);
+    }
+    
+    public TimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
index 1e3e188..bd76ea8 100644
--- a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
+++ b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
@@ -386,5 +386,9 @@ public class Repeater {
     public String getDescription() {
         return description;
     }
+
+    public Duration getTimeLimit() {
+        return timeLimit;
+    }
     
 }


Mime
View raw message