brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [20/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util
Date Tue, 18 Aug 2015 11:00:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
new file mode 100644
index 0000000..c776e4d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
@@ -0,0 +1,892 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.util.JavaGroovyEquivalents.asString;
+import static brooklyn.util.JavaGroovyEquivalents.elvisString;
+import groovy.lang.Closure;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Callables;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * The basic concrete implementation of a {@link Task} to be executed.
+ *
+ * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or
+ * {@link Callable} and will run in its own {@link Thread}.
+ * <p>
+ * The task can be given an optional displayName and description in its constructor (as named
+ * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called
+ * once whenever the task starts running and once again when the task is about to complete. Due to
+ * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we
+ * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job
+ * did so - or {@link #blockUntilEnded()} which will not throw errors.
+ */
+public class BasicTask<T> implements TaskInternal<T> {
+    private static final Logger log = LoggerFactory.getLogger(BasicTask.class);
+
+    private String id = Identifiers.makeRandomId(8);
+    protected Callable<T> job;
+    public final String displayName;
+    public final String description;
+
+    protected final Set<Object> tags = Sets.newConcurrentHashSet();
+    // for debugging, to record where tasks were created
+//    { tags.add(new Throwable("Creation stack trace")); }
+    
+    protected Task<?> proxyTargetTask = null;
+
+    protected String blockingDetails = null;
+    protected Task<?> blockingTask = null;
+    Object extraStatusText = null;
+
+    /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */
+    protected final ExecutionList listeners = new ExecutionList();
+    
+    /**
+     * Constructor needed to prevent confusion in groovy stubs when looking for default constructor,
+     *
+     * The generics on {@link Closure} break it if that is first constructor.
+     */
+    protected BasicTask() { this(Collections.emptyMap()); }
+    protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); }
+
+    public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); }
+    
+    public BasicTask(Map<?,?> flags, Callable<T> job) {
+        this.job = job;
+
+        if (flags.containsKey("tag")) tags.add(flags.remove("tag"));
+        Object ftags = flags.remove("tags");
+        if (ftags!=null) {
+            if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags);
+            else {
+                log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument"));
+                tags.add(ftags);
+            }
+        }
+
+        description = elvisString(flags.remove("description"), "");
+        String d = asString(flags.remove("displayName"));
+        displayName = (d==null ? "" : d);
+    }
+
+    public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); }
+    public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); }
+    public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); }
+    public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Task)
+            return ((Task<?>)obj).getId().equals(getId());
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings 
+        return "Task["+
+            (Strings.isNonEmpty(displayName) ? 
+                displayName : 
+                (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
+            ":"+getId()+"]";
+    }
+
+    @Override
+    public Task<T> asTask() {
+        return this;
+    }
+    
+    // housekeeping --------------------
+
+    /*
+     * These flags are set by BasicExecutionManager.submit.
+     *
+     * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed.
+     * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!)
+     *
+     * # submitter, submit time set, tags and other submit-time fields set
+     *
+     * # thread set, ThreadLocal getCurrentTask set
+     * # start time set, isBegun is true
+     * # task end callback run, if supplied
+     *
+     * # task runs
+     *
+     * # task end callback run, if supplied
+     * # end time set
+     * # thread cleared, ThreadLocal getCurrentTask set
+     * # Task.notifyAll()
+     * # Task.get() (result.get()) available, Task.isDone is true
+     *
+     * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly.
+     * Tests should catch most things, but be careful if you change any of the above semantics.
+     */
+
+    protected long queuedTimeUtc = -1;
+    protected long submitTimeUtc = -1;
+    protected long startTimeUtc = -1;
+    protected long endTimeUtc = -1;
+    protected Maybe<Task<?>> submittedByTask;
+
+    protected volatile Thread thread = null;
+    private volatile boolean cancelled = false;
+    /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
+    protected volatile Future<T> internalFuture = null;
+    
+    @Override
+    public synchronized void initInternalFuture(ListenableFuture<T> result) {
+        if (this.internalFuture != null) 
+            throw new IllegalStateException("task "+this+" is being given a result twice");
+        this.internalFuture = result;
+        notifyAll();
+    }
+
+    // metadata accessors ------------
+
+    @Override
+    public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); }
+    
+    /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
+     * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
+    @Override
+    public long getQueuedTimeUtc() { return queuedTimeUtc; }
+    
+    @Override
+    public long getSubmitTimeUtc() { return submitTimeUtc; }
+    
+    @Override
+    public long getStartTimeUtc() { return startTimeUtc; }
+    
+    @Override
+    public long getEndTimeUtc() { return endTimeUtc; }
+
+    @Override
+    public Future<T> getInternalFuture() { return internalFuture; }
+    
+    @Override
+    public Task<?> getSubmittedByTask() { 
+        if (submittedByTask==null) return null;
+        return submittedByTask.orNull(); 
+    }
+
+    /** the thread where the task is running, if it is running */
+    @Override
+    public Thread getThread() { return thread; }
+
+    // basic fields --------------------
+
+    @Override
+    public boolean isQueued() {
+        return (queuedTimeUtc >= 0);
+    }
+
+    @Override
+    public boolean isQueuedOrSubmitted() {
+        return isQueued() || isSubmitted();
+    }
+
+    @Override
+    public boolean isQueuedAndNotSubmitted() {
+        return isQueued() && (!isSubmitted());
+    }
+
+    @Override
+    public boolean isSubmitted() {
+        return submitTimeUtc >= 0;
+    }
+
+    @Override
+    public boolean isBegun() {
+        return startTimeUtc >= 0;
+    }
+
+    /** marks the task as queued for execution */
+    @Override
+    public void markQueued() {
+        if (queuedTimeUtc<0)
+            queuedTimeUtc = System.currentTimeMillis();
+    }
+
+    @Override
+    public final synchronized boolean cancel() { return cancel(true); }
+
+    /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted;
+     * probably going to be removed and perhaps some mechanism for running again made available
+     * @since 0.7.0  */
+    @Beta
+    public synchronized boolean uncancel() {
+        boolean wasCancelled = cancelled;
+        cancelled = false; 
+        return wasCancelled;
+    }
+    
+    @Override
+    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        if (isDone()) return false;
+        boolean cancel = true;
+        cancelled = true;
+        if (internalFuture!=null) { 
+            cancel = internalFuture.cancel(mayInterruptIfRunning);
+        }
+        notifyAll();
+        return cancel;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return cancelled || (internalFuture!=null && internalFuture.isCancelled());
+    }
+
+    @Override
+    public boolean isDone() {
+        // if endTime is set, result might not be completed yet, but it will be set very soon 
+        // (the two values are set close in time, result right after the endTime;
+        // but callback hooks might not see the result yet)
+        return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0;
+    }
+
+    /**
+     * Returns true if the task has had an error.
+     *
+     * Only true if calling {@link #get()} will throw an exception when it completes (including cancel).
+     * Implementations may set this true before completion if they have that insight, or
+     * (the default) they may compute it lazily after completion (returning false before completion).
+     */
+    @Override
+    public boolean isError() {
+        if (!isDone()) return false;
+        if (isCancelled()) return true;
+        try {
+            get();
+            return false;
+        } catch (Throwable t) {
+            return true;
+        }
+    }
+
+    // future value --------------------
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        try {
+            if (!isDone())
+                Tasks.setBlockingTask(this);
+            blockUntilStarted();
+            return internalFuture.get();
+        } finally {
+            Tasks.resetBlockingTask();
+        }
+    }
+
+    @Override
+    public T getUnchecked() {
+        try {
+            return get();
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    @Override
+    public synchronized void blockUntilStarted() {
+        blockUntilStarted(null);
+    }
+    
+    @Override
+    public synchronized boolean blockUntilStarted(Duration timeout) {
+        Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
+        while (true) {
+            if (cancelled) throw new CancellationException();
+            if (internalFuture==null)
+                try {
+                    if (timeout==null) {
+                        wait();
+                    } else {
+                        long remaining = endTime - System.currentTimeMillis();
+                        if (remaining>0)
+                            wait(remaining);
+                        else
+                            return false;
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    Throwables.propagate(e);
+                }
+            if (internalFuture!=null) return true;
+        }
+    }
+
+    @Override
+    public void blockUntilEnded() {
+        blockUntilEnded(null);
+    }
+    
+    @Override
+    public boolean blockUntilEnded(Duration timeout) {
+        Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
+        try { 
+            boolean started = blockUntilStarted(timeout);
+            if (!started) return false;
+            if (timeout==null) {
+                internalFuture.get();
+            } else {
+                long remaining = endTime - System.currentTimeMillis();
+                if (remaining>0)
+                    internalFuture.get(remaining, TimeUnit.MILLISECONDS);
+            }
+            return isDone();
+        } catch (Throwable t) {
+            Exceptions.propagateIfFatal(t);
+            if (!(t instanceof TimeoutException) && log.isDebugEnabled())
+                log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t);
+            return isDone(); 
+        }
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return get(new Duration(timeout, unit));
+    }
+    
+    @Override
+    public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
+        long start = System.currentTimeMillis();
+        Long end  = duration==null ? null : start + duration.toMillisecondsRoundingUp();
+        while (end==null || end > System.currentTimeMillis()) {
+            if (cancelled) throw new CancellationException();
+            if (internalFuture == null) {
+                synchronized (this) {
+                    long remaining = end - System.currentTimeMillis();
+                    if (internalFuture==null && remaining>0)
+                        wait(remaining);
+                }
+            }
+            if (internalFuture != null) break;
+        }
+        Long remaining = end==null ? null : end -  System.currentTimeMillis();
+        if (isDone()) {
+            return internalFuture.get(1, TimeUnit.MILLISECONDS);
+        } else if (remaining == null) {
+            return internalFuture.get();
+        } else if (remaining > 0) {
+            return internalFuture.get(remaining, TimeUnit.MILLISECONDS);
+        } else {
+            throw new TimeoutException();
+        }
+    }
+
+    @Override
+    public T getUnchecked(Duration duration) {
+        try {
+            return get(duration);
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    // ------------------ status ---------------------------
+    
+    /**
+     * Returns a brief status string
+     *
+     * Plain-text format. Reported status if there is one, otherwise state which will be one of:
+     * <ul>
+     * <li>Not submitted
+     * <li>Submitted for execution
+     * <li>Ended by error
+     * <li>Ended by cancellation
+     * <li>Ended normally
+     * <li>Running
+     * <li>Waiting
+     * </ul>
+     */
+    @Override
+    public String getStatusSummary() {
+        return getStatusString(0);
+    }
+
+    /**
+     * Returns detailed status, suitable for a hover
+     *
+     * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled.
+     */
+    @Override
+    public String getStatusDetail(boolean multiline) {
+        return getStatusString(multiline?2:1);
+    }
+
+    /**
+     * This method is useful for callers to see the status of a task.
+     *
+     * Also for developers to see best practices for examining status fields etc
+     *
+     * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail
+     */
+    protected String getStatusString(int verbosity) {
+//        Thread t = getThread();
+        String rv;
+        if (submitTimeUtc <= 0) rv = "Not submitted";
+        else if (!isCancelled() && startTimeUtc <= 0) {
+            rv = "Submitted for execution";
+            if (verbosity>0) {
+                long elapsed = System.currentTimeMillis() - submitTimeUtc;
+                rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
+            }
+            if (verbosity >= 2 && getExtraStatusText()!=null) {
+                rv += "\n\n"+getExtraStatusText();
+            }
+        } else if (isDone()) {
+            long elapsed = endTimeUtc - submitTimeUtc;
+            String duration = Time.makeTimeStringRounded(elapsed);
+            if (isCancelled()) {
+                rv = "Cancelled";
+                if (verbosity >= 1) rv+=" after "+duration;
+                
+                if (verbosity >= 2 && getExtraStatusText()!=null) {
+                    rv += "\n\n"+getExtraStatusText();
+                }
+            } else if (isError()) {
+                rv = "Failed";
+                if (verbosity >= 1) {
+                    rv += " after "+duration;
+                    Throwable error = Tasks.getError(this);
+
+                    if (verbosity >= 2 && getExtraStatusText()!=null) {
+                        rv += "\n\n"+getExtraStatusText();
+                    }
+                    
+                    //remove outer ExecException which is reported by the get(), we want the exception the task threw
+                    while (error instanceof ExecutionException) error = error.getCause();
+                    String errorMessage = Exceptions.collapseText(error);
+
+                    if (verbosity == 1) rv += ": "+abbreviate(errorMessage);
+                    if (verbosity >= 2) {
+                        rv += ": "+errorMessage;
+                        StringWriter sw = new StringWriter();
+                        ((Throwable)error).printStackTrace(new PrintWriter(sw));
+                        rv += "\n\n"+sw.getBuffer();
+                    }
+                }
+            } else {
+                rv = "Completed";
+                if (verbosity>=1) {
+                    if (verbosity==1) {
+                        try {
+                            Object v = get();
+                            rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString()));
+                        } catch (Exception e) {
+                            rv += ", but error accessing result ["+e+"]"; //shouldn't happen
+                        }
+                    } else {
+                        rv += " after "+duration;
+                        try {
+                            Object v = get();
+                            rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v);
+                        } catch (Exception e) {
+                            rv += " at first\n" +
+                                    "Error accessing result ["+e+"]"; //shouldn't happen
+                        }
+                        if (verbosity >= 2 && getExtraStatusText()!=null) {
+                            rv += "\n\n"+getExtraStatusText();
+                        }
+                    }
+                }
+            }
+        } else {
+            rv = getActiveTaskStatusString(verbosity);
+        }
+        return rv;
+    }
+    
+    private static String abbreviate(String s) {
+        s = Strings.getFirstLine(s);
+        if (s.length()>255) s = s.substring(0, 252)+ "...";
+        return s;
+    }
+
+    protected String getActiveTaskStatusString(int verbosity) {
+        String rv = "";
+        Thread t = getThread();
+    
+        // Normally, it's not possible for thread==null as we were started and not ended
+        
+        // However, there is a race where the task starts sand completes between the calls to getThread()
+        // at the start of the method and this call to getThread(), so both return null even though
+        // the intermediate checks returned started==true isDone()==false.
+        if (t == null) {
+            if (isDone()) {
+                return getStatusString(verbosity);
+            } else {
+                //should only happen for repeating task which is not active
+                return "Sleeping";
+            }
+        }
+
+        ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE));
+        if (getThread()==null)
+            //thread might have moved on to a new task; if so, recompute (it should now say "done")
+            return getStatusString(verbosity);
+        
+        if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) {
+            if (verbosity==1)
+                // short status string will just show blocking details
+                return blockingDetails;
+            //otherwise show the blocking details, then a new line, then additional information
+            rv = blockingDetails + "\n\n";
+        }
+        
+        if (verbosity >= 1 && blockingTask!=null) {
+            if (verbosity==1)
+                // short status string will just show blocking details
+                return "Waiting on "+blockingTask;
+            //otherwise show the blocking details, then a new line, then additional information
+            rv = "Waiting on "+blockingTask + "\n\n";
+        }
+
+        if (verbosity>=2) {
+            if (getExtraStatusText()!=null) {
+                rv += getExtraStatusText()+"\n\n";
+            }
+            
+            rv += ""+toString()+"\n";
+            if (submittedByTask!=null) {
+                rv += "Submitted by "+submittedByTask+"\n";
+            }
+
+            if (this instanceof HasTaskChildren) {
+                // list children tasks for compound tasks
+                try {
+                    Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren();
+                    if (childrenTasks.iterator().hasNext()) {
+                        rv += "Children:\n";
+                        for (Task<?> child: childrenTasks) {
+                            rv += "  "+child+": "+child.getStatusDetail(false)+"\n";
+                        }
+                    }
+                } catch (ConcurrentModificationException exc) {
+                    rv += "  (children not available - currently being modified)\n";
+                }
+            }
+            rv += "\n";
+        }
+        
+        LockInfo lock = ti.getLockInfo();
+        rv += "In progress";
+        if (verbosity>=1) {
+            if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) {
+                //not blocked
+                if (ti.isSuspended()) {
+                    // when does this happen?
+                    rv += ", thread suspended";
+                } else {
+                    if (verbosity >= 2) rv += " ("+ti.getThreadState()+")";
+                }
+            } else {
+                rv +=", thread waiting ";
+                if (ti.getThreadState() == Thread.State.BLOCKED) {
+                    rv += "(mutex) on "+lookup(lock);
+                    //TODO could say who holds it
+                } else if (ti.getThreadState() == Thread.State.WAITING) {
+                    rv += "(notify) on "+lookup(lock);
+                } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) {
+                    rv += "(timed) on "+lookup(lock);
+                } else {
+                    rv = "("+ti.getThreadState()+") on "+lookup(lock);
+                }
+            }
+        }
+        if (verbosity>=2) {
+            StackTraceElement[] st = ti.getStackTrace();
+            st = brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st);
+            if (st!=null && st.length>0)
+                rv += "\n" +"At: "+st[0];
+            for (int ii=1; ii<st.length; ii++) {
+                rv += "\n" +"    "+st[ii];
+            }
+        }
+        return rv;
+    }
+    
+    protected String lookup(LockInfo info) {
+        return info!=null ? ""+info : "unknown (sleep)";
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    
+    /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
+     * and typically cleared immediately afterwards; referenced by management api to inspect a task
+     * which is blocking
+     */
+    @Override
+    public String setBlockingDetails(String blockingDetails) {
+        String old = this.blockingDetails;
+        this.blockingDetails = blockingDetails;
+        return old;
+    }
+    
+    @Override
+    public Task<?> setBlockingTask(Task<?> blockingTask) {
+        Task<?> old = this.blockingTask;
+        this.blockingTask = blockingTask;
+        return old;
+    }
+    
+    @Override
+    public void resetBlockingDetails() {
+        this.blockingDetails = null;
+    }
+    
+    @Override
+    public void resetBlockingTask() {
+        this.blockingTask = null;
+    }
+
+    /** returns a textual message giving details while the task is blocked */
+    @Override
+    public String getBlockingDetails() {
+        return blockingDetails;
+    }
+    
+    /** returns a task that this task is blocked on */
+    @Override
+    public Task<?> getBlockingTask() {
+        return blockingTask;
+    }
+    
+    @Override
+    public void setExtraStatusText(Object extraStatus) {
+        this.extraStatusText = extraStatus;
+    }
+    
+    @Override
+    public Object getExtraStatusText() {
+        return extraStatusText;
+    }
+
+    // ---- add a way to warn if task is not run
+    
+    public interface TaskFinalizer {
+        public void onTaskFinalization(Task<?> t);
+    }
+
+    public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() {
+        @Override
+        public void onTaskFinalization(Task<?> t) {
+            if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) {
+                log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)");
+                log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true));
+                return;
+            }
+            if (!t.isDone()) {
+                // shouldn't happen
+                // TODO But does happen if management context was terminated (e.g. running test suite).
+                //      Should check if Execution Manager is running, and only log if it was not terminated?
+                log.warn("Task "+t+" is being finalized before completion");
+                return;
+            }
+        }
+    };
+
+    public static final TaskFinalizer NO_OP = new TaskFinalizer() {
+        @Override
+        public void onTaskFinalization(Task<?> t) {
+        }
+    };
+    
+    public void ignoreIfNotRun() {
+        setFinalizer(NO_OP);
+    }
+    
+    public void setFinalizer(TaskFinalizer f) {
+        TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
+        if (finalizer!=null && finalizer!=f)
+            throw new IllegalStateException("Cannot apply multiple finalizers");
+        if (isDone())
+            throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished");
+        tags.add(f);
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
+        if (finalizer==null) finalizer = WARN_IF_NOT_RUN;
+        finalizer.onTaskFinalization(this);
+    }
+    
+    public static class SubmissionErrorCatchingExecutor implements Executor {
+        final Executor target;
+        public SubmissionErrorCatchingExecutor(Executor target) {
+            this.target = target;
+        }
+        @Override
+        public void execute(Runnable command) {
+            if (isShutdown()) {
+                log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown.");
+                return;
+            }
+            try {
+                target.execute(command);
+            } catch (Exception e) {
+                if (isShutdown()) {
+                    log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown.");
+                } else {
+                    log.warn("Execution of task callback hook "+command+" failed: "+e, e);
+                }
+            }
+        }
+        protected boolean isShutdown() {
+            return target instanceof ExecutorService && ((ExecutorService)target).isShutdown();
+        }
+    }
+    
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        listeners.add(listener, new SubmissionErrorCatchingExecutor(executor));
+    }
+    
+    @Override
+    public void runListeners() {
+        listeners.execute();
+    }
+    
+    @Override
+    public void setEndTimeUtc(long val) {
+        endTimeUtc = val;
+    }
+    
+    @Override
+    public void setThread(Thread thread) {
+        this.thread = thread;
+    }
+    
+    @Override
+    public Callable<T> getJob() {
+        return job;
+    }
+    
+    @Override
+    public void setJob(Callable<T> job) {
+        this.job = job;
+    }
+    
+    @Override
+    public ExecutionList getListeners() {
+        return listeners;
+    }
+    
+    @Override
+    public void setSubmitTimeUtc(long val) {
+        submitTimeUtc = val;
+    }
+    
+    private static <T> Task<T> newGoneTaskFor(Task<?> task) {
+        Task<T> t = Tasks.<T>builder().dynamic(false).name(task.getDisplayName())
+            .description("Details of the original task "+task+" have been forgotten.")
+            .body(Callables.returning((T)null)).build();
+        ((BasicTask<T>)t).ignoreIfNotRun();
+        return t;
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void setSubmittedByTask(Task<?> task) {
+        submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task)));
+    }
+    
+    @Override
+    public Set<Object> getMutableTags() {
+        return tags;
+    }
+    
+    @Override
+    public void setStartTimeUtc(long val) {
+        startTimeUtc = val;
+    }
+
+    @Override
+    public void applyTagModifier(Function<Set<Object>,Void> modifier) {
+        modifier.apply(tags);
+    }
+
+    @Override
+    public Task<?> getProxyTarget() {
+        return proxyTargetTask;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
new file mode 100644
index 0000000..407a93a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+public interface CanSetName {
+
+    void setName(String name);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
new file mode 100644
index 0000000..8fdb146
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.util.collections.MutableMap;
+
+
+/**
+ * A {@link Task} that is comprised of other units of work: possibly a heterogeneous mix of {@link Task},
+ * {@link Runnable}, {@link Callable} and {@link Closure} instances.
+ * 
+ * This class holds the collection of child tasks, but subclasses have the responsibility of executing them in a
+ * sensible manner by implementing the abstract {@link #runJobs} method.
+ */
+public abstract class CompoundTask<T> extends BasicTask<List<T>> implements HasTaskChildren {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
+                
+    protected final List<Task<? extends T>> children;
+    protected final List<Object> result;
+    
+    /**
+     * Constructs a new compound task containing the specified units of work.
+     * 
+     * @param jobs  A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. 
+     * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types 
+     */
+    public CompoundTask(Object... jobs) {
+        this( Arrays.asList(jobs) );
+    }
+    
+    /**
+     * Constructs a new compound task containing the specified units of work.
+     * 
+     * @param jobs  A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. 
+     * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types 
+     */
+    public CompoundTask(Collection<?> jobs) {
+        this(MutableMap.of("tag", "compound"), jobs);
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public CompoundTask(Map<String,?> flags, Collection<?> jobs) {
+        super(flags);
+        super.job = new Callable<List<T>>() {
+            @Override public List<T> call() throws Exception {
+                return runJobs();
+            }
+        };
+        
+        this.result = new ArrayList<Object>(jobs.size());
+        this.children = new ArrayList<Task<? extends T>>(jobs.size());
+        for (Object job : jobs) {
+            Task subtask;
+            if (job instanceof TaskAdaptable) { subtask = ((TaskAdaptable)job).asTask(); }
+            else if (job instanceof Closure)  { subtask = new BasicTask<T>((Closure) job); }
+            else if (job instanceof Callable) { subtask = new BasicTask<T>((Callable) job); }
+            else if (job instanceof Runnable) { subtask = new BasicTask<T>((Runnable) job); }
+            
+            else throw new IllegalArgumentException("Invalid child "+(job == null ? null : job.getClass() + " ("+job+")")+
+                " passed to compound task; must be Runnable, Callable, Closure or Task");
+            
+            BrooklynTaskTags.addTagDynamically(subtask, ManagementContextInternal.SUB_TASK_TAG);
+            children.add(subtask);
+        }
+        
+        for (Task<?> t: getChildren()) {
+            ((TaskInternal<?>)t).markQueued();
+        }
+    }
+
+    /** return value needs to be specified by subclass; subclass should also setBlockingDetails 
+     * @throws ExecutionException 
+     * @throws InterruptedException */    
+    protected abstract List<T> runJobs() throws InterruptedException, ExecutionException;
+    
+    protected void submitIfNecessary(TaskAdaptable<?> task) {
+        if (!task.asTask().isSubmitted()) {
+            if (BasicExecutionContext.getCurrentExecutionContext() == null) {
+                throw new IllegalStateException("Compound task ("+task+") launched from "+this+" missing required execution context");
+            } else {
+                BasicExecutionContext.getCurrentExecutionContext().submit(task);
+            }
+        }
+    }
+    
+    public List<Task<? extends T>> getChildrenTyped() {
+        return children;
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public List<Task<?>> getChildren() {
+        return (List) getChildrenTyped();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
new file mode 100644
index 0000000..ad9416b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import com.google.common.base.Supplier;
+
+/**
+ * A class that supplies objects of a single type. When used as a ConfigKey value,
+ * the evaluation is deferred until getConfig() is called. The returned value will then
+ * be coerced to the correct type. 
+ * 
+ * Subsequent calls to getConfig will result in further calls to deferredProvider.get(), 
+ * rather than reusing the result. If you want to reuse the result, consider instead 
+ * using a Future.
+ * 
+ * Note that this functionality replaces the ues of Closure in brooklyn 0.4.0, which 
+ * served the same purpose.
+ */
+public interface DeferredSupplier<T> extends Supplier<T> {
+    @Override
+    T get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
new file mode 100644
index 0000000..e197705
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableList;
+
+/** Represents a task whose run() method can create other tasks
+ * which are run sequentially, but that sequence runs in parallel to this task
+ * <p>
+ * There is an optional primary job run with this task, along with multiple secondary children.
+ * If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default
+ * subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted).
+ * You can change the behavior of this task with fields in {@link FailureHandlingConfig},
+ * or the convenience {@link TaskQueueingContext#swallowChildrenFailures()}
+ * (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task).
+ * <p>
+ * This synchronizes on secondary tasks when submitting them, in case they may be manually submitted
+ * and the submitter wishes to ensure it is only submitted once.
+ * <p>
+ * Improvements which would be nice to have:
+ * <li> unqueued tasks not visible in api; would like that
+ * <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this,
+ *      and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above)
+ * <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI)   
+ **/
+public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext {
+
+    private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
+                
+    protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>();
+    protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>();
+    protected final Object jobTransitionLock = new Object();
+    protected volatile boolean primaryStarted = false;
+    protected volatile boolean primaryFinished = false;
+    protected volatile boolean secondaryQueueAborted = false;
+    protected Thread primaryThread;
+    protected DstJob dstJob;
+    protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT;
+
+    // default values for how to handle the various failures
+    @Beta
+    public static class FailureHandlingConfig {
+        /** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order), 
+         * but can set it up not to submit any more tasks if the primary fails */
+        public final boolean abortSecondaryQueueOnPrimaryFailure;
+        /** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/
+        public final boolean cancelSecondariesOnPrimaryFailure;
+        /** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual;
+         * typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case
+         * the secondary queue is never aborted */
+        public final boolean abortSecondaryQueueOnSecondaryFailure;
+        /** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */
+        public final boolean cancelSecondariesOnSecondaryFailure;
+        /** whether to issue cancel against primary task if a secondary task fails */
+        public final boolean cancelPrimaryOnSecondaryFailure;
+        /** whether to fail this task if a secondary task fails */
+        public final boolean failParentOnSecondaryFailure;
+        
+        @Beta
+        public FailureHandlingConfig(
+                boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure,
+                boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure,
+                boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) {
+            this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure;
+            this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure;
+            this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure;
+            this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure;
+            this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure;
+            this.failParentOnSecondaryFailure = failParentOnSecondaryFailure;
+        }
+        
+        public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true);
+        public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false);
+    }
+    
+    public static class QueueAbortedException extends IllegalStateException {
+        private static final long serialVersionUID = -7569362887826818524L;
+        
+        public QueueAbortedException(String msg) {
+            super(msg);
+        }
+        public QueueAbortedException(String msg, Throwable cause) {
+            super(msg, cause);
+        }
+    }
+
+    /**
+     * Constructs a new compound task containing the specified units of work.
+     * 
+     * @param jobs  A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. 
+     * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types 
+     */
+    public DynamicSequentialTask() {
+        this(null);
+    }
+    
+    public DynamicSequentialTask(Callable<T> mainJob) {
+        this(MutableMap.of("tag", "compound"), mainJob);
+    }
+    
+    public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) {
+        super(flags);
+        this.job = dstJob = new DstJob(mainJob);
+    }
+    
+    @Override
+    public void queue(Task<?> t) {
+        synchronized (jobTransitionLock) {
+            if (primaryFinished)
+                throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")");
+            if (secondaryQueueAborted)
+                throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")");
+            secondaryJobsAll.add(t);
+            secondaryJobsRemaining.add(t);
+            BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG);
+            ((TaskInternal<?>)t).markQueued();
+            jobTransitionLock.notifyAll();
+        }
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
+    }
+    public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
+        if (isDone()) return false;
+        if (log.isTraceEnabled()) log.trace("cancelling {}", this);
+        boolean cancel = super.cancel(mayInterruptTask);
+        if (alsoCancelChildren) {
+            for (Task<?> t: secondaryJobsAll)
+                cancel |= t.cancel(mayInterruptTask);
+        }
+        synchronized (jobTransitionLock) {
+            if (primaryThread!=null) {
+                if (interruptPrimaryThread) {
+                    if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
+                    primaryThread.interrupt();
+                }
+                cancel = true;
+            }
+        }
+        return cancel;
+    }
+    
+    @Override
+    public synchronized boolean uncancel() {
+        secondaryQueueAborted = false;
+        return super.uncancel();
+    }
+
+    @Override
+    public Iterable<Task<?>> getChildren() {
+        return Collections.unmodifiableCollection(secondaryJobsAll);
+    }
+    
+    /** submits the indicated task for execution in the current execution context, and returns immediately */
+    protected void submitBackgroundInheritingContext(Task<?> task) {
+        BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
+        if (log.isTraceEnabled()) {
+            log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
+        }
+        if (ec==null) {
+            String message = Tasks.current()!=null ?
+                    // user forgot ExecContext:
+                        "Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current()
+                    : // should not happen:
+                        "Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this;
+            log.warn(message+" (rethrowing)");
+            throw new IllegalStateException(message);
+        }
+        synchronized (task) {
+            if (task.isSubmitted()) {
+                if (log.isTraceEnabled()) {
+                    log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
+                }
+            } else {
+                try {
+                    ec.submit(task);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    // Give some context when the submit fails (happens when the target is already unmanaged)
+                    throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) {
+        this.failureHandlingConfig = failureHandlingConfig;
+    }
+    @Override
+    public void swallowChildrenFailures() {
+        setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES);
+    }
+    
+    protected class DstJob implements Callable<T> {
+        protected Callable<T> primaryJob;
+        /** currently executing (or just completed) secondary task, or null if none;
+         * with jobTransitionLock notified on change and completion */
+        protected volatile Task<?> currentSecondary = null;
+        protected volatile boolean finishedSecondaries = false;
+        
+        public DstJob(Callable<T> mainJob) {
+            this.primaryJob = mainJob;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T call() throws Exception {
+
+            synchronized (jobTransitionLock) {
+                primaryStarted = true;
+                primaryThread = Thread.currentThread();
+                for (Task<?> t: secondaryJobsAll)
+                    ((TaskInternal<?>)t).markQueued();
+            }
+            // TODO overkill having a thread/task for this, but it works
+            // optimisation would either use newTaskEndCallback property on task to submit
+            // or use some kind of single threaded executor for the queued tasks
+            Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
+                    .name("DST manager (internal)")
+                    // TODO marking it transient helps it be GC'd sooner, 
+                    // but ideally we wouldn't have this,
+                    // or else it would be a child
+                    .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+                    .body(new Callable<List<Object>>() {
+
+                @Override
+                public List<Object> call() throws Exception {
+                    List<Object> result = new ArrayList<Object>();
+                    try { 
+                        while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
+                            synchronized (jobTransitionLock) {
+                                if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
+                                    currentSecondary = null;
+                                    jobTransitionLock.wait(1000);
+                                }
+                            }
+                            @SuppressWarnings("rawtypes")
+                            Task secondaryJob = secondaryJobsRemaining.poll();
+                            if (secondaryJob != null) {
+                                synchronized (jobTransitionLock) {
+                                    currentSecondary = secondaryJob;
+                                    submitBackgroundInheritingContext(secondaryJob);
+                                    jobTransitionLock.notifyAll();
+                                }
+                                try {
+                                    result.add(secondaryJob.get());
+                                } catch (Exception e) {
+                                    if (TaskTags.isInessential(secondaryJob)) {
+                                        result.add(Tasks.getError(secondaryJob));
+                                        if (log.isDebugEnabled())
+                                            log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
+                                    } else {
+                                        if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
+                                            synchronized (jobTransitionLock) {
+                                                for (Task<?> t: secondaryJobsRemaining)
+                                                    t.cancel(true);
+                                                jobTransitionLock.notifyAll();
+                                            }
+                                        }
+                                        
+                                        if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
+                                            secondaryQueueAborted = true;
+                                            throw e;
+                                        }
+
+                                        if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
+                                            cancel(true, false, false);
+                                        }
+                                        
+                                        result.add(Tasks.getError(secondaryJob));
+                                        if (log.isDebugEnabled())
+                                            log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
+                                    }
+                                }
+                            }
+                        }
+                    } finally {
+                        synchronized (jobTransitionLock) {
+                            currentSecondary = null;
+                            finishedSecondaries = true;
+                            jobTransitionLock.notifyAll();
+                        }
+                    }
+                    return result;
+                }
+            }).build();
+            ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
+            
+            submitBackgroundInheritingContext(secondaryJobMaster);
+            
+            T result = null;
+            Throwable error = null;
+            Throwable uninterestingSelfError = null;
+            boolean errorIsFromChild = false;
+            try {
+                if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
+                if (primaryJob!=null) result = primaryJob.call();
+            } catch (Throwable selfException) {
+                Exceptions.propagateIfFatal(selfException);
+                if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
+                    // Error was caused by the task already having failed, and this thread calling queue() to try
+                    // to queue more work. The underlying cause will be much more interesting.
+                    // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
+                    // which gets propagated instead of the more interesting child exception.
+                    uninterestingSelfError = selfException;
+                } else {
+                    error = selfException;
+                    errorIsFromChild = false;
+                }
+                if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
+                    if (log.isDebugEnabled())
+                        log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
+                    secondaryQueueAborted = true;
+                }
+                if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
+                    if (log.isDebugEnabled())
+                        log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
+                    synchronized (jobTransitionLock) {
+                        for (Task<?> t: secondaryJobsRemaining)
+                            t.cancel(true);
+                        // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
+                        primaryThread = null;
+                        primaryFinished = true;
+                    }
+                }
+            } finally {
+                try {
+                    if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
+                    synchronized (jobTransitionLock) {
+                        // semaphore might be nicer here (aled notes as it is this is a little hard to read)
+                        primaryThread = null;
+                        primaryFinished = true;
+                        jobTransitionLock.notifyAll();
+                    }
+                    if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
+                        if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
+                        // wait on tasks sequentially so that blocking information is more interesting
+                        DynamicTasks.waitForLast();
+                        List<Object> result2 = secondaryJobMaster.get();
+                        try {
+                            if (primaryJob==null) result = (T)result2;
+                        } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
+                    }
+                } catch (Throwable childException) {
+                    Exceptions.propagateIfFatal(childException);
+                    if (error==null) {
+                        error = childException;
+                        errorIsFromChild = true;
+                    } else {
+                        if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
+                    }
+                }
+            }
+            if (error!=null) {
+                handleException(error, errorIsFromChild);
+            }
+            if (uninterestingSelfError != null) {
+                handleException(uninterestingSelfError, false);
+            }
+            return result;
+        }
+        
+        @Override
+        public String toString() {
+            return "DstJob:"+DynamicSequentialTask.this.getId();
+        }
+
+        /** waits for this job to complete, or the given time to elapse */
+        public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException {
+            CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null;
+            while (true) {
+                Task<?> cs;
+                Duration remaining;
+                synchronized (jobTransitionLock) {
+                    cs = currentSecondary;
+                    if (finishedSecondaries) return;
+                    remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining();
+                    if (!remaining.isPositive()) return;
+                    if (cs==null) {
+                        if (!includePrimary && secondaryJobsRemaining.isEmpty()) return;
+                        // parent still running, no children though
+                        Tasks.setBlockingTask(DynamicSequentialTask.this);
+                        jobTransitionLock.wait(remaining.toMilliseconds());
+                        Tasks.resetBlockingDetails();
+                    }
+                }
+                if (cs!=null) {
+                    Tasks.setBlockingTask(cs);
+                    cs.blockUntilEnded(remaining);
+                    Tasks.resetBlockingDetails();
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<Task<?>> getQueue() {
+        return ImmutableList.copyOf(secondaryJobsAll);
+    }
+
+    public void handleException(Throwable throwable, boolean fromChild) throws Exception {
+        Exceptions.propagateIfFatal(throwable);
+        if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) {
+            log.debug("Parent task "+this+" swallowing child error: "+throwable);
+            return;
+        }
+        handleException(throwable);
+    }
+    public void handleException(Throwable throwable) throws Exception { 
+        Exceptions.propagateIfFatal(throwable);
+        if (throwable instanceof Exception) {
+            // allow checked exceptions to be passed through
+            throw (Exception)throwable;
+        }
+        throw Exceptions.propagate(throwable);
+    }
+
+    @Override
+    public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) {
+        try {
+            dstJob.join(includePrimary, optionalTimeout);
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        if (throwFirstError) {
+            if (isError()) 
+                getUnchecked();
+            for (Task<?> t: getQueue())
+                if (t.isError() && !TaskTags.isInessential(t))
+                    t.getUnchecked();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
new file mode 100644
index 0000000..ed46558
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.apache.brooklyn.api.management.TaskWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+/** 
+ * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ * 
+ * @since 0.6.0
+ */
+@Beta
+public class DynamicTasks {
+
+    private static final Logger log = LoggerFactory.getLogger(DynamicTasks.class);
+    private static final ThreadLocal<TaskQueueingContext> taskQueueingContext = new ThreadLocal<TaskQueueingContext>();
+    
+    public static void setTaskQueueingContext(TaskQueueingContext newTaskQC) {
+        taskQueueingContext.set(newTaskQC);
+    }
+    
+    public static TaskQueueingContext getThreadTaskQueuingContext() {
+        return taskQueueingContext.get();
+    }
+    
+    public static TaskQueueingContext getTaskQueuingContext() {
+        TaskQueueingContext adder = getThreadTaskQueuingContext();
+        if (adder!=null) return adder;
+        Task<?> t = Tasks.current();
+        if (t instanceof TaskQueueingContext) return (TaskQueueingContext) t;
+        return null;
+    }
+
+    
+    public static void removeTaskQueueingContext() {
+        taskQueueingContext.remove();
+    }
+
+    public static class TaskQueueingResult<T> implements TaskWrapper<T> {
+        private final Task<T> task;
+        private final boolean wasQueued;
+        private ExecutionContext execContext = null;
+        
+        private TaskQueueingResult(TaskAdaptable<T> task, boolean wasQueued) {
+            this.task = task.asTask();
+            this.wasQueued = wasQueued;
+        }
+        @Override
+        public Task<T> asTask() {
+            return task;
+        }
+        @Override
+        public Task<T> getTask() {
+            return task;
+        }
+        /** returns true if the task was queued */
+        public boolean wasQueued() {
+            return wasQueued;
+        }
+        /** returns true if the task either is currently queued or has been submitted */
+        public boolean isQueuedOrSubmitted() {
+            return wasQueued || Tasks.isQueuedOrSubmitted(task);
+        }
+        /** specifies an execContext to use if the task has to be explicitly submitted;
+         * if omitted it will attempt to find one based on the current thread's context */
+        public TaskQueueingResult<T> executionContext(ExecutionContext execContext) {
+            this.execContext = execContext;
+            return this;
+        }
+        /** as {@link #executionContext(ExecutionContext)} but inferring from the entity */
+        public TaskQueueingResult<T> executionContext(Entity entity) {
+            this.execContext = ((EntityInternal)entity).getManagementSupport().getExecutionContext();
+            return this;
+        }
+        private boolean orSubmitInternal() {
+            if (!wasQueued()) {
+                if (isQueuedOrSubmitted()) {
+                    log.warn("Redundant call to execute "+getTask()+"; skipping");
+                    return false;
+                } else {
+                    ExecutionContext ec = execContext;
+                    if (ec==null)
+                        ec = BasicExecutionContext.getCurrentExecutionContext();
+                    if (ec==null)
+                        throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext");
+                    ec.submit(getTask());
+                    return true;
+                }
+            } else {
+                return false;
+            }
+        }
+        /** causes the task to be submitted (asynchronously) if it hasn't already been,
+         * requiring an entity execution context (will try to find a default if not set) */
+        public TaskQueueingResult<T> orSubmitAsync() {
+            orSubmitInternal();
+            return this;
+        }
+        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+        public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
+            executionContext(entity);
+            return orSubmitAsync();
+        }
+        /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
+         * useful in contexts such as libraries where callers may be either on a legacy call path 
+         * (which assumes all commands complete immediately);
+         * requiring an entity execution context (will try to find a default if not set) */
+        public TaskQueueingResult<T> orSubmitAndBlock() {
+            if (orSubmitInternal()) task.getUnchecked();
+            return this;
+        }
+        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+        public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
+            executionContext(entity);
+            return orSubmitAndBlock();
+        }
+        /** blocks for the task to be completed
+         * <p>
+         * needed in any context where subsequent commands assume the task has completed.
+         * not needed in a context where the task is simply being built up and queued.
+         * <p>
+         * throws if there are any errors
+         */
+        public T andWaitForSuccess() {
+            return task.getUnchecked();
+        }
+        public void orCancel() {
+            if (!wasQueued()) {
+                task.cancel(false);
+            }
+        }
+    }
+    
+    /**
+     * Tries to add the task to the current addition context if there is one, otherwise does nothing.
+     * <p/>
+     * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+     * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
+     * {@link BasicExecutionContext}.
+     */
+    public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) {
+        TaskQueueingContext adder = getTaskQueuingContext();
+        boolean result = false;
+        if (adder!=null)
+            result = Tasks.tryQueueing(adder, task);
+        return new TaskQueueingResult<T>(task, result);
+    }
+
+    /** @see #queueIfPossible(TaskAdaptable) */
+    public static <T> TaskQueueingResult<T> queueIfPossible(TaskFactory<? extends TaskAdaptable<T>> task) {
+        return queueIfPossible(task.newTask());
+    }
+
+    /** adds the given task to the nearest task addition context,
+     * either set as a thread-local, or in the current task, or the submitter of the task, etc
+     * <p>
+     * throws if it cannot add */
+    public static <T> Task<T> queueInTaskHierarchy(Task<T> task) {
+        Preconditions.checkNotNull(task, "Task to queue cannot be null");
+        Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task);
+        
+        TaskQueueingContext adder = getTaskQueuingContext();
+        if (adder!=null) { 
+            if (Tasks.tryQueueing(adder, task)) {
+                log.debug("Queued task {} at context {} (no hierarchy)", task, adder);
+                return task;
+            }
+        }
+        
+        Task<?> t = Tasks.current();
+        Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task);
+        
+        while (t!=null) {
+            if (t instanceof TaskQueueingContext) {
+                if (Tasks.tryQueueing((TaskQueueingContext)t, task)) {
+                    log.debug("Queued task {} at hierarchical context {}", task, t);
+                    return task;
+                }
+            }
+            t = t.getSubmittedByTask();
+        }
+        
+        throw new IllegalStateException("No task addition context available in current task hierarchy for adding task "+task);
+    }
+
+    /**
+     * Queues the given task.
+     * <p/>
+     * This method is only valid within a dynamic task. Use {@link #queueIfPossible(TaskAdaptable)}
+     * and {@link TaskQueueingResult#orSubmitAsync()} if the calling context is a basic task.
+     *
+     * @param task The task to queue
+     * @throws IllegalStateException if no task queueing context is available
+     * @return The queued task
+     */
+    public static <V extends TaskAdaptable<?>> V queue(V task) {
+        try {
+            Preconditions.checkNotNull(task, "Task to queue cannot be null");
+            Preconditions.checkState(!Tasks.isQueued(task), "Task to queue must not yet be queued: %s", task);
+            TaskQueueingContext adder = getTaskQueuingContext();
+            if (adder==null) {
+                throw new IllegalStateException("Task "+task+" cannot be queued here; no queueing context available");
+            }
+            adder.queue(task.asTask());
+            return task;
+        } catch (Throwable e) {
+            log.warn("Error queueing "+task+" (rethrowing): "+e);
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable)  */
+    public static void queue(TaskAdaptable<?> task1, TaskAdaptable<?> task2, TaskAdaptable<?> ...tasks) {
+        queue(task1);
+        queue(task2);
+        for (TaskAdaptable<?> task: tasks) queue(task);
+    }
+
+    /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable)  */
+    public static <T extends TaskAdaptable<?>> T queue(TaskFactory<T> taskFactory) {
+        return queue(taskFactory.newTask());
+    }
+
+    /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable)  */
+    public static void queue(TaskFactory<?> task1, TaskFactory<?> task2, TaskFactory<?> ...tasks) {
+        queue(task1.newTask());
+        queue(task2.newTask());
+        for (TaskFactory<?> task: tasks) queue(task.newTask());
+    }
+
+    /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable)  */
+    public static <T> Task<T> queue(String name, Callable<T> job) {
+        return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
+    }
+
+    /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable)  */
+    public static <T> Task<T> queue(String name, Runnable job) {
+        return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
+    }
+
+    /** queues the task if needed, i.e. if it is not yet submitted (so it will run), 
+     * or if it is submitted but not queued and we are in a queueing context (so it is available for informational purposes) */
+    public static <T extends TaskAdaptable<?>> T queueIfNeeded(T task) {
+        if (!Tasks.isQueued(task)) {
+            if (Tasks.isSubmitted(task) && getTaskQueuingContext()==null) {
+                // already submitted and not in a queueing context, don't try to queue
+            } else {
+                // needs submitting, put it in the queue
+                // (will throw an error if we are not a queueing context)
+                queue(task);
+            }
+        }
+        return task;
+    }
+    
+    /** submits/queues the given task if needed, and gets the result (unchecked) 
+     * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */
+    // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks!
+    public static <T> T get(TaskAdaptable<T> t) {
+        return queueIfNeeded(t).asTask().getUnchecked();
+    }
+
+    /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error 
+     * (excluding errors in inessential tasks),
+     * then returning the last task in the queue (which is guaranteed to have finished without error,
+     * if this method returns without throwing) */
+    public static Task<?> waitForLast() {
+        drain(null, true);
+        // this call to last is safe, as the above guarantees everything will have run
+        // (on errors the above will throw so we won't come here)
+        List<Task<?>> q = DynamicTasks.getTaskQueuingContext().getQueue();
+        return q.isEmpty() ? null : Iterables.getLast(q);
+    }
+    
+    /** Calls {@link TaskQueueingContext#drain(Duration, boolean, boolean)} on the current task context */
+    public static TaskQueueingContext drain(Duration optionalTimeout, boolean throwFirstError) {
+        TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+        Preconditions.checkNotNull(qc, "Cannot drain when there is no queueing context");
+        qc.drain(optionalTimeout, false, throwFirstError);
+        return qc;
+    }
+
+    /** as {@link Tasks#swallowChildrenFailures()} but requiring a {@link TaskQueueingContext}. */
+    @Beta
+    public static void swallowChildrenFailures() {
+        Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
+        Tasks.swallowChildrenFailures();
+    }
+
+    /** same as {@link Tasks#markInessential()}
+     * (but included here for convenience as it is often used in conjunction with {@link DynamicTasks}) */
+    public static void markInessential() {
+        Tasks.markInessential();
+    }
+
+    /** queues the task if possible, otherwise submits it asynchronously; returns the task for callers to 
+     * {@link Task#getUnchecked()} or {@link Task#blockUntilEnded()} */
+    public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
+        return queueIfPossible(task).orSubmitAsync(entity).asTask();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
new file mode 100644
index 0000000..5341b21
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import org.apache.brooklyn.api.management.Task;
+
+public interface ExecutionListener {
+
+    /** invoked when a task completes: 
+     * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set,
+     * and {@link Task#get()} should return immediately for most Task implementations
+     * (care has been taken to avoid potential deadlocks here, waiting for a result!)  */
+    public void onTaskDone(Task<?> task);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
new file mode 100644
index 0000000..be677e3
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+
+public class ExecutionUtils {
+    /**
+     * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
+     * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
+     * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static Object invoke(Object callable, Object ...args) {
+        if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
+        if (callable instanceof Callable) {
+            try {
+                return ((Callable<?>)callable).call();
+            } catch (Throwable t) {
+                throw Throwables.propagate(t);
+            }
+        }
+        if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
+        if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
+        if (callable==null) return null;
+        throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
+    }
+}



Mime
View raw message