Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1FB3010031 for ; Mon, 3 Nov 2014 15:52:22 +0000 (UTC) Received: (qmail 59933 invoked by uid 500); 3 Nov 2014 15:52:22 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 59910 invoked by uid 500); 3 Nov 2014 15:52:22 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 59901 invoked by uid 99); 3 Nov 2014 15:52:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 15:52:21 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 03 Nov 2014 15:51:49 +0000 Received: (qmail 57106 invoked by uid 99); 3 Nov 2014 15:51:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 15:51:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1A37099BA65; Mon, 3 Nov 2014 15:51:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aledsage@apache.org To: commits@brooklyn.incubator.apache.org Date: Mon, 03 Nov 2014 15:52:04 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/29] git commit: refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration, with DependentConfiguration now relying on an internal class and: * checking for unmanaged (and now this is the defau X-Virus-Checked: Checked by ClamAV on apache.org refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration, with DependentConfiguration now relying on an internal class and: * checking for unmanaged (and now this is the default) * supporting a timeout * ensuring that all values put into the subscription queue are read (previously it could miss a value if subscriptions were updated twice in succession) Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/14f17bc9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/14f17bc9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/14f17bc9 Branch: refs/heads/master Commit: 14f17bc94d76693eadafe7d33a34e4c5fad06ff9 Parents: 7f09a80 Author: Alex Heneveld Authored: Tue Oct 28 00:42:36 2014 -0700 Committer: Alex Heneveld Committed: Fri Oct 31 09:39:50 2014 -0500 ---------------------------------------------------------------------- .../java/brooklyn/entity/basic/EntityTasks.java | 72 +-- .../event/basic/DependentConfiguration.java | 468 +++++++++++++++---- .../src/main/java/brooklyn/util/task/Tasks.java | 28 +- .../entity/basic/EntityPredicatesTest.java | 4 +- .../test/java/brooklyn/util/task/TasksTest.java | 29 ++ .../BrooklynClusterUpgradeEffectorBody.java | 16 +- .../BrooklynNodeUpgradeEffectorBody.java | 2 +- .../util/collections/CollectionFunctionals.java | 2 +- .../util/exceptions/NotManagedException.java | 36 ++ .../util/exceptions/TimeoutException.java | 36 ++ .../java/brooklyn/util/repeat/Repeater.java | 4 + 11 files changed, 547 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/entity/basic/EntityTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java index 37114c5..99c5ca3 100644 --- a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java +++ b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java @@ -20,44 +20,62 @@ package brooklyn.entity.basic; import brooklyn.entity.Entity; import brooklyn.event.AttributeSensor; -import brooklyn.management.TaskAdaptable; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.management.Task; import brooklyn.util.collections.CollectionFunctionals; -import brooklyn.util.guava.Functionals; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; import com.google.common.base.Functions; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; +import com.google.common.base.Predicates; /** Generally useful tasks related to entities */ public class EntityTasks { /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate, - * with an optional timeout */ - public static TaskAdaptable awaitingAttribute(Entity entity, AttributeSensor sensor, Predicate condition, Duration timeout) { - return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName()) - .backoff(Duration.millis(10), 1.5, Duration.millis(200)) - .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout) -// TODO abort if entity is unmanaged - .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), entity)), - true) - .description("waiting on "+entity+" "+sensor.getName()+" "+condition+ - (timeout!=null ? ", timeout "+timeout : "")).build(); + * returning false if it times out or becomes unmanaged */ + public static Task testingAttributeEventually(Entity entity, AttributeSensor sensor, Predicate condition, Duration timeout) { + return DependentConfiguration.builder().attributeWhenReady(entity, sensor) + .readiness(condition) + .postProcess(Functions.constant(true)) + .timeout(timeout) + .onTimeoutReturn(false) + .onUnmanagedReturn(false) + .build(); } - /** as {@link #awaitingAttribute(Entity, AttributeSensor, Predicate, Duration)} for multiple entities */ - public static TaskAdaptable awaitingAttribute(Iterable entities, AttributeSensor sensor, Predicate condition, Duration timeout) { - return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName()) - .backoff(Duration.millis(10), 1.5, Duration.millis(200)) - .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout) -// TODO abort if entity is unmanaged - .until(Functionals.callable(Functions.forPredicate( - CollectionFunctionals.all(EntityPredicates.attributeSatisfies(sensor, condition))), entities)), - true) - .description("waiting on "+Iterables.size(entities)+", "+sensor.getName()+" "+condition+ - (timeout!=null ? ", timeout "+timeout : "")+ - ": "+entities).build(); + /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate, + * throwing if it times out or becomes unmanaged */ + public static Task requiringAttributeEventually(Entity entity, AttributeSensor sensor, Predicate condition, Duration timeout) { + return DependentConfiguration.builder().attributeWhenReady(entity, sensor) + .readiness(condition) + .postProcess(Functions.constant(true)) + .timeout(timeout) + .onTimeoutThrow() + .onUnmanagedThrow() + .build(); + } + + /** as {@link #testingAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */ + public static Task testingAttributeEventually(Iterable entities, AttributeSensor sensor, Predicate condition, Duration timeout) { + return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition) + .postProcess(Functions.constant(true)) + .timeout(timeout) + .onTimeoutReturn(false) + .onUnmanagedReturn(false) + .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true))) + .build(); } + + /** as {@link #requiringAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */ + public static Task requiringAttributeEventually(Iterable entities, AttributeSensor sensor, Predicate condition, Duration timeout) { + return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition) + .postProcess(Functions.constant(true)) + .timeout(timeout) + .onTimeoutThrow() + .onUnmanagedThrow() + .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true))) + .build(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java index de45514..51af110 100644 --- a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java +++ b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java @@ -19,18 +19,18 @@ package brooklyn.event.basic; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import groovy.lang.Closure; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -41,6 +41,7 @@ import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; import brooklyn.entity.basic.EntityLocal; import brooklyn.entity.basic.Lifecycle; @@ -53,9 +54,15 @@ import brooklyn.management.Task; import brooklyn.management.TaskAdaptable; import brooklyn.management.TaskFactory; import brooklyn.util.GroovyJavaMethods; +import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.collections.MutableList; import brooklyn.util.collections.MutableMap; import brooklyn.util.exceptions.CompoundRuntimeException; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.NotManagedException; +import brooklyn.util.exceptions.TimeoutException; +import brooklyn.util.guava.Functionals; +import brooklyn.util.guava.Maybe; import brooklyn.util.task.BasicExecutionContext; import brooklyn.util.task.BasicTask; import brooklyn.util.task.DeferredSupplier; @@ -63,6 +70,9 @@ import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.ParallelTask; import brooklyn.util.task.TaskInternal; import brooklyn.util.task.Tasks; +import brooklyn.util.text.Strings; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; import com.google.common.annotations.Beta; import com.google.common.base.Function; @@ -105,7 +115,7 @@ public class DependentConfiguration { return attributeWhenReady(source, sensor, readyPredicate); } - /** returns a {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value; + /** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value; * particular useful in Entity configuration where config will block until Tasks have a value */ public static Task attributeWhenReady(final Entity source, final AttributeSensor sensor, final Predicate ready) { @@ -141,11 +151,19 @@ public class DependentConfiguration { return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.functionFromClosure(postProcess)); } + @SuppressWarnings("unchecked") public static Task attributePostProcessedWhenReady(final Entity source, final AttributeSensor sensor, final Predicate ready, final Function postProcess) { - Builder builder = builder().attributeWhenReady(source, sensor); + Builder builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor); + // messy generics here to support null postProcess; would be nice to disallow that here + Builder builder; + if (postProcess != null) { + builder = builder1.postProcess(postProcess); + } else { + builder = (Builder)builder1; + } if (ready != null) builder.readiness(ready); - if (postProcess != null) builder.postProcess(postProcess); - return ((Builder)builder).build(); + + return builder.build(); } public static T waitInTaskForAttributeReady(Entity source, AttributeSensor sensor, Predicate ready) { @@ -158,49 +176,83 @@ public class DependentConfiguration { } // TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp) - // and TODO would be nice to have it stop when source is unmanaged (with ability to define post-processing) - // probably using the builder for both of these... + public static T waitInTaskForAttributeReady(final Entity source, final AttributeSensor sensor, Predicate ready, List> abortConditions, String blockingDetails) { + return new WaitInTaskForAttributeReady(source, sensor, ready, abortConditions, blockingDetails).call(); + } + + protected static class WaitInTaskForAttributeReady implements Callable { + + /* This is a change since before Oct 2014. Previously it would continue to poll, + * (maybe finding a different error) if the target entity becomes unmanaged. + * Now it actively checks unmanaged by default, and still throws although it might + * now find a different problem. */ + private final static boolean DEFAULT_IGNORE_UNMANAGED = false; - T value = source.getAttribute(sensor); - final List abortionExceptions = Lists.newCopyOnWriteArrayList(); - - // return immediately if either the ready predicate or the abort conditions hold - if (ready==null) ready = GroovyJavaMethods.truthPredicate(); - if (ready.apply(value)) return value; - for (AttributeAndSensorCondition abortCondition : abortConditions) { - Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); - if (abortCondition.predicate.apply(abortValue)) { - abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); - } + final Entity source; + final AttributeSensor sensor; + final Predicate ready; + final List> abortSensorConditions; + final String blockingDetails; + final Function postProcess; + final Duration timeout; + final Maybe onTimeout; + final boolean ignoreUnmanaged; + final Maybe onUnmanaged; + // TODO onError Continue / Throw / Return(V) + + protected WaitInTaskForAttributeReady(Builder builder) { + this.source = builder.source; + this.sensor = builder.sensor; + this.ready = builder.readiness; + this.abortSensorConditions = builder.abortSensorConditions; + this.blockingDetails = builder.blockingDetails; + this.postProcess = builder.postProcess; + this.timeout = builder.timeout; + this.onTimeout = builder.onTimeout; + this.ignoreUnmanaged = builder.ignoreUnmanaged; + this.onUnmanaged = builder.onUnmanaged; } - if (abortionExceptions.size() > 0) { - throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + + private WaitInTaskForAttributeReady(Entity source, AttributeSensor sensor, Predicate ready, + List> abortConditions, String blockingDetails) { + this.source = source; + this.sensor = sensor; + this.ready = ready; + this.abortSensorConditions = abortConditions; + this.blockingDetails = blockingDetails; + + this.timeout = Duration.PRACTICALLY_FOREVER; + this.onTimeout = Maybe.absent(); + this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED; + this.onUnmanaged = Maybe.absent(); + this.postProcess = null; } - TaskInternal current = (TaskInternal) Tasks.current(); - if (current == null) throw new IllegalStateException("Should only be invoked in a running task"); - Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current); - if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+ - current+" has no entity tag ("+current.getStatusDetail(false)+")"); - final AtomicReference data = new AtomicReference(); - final Semaphore semaphore = new Semaphore(0); // could use Exchanger - SubscriptionHandle subscription = null; - List abortSubscriptions = Lists.newArrayList(); - try { - subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - data.set(event.getValue()); - semaphore.release(); - }}); - for (final AttributeAndSensorCondition abortCondition : abortConditions) { - abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - if (abortCondition.predicate.apply(event.getValue())) { - abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); - semaphore.release(); - } - }})); + @SuppressWarnings("unchecked") + protected V postProcess(T value) { + if (this.postProcess!=null) return postProcess.apply(value); + // if no post-processing assume the types are correct + return (V) value; + } + + protected boolean ready(T value) { + if (ready!=null) return ready.apply(value); + return GroovyJavaMethods.truth(value); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public V call() { + T value = source.getAttribute(sensor); + + // return immediately if either the ready predicate or the abort conditions hold + if (ready(value)) return postProcess(value); + + final List abortionExceptions = Lists.newCopyOnWriteArrayList(); + long start = System.currentTimeMillis(); + + for (AttributeAndSensorCondition abortCondition : abortSensorConditions) { Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); if (abortCondition.predicate.apply(abortValue)) { abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); @@ -209,31 +261,101 @@ public class DependentConfiguration { if (abortionExceptions.size() > 0) { throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); } + + TaskInternal current = (TaskInternal) Tasks.current(); + if (current == null) throw new IllegalStateException("Should only be invoked in a running task"); + Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current); + if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+ + current+" has no entity tag ("+current.getStatusDetail(false)+")"); + + final LinkedList publishedValues = new LinkedList(); + final Semaphore semaphore = new Semaphore(0); // could use Exchanger + SubscriptionHandle subscription = null; + List abortSubscriptions = Lists.newArrayList(); - value = source.getAttribute(sensor); - while (!ready.apply(value)) { - String prevBlockingDetails = current.setBlockingDetails(blockingDetails); - try { - semaphore.acquire(); - } finally { - current.setBlockingDetails(prevBlockingDetails); + try { + subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener() { + @Override public void onEvent(SensorEvent event) { + synchronized (publishedValues) { publishedValues.add(event.getValue()); } + semaphore.release(); + }}); + for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) { + abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener() { + @Override public void onEvent(SensorEvent event) { + if (abortCondition.predicate.apply(event.getValue())) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + semaphore.release(); + } + }})); + Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); + if (abortCondition.predicate.apply(abortValue)) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + } } - if (abortionExceptions.size() > 0) { throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); } - value = data.get(); - } - if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source); - return value; - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - if (subscription != null) { - ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription); - } - for (SubscriptionHandle handle : abortSubscriptions) { - ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle); + + CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null; + Duration maxPeriod = Duration.millis(200); + Duration nextPeriod = Duration.millis(10); + while (true) { + // check the source on initial run (could be done outside the loop) + // and also (optionally) on each iteration in case it is more recent + value = source.getAttribute(sensor); + if (ready(value)) break; + + if (timer!=null) { + if (timer.getDurationRemaining().isShorterThan(nextPeriod)) { + nextPeriod = timer.getDurationRemaining(); + } + if (timer.isExpired()) { + if (onTimeout.isPresent()) return onTimeout.get(); + throw new TimeoutException("Unsatisfied after "+Duration.sinceUtc(start)); + } + } + + String prevBlockingDetails = current.setBlockingDetails(blockingDetails); + try { + if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) { + // immediately release so we are available for the next check + semaphore.release(); + // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times + semaphore.drainPermits(); + } + } finally { + current.setBlockingDetails(prevBlockingDetails); + } + + // check any subscribed values which have come in first + while (!publishedValues.isEmpty()) { + synchronized (publishedValues) { value = publishedValues.pop(); } + if (ready(value)) break; + } + + // if unmanaged then ignore the other abort conditions + if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) { + if (onTimeout.isPresent()) return onTimeout.get(); + throw new NotManagedException(entity); + } + + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + nextPeriod = nextPeriod.times(2).maximum(maxPeriod); + } + if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source); + return postProcess(value); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + if (subscription != null) { + ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription); + } + for (SubscriptionHandle handle : abortSubscriptions) { + ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle); + } } } } @@ -268,6 +390,7 @@ public class DependentConfiguration { } /** @see #transform(Task, Function) */ + @SuppressWarnings({ "rawtypes" }) public static Task transform(final Map flags, final TaskAdaptable task, final Function transformer) { return new BasicTask(flags, new Callable() { public T call() throws Exception { @@ -286,19 +409,23 @@ public class DependentConfiguration { } /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public static Task transformMultiple(Closure transformer, TaskAdaptable ...tasks) { return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks); } /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public static Task transformMultiple(Map flags, Closure transformer, TaskAdaptable ...tasks) { return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks); } /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "rawtypes" }) public static Task transformMultiple(Map flags, final Function,T> transformer, TaskAdaptable ...tasks) { return transformMultiple(flags, transformer, Arrays.asList(tasks)); } + @SuppressWarnings({ "rawtypes" }) public static Task transformMultiple(Map flags, final Function,T> transformer, Collection> tasks) { if (tasks.size()==1) { return transform(flags, Iterables.getOnlyElement(tasks), new Function() { @@ -411,18 +538,26 @@ public class DependentConfiguration { public static class ProtoBuilder { /** * Will wait for the attribute on the given entity. - * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. + * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. */ public Builder attributeWhenReady(Entity source, AttributeSensor sensor) { - return new Builder().attributeWhenReady(source, sensor); + return new Builder(source, sensor).abortIfOnFire(); } - /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, + /** + * Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}. + */ + public Builder attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor sensor) { + return new Builder(source, sensor); + } + + /** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity, * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ @Beta public MultiBuilder> attributeWhenReadyFromMultiple(Iterable sources, AttributeSensor sensor) { return attributeWhenReadyFromMultiple(sources, sensor, GroovyJavaMethods.truthPredicate()); } + /** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */ @Beta public MultiBuilder> attributeWhenReadyFromMultiple(Iterable sources, AttributeSensor sensor, Predicate readiness) { return new MultiBuilder>(sources, sensor, readiness); @@ -432,24 +567,33 @@ public class DependentConfiguration { /** * Builder for producing variants of attributeWhenReady. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Beta public static class Builder { protected Entity source; protected AttributeSensor sensor; protected Predicate readiness; protected Function postProcess; - protected List> abortConditions = Lists.newArrayList(); + protected List> abortSensorConditions = Lists.newArrayList(); protected String blockingDetails; + protected Duration timeout; + protected Maybe onTimeout; + protected boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED; + protected Maybe onUnmanaged; + + protected Builder(Entity source, AttributeSensor sensor) { + this.source = source; + this.sensor = sensor; + } /** * Will wait for the attribute on the given entity. - * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. + * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. + * @deprecated since 0.7.0 use {@link DependentConfiguration#builder()} then {@link ProtoBuilder#attributeWhenReady(Entity, AttributeSensor)} then {@link #abortIfOnFire()} */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public Builder attributeWhenReady(Entity source, AttributeSensor sensor) { this.source = checkNotNull(source, "source"); this.sensor = (AttributeSensor) checkNotNull(sensor, "sensor"); - abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE)); + abortIfOnFire(); return (Builder) this; } public Builder readiness(Closure val) { @@ -460,10 +604,12 @@ public class DependentConfiguration { this.readiness = checkNotNull(val, "ready"); return this; } + @SuppressWarnings({ "unchecked", "rawtypes" }) public Builder postProcess(Closure val) { this.postProcess = (Function) GroovyJavaMethods.functionFromClosure(checkNotNull(val, "postProcess")); return (Builder) this; } + @SuppressWarnings({ "unchecked", "rawtypes" }) public Builder postProcess(final Function val) { this.postProcess = (Function) checkNotNull(val, "postProcess"); return (Builder) this; @@ -472,28 +618,71 @@ public class DependentConfiguration { return abortIf(source, sensor, GroovyJavaMethods.truthPredicate()); } public Builder abortIf(Entity source, AttributeSensor sensor, Predicate predicate) { - abortConditions.add(new AttributeAndSensorCondition(source, sensor, predicate)); + abortSensorConditions.add(new AttributeAndSensorCondition(source, sensor, predicate)); + return this; + } + public Builder abortIfOnFire() { + abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE)); return this; } public Builder blockingDetails(String val) { blockingDetails = val; return this; } + /** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */ + public Builder timeout(Duration val) { + timeout = val; + return this; + } + public Builder onTimeoutReturn(V val) { + onTimeout = Maybe.of(val); + return this; + } + public Builder onTimeoutThrow() { + onTimeout = Maybe.absent(); + return this; + } + public Builder onUnmanagedReturn(V val) { + onUnmanaged = Maybe.of(val); + return this; + } + public Builder onUnmanagedThrow() { + onUnmanaged = Maybe.absent(); + return this; + } + /** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required + * (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed, + * and before management the current code will continue, so long as there are no other errors) */ @Deprecated + public Builder onUnmanagedContinue() { + ignoreUnmanaged = true; + return this; + } + /** take advantage of the fact that this builder can build multiple times, allowing subclasses + * to change the source along the way */ + protected Builder source(Entity source) { + this.source = source; + return this; + } + /** as {@link #source(Entity)} */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected Builder sensor(AttributeSensor sensor) { + this.sensor = (AttributeSensor) sensor; + return this; + } public Task build() { validate(); - return new BasicTask( - MutableMap.of("tag", "attributeWhenReady", "displayName", "retrieving sensor "+sensor.getName()+" from "+source.getDisplayName()), - new Callable() { - @Override public V call() { - T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails); - return postProcess.apply(result); - } - }); + + return Tasks.builder().dynamic(false) + .name("waiting on "+sensor.getName()) + .description("Waiting on sensor "+sensor.getName()+" from "+source) + .tag("attributeWhenReady") + .body(new WaitInTaskForAttributeReady(this)) + .build(); } + public V runNow() { validate(); - T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails); - return postProcess.apply(result); + return new WaitInTaskForAttributeReady(this).call(); } private void validate() { checkNotNull(source, "Entity source"); @@ -509,7 +698,12 @@ public class DependentConfiguration { @SuppressWarnings({ "unchecked", "rawtypes" }) @Beta public static class MultiBuilder { - protected List> multiSource = Lists.newArrayList(); + protected final String name; + protected final String descriptionBase; + protected final Builder builder; + // if desired, the use of this multiSource could allow different conditions; + // but probably an easier API just for the caller to build the parallel task + protected final List> multiSource = Lists.newArrayList(); protected Function, ? extends V2> postProcessFromMultiple; /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, @@ -520,34 +714,106 @@ public class DependentConfiguration { } @Beta protected MultiBuilder(Iterable sources, AttributeSensor sensor, Predicate readiness) { + builder = new Builder(null, sensor); + builder.readiness(readiness); + for (Entity s : checkNotNull(sources, "sources")) { - AttributeAndSensorCondition condition = new AttributeAndSensorCondition(s, sensor, readiness); - multiSource.add(condition); + multiSource.add(new AttributeAndSensorCondition(s, sensor, readiness)); } + this.name = "waiting on "+sensor.getName(); + this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness + +" from "+Iterables.size(sources)+" entit"+Strings.ies(sources); } + + /** Apply post-processing to the entire list of results */ public MultiBuilder postProcessFromMultiple(final Function, V2b> val) { - this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcess"); + this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple"); return (MultiBuilder) this; } + /** Apply post-processing to the entire list of results + * See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(brooklyn.util.collections.QuorumCheck, Predicate) + * which allow useful arguments. */ + public MultiBuilder postProcessFromMultiple(final Predicate> val) { + return postProcessFromMultiple(Functions.forPredicate(val)); + } + + public MultiBuilder postProcess(Closure val) { + builder.postProcess(val); + return (MultiBuilder) this; + } + public MultiBuilder postProcess(final Function val) { + builder.postProcess(val); + return (MultiBuilder) this; + } + public MultiBuilder abortIf(Entity source, AttributeSensor sensor) { + builder.abortIf(source, sensor); + return this; + } + public MultiBuilder abortIf(Entity source, AttributeSensor sensor, Predicate predicate) { + builder.abortIf(source, sensor, predicate); + return this; + } + public MultiBuilder abortIfOnFire() { + builder.abortIfOnFire(); + return this; + } + public MultiBuilder blockingDetails(String val) { + builder.blockingDetails(val); + return this; + } + public MultiBuilder timeout(Duration val) { + builder.timeout(val); + return this; + } + public MultiBuilder onTimeoutReturn(V val) { + builder.onTimeoutReturn(val); + return this; + } + public MultiBuilder onTimeoutThrow() { + builder.onTimeoutThrow(); + return this; + } + public MultiBuilder onUnmanagedReturn(V val) { + builder.onUnmanagedReturn(val); + return this; + } + public MultiBuilder onUnmanagedThrow() { + builder.onUnmanagedThrow(); + return this; + } + public Task build() { - checkState(multiSource.size() > 0, "Entity sources must be set: multiSource=%s", multiSource); + List> tasks = MutableList.of(); + for (AttributeAndSensorCondition source: multiSource) { + builder.source(source.source); + builder.sensor((AttributeSensor)source.sensor); + builder.readiness((Predicate)source.predicate); + tasks.add(builder.build()); + } + final Task> parallelTask = Tasks.>builder().parallel(true).addAll(tasks) + .name(name) + .description(descriptionBase+ + (builder.timeout!=null ? ", timeout "+builder.timeout : "")) + .build(); - // TODO Do we really want to try to support the list-of-entities? - final Task> task = (Task>) new ParallelTask(Iterables.transform(multiSource, new Function, Task>() { - @Override public Task apply(AttributeAndSensorCondition it) { - return (Task) builder().attributeWhenReady(it.source, it.sensor).readiness((Predicate)it.predicate).build(); - } - })); if (postProcessFromMultiple == null) { - return (Task) task; + // V2 should be the right type in normal operations + return (Task) parallelTask; } else { - return new BasicTask(new Callable() { - @Override public V2 call() throws Exception { - List prePostProgress = DynamicTasks.queueIfPossible(task).orSubmitAndBlock().getTask().get(); - return postProcessFromMultiple.apply(prePostProgress); - } - }); + return Tasks.builder().name(name).description(descriptionBase) + .tag("attributeWhenReady") + .body(new Callable() { + @Override public V2 call() throws Exception { + List prePostProgress = DynamicTasks.queue(parallelTask).get(); + return DynamicTasks.queue( + Tasks.builder().name("post-processing").description("Applying "+postProcessFromMultiple) + .body(Functionals.,V2>callable((Function)postProcessFromMultiple, prePostProgress)) + .build()).get(); + } + }) + .build(); } } } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/util/task/Tasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java index 0edd07b..fd33985 100644 --- a/core/src/main/java/brooklyn/util/task/Tasks.java +++ b/core/src/main/java/brooklyn/util/task/Tasks.java @@ -445,16 +445,28 @@ public class Tasks { return false; } } - - /** creates an (unsubmitted) task which waits for the given repeater, optionally failing if it does not complete with success */ - public static TaskAdaptable awaiting(Repeater repeater, boolean requireTrue) { - return awaitingBuilder(repeater, requireTrue).build(); + + /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe, + * returning true or false depending on whether repeater succeed */ + public static TaskBuilder testing(Repeater repeater) { + return Tasks.builder().body(new WaitForRepeaterCallable(repeater, false)) + .name("waiting for condition") + .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription()); } - /** creates a partially instantiated builder which waits for the given repeater, optionally failing if it does not complete with success, - * for further task customization and then {@link TaskBuilder#build()} */ - public static TaskBuilder awaitingBuilder(Repeater repeater, boolean requireTrue) { - return Tasks.builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue)); + /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe, + * throwing if it does not */ + public static TaskBuilder requiring(Repeater repeater) { + return Tasks.builder().body(new WaitForRepeaterCallable(repeater, true)) + .name("waiting for condition") + .description("Requiring " + getTimeoutString(repeater) + ": "+repeater); + } + + private static String getTimeoutString(Repeater repeater) { + Duration timeout = repeater.getTimeLimit(); + if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout)) + return "eventually"; + return "in "+timeout; } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java index b4c144f..9394b85 100644 --- a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java +++ b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java @@ -110,7 +110,7 @@ public class EntityPredicatesTest extends BrooklynAppUnitTestSupport { @Test public void testWithLocation() throws Exception { entity.addLocations(ImmutableList.of(loc)); - assertTrue(EntityPredicates.locationsInclude(loc).apply(entity)); - assertFalse(EntityPredicates.locationsInclude(loc).apply(app)); + assertTrue(EntityPredicates.locationsIncludes(loc).apply(entity)); + assertFalse(EntityPredicates.locationsIncludes(loc).apply(app)); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/util/task/TasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java index 9ee9970..68aa2ea 100644 --- a/core/src/test/java/brooklyn/util/task/TasksTest.java +++ b/core/src/test/java/brooklyn/util/task/TasksTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals; import java.util.Map; import java.util.Set; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -34,12 +35,14 @@ import brooklyn.management.Task; import brooklyn.test.entity.TestApplication; import brooklyn.test.entity.TestEntity; import brooklyn.util.guava.Functionals; +import brooklyn.util.repeat.Repeater; import brooklyn.util.time.Duration; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Callables; public class TasksTest extends BrooklynAppUnitTestSupport { @@ -127,4 +130,30 @@ public class TasksTest extends BrooklynAppUnitTestSupport { assertResolvesValue(v, Object.class, "foo"); } + @Test + public void testRepeater() throws Exception { + Task t; + + t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + t.get(Duration.ONE_SECOND); + + t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + Assert.assertEquals(t.get(Duration.ONE_SECOND), true); + + t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + try { + t.get(Duration.ONE_SECOND); + Assert.fail("Should have failed"); + } catch (Exception e) { + // expected + } + + t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + Assert.assertEquals(t.get(Duration.ONE_SECOND), false); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java index b815ee4..48553b3 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java @@ -22,14 +22,12 @@ import java.io.File; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import brooklyn.config.ConfigKey; import brooklyn.entity.Effector; import brooklyn.entity.Entity; import brooklyn.entity.Group; @@ -91,9 +89,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody imple specCfg.putAll(ConfigBag.newInstance(parameters.get(EXTRA_CONFIG)).getAllConfigAsConfigKeyMap()); - Map, Object> cfgLive = memberSpec.getConfigLive(); - cfgLive.clear(); - cfgLive.putAll(specCfg.getAllConfigAsConfigKeyMap()); + memberSpec.clearConfig(); + memberSpec.configure(specCfg.getAllConfigAsConfigKeyMap()); // not necessary, but good practice entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec); log.debug("Upgrading "+entity()+", new "+BrooklynCluster.MEMBER_SPEC+": "+memberSpec+" / "+specCfg); @@ -101,9 +98,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody imple upgrade(parameters); } catch (Exception e) { log.debug("Upgrading "+entity()+" failed, will rethrow after restoring "+BrooklynCluster.MEMBER_SPEC+" to: "+origSpecCfg); - Map, Object> cfgLive = memberSpec.getConfigLive(); - cfgLive.clear(); - cfgLive.putAll(origSpecCfg.getAllConfigAsConfigKeyMap()); + memberSpec.clearConfig(); + memberSpec.configure(origSpecCfg.getAllConfigAsConfigKeyMap()); // not necessary, but good practice entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec); @@ -187,7 +183,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody imple //2. Wait for them to be RUNNING (or at least STARTING to have completed) // (should already be the case, because above is synchronous and, we think, it will fail if start does not succeed) - DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, Attributes.SERVICE_STATE_ACTUAL, + DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, Attributes.SERVICE_STATE_ACTUAL, Predicates.not(Predicates.equalTo(Lifecycle.STARTING)), Duration.minutes(30))); //3. Set HOT_STANDBY in case it is not enabled on the command line ... @@ -197,7 +193,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody imple newNodes)).asTask().getUnchecked(); //... and wait until all of the nodes change state // TODO fail quicker if state changes to FAILED - DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, + DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES)); //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well) http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java index 299bd5b..00ab7bc 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java @@ -128,7 +128,7 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody { DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY)); // 2 confirm hot standby status - DynamicTasks.queue(EntityTasks.awaitingAttribute(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, + DynamicTasks.queue(EntityTasks.requiringAttributeEventually(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES)); // 3 stop new version http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java index a42091e..9ac7202 100644 --- a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java +++ b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java @@ -181,7 +181,7 @@ public class CollectionFunctionals { // --------- public static > Predicate all(Predicate attributeSatisfies) { - return new QuorumSatisfies(QuorumChecks.all(), attributeSatisfies); + return quorum(QuorumChecks.all(), attributeSatisfies); } public static > Predicate quorum(QuorumCheck quorumCheck, Predicate attributeSatisfies) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java new file mode 100644 index 0000000..9550288 --- /dev/null +++ b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.util.exceptions; + +public class NotManagedException extends IllegalStateException { + + private static final long serialVersionUID = -3359163414517503809L; + + public NotManagedException(Object object) { + super(object+" is not managed"); + } + + public NotManagedException(String message) { + super(message); + } + + public NotManagedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java new file mode 100644 index 0000000..c31512f --- /dev/null +++ b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.util.exceptions; + +public class TimeoutException extends IllegalStateException { + + private static final long serialVersionUID = -3359163414517503809L; + + public TimeoutException() { + super("timeout"); + } + + public TimeoutException(String message) { + super(message); + } + + public TimeoutException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java index 1e3e188..bd76ea8 100644 --- a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java +++ b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java @@ -386,5 +386,9 @@ public class Repeater { public String getDescription() { return description; } + + public Duration getTimeLimit() { + return timeLimit; + } }