brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [21/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util
Date Mon, 17 Aug 2015 19:17:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java
new file mode 100644
index 0000000..7fc112b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.core.util.flags.TypeCoercions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Durations;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/** 
+ * Resolves a given object, as follows:
+ * <li> If it is a {@link Tasks} or a {@link DeferredSupplier} then get its contents
+ * <li> If it's a map and {@link #deep(boolean)} is requested, it applies resolution to contents
+ * <li> It applies coercion
+ * <p>
+ * Fluent-style API exposes a number of other options.
+ */
+public class ValueResolver<T> implements DeferredSupplier<T> {
+
+    /** 
+     * Period to wait if we're expected to return real quick 
+     * but we want fast things to have time to finish.
+     * <p>
+     * Timings are always somewhat arbitrary but this at least
+     * allows some intention to be captured in code rather than arbitrary values. */
+    public static Duration REAL_QUICK_WAIT = Duration.millis(50);
+    /** 
+     * Period to wait if we're expected to return quickly 
+     * but we want to be a bit more generous for things to finish,
+     * without letting a caller get annoyed. 
+     * <p>
+     * See {@link #REAL_QUICK_WAIT}. */
+    public static Duration PRETTY_QUICK_WAIT = Duration.millis(200);
+    
+    /** Period to wait when we have to poll but want to give the illusion of no wait.
+     * See {@link Repeater#DEFAULT_REAL_QUICK_PERIOD} */ 
+    public static Duration REAL_QUICK_PERIOD = Repeater.DEFAULT_REAL_QUICK_PERIOD;
+    
+    private static final Logger log = LoggerFactory.getLogger(ValueResolver.class);
+    
+    final Object value;
+    final Class<T> type;
+    ExecutionContext exec;
+    String description;
+    boolean forceDeep;
+    /** null means do it if you can; true means always, false means never */
+    Boolean embedResolutionInTask;
+    /** timeout on execution, if possible, or if embedResolutionInTask is true */
+    Duration timeout;
+    boolean isTransientTask = true;
+    
+    T defaultValue = null;
+    boolean returnDefaultOnGet = false;
+    boolean swallowExceptions = false;
+    
+    // internal fields
+    final Object parentOriginalValue;
+    final CountdownTimer parentTimer;
+    AtomicBoolean started = new AtomicBoolean(false);
+    boolean expired;
+    
+    ValueResolver(Object v, Class<T> type) {
+        this.value = v;
+        this.type = type;
+        checkTypeNotNull();
+        parentOriginalValue = null;
+        parentTimer = null;
+    }
+    
+    ValueResolver(Object v, Class<T> type, ValueResolver<?> parent) {
+        this.value = v;
+        this.type = type;
+        checkTypeNotNull();
+        
+        exec = parent.exec;
+        description = parent.description;
+        forceDeep = parent.forceDeep;
+        embedResolutionInTask = parent.embedResolutionInTask;
+
+        parentOriginalValue = parent.getOriginalValue();
+
+        timeout = parent.timeout;
+        parentTimer = parent.parentTimer;
+        if (parentTimer!=null && parentTimer.isExpired())
+            expired = true;
+        
+        // default value and swallow exceptions do not need to be nested
+    }
+
+    public static class ResolverBuilderPretype {
+        final Object v;
+        public ResolverBuilderPretype(Object v) {
+            this.v = v;
+        }
+        public <T> ValueResolver<T> as(Class<T> type) {
+            return new ValueResolver<T>(v, type);
+        }
+    }
+
+    /** returns a copy of this resolver which can be queried, even if the original (single-use instance) has already been copied */
+    public ValueResolver<T> clone() {
+        ValueResolver<T> result = new ValueResolver<T>(value, type)
+            .context(exec).description(description)
+            .embedResolutionInTask(embedResolutionInTask)
+            .deep(forceDeep)
+            .timeout(timeout);
+        if (returnDefaultOnGet) result.defaultValue(defaultValue);
+        if (swallowExceptions) result.swallowExceptions();
+        return result;
+    }
+    
+    /** execution context to use when resolving; required if resolving unsubmitted tasks or running with a time limit */
+    public ValueResolver<T> context(ExecutionContext exec) {
+        this.exec = exec;
+        return this;
+    }
+    /** as {@link #context(ExecutionContext)} for use from an entity */
+    public ValueResolver<T> context(Entity entity) {
+        return context(entity!=null ? ((EntityInternal)entity).getExecutionContext() : null);
+    }
+    
+    /** sets a message which will be displayed in status reports while it waits (e.g. the name of the config key being looked up) */
+    public ValueResolver<T> description(String description) {
+        this.description = description;
+        return this;
+    }
+    
+    /** sets a default value which will be returned on a call to {@link #get()} if the task does not complete
+     * or completes with an error
+     * <p>
+     * note that {@link #getMaybe()} returns an absent object even in the presence of
+     * a default, so that any error can still be accessed */
+    public ValueResolver<T> defaultValue(T defaultValue) {
+        this.defaultValue = defaultValue;
+        this.returnDefaultOnGet = true;
+        return this;
+    }
+
+    /** indicates that no default value should be returned on a call to {@link #get()}, and instead it should throw
+     * (this is the default; this method is provided to undo a call to {@link #defaultValue(Object)}) */
+    public ValueResolver<T> noDefaultValue() {
+        this.returnDefaultOnGet = false;
+        this.defaultValue = null;
+        return this;
+    }
+    
+    /** indicates that exceptions in resolution should not be thrown on a call to {@link #getMaybe()}, 
+     * but rather used as part of the {@link Maybe#get()} if it's absent, 
+     * and swallowed altogether on a call to {@link #get()} in the presence of a {@link #defaultValue(Object)} */
+    public ValueResolver<T> swallowExceptions() {
+        this.swallowExceptions = true;
+        return this;
+    }
+    
+    /** whether the task should be marked as transient; defaults true */
+    public ValueResolver<T> transientTask(boolean isTransientTask) {
+        this.isTransientTask = isTransientTask;
+        return this;
+    }
+    
+    public Maybe<T> getDefault() {
+        if (returnDefaultOnGet) return Maybe.of(defaultValue);
+        else return Maybe.absent("No default value set");
+    }
+    
+    /** causes nested structures (maps, lists) to be descended and nested unresolved values resolved */
+    public ValueResolver<T> deep(boolean forceDeep) {
+        this.forceDeep = forceDeep;
+        return this;
+    }
+
+    /** if true, forces execution of a deferred supplier to be run in a task;
+     * if false, it prevents it (meaning time limits may not be applied);
+     * if null, the default, it runs in a task if a time limit is applied.
+     * <p>
+     * running inside a task is required for some {@link DeferredSupplier}
+     * instances which look up a task {@link ExecutionContext}. */
+    public ValueResolver<T> embedResolutionInTask(Boolean embedResolutionInTask) {
+        this.embedResolutionInTask = embedResolutionInTask;
+        return this;
+    }
+    
+    /** sets a time limit on executions
+     * <p>
+     * used for {@link Task} and {@link DeferredSupplier} instances.
+     * may require an execution context at runtime. */
+    public ValueResolver<T> timeout(Duration timeout) {
+        this.timeout = timeout;
+        return this;
+    }
+    
+    protected void checkTypeNotNull() {
+        if (type==null) 
+            throw new NullPointerException("type must be set to resolve, for '"+value+"'"+(description!=null ? ", "+description : ""));
+    }
+
+    public T get() {
+        Maybe<T> m = getMaybe();
+        if (m.isPresent()) return m.get();
+        if (returnDefaultOnGet) return defaultValue;
+        return m.get();
+    }
+    
+    public Maybe<T> getMaybe() {
+        Maybe<T> result = getMaybeInternal();
+        if (log.isTraceEnabled()) {
+            log.trace(this+" evaluated as "+result);
+        }
+        return result;
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected Maybe<T> getMaybeInternal() {
+        if (started.getAndSet(true))
+            throw new IllegalStateException("ValueResolver can only be used once");
+        
+        if (expired) return Maybe.absent("Nested resolution of "+getOriginalValue()+" did not complete within "+timeout);
+        
+        ExecutionContext exec = this.exec;
+        if (exec==null) {
+            // if execution context not specified, take it from the current task if present
+            exec = BasicExecutionContext.getCurrentExecutionContext();
+        }
+        
+        CountdownTimer timerU = parentTimer;
+        if (timerU==null && timeout!=null)
+            timerU = timeout.countdownTimer();
+        final CountdownTimer timer = timerU;
+        if (timer!=null && !timer.isRunning())
+            timer.start();
+        
+        checkTypeNotNull();
+        Object v = this.value;
+        
+        //if the expected type is a closure or map and that's what we have, we're done (or if it's null);
+        //but not allowed to return a future or DeferredSupplier as the resolved value
+        if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v)))
+            return Maybe.of((T) v);
+        
+        try {
+            //if it's a task or a future, we wait for the task to complete
+            if (v instanceof TaskAdaptable<?>) {
+                //if it's a task, we make sure it is submitted
+                if (!((TaskAdaptable<?>) v).asTask().isSubmitted() ) {
+                    if (exec==null)
+                        return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available");
+                    exec.submit(((TaskAdaptable<?>) v).asTask());
+                }
+            }
+
+            if (v instanceof Future) {
+                final Future<?> vfuture = (Future<?>) v;
+
+                //including tasks, above
+                if (!vfuture.isDone()) {
+                    Callable<Maybe> callable = new Callable<Maybe>() {
+                        public Maybe call() throws Exception {
+                            return Durations.get(vfuture, timer);
+                        } };
+
+                    String description = getDescription();
+                    Maybe vm = Tasks.withBlockingDetails("Waiting for "+description, callable);
+                    if (vm.isAbsent()) return vm;
+                    v = vm.get();
+
+                } else {
+                    v = vfuture.get();
+                    
+                }
+
+            } else if (v instanceof DeferredSupplier<?>) {
+                final Object vf = v;
+
+                if ((!Boolean.FALSE.equals(embedResolutionInTask) && (exec!=null || timeout!=null)) || Boolean.TRUE.equals(embedResolutionInTask)) {
+                    if (exec==null)
+                        return Maybe.absent("Embedding in task needed for '"+getDescription()+"' but no execution context available");
+                        
+                    Callable<Object> callable = new Callable<Object>() {
+                        public Object call() throws Exception {
+                            try {
+                                Tasks.setBlockingDetails("Retrieving "+vf);
+                                return ((DeferredSupplier<?>) vf).get();
+                            } finally {
+                                Tasks.resetBlockingDetails();
+                            }
+                        } };
+                    String description = getDescription();
+                    TaskBuilder<Object> vb = Tasks.<Object>builder().body(callable).name("Resolving dependent value").description(description);
+                    if (isTransientTask) vb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+                    Task<Object> vt = exec.submit(vb.build());
+                    // TODO to handle immediate resolution, it would be nice to be able to submit 
+                    // so it executes in the current thread,
+                    // or put a marker in the target thread or task while it is running that the task 
+                    // should never wait on anything other than another value being resolved 
+                    // (though either could recurse infinitely) 
+                    Maybe<Object> vm = Durations.get(vt, timer);
+                    vt.cancel(true);
+                    if (vm.isAbsent()) return (Maybe<T>)vm;
+                    v = vm.get();
+                    
+                } else {
+                    try {
+                        Tasks.setBlockingDetails("Retrieving (non-task) "+vf);
+                        v = ((DeferredSupplier<?>) vf).get();
+                    } finally {
+                        Tasks.resetBlockingDetails();
+                    }
+                }
+
+            } else if (v instanceof Map) {
+                //and if a map or list we look inside
+                Map result = Maps.newLinkedHashMap();
+                for (Map.Entry<?,?> entry : ((Map<?,?>)v).entrySet()) {
+                    Maybe<?> kk = new ValueResolver(entry.getKey(), type, this)
+                        .description( (description!=null ? description+", " : "") + "map key "+entry.getKey() )
+                        .getMaybe();
+                    if (kk.isAbsent()) return (Maybe<T>)kk;
+                    Maybe<?> vv = new ValueResolver(entry.getValue(), type, this)
+                        .description( (description!=null ? description+", " : "") + "map value for key "+kk.get() )
+                        .getMaybe();
+                    if (vv.isAbsent()) return (Maybe<T>)vv;
+                    result.put(kk.get(), vv.get());
+                }
+                return Maybe.of((T) result);
+
+            } else if (v instanceof Set) {
+                Set result = Sets.newLinkedHashSet();
+                int count = 0;
+                for (Object it : (Set)v) {
+                    Maybe<?> vv = new ValueResolver(it, type, this)
+                        .description( (description!=null ? description+", " : "") + "entry "+count )
+                        .getMaybe();
+                    if (vv.isAbsent()) return (Maybe<T>)vv;
+                    result.add(vv.get());
+                    count++;
+                }
+                return Maybe.of((T) result);
+
+            } else if (v instanceof Iterable) {
+                List result = Lists.newArrayList();
+                int count = 0;
+                for (Object it : (Iterable)v) {
+                    Maybe<?> vv = new ValueResolver(it, type, this)
+                        .description( (description!=null ? description+", " : "") + "entry "+count )
+                        .getMaybe();
+                    if (vv.isAbsent()) return (Maybe<T>)vv;
+                    result.add(vv.get());
+                    count++;
+                }
+                return Maybe.of((T) result);
+
+            } else {
+                return TypeCoercions.tryCoerce(v, TypeToken.of(type));
+            }
+
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            
+            IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e);
+            if (swallowExceptions) {
+                if (log.isDebugEnabled())
+                    log.debug("Resolution of "+this+" failed, swallowing and returning: "+e);
+                return Maybe.absent(problem);
+            }
+            if (log.isDebugEnabled())
+                log.debug("Resolution of "+this+" failed, throwing: "+e);
+            throw problem;
+        }
+        
+        return new ValueResolver(v, type, this).getMaybe();
+    }
+
+    protected String getDescription() {
+        return description!=null ? description : ""+value;
+    }
+    protected Object getOriginalValue() {
+        if (parentOriginalValue!=null) return parentOriginalValue;
+        return value;
+    }
+    
+    @Override
+    public String toString() {
+        return JavaClassNames.cleanSimpleClassName(this)+"["+JavaClassNames.cleanSimpleClassName(type)+" "+value+"]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java
new file mode 100644
index 0000000..a38e305
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it 
+public class SshFetchTaskFactory implements TaskFactory<SshFetchTaskWrapper> {
+    
+    private static final Logger log = LoggerFactory.getLogger(SshFetchTaskFactory.class);
+    
+    private boolean dirty = false;
+    
+    protected SshMachineLocation machine;
+    protected String remoteFile;
+    protected final ConfigBag config = ConfigBag.newInstance();
+
+    /** constructor where machine will be added later */
+    public SshFetchTaskFactory(String remoteFile) {
+        remoteFile(remoteFile);
+    }
+
+    /** convenience constructor to supply machine immediately */
+    public SshFetchTaskFactory(SshMachineLocation machine, String remoteFile) {
+        machine(machine);
+        remoteFile(remoteFile);
+    }
+
+    protected SshFetchTaskFactory self() { return this; }
+
+    protected void markDirty() {
+        dirty = true;
+    }
+    
+    public SshFetchTaskFactory machine(SshMachineLocation machine) {
+        markDirty();
+        this.machine = machine;
+        return self();
+    }
+        
+    public SshMachineLocation getMachine() {
+        return machine;
+    }
+    
+    public SshFetchTaskFactory remoteFile(String remoteFile) {
+        this.remoteFile = remoteFile;
+        return self();
+    }
+
+    public ConfigBag getConfig() {
+        return config;
+    }
+    
+    @Override
+    public SshFetchTaskWrapper newTask() {
+        dirty = false;
+        return new SshFetchTaskWrapper(this);
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        // help let people know of API usage error
+        if (dirty)
+            log.warn("Task "+this+" was modified but modification was never used");
+        super.finalize();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java
new file mode 100644
index 0000000..b6c2931
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskWrapper;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.task.TaskBuilder;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.os.Os;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+
+/**
+ * As {@link ProcessTaskWrapper}, but putting a file on the remote machine
+ * 
+ * @since 0.6.0
+ */
+@Beta
+public class SshFetchTaskWrapper implements TaskWrapper<String> {
+
+    private final Task<String> task;
+
+    private final String remoteFile;
+    private final SshMachineLocation machine;
+    private File backingFile;
+    private final ConfigBag config;
+    
+    
+    // package private as only AbstractSshTaskFactory should invoke
+    SshFetchTaskWrapper(SshFetchTaskFactory factory) {
+        this.remoteFile = Preconditions.checkNotNull(factory.remoteFile, "remoteFile");
+        this.machine = Preconditions.checkNotNull(factory.machine, "machine");
+        TaskBuilder<String> tb = TaskBuilder.<String>builder().dynamic(false).name("ssh fetch "+factory.remoteFile);
+        task = tb.body(new SshFetchJob()).build();
+        config = factory.getConfig();
+    }
+    
+    @Override
+    public Task<String> asTask() {
+        return getTask();
+    }
+    
+    @Override
+    public Task<String> getTask() {
+        return task;
+    }
+    
+    public String getRemoteFile() {
+        return remoteFile;
+    }
+    
+    public SshMachineLocation getMachine() {
+        return machine;
+    }
+        
+    private class SshFetchJob implements Callable<String> {
+        @Override
+        public String call() throws Exception {
+            int result = -1;
+            try {
+                Preconditions.checkNotNull(getMachine(), "machine");
+                backingFile = Os.newTempFile("brooklyn-ssh-fetch-", FilenameUtils.getName(remoteFile));
+                backingFile.deleteOnExit();
+                
+                result = getMachine().copyFrom(config.getAllConfig(), remoteFile, backingFile.getPath());
+            } catch (Exception e) {
+                throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned threw exception, in "+Tasks.current()+": "+e, e);
+            }
+            if (result!=0) {
+                throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned non-zero exit code  "+result+", in "+Tasks.current());
+            }
+            return FileUtils.readFileToString(backingFile);
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString()+"["+task+"]";
+    }
+
+    /** blocks, returns the fetched file as a string, throwing if there was an exception */
+    public String get() {
+        return getTask().getUnchecked();
+    }
+    
+    /** blocks, returns the fetched file as bytes, throwing if there was an exception */
+    public byte[] getBytes() {
+        block();
+        try {
+            return FileUtils.readFileToByteArray(backingFile);
+        } catch (IOException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    /** blocks until the task completes; does not throw */
+    public SshFetchTaskWrapper block() {
+        getTask().blockUntilEnded();
+        return this;
+    }
+ 
+    /** true iff the ssh job has completed (with or without failure) */
+    public boolean isDone() {
+        return getTask().isDone();
+    }   
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java
new file mode 100644
index 0000000..05cc42e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.io.InputStream;
+import java.io.Reader;
+
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.stream.KnownSizeInputStream;
+import brooklyn.util.stream.ReaderInputStream;
+
+import com.google.common.base.Suppliers;
+
+// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it 
+public class SshPutTaskFactory extends SshPutTaskStub implements TaskFactory<SshPutTaskWrapper> {
+    
+    private static final Logger log = LoggerFactory.getLogger(SshPutTaskFactory.class);
+    
+    private boolean dirty = false;
+
+    /** constructor where machine will be added later */
+    public SshPutTaskFactory(String remoteFile) {
+        remoteFile(remoteFile);
+    }
+
+    /** convenience constructor to supply machine immediately */
+    public SshPutTaskFactory(SshMachineLocation machine, String remoteFile) {
+        machine(machine);
+        remoteFile(remoteFile);
+    }
+
+    protected SshPutTaskFactory self() { return this; }
+
+    protected void markDirty() {
+        dirty = true;
+    }
+    
+    public SshPutTaskFactory machine(SshMachineLocation machine) {
+        markDirty();
+        this.machine = machine;
+        return self();
+    }
+        
+    public SshPutTaskFactory remoteFile(String remoteFile) {
+        this.remoteFile = remoteFile;
+        return self();
+    }
+
+    public SshPutTaskFactory summary(String summary) {
+        markDirty();
+        this.summary = summary;
+        return self();
+    }
+
+    public SshPutTaskFactory contents(String contents) {
+        markDirty();
+        this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents));  
+        return self();
+    }
+
+    public SshPutTaskFactory contents(byte[] contents) {
+        markDirty();
+        this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents));  
+        return self();
+    }
+
+    public SshPutTaskFactory contents(InputStream stream) {
+        markDirty();
+        this.contents = Suppliers.ofInstance(stream);  
+        return self();
+    }
+
+    public SshPutTaskFactory contents(Reader reader) {
+        markDirty();
+        this.contents = Suppliers.ofInstance(new ReaderInputStream(reader));  
+        return self();
+    }
+
+    public SshPutTaskFactory allowFailure() {
+        markDirty();
+        allowFailure = true;
+        return self();
+    }
+    
+    public SshPutTaskFactory createDirectory() {
+        markDirty();
+        createDirectory = true;
+        return self();
+    }
+    
+    public SshPutTaskWrapper newTask() {
+        dirty = false;
+        return new SshPutTaskWrapper(this);
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        // help let people know of API usage error
+        if (dirty)
+            log.warn("Task "+this+" was modified but modification was never used");
+        super.finalize();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java
new file mode 100644
index 0000000..4e3a024
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.io.InputStream;
+
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Supplier;
+
+public class SshPutTaskStub {
+
+    protected String remoteFile;
+    protected SshMachineLocation machine;
+    protected Supplier<? extends InputStream> contents;
+    protected String summary;
+    protected String permissions;
+    protected boolean allowFailure = false;
+    protected boolean createDirectory = false;
+    protected final ConfigBag config = ConfigBag.newInstance();
+
+    protected SshPutTaskStub() {
+    }
+    
+    protected SshPutTaskStub(SshPutTaskStub constructor) {
+        this.remoteFile = constructor.remoteFile;
+        this.machine = constructor.machine;
+        this.contents = constructor.contents;
+        this.summary = constructor.summary;
+        this.allowFailure = constructor.allowFailure;
+        this.createDirectory = constructor.createDirectory;
+        this.permissions = constructor.permissions;
+        this.config.copy(constructor.config);
+    }
+
+    public String getRemoteFile() {
+        return remoteFile;
+    }
+    
+    public String getSummary() {
+        if (summary!=null) return summary;
+        return "scp put: "+remoteFile;
+    }
+
+    public SshMachineLocation getMachine() {
+        return machine;
+    }
+    
+    protected ConfigBag getConfig() {
+        return config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java
new file mode 100644
index 0000000..13449d0
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskWrapper;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.internal.ssh.SshTool;
+import org.apache.brooklyn.core.util.task.TaskBuilder;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+
+/** As {@link ProcessTaskWrapper}, but putting a file on the remote machine */
+@Beta
+public class SshPutTaskWrapper extends SshPutTaskStub implements TaskWrapper<Void> {
+
+    private static final Logger log = LoggerFactory.getLogger(SshPutTaskWrapper.class);
+    
+    private final Task<Void> task;
+
+    protected Integer exitCodeOfCopy = null;
+    protected Exception exception = null;
+    protected boolean successful = false;
+    
+    // package private as only AbstractSshTaskFactory should invoke
+    SshPutTaskWrapper(SshPutTaskFactory constructor) {
+        super(constructor);
+        TaskBuilder<Void> tb = TaskBuilder.<Void>builder().dynamic(false).name(getSummary());
+        task = tb.body(new SshPutJob()).build();
+    }
+    
+    @Override
+    public Task<Void> asTask() {
+        return getTask();
+    }
+    
+    @Override
+    public Task<Void> getTask() {
+        return task;
+    }
+        
+    // TODO:
+    //   verify
+    //   copyAsRoot
+    //   owner
+    //   lastModificationDate - see {@link #PROP_LAST_MODIFICATION_DATE}; not supported by all SshTool implementations
+    //   lastAccessDate - see {@link #PROP_LAST_ACCESS_DATE}; not supported by all SshTool implementations
+
+    private class SshPutJob implements Callable<Void> {
+        @Override
+        public Void call() throws Exception {
+            try {
+                Preconditions.checkNotNull(getMachine(), "machine");
+                
+                String remoteFile = getRemoteFile();
+
+                if (createDirectory) {
+                    String remoteDir = remoteFile;
+                    int exitCodeOfCreate = -1;
+                    try {
+                        int li = remoteDir.lastIndexOf("/");
+                        if (li>=0) {
+                            remoteDir = remoteDir.substring(0, li+1);
+                            exitCodeOfCreate = getMachine().execCommands("creating directory for "+getSummary(), 
+                                    Arrays.asList("mkdir -p "+remoteDir));
+                        } else {
+                            // nothing to create
+                            exitCodeOfCreate = 0;
+                        }
+                    } catch (Exception e) {
+                        if (log.isDebugEnabled())
+                            log.debug("SSH put "+getRemoteFile()+" (create dir, in task "+getSummary()+") to "+getMachine()+" threw exception: "+e);
+                        exception = e;
+                    }
+                    if (exception!=null || !((Integer)0).equals(exitCodeOfCreate)) {
+                        if (!allowFailure) {
+                            if (exception != null) {
+                                throw new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" for SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception);
+                            }
+                            if (exitCodeOfCreate!=0) {
+                                exception = new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" SSH put task) ended with exit code "+exitCodeOfCreate+", in "+Tasks.current());
+                                throw exception;
+                            }
+                        }
+                        // not successful, but allowed
+                        return null;
+                    }
+                }
+                
+                ConfigBag config = ConfigBag.newInstanceCopying(getConfig());
+                if (permissions!=null) config.put(SshTool.PROP_PERMISSIONS, permissions);
+                
+                exitCodeOfCopy = getMachine().copyTo(config.getAllConfig(), contents.get(), remoteFile);
+
+                if (log.isDebugEnabled())
+                    log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" completed with exit code "+exitCodeOfCopy);
+            } catch (Exception e) {
+                if (log.isDebugEnabled())
+                    log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" threw exception: "+e);
+                exception = e;
+            }
+            
+            if (exception!=null || !((Integer)0).equals(exitCodeOfCopy)) {
+                if (!allowFailure) {
+                    if (exception != null) {
+                        throw new IllegalStateException(getSummary()+" (SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception);
+                    }
+                    if (exitCodeOfCopy!=0) {
+                        exception = new IllegalStateException(getSummary()+" (SSH put task) ended with exit code "+exitCodeOfCopy+", in "+Tasks.current());
+                        throw exception;
+                    }
+                }
+                // not successful, but allowed
+                return null;
+            }
+            
+            // TODO verify
+
+            successful = (exception==null && ((Integer)0).equals(exitCodeOfCopy));
+            return null;
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString()+"["+task+"]";
+    }
+
+    /** blocks, throwing if there was an exception */
+    public Void get() {
+        return getTask().getUnchecked();
+    }
+    
+    /** returns the exit code from the copy, 0 on success; 
+     * null if it has not completed or threw exception
+     * (not sure if this is ever a non-zero integer or if it is meaningful)
+     * <p>
+     * most callers will want the simpler {@link #isSuccessful()} */
+    public Integer getExitCode() {
+        return exitCodeOfCopy;
+    }
+    
+    /** returns any exception encountered in the operation */
+    public Exception getException() {
+        return exception;
+    }
+    
+    /** blocks until the task completes; does not throw */
+    public SshPutTaskWrapper block() {
+        getTask().blockUntilEnded();
+        return this;
+    }
+ 
+    /** true iff the ssh job has completed (with or without failure) */
+    public boolean isDone() {
+        return getTask().isDone();
+    }
+
+    /** true iff the scp has completed successfully; guaranteed to be set before {@link #isDone()} or {@link #block()} are satisfied */
+    public boolean isSuccessful() {
+        return successful;
+    }
+    
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java
new file mode 100644
index 0000000..5f8d735
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.apache.brooklyn.core.util.ResourceUtils;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.internal.ssh.SshTool;
+import org.apache.brooklyn.core.util.task.DynamicTasks;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.ssh.internal.PlainSshExecTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.config.ConfigUtils;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.ConfigKeys;
+
+import org.apache.brooklyn.location.basic.AbstractLocation;
+import org.apache.brooklyn.location.basic.LocationInternal;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.net.Urls;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Conveniences for generating {@link Task} instances to perform SSH activities on an {@link SshMachineLocation}.
+ * <p>
+ * To infer the {@link SshMachineLocation} and take properties from entities and global management context the
+ * {@link SshEffectorTasks} should be preferred over this class.
+ *  
+ * @see SshEffectorTasks
+ * @since 0.6.0
+ */
+@Beta
+public class SshTasks {
+
+    private static final Logger log = LoggerFactory.getLogger(SshTasks.class);
+        
+    public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, String ...commands) {
+        return newSshExecTaskFactory(machine, true, commands);
+    }
+    
+    public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String ...commands) {
+        return new PlainSshExecTaskFactory<Integer>(machine, commands) {
+            {
+                if (useMachineConfig)
+                    config.putIfAbsent(getSshFlags(machine));
+            }
+        };
+    }
+
+    public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, String remoteFile) {
+        return newSshPutTaskFactory(machine, true, remoteFile);
+    }
+    
+    public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) {
+        return new SshPutTaskFactory(machine, remoteFile) {
+            {
+                if (useMachineConfig)
+                    config.putIfAbsent(getSshFlags(machine));
+            }
+        };
+    }
+
+    public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, String remoteFile) {
+        return newSshFetchTaskFactory(machine, true, remoteFile);
+    }
+    
+    public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) {
+        return new SshFetchTaskFactory(machine, remoteFile) {
+            {
+                if (useMachineConfig)
+                    config.putIfAbsent(getSshFlags(machine));
+            }
+        };
+    }
+
+    private static Map<String, Object> getSshFlags(Location location) {
+        ConfigBag allConfig = ConfigBag.newInstance();
+        
+        if (location instanceof AbstractLocation) {
+            ManagementContext mgmt = ((AbstractLocation)location).getManagementContext();
+            if (mgmt!=null)
+                allConfig.putAll(mgmt.getConfig().getAllConfig());
+        }
+        
+        allConfig.putAll(((LocationInternal)location).config().getBag());
+        
+        Map<String, Object> result = Maps.newLinkedHashMap();
+        for (String keyS : allConfig.getAllConfig().keySet()) {
+            ConfigKey<?> key = ConfigKeys.newConfigKey(Object.class, keyS);
+            if (key.getName().startsWith(SshTool.BROOKLYN_CONFIG_KEY_PREFIX)) {
+                result.put(ConfigUtils.unprefixedKey(SshTool.BROOKLYN_CONFIG_KEY_PREFIX, key).getName(), allConfig.get(key));
+            }
+        }
+        return result;
+    }
+
+    @Beta
+    public static enum OnFailingTask { 
+        FAIL,
+        /** issues a warning, sometimes implemented as marking the task inessential and failing it if it appears
+         * we are in a dynamic {@link TaskQueueingContext};
+         * useful because this way the warning appears to the user;
+         * but note that the check is done against the calling thread so use with some care
+         * (and thus this enum is currently here rather then elsewhere) */
+        WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL,
+        /** issues a warning in the log if the task fails, otherwise swallows it */
+        WARN_IN_LOG_ONLY, 
+        /** not even a warning if the task fails (the caller is expected to handle it as appropriate) */
+        IGNORE }
+    
+    public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, final boolean failIfCantSudo) {
+        return dontRequireTtyForSudo(machine, failIfCantSudo ? OnFailingTask.FAIL : OnFailingTask.WARN_IN_LOG_ONLY);
+    }
+    /** creates a task which returns modifies sudoers to ensure non-tty access is permitted;
+     * also gives nice warnings if sudo is not permitted */
+    public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, OnFailingTask onFailingTaskRequested) {
+        final OnFailingTask onFailingTask;
+        if (onFailingTaskRequested==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) {
+            if (DynamicTasks.getTaskQueuingContext()!=null)
+                onFailingTask = onFailingTaskRequested;
+            else
+                onFailingTask = OnFailingTask.WARN_IN_LOG_ONLY;
+        } else {
+            onFailingTask = onFailingTaskRequested;
+        }
+        
+        final String id = Identifiers.makeRandomId(6);
+        return newSshExecTaskFactory(machine, 
+                BashCommands.dontRequireTtyForSudo(),
+                // strange quotes are to ensure we don't match against echoed stdin
+                BashCommands.sudo("echo \"sudo\"-is-working-"+id))
+            .summary("setting up sudo")
+            .configure(SshTool.PROP_ALLOCATE_PTY, true)
+            .allowingNonZeroExitCode()
+            .returning(new Function<ProcessTaskWrapper<?>,Boolean>() { public Boolean apply(ProcessTaskWrapper<?> task) {
+                if (task.getExitCode()==0 && task.getStdout().contains("sudo-is-working-"+id)) return true;
+                Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
+                
+                
+                if (onFailingTask!=OnFailingTask.IGNORE) {
+                    // TODO if in a queueing context can we mark this task inessential and throw?
+                    // that way user sees the message...
+                    String message = "Error setting up sudo for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+" "+
+                        " (exit code "+task.getExitCode()+(entity!=null ? ", entity "+entity : "")+")";
+                    DynamicTasks.queueIfPossible(Tasks.warning(message, null));
+                }
+                Streams.logStreamTail(log, "STDERR of sudo setup problem", Streams.byteArrayOfString(task.getStderr()), 1024);
+                
+                if (onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) {
+                    Tasks.markInessential();
+                }
+                if (onFailingTask==OnFailingTask.FAIL || onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) {
+                    throw new IllegalStateException("Passwordless sudo is required for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+
+                            (entity!=null ? " ("+entity+")" : ""));
+                }
+                return false; 
+            } });
+    }
+
+    /** Function for use in {@link ProcessTaskFactory#returning(Function)} which logs all information, optionally requires zero exit code, 
+     * and then returns stdout */
+    public static Function<ProcessTaskWrapper<?>, String> returningStdoutLoggingInfo(final Logger logger, final boolean requireZero) {
+        return new Function<ProcessTaskWrapper<?>, String>() {
+          public String apply(@Nullable ProcessTaskWrapper<?> input) {
+            if (logger!=null) logger.info(input+" COMMANDS:\n"+Strings.join(input.getCommands(),"\n"));
+            if (logger!=null) logger.info(input+" STDOUT:\n"+input.getStdout());
+            if (logger!=null) logger.info(input+" STDERR:\n"+input.getStderr());
+            if (requireZero && input.getExitCode()!=0) 
+                throw new IllegalStateException("non-zero exit code in "+input.getSummary()+": see log for more details!");
+            return input.getStdout();
+          }
+        };
+    }
+
+    /** task to install a file given a url, where the url is resolved remotely first then locally */
+    public static TaskFactory<?> installFromUrl(final SshMachineLocation location, final String url, final String destPath) {
+        return installFromUrl(ResourceUtils.create(SshTasks.class), ImmutableMap.<String,Object>of(), location, url, destPath);
+    }
+    /** task to install a file given a url, where the url is resolved remotely first then locally */
+    public static TaskFactory<?> installFromUrl(final ResourceUtils utils, final Map<String, ?> props, final SshMachineLocation location, final String url, final String destPath) {
+        return new TaskFactory<TaskAdaptable<?>>() {
+            @Override
+            public TaskAdaptable<?> newTask() {
+                return Tasks.<Void>builder().name("installing "+Urls.getBasename(url)).description("installing "+url+" to "+destPath).body(new Runnable() {
+                    @Override
+                    public void run() {
+                        int result = location.installTo(utils, props, url, destPath);
+                        if (result!=0) 
+                            throw new IllegalStateException("Failed to install '"+url+"' to '"+destPath+"' at "+location+": exit code "+result);
+                    }
+                }).build();
+            }
+        };
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java
new file mode 100644
index 0000000..45600d5
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh.internal;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.core.util.task.system.internal.AbstractProcessTaskFactory;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it 
+public abstract class AbstractSshExecTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends AbstractProcessTaskFactory<T,RET> implements ProcessTaskFactory<RET> {
+    
+    /** constructor where machine will be added later */
+    public AbstractSshExecTaskFactory(String ...commands) {
+        super(commands);
+    }
+
+    /** convenience constructor to supply machine immediately */
+    public AbstractSshExecTaskFactory(SshMachineLocation machine, String ...commands) {
+        this(commands);
+        machine(machine);
+    }
+    
+    @Override
+    public ProcessTaskWrapper<RET> newTask() {
+        dirty = false;
+        return new ProcessTaskWrapper<RET>(this) {
+            protected void run(ConfigBag config) {
+                Preconditions.checkNotNull(getMachine(), "machine");
+                if (Boolean.FALSE.equals(this.runAsScript)) {
+                    this.exitCode = getMachine().execCommands(config.getAllConfig(), getSummary(), commands, shellEnvironment);
+                } else { // runScript = null or TRUE
+                    this.exitCode = getMachine().execScript(config.getAllConfig(), getSummary(), commands, shellEnvironment);
+                }
+            }
+            protected String taskTypeShortName() { return "SSH"; }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java
new file mode 100644
index 0000000..4d5dfce
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh.internal;
+
+import java.util.List;
+
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Function;
+
+/** the "Plain" class exists purely so we can massage return types for callers' convenience */
+public class PlainSshExecTaskFactory<RET> extends AbstractSshExecTaskFactory<PlainSshExecTaskFactory<RET>,RET> {
+    /** constructor where machine will be added later */
+    public PlainSshExecTaskFactory(String ...commands) {
+        super(commands);
+    }
+
+    /** convenience constructor to supply machine immediately */
+    public PlainSshExecTaskFactory(SshMachineLocation machine, String ...commands) {
+        this(commands);
+        machine(machine);
+    }
+
+    /** Constructor where machine will be added later */
+    public PlainSshExecTaskFactory(List<String> commands) {
+        this(commands.toArray(new String[commands.size()]));
+    }
+
+    /** Convenience constructor to supply machine immediately */
+    public PlainSshExecTaskFactory(SshMachineLocation machine, List<String> commands) {
+        this(machine, commands.toArray(new String[commands.size()]));
+    }
+
+    @Override
+    public <T2> PlainSshExecTaskFactory<T2> returning(ScriptReturnType type) {
+        return (PlainSshExecTaskFactory<T2>) super.<T2>returning(type);
+    }
+
+    @Override
+    public <RET2> PlainSshExecTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) {
+        return (PlainSshExecTaskFactory<RET2>) super.returning(resultTransformation);
+    }
+    
+    @Override
+    public PlainSshExecTaskFactory<Boolean> returningIsExitCodeZero() {
+        return (PlainSshExecTaskFactory<Boolean>) super.returningIsExitCodeZero();
+    }
+    
+    @Override
+    public PlainSshExecTaskFactory<String> requiringZeroAndReturningStdout() {
+        return (PlainSshExecTaskFactory<String>) super.requiringZeroAndReturningStdout();
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java
new file mode 100644
index 0000000..f66e1ea
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.core.util.internal.ssh.SshTool;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskStub.ScriptReturnType;
+
+import brooklyn.config.ConfigKey;
+
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+
+public interface ProcessTaskFactory<T> extends TaskFactory<ProcessTaskWrapper<T>> {
+    public ProcessTaskFactory<T> machine(SshMachineLocation machine);
+    public ProcessTaskFactory<T> add(String ...commandsToAdd);
+    public ProcessTaskFactory<T> add(Iterable<String> commandsToAdd);
+    public ProcessTaskFactory<T> requiringExitCodeZero();
+    public ProcessTaskFactory<T> requiringExitCodeZero(String extraErrorMessage);
+    public ProcessTaskFactory<T> allowingNonZeroExitCode();
+    public ProcessTaskFactory<String> requiringZeroAndReturningStdout();
+    public ProcessTaskFactory<Boolean> returningIsExitCodeZero();
+    public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type);
+    public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation);
+    public ProcessTaskFactory<T> runAsCommand();
+    public ProcessTaskFactory<T> runAsScript();
+    public ProcessTaskFactory<T> runAsRoot();
+    public ProcessTaskFactory<T> environmentVariable(String key, String val);
+    public ProcessTaskFactory<T> environmentVariables(Map<String,String> vars);
+    public ProcessTaskFactory<T> summary(String summary);
+    
+    /** allows setting config-key based properties for specific underlying tools */
+    @Beta
+    public <V> ProcessTaskFactory<T> configure(ConfigKey<V> key, V value);
+
+    /** allows setting config-key/flag based properties for specific underlying tools;
+     * but note that if any are prefixed with {@link SshTool#BROOKLYN_CONFIG_KEY_PREFIX}
+     * these should normally be filtered out */
+    @Beta
+    public ProcessTaskFactory<T> configure(Map<?,?> flags);
+
+    /** adds a listener which will be notified of (otherwise) successful completion,
+     * typically used to invalidate the result (ie throw exception, to promote a string in the output to an exception);
+     * invoked even if return code is zero, so a better error can be thrown */
+    public ProcessTaskFactory<T> addCompletionListener(Function<ProcessTaskWrapper<?>, Void> function);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java
new file mode 100644
index 0000000..1937d15
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class ProcessTaskStub {
+    
+    protected final List<String> commands = new ArrayList<String>();
+    /** null for localhost */
+    protected SshMachineLocation machine;
+    
+    // config data
+    protected String summary;
+    protected final ConfigBag config = ConfigBag.newInstance();
+    
+    public static enum ScriptReturnType { CUSTOM, EXIT_CODE, STDOUT_STRING, STDOUT_BYTES, STDERR_STRING, STDERR_BYTES }
+    protected Function<ProcessTaskWrapper<?>, ?> returnResultTransformation = null;
+    protected ScriptReturnType returnType = ScriptReturnType.EXIT_CODE;
+    
+    protected Boolean runAsScript = null;
+    protected boolean runAsRoot = false;
+    protected Boolean requireExitCodeZero = null;
+    protected String extraErrorMessage = null;
+    protected Map<String,String> shellEnvironment = new MutableMap<String, String>();
+    protected final List<Function<ProcessTaskWrapper<?>, Void>> completionListeners = new ArrayList<Function<ProcessTaskWrapper<?>,Void>>();
+
+    public ProcessTaskStub() {}
+    
+    protected ProcessTaskStub(ProcessTaskStub source) {
+        commands.addAll(source.getCommands());
+        machine = source.getMachine();
+        summary = source.getSummary();
+        config.copy(source.getConfig());
+        returnResultTransformation = source.returnResultTransformation;
+        returnType = source.returnType;
+        runAsScript = source.runAsScript;
+        runAsRoot = source.runAsRoot;
+        requireExitCodeZero = source.requireExitCodeZero;
+        extraErrorMessage = source.extraErrorMessage;
+        shellEnvironment.putAll(source.getShellEnvironment());
+        completionListeners.addAll(source.getCompletionListeners());
+    }
+
+    public String getSummary() {
+        if (summary!=null) return summary;
+        return Strings.maxlen(Strings.join(commands, " ; "), 160);
+    }
+    
+    /** null for localhost */
+    public SshMachineLocation getMachine() {
+        return machine;
+    }
+    
+    public Map<String, String> getShellEnvironment() {
+        return ImmutableMap.copyOf(shellEnvironment);
+    }
+ 
+    @Override
+    public String toString() {
+        return super.toString()+"["+getSummary()+"]";
+    }
+
+    public List<String> getCommands() {
+        return ImmutableList.copyOf(commands);
+    }
+ 
+    public List<Function<ProcessTaskWrapper<?>, Void>> getCompletionListeners() {
+        return completionListeners;
+    }
+
+    protected ConfigBag getConfig() { return config; }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java
new file mode 100644
index 0000000..045b3c9
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskWrapper;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.core.util.internal.ssh.ShellTool;
+import org.apache.brooklyn.core.util.task.TaskBuilder;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.system.internal.AbstractProcessTaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+
+/** Wraps a fully constructed process task, and allows callers to inspect status. 
+ * Note that methods in here such as {@link #getStdout()} will return partially completed streams while the task is ongoing
+ * (and exit code will be null). You can {@link #block()} or {@link #get()} as conveniences on the underlying {@link #getTask()}. */ 
+public abstract class ProcessTaskWrapper<RET> extends ProcessTaskStub implements TaskWrapper<RET> {
+
+    private static final Logger log = LoggerFactory.getLogger(ProcessTaskWrapper.class);
+    
+    private final Task<RET> task;
+
+    // execution details
+    protected ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+    protected ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+    protected Integer exitCode = null;
+    
+    @SuppressWarnings("unchecked")
+    protected ProcessTaskWrapper(AbstractProcessTaskFactory<?,RET> constructor) {
+        super(constructor);
+        TaskBuilder<Object> tb = constructor.constructCustomizedTaskBuilder();
+        if (stdout!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, stdout));
+        if (stderr!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, stderr));
+        task = (Task<RET>) tb.body(new ProcessTaskInternalJob()).build();
+    }
+    
+    @Override
+    public Task<RET> asTask() {
+        return getTask();
+    }
+    
+    @Override
+    public Task<RET> getTask() {
+        return task;
+    }
+    
+    public Integer getExitCode() {
+        return exitCode;
+    }
+    
+    public byte[] getStdoutBytes() {
+        if (stdout==null) return null;
+        return stdout.toByteArray();
+    }
+    
+    public byte[] getStderrBytes() {
+        if (stderr==null) return null;
+        return stderr.toByteArray();
+    }
+    
+    public String getStdout() {
+        if (stdout==null) return null;
+        return stdout.toString();
+    }
+    
+    public String getStderr() {
+        if (stderr==null) return null;
+        return stderr.toString();
+    }
+
+    protected class ProcessTaskInternalJob implements Callable<Object> {
+        @Override
+        public Object call() throws Exception {
+            run( getConfigForRunning() );
+            
+            for (Function<ProcessTaskWrapper<?>, Void> listener: completionListeners) {
+                try {
+                    listener.apply(ProcessTaskWrapper.this);
+                } catch (Exception e) {
+                    logWithDetailsAndThrow("Error in "+taskTypeShortName()+" task "+getSummary()+": "+e, e);                    
+                }
+            }
+            
+            if (exitCode!=0 && !Boolean.FALSE.equals(requireExitCodeZero)) {
+                if (Boolean.TRUE.equals(requireExitCodeZero)) {
+                    logWithDetailsAndThrow(taskTypeShortName()+" task ended with exit code "+exitCode+" when 0 was required, in "+Tasks.current()+": "+getSummary(), null);
+                } else {
+                    // warn, but allow, on non-zero not explicitly allowed
+                    log.warn(taskTypeShortName()+" task ended with exit code "+exitCode+" when non-zero was not explicitly allowed (error may be thrown in future), in "
+                            +Tasks.current()+": "+getSummary());
+                }
+            }
+            switch (returnType) {
+            case CUSTOM: return returnResultTransformation.apply(ProcessTaskWrapper.this);
+            case STDOUT_STRING: return stdout.toString();
+            case STDOUT_BYTES: return stdout.toByteArray();
+            case STDERR_STRING: return stderr.toString();
+            case STDERR_BYTES: return stderr.toByteArray();
+            case EXIT_CODE: return exitCode;
+            }
+
+            throw new IllegalStateException("Unknown return type for "+taskTypeShortName()+" job "+getSummary()+": "+returnType);
+        }
+
+        protected void logWithDetailsAndThrow(String message, Throwable optionalCause) {
+            message = (extraErrorMessage!=null ? extraErrorMessage+": " : "") + message;
+            log.warn(message+" (throwing)");
+            logProblemDetails("STDERR", stderr, 1024);
+            logProblemDetails("STDOUT", stdout, 1024);
+            logProblemDetails("STDIN", Streams.byteArrayOfString(Strings.join(commands,"\n")), 4096);
+            if (optionalCause!=null) throw new IllegalStateException(message, optionalCause);
+            throw new IllegalStateException(message);
+        }
+        
+        protected void logProblemDetails(String streamName, ByteArrayOutputStream stream, int max) {
+            Streams.logStreamTail(log, streamName+" for problem in "+Tasks.current(), stream, max);
+        }
+
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString()+"["+task+"]";
+    }
+
+    /** blocks and gets the result, throwing if there was an exception */
+    public RET get() {
+        return getTask().getUnchecked();
+    }
+    
+    /** blocks until the task completes; does not throw */
+    public ProcessTaskWrapper<RET> block() {
+        getTask().blockUntilEnded();
+        return this;
+    }
+ 
+    /** true iff the process has completed (with or without failure) */
+    public boolean isDone() {
+        return getTask().isDone();
+    }
+
+    /** for overriding */
+    protected ConfigBag getConfigForRunning() {
+        ConfigBag config = ConfigBag.newInstanceCopying(ProcessTaskWrapper.this.config);
+        if (stdout!=null) config.put(ShellTool.PROP_OUT_STREAM, stdout);
+        if (stderr!=null) config.put(ShellTool.PROP_ERR_STREAM, stderr);
+        
+        if (!config.containsKey(ShellTool.PROP_NO_EXTRA_OUTPUT))
+            // by default no extra output (so things like cat, etc work as expected)
+            config.put(ShellTool.PROP_NO_EXTRA_OUTPUT, true);
+
+        if (runAsRoot)
+            config.put(ShellTool.PROP_RUN_AS_ROOT, true);
+        return config;
+    }
+
+    protected abstract void run(ConfigBag config);
+    
+    protected abstract String taskTypeShortName();
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java
new file mode 100644
index 0000000..d05b09c
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import org.apache.brooklyn.core.util.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory;
+
+public class SystemTasks {
+
+    public static ProcessTaskFactory<Integer> exec(String ...commands) {
+        return new ConcreteSystemProcessTaskFactory<Integer>(commands);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java
new file mode 100644
index 0000000..8b90263
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system.internal;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+
+import org.apache.brooklyn.core.util.task.TaskBuilder;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskStub;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public abstract class AbstractProcessTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends ProcessTaskStub implements ProcessTaskFactory<RET> {
+    
+    private static final Logger log = LoggerFactory.getLogger(AbstractProcessTaskFactory.class);
+    
+    protected boolean dirty = false;
+    
+    public AbstractProcessTaskFactory(String ...commands) {
+        this.commands.addAll(Arrays.asList(commands));
+    }
+
+    @SuppressWarnings("unchecked")
+    protected T self() { return (T)this; }
+    
+    protected void markDirty() {
+        dirty = true;
+    }
+    
+    @Override
+    public T add(String ...commandsToAdd) {
+        markDirty();
+        for (String commandToAdd: commandsToAdd) this.commands.add(commandToAdd);
+        return self();
+    }
+
+    @Override
+    public T add(Iterable<String> commandsToAdd) {
+        Iterables.addAll(this.commands, commandsToAdd);
+        return self();
+    }
+    
+    @Override
+    public T machine(SshMachineLocation machine) {
+        markDirty();
+        this.machine = machine;
+        return self();
+    }
+
+    @Override
+    public T requiringExitCodeZero() {
+        markDirty();
+        requireExitCodeZero = true;
+        return self();
+    }
+    
+    @Override
+    public T requiringExitCodeZero(String extraErrorMessage) {
+        markDirty();
+        requireExitCodeZero = true;
+        this.extraErrorMessage = extraErrorMessage;
+        return self();
+    }
+    
+    @Override
+    public T allowingNonZeroExitCode() {
+        markDirty();
+        requireExitCodeZero = false;
+        return self();
+    }
+
+    @Override
+    public ProcessTaskFactory<Boolean> returningIsExitCodeZero() {
+        if (requireExitCodeZero==null) allowingNonZeroExitCode();
+        return returning(new Function<ProcessTaskWrapper<?>,Boolean>() {
+            public Boolean apply(ProcessTaskWrapper<?> input) {
+                return input.getExitCode()==0;
+            }
+        });
+    }
+
+    @Override
+    public ProcessTaskFactory<String> requiringZeroAndReturningStdout() {
+        requiringExitCodeZero();
+        return this.<String>returning(ScriptReturnType.STDOUT_STRING);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type) {
+        markDirty();
+        returnType = Preconditions.checkNotNull(type);
+        return (ProcessTaskFactory<RET2>) self();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) {
+        markDirty();
+        returnType = ScriptReturnType.CUSTOM;
+        this.returnResultTransformation = resultTransformation;
+        return (ProcessTaskFactory<RET2>) self();
+    }
+    
+    @Override
+    public T runAsCommand() {
+        markDirty();
+        runAsScript = false;
+        return self();
+    }
+
+    @Override
+    public T runAsScript() {
+        markDirty();
+        runAsScript = true;
+        return self();
+    }
+
+    @Override
+    public T runAsRoot() {
+        markDirty();
+        runAsRoot = true;
+        return self();
+    }
+    
+    @Override
+    public T environmentVariable(String key, String val) {
+        markDirty();
+        shellEnvironment.put(key, val);
+        return self();
+    }
+
+    @Override
+    public T environmentVariables(Map<String,String> vars) {
+        if (vars!=null) {
+            markDirty();
+            shellEnvironment.putAll(vars);
+        }
+        return self();
+    }
+
+    /** creates the TaskBuilder which can be further customized; typically invoked by the initial {@link #newTask()} */
+    public TaskBuilder<Object> constructCustomizedTaskBuilder() {
+        TaskBuilder<Object> tb = TaskBuilder.builder().dynamic(false).name("ssh: "+getSummary());
+        
+        tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDIN, 
+                Streams.byteArrayOfString(Strings.join(commands, "\n"))));
+        tb.tag(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, shellEnvironment));
+        
+        return tb;
+    }
+    
+    @Override
+    public T summary(String summary) {
+        markDirty();
+        this.summary = summary;
+        return self();
+    }
+
+    @Override
+    public <V> T configure(ConfigKey<V> key, V value) {
+        config.configure(key, value);
+        return self();
+    }
+    
+    @Override
+    public T configure(Map<?, ?> flags) {
+        if (flags!=null)
+            config.putAll(flags);
+        return self();
+    }
+ 
+    @Override
+    public T addCompletionListener(Function<ProcessTaskWrapper<?>, Void> listener) {
+        completionListeners.add(listener);
+        return self();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        // help let people know of API usage error
+        if (dirty)
+            log.warn("Task "+this+" was modified but modification was never used");
+        super.finalize();
+    }
+}



Mime
View raw message