brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [11/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util
Date Mon, 17 Aug 2015 19:17:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
new file mode 100644
index 0000000..cdaf433
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.osgi;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import org.apache.brooklyn.core.util.osgi.Osgis;
+import org.apache.brooklyn.core.util.osgi.Osgis.VersionedName;
+import org.osgi.framework.Version;
+import org.testng.annotations.Test;
+
+public class OsgisTest {
+
+    @Test
+    public void testParseOsgiIdentifier() throws Exception {
+        assertEquals(Osgis.parseOsgiIdentifier("a.b").get(), new VersionedName("a.b", null));
+        assertEquals(Osgis.parseOsgiIdentifier("a.b:0.1.2").get(), new VersionedName("a.b", Version.parseVersion("0.1.2")));
+        assertEquals(Osgis.parseOsgiIdentifier("a.b:0.0.0.SNAPSHOT").get(), new VersionedName("a.b", Version.parseVersion("0.0.0.SNAPSHOT")));
+        assertFalse(Osgis.parseOsgiIdentifier("a.b:0.notanumber.2").isPresent()); // invalid version
+        assertFalse(Osgis.parseOsgiIdentifier("a.b:0.1.2:3.4.5").isPresent());    // too many colons
+        assertFalse(Osgis.parseOsgiIdentifier("a.b:0.0.0_SNAPSHOT").isPresent()); // invalid version
+        assertFalse(Osgis.parseOsgiIdentifier("").isPresent());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
new file mode 100644
index 0000000..77f91f6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.ssh;
+
+import static brooklyn.util.ssh.BashCommands.sudo;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+
+public class BashCommandsIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(BashCommandsIntegrationTest.class);
+    
+    private ManagementContext mgmt;
+    private BasicExecutionContext exec;
+    
+    private File destFile;
+    private File sourceNonExistantFile;
+    private File sourceFile1;
+    private File sourceFile2;
+    private String sourceNonExistantFileUrl;
+    private String sourceFileUrl1;
+    private String sourceFileUrl2;
+    private SshMachineLocation loc;
+
+    private String localRepoFilename = "localrepofile.txt";
+    private File localRepoBasePath;
+    private File localRepoEntityBasePath;
+    private String localRepoEntityVersionPath;
+    private File localRepoEntityFile;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        mgmt = new LocalManagementContextForTests();
+        exec = new BasicExecutionContext(mgmt.getExecutionManager());
+        
+        destFile = Os.newTempFile(getClass(), "commoncommands-test-dest.txt");
+        
+        sourceNonExistantFile = new File("/this/does/not/exist/ERQBETJJIG1234");
+        sourceNonExistantFileUrl = sourceNonExistantFile.toURI().toString();
+        
+        sourceFile1 = Os.newTempFile(getClass(), "commoncommands-test.txt");
+        sourceFileUrl1 = sourceFile1.toURI().toString();
+        Files.write("mysource1".getBytes(), sourceFile1);
+        
+        sourceFile2 = Os.newTempFile(getClass(), "commoncommands-test2.txt");
+        sourceFileUrl2 = sourceFile2.toURI().toString();
+        Files.write("mysource2".getBytes(), sourceFile2);
+
+        localRepoEntityVersionPath = JavaClassNames.simpleClassName(this)+"-test-dest-"+Identifiers.makeRandomId(8);
+        localRepoBasePath = new File(format("%s/.brooklyn/repository", System.getProperty("user.home")));
+        localRepoEntityBasePath = new File(localRepoBasePath, localRepoEntityVersionPath);
+        localRepoEntityFile = new File(localRepoEntityBasePath, localRepoFilename);
+        localRepoEntityBasePath.mkdirs();
+        Files.write("mylocal1".getBytes(), localRepoEntityFile);
+
+        loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (sourceFile1 != null) sourceFile1.delete();
+        if (sourceFile2 != null) sourceFile2.delete();
+        if (destFile != null) destFile.delete();
+        if (localRepoEntityFile != null) localRepoEntityFile.delete();
+        if (localRepoEntityBasePath != null) FileUtils.deleteDirectory(localRepoEntityBasePath);
+        if (loc != null) loc.close();
+        if (mgmt != null) Entities.destroyAll(mgmt);
+    }
+    
+    @Test(groups="Integration")
+    public void testSudo() throws Exception {
+        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        ByteArrayOutputStream errStream = new ByteArrayOutputStream();
+        String cmd = sudo("whoami");
+        int exitcode = loc.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "test", ImmutableList.of(cmd));
+        String outstr = new String(outStream.toByteArray());
+        String errstr = new String(errStream.toByteArray());
+        
+        assertEquals(exitcode, 0, "out="+outstr+"; err="+errstr);
+        assertTrue(outstr.contains("root"), "out="+outstr+"; err="+errstr);
+    }
+    
+    public void testDownloadUrl() throws Exception {
+        List<String> cmds = BashCommands.commandsToDownloadUrlsAs(
+                ImmutableList.of(sourceFileUrl1), 
+                destFile.getAbsolutePath());
+        int exitcode = loc.execCommands("test", cmds);
+        
+        assertEquals(0, exitcode);
+        assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1"));
+    }
+    
+    @Test(groups="Integration")
+    public void testDownloadFirstSuccessfulFile() throws Exception {
+        List<String> cmds = BashCommands.commandsToDownloadUrlsAs(
+                ImmutableList.of(sourceNonExistantFileUrl, sourceFileUrl1, sourceFileUrl2), 
+                destFile.getAbsolutePath());
+        int exitcode = loc.execCommands("test", cmds);
+        
+        assertEquals(0, exitcode);
+        assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1"));
+    }
+    
+    @Test(groups="Integration")
+    public void testDownloadToStdout() throws Exception {
+        ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc, 
+                "cd "+destFile.getParentFile().getAbsolutePath(),
+                BashCommands.downloadToStdout(Arrays.asList(sourceFileUrl1))+" | sed s/my/your/")
+            .requiringZeroAndReturningStdout().newTask();
+
+        String result = exec.submit(t).get();
+        assertTrue(result.trim().equals("yoursource1"), "Wrong contents of stdout download: "+result);
+    }
+
+    @Test(groups="Integration")
+    public void testAlternativesWhereFirstSucceeds() throws Exception {
+        ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc)
+                .add(BashCommands.alternatives(Arrays.asList("echo first", "exit 88")))
+                .newTask();
+
+        Integer returnCode = exec.submit(t).get();
+        String stdout = t.getStdout();
+        String stderr = t.getStderr();
+        log.info("alternatives for good first command gave: "+returnCode+"; err="+stderr+"; out="+stdout);
+        assertTrue(stdout.contains("first"), "errcode="+returnCode+"; stdout="+stdout+"; stderr="+stderr);
+        assertEquals(returnCode, (Integer)0);
+    }
+
+    @Test(groups="Integration")
+    public void testAlternatives() throws Exception {
+        ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc)
+                .add(BashCommands.alternatives(Arrays.asList("asdfj_no_such_command_1", "exit 88")))
+                .newTask();
+
+        Integer returnCode = exec.submit(t).get();
+        log.info("alternatives for bad commands gave: "+returnCode+"; err="+new String(t.getStderr())+"; out="+new String(t.getStdout()));
+        assertEquals(returnCode, (Integer)88);
+    }
+
+    @Test(groups="Integration")
+    public void testRequireTestHandlesFailure() throws Exception {
+        ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+            .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(),
+                    "The requested file does not exist")).newTask();
+
+        exec.submit(t).get();
+        assertNotEquals(t.getExitCode(), (Integer)0);
+        assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr());
+        assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout());
+    }
+
+    @Test(groups="Integration")
+    public void testRequireTestHandlesSuccess() throws Exception {
+        ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+            .add(BashCommands.requireTest("-f "+sourceFile1.getPath(),
+                    "The requested file does not exist")).newTask();
+
+        exec.submit(t).get();
+        assertEquals(t.getExitCode(), (Integer)0);
+        assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr());
+    }
+
+    @Test(groups="Integration")
+    public void testRequireFileHandlesFailure() throws Exception {
+        ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+            .add(BashCommands.requireFile(sourceNonExistantFile.getPath())).newTask();
+
+        exec.submit(t).get();
+        assertNotEquals(t.getExitCode(), (Integer)0);
+        assertTrue(t.getStderr().contains("required file"), "Expected message in: "+t.getStderr());
+        assertTrue(t.getStderr().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStderr());
+        assertTrue(t.getStdout().contains("required file"), "Expected message in: "+t.getStdout());
+        assertTrue(t.getStdout().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStdout());
+    }
+
+    @Test(groups="Integration")
+    public void testRequireFileHandlesSuccess() throws Exception {
+        ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+            .add(BashCommands.requireFile(sourceFile1.getPath())).newTask();
+
+        exec.submit(t).get();
+        assertEquals(t.getExitCode(), (Integer)0);
+        assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr());
+    }
+
+    @Test(groups="Integration")
+    public void testRequireFailureExitsImmediately() throws Exception {
+        ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+            .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(),
+                    "The requested file does not exist"))
+            .add("echo shouldnae come here").newTask();
+
+        exec.submit(t).get();
+        assertNotEquals(t.getExitCode(), (Integer)0);
+        assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr());
+        assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout());
+        Assert.assertFalse(t.getStdout().contains("shouldnae"), "Expected message in: "+t.getStdout());
+    }
+
+    @Test(groups="Integration")
+    public void testPipeMultiline() throws Exception {
+        String output = execRequiringZeroAndReturningStdout(loc,
+                BashCommands.pipeTextTo("hello world\n"+"and goodbye\n", "wc")).get();
+
+        assertEquals(Strings.replaceAllRegex(output, "\\s+", " ").trim(), "3 4 25");
+    }
+
+    @Test(groups="Integration")
+    public void testWaitForFileContentsWhenAbortingOnFail() throws Exception {
+        String fileContent = "mycontents";
+        String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, true);
+
+        int exitcode = loc.execCommands("test", ImmutableList.of(cmd));
+        assertEquals(exitcode, 1);
+        
+        Files.write(fileContent, destFile, Charsets.UTF_8);
+        int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd));
+        assertEquals(exitcode2, 0);
+    }
+
+    @Test(groups="Integration")
+    public void testWaitForFileContentsWhenNotAbortingOnFail() throws Exception {
+        String fileContent = "mycontents";
+        String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, false);
+
+        String output = execRequiringZeroAndReturningStdout(loc, cmd).get();
+        assertTrue(output.contains("Couldn't find"), "output="+output);
+
+        Files.write(fileContent, destFile, Charsets.UTF_8);
+        String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get();
+        assertFalse(output2.contains("Couldn't find"), "output="+output2);
+    }
+    
+    @Test(groups="Integration")
+    public void testWaitForFileContentsWhenContentsAppearAfterStart() throws Exception {
+        String fileContent = "mycontents";
+
+        String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.THIRTY_SECONDS, false);
+        ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd);
+        exec.submit(t);
+        
+        // sleep for long enough to ensure the ssh command is definitely executing
+        Thread.sleep(5*1000);
+        assertFalse(t.isDone());
+        
+        Files.write(fileContent, destFile, Charsets.UTF_8);
+        String output = t.get();
+        assertFalse(output.contains("Couldn't find"), "output="+output);
+    }
+    
+    @Test(groups="Integration", dependsOnMethods="testSudo")
+    public void testWaitForPortFreeWhenAbortingOnTimeout() throws Exception {
+        ServerSocket serverSocket = openServerSocket();
+        try {
+            int port = serverSocket.getLocalPort();
+            String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, true);
+    
+            int exitcode = loc.execCommands("test", ImmutableList.of(cmd));
+            assertEquals(exitcode, 1);
+            
+            serverSocket.close();
+            assertTrue(Networking.isPortAvailable(port));
+            int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd));
+            assertEquals(exitcode2, 0);
+        } finally {
+            serverSocket.close();
+        }
+    }
+
+    @Test(groups="Integration", dependsOnMethods="testSudo")
+    public void testWaitForPortFreeWhenNotAbortingOnTimeout() throws Exception {
+        ServerSocket serverSocket = openServerSocket();
+        try {
+            int port = serverSocket.getLocalPort();
+            String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, false);
+    
+            String output = execRequiringZeroAndReturningStdout(loc, cmd).get();
+            assertTrue(output.contains(port+" still in use"), "output="+output);
+    
+            serverSocket.close();
+            assertTrue(Networking.isPortAvailable(port));
+            String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get();
+            assertFalse(output2.contains("still in use"), "output="+output2);
+        } finally {
+            serverSocket.close();
+        }
+    }
+    
+    @Test(groups="Integration", dependsOnMethods="testSudo")
+    public void testWaitForPortFreeWhenFreedAfterStart() throws Exception {
+        ServerSocket serverSocket = openServerSocket();
+        try {
+            int port = serverSocket.getLocalPort();
+    
+            String cmd = BashCommands.waitForPortFree(port, Duration.THIRTY_SECONDS, false);
+            ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd);
+            exec.submit(t);
+            
+            // sleep for long enough to ensure the ssh command is definitely executing
+            Thread.sleep(5*1000);
+            assertFalse(t.isDone());
+            
+            serverSocket.close();
+            assertTrue(Networking.isPortAvailable(port));
+            String output = t.get();
+            assertFalse(output.contains("still in use"), "output="+output);
+        } finally {
+            serverSocket.close();
+        }
+    }
+
+    
+    // Disabled by default because of risk of overriding /etc/hosts in really bad way if doesn't work properly!
+    // As a manual visual inspection test, consider first manually creating /etc/hostname and /etc/sysconfig/network
+    // so that it looks like debian+ubuntu / CentOS/RHEL.
+    @Test(groups={"Integration"}, enabled=false)
+    public void testSetHostnameUnqualified() throws Exception {
+        runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, false);
+    }
+
+    @Test(groups={"Integration"}, enabled=false)
+    public void testSetHostnameQualified() throws Exception {
+        runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase()+".brooklyn.incubator.apache.org", null, false);
+    }
+
+    @Test(groups={"Integration"}, enabled=false)
+    public void testSetHostnameNullDomain() throws Exception {
+        runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, true);
+    }
+
+    @Test(groups={"Integration"}, enabled=false)
+    public void testSetHostnameNonNullDomain() throws Exception {
+        runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), "brooklyn.incubator.apache.org", true);
+    }
+
+    protected void runSetHostname(String newHostname, String newDomain, boolean includeDomain) throws Exception {
+        String fqdn = (includeDomain && Strings.isNonBlank(newDomain)) ? newHostname + "." + newDomain : newHostname;
+        
+        LocalManagementContextForTests mgmt = new LocalManagementContextForTests();
+        SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+
+        execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testSetHostname")).get();
+        execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname", sudo("cp /etc/hostname /etc/hostname-orig-testSetHostname"))).get();
+        execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network", sudo("cp /etc/sysconfig/network /etc/sysconfig/network-orig-testSetHostname"))).get();
+        
+        String origHostname = getHostnameNoArgs(loc);
+        assertTrue(Strings.isNonBlank(origHostname));
+        
+        try {
+            List<String> cmd = (includeDomain) ? BashCommands.setHostname(newHostname, newDomain) : BashCommands.setHostname(newHostname);
+            execRequiringZeroAndReturningStdout(loc, cmd).get();
+
+            String actualHostnameUnqualified = getHostnameUnqualified(loc);
+            String actualHostnameFullyQualified = getHostnameFullyQualified(loc);
+
+            // TODO On OS X at least, we aren't actually setting the domain name; we're just letting 
+            //      the user pass in what the domain name is. We do add this properly to /etc/hosts
+            //      (e.g. first line is "127.0.0.1 br-g4x5wgx8.brooklyn.incubator.apache.org br-g4x5wgx8 localhost")
+            //      but subsequent calls to `hostname -f` returns the unqualified. Similarly, `domainname` 
+            //      returns blank. Therefore we can't assert that it equals our expected val (because we just made  
+            //      it up - "brooklyn.incubator.apache.org").
+            //      assertEquals(actualHostnameFullyQualified, fqdn);
+            assertEquals(actualHostnameUnqualified, Strings.getFragmentBetween(newHostname, null, "."));
+            execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameUnqualified).get();
+            execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameFullyQualified).get();
+            
+            String result = execRequiringZeroAndReturningStdout(loc, "grep -n "+fqdn+" /etc/hosts").get();
+            assertTrue(result.contains("localhost"), "line="+result);
+            log.info("result="+result);
+            
+        } finally {
+            execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testSetHostname /etc/hosts")).get();
+            execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname-orig-testSetHostname", sudo("cp /etc/hostname-orig-testSetHostname /etc/hostname"))).get();
+            execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network-orig-testSetHostname", sudo("cp /etc/sysconfig/network-orig-testSetHostname /etc/sysconfig/network"))).get();
+            execRequiringZeroAndReturningStdout(loc, sudo("hostname "+origHostname)).get();
+        }
+    }
+
+    // Marked disabled because not safe to run on your normal machine! It modifies /etc/hosts, which is dangerous if things go wrong!
+    @Test(groups={"Integration"}, enabled=false)
+    public void testModifyEtcHosts() throws Exception {
+        LocalManagementContextForTests mgmt = new LocalManagementContextForTests();
+        SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+
+        execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testModifyEtcHosts")).get();
+        int numLinesOrig = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]);
+        
+        try {
+            String cmd = BashCommands.prependToEtcHosts("1.2.3.4", "myhostnamefor1234.at.start", "myhostnamefor1234b");
+            execRequiringZeroAndReturningStdout(loc, cmd).get();
+            
+            String cmd2 = BashCommands.appendToEtcHosts("5.6.7.8", "myhostnamefor5678.at.end", "myhostnamefor5678");
+            execRequiringZeroAndReturningStdout(loc, cmd2).get();
+            
+            String grepFirst = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor1234 /etc/hosts").get();
+            String grepLast = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor5678 /etc/hosts").get();
+            int numLinesAfter = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]);
+            log.info("result: numLinesBefore="+numLinesOrig+"; numLinesAfter="+numLinesAfter+"; first="+grepFirst+"; last="+grepLast);
+            
+            assertTrue(grepFirst.startsWith("1:") && grepFirst.contains("1.2.3.4 myhostnamefor1234.at.start myhostnamefor1234"), "first="+grepFirst);
+            assertTrue(grepLast.startsWith((numLinesOrig+2)+":") && grepLast.contains("5.6.7.8 myhostnamefor5678.at.end myhostnamefor5678"), "last="+grepLast);
+            assertEquals(numLinesOrig + 2, numLinesAfter, "lines orig="+numLinesOrig+", after="+numLinesAfter);
+        } finally {
+            execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testModifyEtcHosts /etc/hosts")).get();
+        }
+    }
+    
+    private String getHostnameNoArgs(SshMachineLocation machine) {
+        String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname; echo AFTMARKER").get();
+        return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+    }
+
+    private String getHostnameUnqualified(SshMachineLocation machine) {
+        String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname -s 2> /dev/null || hostname; echo AFTMARKER").get();
+        return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+    }
+
+    private String getHostnameFullyQualified(SshMachineLocation machine) {
+        String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname --fqdn 2> /dev/null || hostname -f; echo AFTMARKER").get();
+        return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+    }
+
+    private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, Collection<String> cmds) {
+        return execRequiringZeroAndReturningStdout(loc, cmds.toArray(new String[cmds.size()]));
+    }
+    
+    private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, String... cmds) {
+        ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc, cmds)
+                .requiringZeroAndReturningStdout().newTask();
+        exec.submit(t);
+        return t;
+    }
+
+    private ServerSocket openServerSocket() {
+        int lowerBound = 40000;
+        int upperBound = 40100;
+        for (int i = lowerBound; i < upperBound; i++) {
+            try {
+                return new ServerSocket(i);
+            } catch (IOException e) {
+                // try next number
+            }
+        }
+        throw new IllegalStateException("No ports available in range "+lowerBound+" to "+upperBound);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
new file mode 100644
index 0000000..71d2586
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Callables;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class BasicTaskExecutionPerformanceTest {
+    private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionPerformanceTest.class);
+ 
+    private static final int TIMEOUT_MS = 10*1000;
+    
+    private BasicExecutionManager em;
+
+    public static final int MAX_OVERHEAD_MS = 1500; // was 750ms but saw 1.3s on buildhive
+    public static final int EARLY_RETURN_GRACE = 25; // saw 13ms early return on jenkins!
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        em = new BasicExecutionManager("mycontext");
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (em != null) em.shutdownNow();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testScheduledTaskExecutedAfterDelay() throws Exception {
+        int delay = 100;
+        final CountDownLatch latch = new CountDownLatch(1);
+        
+        Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+            @Override public Task<?> call() {
+                return new BasicTask<Void>(new Runnable() {
+                    @Override public void run() {
+                        latch.countDown();
+                    }});
+            }};
+        ScheduledTask t = new ScheduledTask(taskFactory).delay(delay);
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        em.submit(t);
+        
+        assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        long actualDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+        
+        assertTrue(actualDelay > (delay-EARLY_RETURN_GRACE), "actualDelay="+actualDelay+"; delay="+delay);
+        assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testScheduledTaskExecutedAtRegularPeriod() throws Exception {
+        final int period = 100;
+        final int numTimestamps = 4;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList());
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        
+        Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+            @Override public Task<?> call() {
+                return new BasicTask<Void>(new Runnable() {
+                    @Override public void run() {
+                        timestamps.add(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+                        if (timestamps.size() >= numTimestamps) latch.countDown();
+                    }});
+            }};
+        ScheduledTask t = new ScheduledTask(taskFactory).delay(1).period(period);
+        em.submit(t);
+        
+        assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        
+        synchronized (timestamps) {
+            long prev = timestamps.get(0);
+            for (long timestamp : timestamps.subList(1, timestamps.size())) {
+                assertTrue(timestamp > prev+period-EARLY_RETURN_GRACE, "timestamps="+timestamps);
+                assertTrue(timestamp < prev+period+MAX_OVERHEAD_MS, "timestamps="+timestamps);
+                prev = timestamp;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCanCancelScheduledTask() throws Exception {
+        final int period = 1;
+        final long checkPeriod = 250;
+        final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList());
+        
+        Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+            @Override public Task<?> call() {
+                return new BasicTask<Void>(new Runnable() {
+                    @Override public void run() {
+                        timestamps.add(System.currentTimeMillis());
+                    }});
+            }};
+        ScheduledTask t = new ScheduledTask(taskFactory).period(period);
+        em.submit(t);
+
+        t.cancel();
+        long cancelTime = System.currentTimeMillis();
+        int countImmediatelyAfterCancel = timestamps.size();
+        Thread.sleep(checkPeriod);
+        int countWellAfterCancel = timestamps.size();
+
+        // should have at most 1 more execution after cancel
+        log.info("testCanCancelScheduledTask saw "+countImmediatelyAfterCancel+" then cancel then "+countWellAfterCancel+" total");                
+        assertTrue(countWellAfterCancel - countImmediatelyAfterCancel <= 2, "timestamps="+timestamps+"; cancelTime="+cancelTime);
+    }
+
+    // Previously, when we used a CopyOnWriteArraySet, performance for submitting new tasks was
+    // terrible, and it degraded significantly as the number of previously executed tasks increased
+    // (e.g. 9s for first 1000; 26s for next 1000; 42s for next 1000).
+    @Test
+    public void testExecutionManagerPerformance() throws Exception {
+        // Was fixed at 1000 tasks, but was running out of virtual memory due to excessive thread creation
+        // on machines which were not able to execute the threads quickly.
+        final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+        final int NUM_TIMES = 10;
+        final int MAX_ACCEPTABLE_TIME = 7500; // saw 5601ms on buildhive
+        
+        long tWarmup = execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A"));
+        
+        List<Long> times = Lists.newArrayList();
+        for (int i = 1; i <= NUM_TIMES; i++) {
+            times.add(execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A")));
+        }
+        
+        Long toobig = Iterables.find(
+                times, 
+                new Predicate<Long>() {
+                    public boolean apply(Long input) {
+                        return input > MAX_ACCEPTABLE_TIME;
+                    }},
+                null);
+        assertNull(toobig, "warmup="+tWarmup+"; times="+times);
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private long execTasksAndWaitForDone(int numTasks, List<?> tags) throws Exception {
+        List<Task<?>> tasks = Lists.newArrayList();
+        long startTimestamp = System.currentTimeMillis();
+        for (int i = 1; i < numTasks; i++) {
+            Task<?> t = new BasicTask(Callables.returning(null)); // no-op
+            em.submit(MutableMap.of("tags", tags), t);
+            tasks.add(t);
+        }
+        long submittedTimestamp = System.currentTimeMillis();
+
+        for (Task t : tasks) {
+            t.get();
+        }
+        long endTimestamp = System.currentTimeMillis();
+        long submitTime = submittedTimestamp - startTimestamp;
+        long totalTime = endTimestamp - startTimestamp;
+        
+        log.info("Executed {} tasks; {}ms total; {}ms to submit", new Object[] {numTasks, totalTime, submitTime});
+
+        return totalTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
new file mode 100644
index 0000000..c730738
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Callables;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class BasicTaskExecutionTest {
+    private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionTest.class);
+ 
+    private static final int TIMEOUT_MS = 10*1000;
+    
+    private BasicExecutionManager em;
+    private Map<Object, Object> data;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() {
+        em = new BasicExecutionManager("mycontext");
+        data = Collections.synchronizedMap(new HashMap<Object, Object>());
+        data.clear();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (em != null) em.shutdownNow();
+        if (data != null) data.clear();
+    }
+    
+    @Test
+    public void runSimpleBasicTask() throws Exception {
+        BasicTask<Object> t = new BasicTask<Object>(newPutCallable(1, "b"));
+        data.put(1, "a");
+        Task<Object> t2 = em.submit(MutableMap.of("tag", "A"), t);
+        assertEquals("a", t.get());
+        assertEquals("a", t2.get());
+        assertEquals("b", data.get(1));
+    }
+    
+    @Test
+    public void runSimpleRunnable() throws Exception {
+        data.put(1, "a");
+        Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutRunnable(1, "b"));
+        assertEquals(null, t.get());
+        assertEquals("b", data.get(1));
+    }
+
+    @Test
+    public void runSimpleCallable() throws Exception {
+        data.put(1, "a");
+        Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutCallable(1, "b"));
+        assertEquals("a", t.get());
+        assertEquals("b", data.get(1));
+    }
+
+    @Test
+    public void runBasicTaskWithWaits() throws Exception {
+        final CountDownLatch signalStarted = new CountDownLatch(1);
+        final CountDownLatch allowCompletion = new CountDownLatch(1);
+        final BasicTask<Object> t = new BasicTask<Object>(new Callable<Object>() {
+            public Object call() throws Exception {
+                Object result = data.put(1, "b");
+                signalStarted.countDown();
+                assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+                return result;
+            }});
+        data.put(1, "a");
+
+        Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+        assertEquals(t, t2);
+        assertFalse(t.isDone());
+        
+        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        assertEquals("b", data.get(1));
+        assertFalse(t.isDone());
+        
+        log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false));
+        
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                String status = t.getStatusDetail(false);
+                assertTrue(status != null && status.toLowerCase().contains("waiting"), "status="+status);
+            }});
+        
+        allowCompletion.countDown();
+        assertEquals("a", t.get());
+    }
+
+    @Test
+    public void runMultipleBasicTasks() throws Exception {
+        data.put(1, 1);
+        BasicExecutionManager em = new BasicExecutionManager("mycontext");
+        for (int i = 0; i < 2; i++) {
+            em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1)));
+            em.submit(MutableMap.of("tag", "B"), new BasicTask<Integer>(newIncrementCallable((1))));
+        }
+        int total = 0;
+        for (Object tag : em.getTaskTags()) {
+                log.debug("tag {}", tag);
+                for (Task<?> task : em.getTasksWithTag(tag)) {
+                    log.debug("BasicTask {}, has {}", task, task.get());
+                    total += (Integer)task.get();
+                }
+            }
+        assertEquals(10, total);
+        //now that all have completed:
+        assertEquals(5, data.get(1));
+    }
+
+    @Test
+    public void runMultipleBasicTasksMultipleTags() throws Exception {
+        data.put(1, 1);
+        Collection<Task<Integer>> tasks = Lists.newArrayList();
+        tasks.add(em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1))));
+        tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("A","B")), new BasicTask<Integer>(newIncrementCallable(1))));
+        tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("B","C")), new BasicTask<Integer>(newIncrementCallable(1))));
+        tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("D")), new BasicTask<Integer>(newIncrementCallable(1))));
+        int total = 0;
+
+        for (Task<Integer> t : tasks) {
+            log.debug("BasicTask {}, has {}", t, t.get());
+            total += t.get();
+            }
+        assertEquals(10, total);
+ 
+        //now that all have completed:
+        assertEquals(data.get(1), 5);
+        assertEquals(em.getTasksWithTag("A").size(), 2);
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")).size(), 2);
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")).size(), 2);
+
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")).size(), 3);
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")).size(), 1);
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("B", "C")).size(), 1);
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "D")).size(), 3);
+    }
+
+    @Test
+    public void testGetTaskById() throws Exception {
+        Task<?> t = new BasicTask<Void>(newNoop());
+        em.submit(MutableMap.of("tag", "A"), t);
+        assertEquals(em.getTask(t.getId()), t);
+    }
+
+    @Test
+    public void testRetrievingTasksWithTagsReturnsExpectedTask() throws Exception {
+        Task<?> t = new BasicTask<Void>(newNoop());
+        em.submit(MutableMap.of("tag", "A"), t);
+        t.get();
+
+        assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t));
+    }
+
+    @Test
+    public void testRetrievingTasksWithTagsExcludesNonMatchingTasks() throws Exception {
+        Task<?> t = new BasicTask<Void>(newNoop());
+        em.submit(MutableMap.of("tag", "A"), t);
+        t.get();
+
+        assertEquals(em.getTasksWithTag("B"), ImmutableSet.of());
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableSet.of());
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableSet.of());
+    }
+    
+    @Test
+    public void testRetrievingTasksWithMultipleTags() throws Exception {
+        Task<?> t = new BasicTask<Void>(newNoop());
+        em.submit(MutableMap.of("tags", ImmutableList.of("A", "B")), t);
+        t.get();
+
+        assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+        assertEquals(em.getTasksWithTag("B"), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t));
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("B")), ImmutableList.of(t));
+    }
+
+    // ENGR-1796: if nothing matched first tag, then returned whatever matched second tag!
+    @Test
+    public void testRetrievingTasksWithAllTagsWhenFirstNotMatched() throws Exception {
+        Task<?> t = new BasicTask<Void>(newNoop());
+        em.submit(MutableMap.of("tags", ImmutableList.of("A")), t);
+        t.get();
+
+        assertEquals(em.getTasksWithAllTags(ImmutableList.of("not_there","A")), ImmutableSet.of());
+    }
+    
+    @Test
+    public void testRetrievedTasksIncludesTasksInProgress() throws Exception {
+        final CountDownLatch runningLatch = new CountDownLatch(1);
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+        Task<Void> t = new BasicTask<Void>(new Callable<Void>() {
+            public Void call() throws Exception {
+                runningLatch.countDown();
+                finishLatch.await();
+                return null;
+            }});
+        em.submit(MutableMap.of("tags", ImmutableList.of("A")), t);
+        
+        try {
+            runningLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    
+            assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+        } finally {
+            finishLatch.countDown();
+        }
+    }
+    
+    @Test
+    public void cancelBeforeRun() throws Exception {
+        final CountDownLatch blockForever = new CountDownLatch(1);
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+            public Integer call() throws Exception {
+                blockForever.await(); return 42;
+            }});
+        t.cancel(true);
+        assertTrue(t.isCancelled());
+        assertTrue(t.isDone());
+        assertTrue(t.isError());
+        em.submit(MutableMap.of("tag", "A"), t);
+        try {
+            t.get();
+            fail("get should have failed due to cancel");
+        } catch (CancellationException e) {
+            // expected
+        }
+        assertTrue(t.isCancelled());
+        assertTrue(t.isDone());
+        assertTrue(t.isError());
+        
+        log.debug("cancelBeforeRun status: {}", t.getStatusDetail(false));
+        assertTrue(t.getStatusDetail(false).toLowerCase().contains("cancel"));
+    }
+
+    @Test
+    public void cancelDuringRun() throws Exception {
+        final CountDownLatch signalStarted = new CountDownLatch(1);
+        final CountDownLatch blockForever = new CountDownLatch(1);
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+            public Integer call() throws Exception {
+                synchronized (data) {
+                    signalStarted.countDown();
+                    blockForever.await();
+                }
+                return 42;
+            }});
+        em.submit(MutableMap.of("tag", "A"), t);
+        assertFalse(t.isCancelled());
+        assertFalse(t.isDone());
+        assertFalse(t.isError());
+        
+        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        t.cancel(true);
+        
+        assertTrue(t.isCancelled());
+        assertTrue(t.isError());
+        try {
+            t.get();
+            fail("get should have failed due to cancel");
+        } catch (CancellationException e) {
+            // expected
+        }
+        assertTrue(t.isCancelled());
+        assertTrue(t.isDone());
+        assertTrue(t.isError());
+    }
+    
+    @Test
+    public void cancelAfterRun() throws Exception {
+        BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(42));
+        em.submit(MutableMap.of("tag", "A"), t);
+
+        assertEquals(t.get(), (Integer)42);
+        t.cancel(true);
+        assertFalse(t.isCancelled());
+        assertFalse(t.isError());
+        assertTrue(t.isDone());
+    }
+    
+    @Test
+    public void errorDuringRun() throws Exception {
+        BasicTask<Void> t = new BasicTask<Void>(new Callable<Void>() {
+            public Void call() throws Exception {
+                throw new IllegalStateException("Simulating failure in errorDuringRun");
+            }});
+        
+        em.submit(MutableMap.of("tag", "A"), t);
+        
+        try {
+            t.get();
+            fail("get should have failed due to error"); 
+        } catch (Exception eo) { 
+            Throwable e = Throwables.getRootCause(eo);
+            assertEquals("Simulating failure in errorDuringRun", e.getMessage());
+        }
+        
+        assertFalse(t.isCancelled());
+        assertTrue(t.isError());
+        assertTrue(t.isDone());
+        
+        log.debug("errorDuringRun status: {}", t.getStatusDetail(false));
+        assertTrue(t.getStatusDetail(false).contains("Simulating failure in errorDuringRun"), "details="+t.getStatusDetail(false));
+    }
+
+    @Test
+    public void fieldsSetForSimpleBasicTask() throws Exception {
+        final CountDownLatch signalStarted = new CountDownLatch(1);
+        final CountDownLatch allowCompletion = new CountDownLatch(1);
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+            public Integer call() throws Exception {
+                signalStarted.countDown();
+                allowCompletion.await();
+                return 42;
+            }});
+        assertEquals(null, t.getSubmittedByTask());
+        assertEquals(-1, t.submitTimeUtc);
+        assertNull(t.getInternalFuture());
+
+        em.submit(MutableMap.of("tag", "A"), t);
+        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        
+        assertTrue(t.submitTimeUtc > 0);
+        assertTrue(t.startTimeUtc >= t.submitTimeUtc);
+        assertNotNull(t.getInternalFuture());
+        assertEquals(-1, t.endTimeUtc);
+        assertEquals(false, t.isCancelled());
+        
+        allowCompletion.countDown();
+        assertEquals(t.get(), (Integer)42);
+        assertTrue(t.endTimeUtc >= t.startTimeUtc);
+
+        log.debug("BasicTask duration (millis): {}", (t.endTimeUtc - t.submitTimeUtc));
+    }
+
+    @Test
+    public void fieldsSetForBasicTaskSubmittedBasicTask() throws Exception {
+        //submitted BasicTask B is started by A, and waits for A to complete
+        BasicTask<Integer> t = new BasicTask<Integer>(MutableMap.of("displayName", "sample", "description", "some descr"), new Callable<Integer>() {
+            public Integer call() throws Exception {
+                em.submit(MutableMap.of("tag", "B"), new Callable<Integer>() {
+                    public Integer call() throws Exception {
+                        assertEquals(45, em.getTasksWithTag("A").iterator().next().get());
+                        return 46;
+                    }});
+                return 45;
+            }});
+        em.submit(MutableMap.of("tag", "A"), t);
+
+        t.blockUntilEnded();
+ 
+//        assertEquals(em.getAllTasks().size(), 2
+        
+        BasicTask<?> tb = (BasicTask<?>) em.getTasksWithTag("B").iterator().next();
+        assertEquals( 46, tb.get() );
+        assertEquals( t, em.getTasksWithTag("A").iterator().next() );
+        assertNull( t.getSubmittedByTask() );
+        
+        BasicTask<?> submitter = (BasicTask<?>) tb.getSubmittedByTask();
+        assertNotNull(submitter);
+        assertEquals("sample", submitter.displayName);
+        assertEquals("some descr", submitter.description);
+        assertEquals(t, submitter);
+        
+        assertTrue(submitter.submitTimeUtc <= tb.submitTimeUtc);
+        assertTrue(submitter.endTimeUtc <= tb.endTimeUtc);
+        
+        log.debug("BasicTask {} was submitted by {}", tb, submitter);
+    }
+    
+    private Callable<Object> newPutCallable(final Object key, final Object val) {
+        return new Callable<Object>() {
+            public Object call() {
+                return data.put(key, val);
+            }
+        };
+    }
+    
+    private Callable<Integer> newIncrementCallable(final Object key) {
+        return new Callable<Integer>() {
+            public Integer call() {
+                synchronized (data) {
+                    return (Integer) data.put(key, (Integer)data.get(key) + 1);
+                }
+            }
+        };
+    }
+    
+    private Runnable newPutRunnable(final Object key, final Object val) {
+        return new Runnable() {
+            public void run() {
+                data.put(key, val);
+            }
+        };
+    }
+    
+    private Runnable newNoop() {
+        return new Runnable() {
+            public void run() {
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
new file mode 100644
index 0000000..020a98c
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+
+public class BasicTasksFutureTest {
+
+    private static final Logger log = LoggerFactory.getLogger(BasicTasksFutureTest.class);
+    
+    private BasicExecutionManager em;
+    private BasicExecutionContext ec;
+    private Map<Object,Object> data;
+    private ExecutorService ex;
+    private Semaphore started;
+    private Semaphore waitInTask;
+    private Semaphore cancelledWhileSleeping;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() {
+        em = new BasicExecutionManager("mycontext");
+        ec = new BasicExecutionContext(em);
+        ex = Executors.newCachedThreadPool();
+        data = Collections.synchronizedMap(new LinkedHashMap<Object,Object>());
+        started = new Semaphore(0);
+        waitInTask = new Semaphore(0);
+        cancelledWhileSleeping = new Semaphore(0);
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (em != null) em.shutdownNow();
+        if (ex != null) ex.shutdownNow();
+    }
+
+    @Test
+    public void testBlockAndGetWithTimeoutsAndListenableFuture() throws InterruptedException {
+        Task<String> t = waitForSemaphore(Duration.FIVE_SECONDS, true, "x");
+        
+        Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
+        Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
+        boolean didNotThrow = false;
+        
+        try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
+        catch (Exception e) { /* expected */ }
+        Assert.assertFalse(didNotThrow);
+        
+        try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
+        catch (Exception e) { /* expected */ }
+        Assert.assertFalse(didNotThrow);
+
+        addFutureListener(t, "before");
+        ec.submit(t);
+        
+        Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
+        Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
+        
+        try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
+        catch (Exception e) { /* expected */ }
+        Assert.assertFalse(didNotThrow);
+        
+        try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
+        catch (Exception e) { /* expected */ }
+        Assert.assertFalse(didNotThrow);
+
+        addFutureListener(t, "during");
+            
+        synchronized (data) {
+            // now let it finish
+            waitInTask.release();
+            Assert.assertTrue(t.blockUntilEnded(Duration.TEN_SECONDS));
+
+            Assert.assertEquals(t.getUnchecked(Duration.millis(1)), "x");
+            Assert.assertEquals(t.getUnchecked(Duration.ZERO), "x");
+            
+            Assert.assertNull(data.get("before"));
+            Assert.assertNull(data.get("during"));
+            // can't set the data(above) until we release the lock (in assert call below)
+            assertSoonGetsData("before");
+            assertSoonGetsData("during");
+        }
+
+        // and see that a listener added late also runs
+        synchronized (data) {
+            addFutureListener(t, "after");
+            Assert.assertNull(data.get("after"));
+            assertSoonGetsData("after");
+        }
+    }
+
+    private void addFutureListener(Task<String> t, final String key) {
+        t.addListener(new Runnable() { public void run() {
+            synchronized (data) {
+                log.info("notifying for "+key);
+                data.notifyAll();
+                data.put(key, true);
+            }
+        }}, ex);
+    }
+
+    private void assertSoonGetsData(String key) throws InterruptedException {
+        for (int i=0; i<10; i++) {
+            if (Boolean.TRUE.equals(data.get(key))) {
+                log.info("got data for "+key);
+                return;
+            }
+            data.wait(Duration.ONE_SECOND.toMilliseconds());
+        }
+        Assert.fail("did not get data for '"+key+"' in time");
+    }
+
+    private <T> Task<T> waitForSemaphore(final Duration time, final boolean requireSemaphore, final T result) {
+        return Tasks.<T>builder().body(new Callable<T>() {
+            public T call() { 
+                try {
+                    started.release();
+                    log.info("waiting up to "+time+" to acquire before returning "+result);
+                    if (!waitInTask.tryAcquire(time.toMilliseconds(), TimeUnit.MILLISECONDS)) {
+                        log.info("did not get semaphore");
+                        if (requireSemaphore) Assert.fail("task did not get semaphore");
+                    } else {
+                        log.info("got semaphore");
+                    }
+                } catch (Exception e) {
+                    log.info("cancelled before returning "+result);
+                    cancelledWhileSleeping.release();
+                    throw Exceptions.propagate(e);
+                }
+                log.info("task returning "+result);
+                return result; 
+            }
+        }).build();
+    }
+
+    @Test
+    public void testCancelAfterStartTriggersListenableFuture() throws Exception {
+        doTestCancelTriggersListenableFuture(Duration.millis(50));
+    }
+    @Test
+    public void testCancelImmediateTriggersListenableFuture() throws Exception {
+        // if cancel fires after submit but before it passes to the executor,
+        // that needs handling separately; this doesn't guarantee this code path,
+        // but it happens sometimes (and it should be handled)
+        doTestCancelTriggersListenableFuture(Duration.ZERO);
+    }
+    public void doTestCancelTriggersListenableFuture(Duration delay) throws Exception {
+        Task<String> t = waitForSemaphore(Duration.TEN_SECONDS, true, "x");
+        addFutureListener(t, "before");
+
+        Stopwatch watch = Stopwatch.createStarted();
+        ec.submit(t);
+        
+        addFutureListener(t, "during");
+
+        log.info("test cancelling "+t+" ("+t.getClass()+") after "+delay);
+        // NB: two different code paths (callers to this method) for notifying futures 
+        // depending whether task is started 
+        Time.sleep(delay);
+
+        synchronized (data) {
+            t.cancel(true);
+            
+            assertSoonGetsData("before");
+            assertSoonGetsData("during");
+
+            addFutureListener(t, "after");
+            Assert.assertNull(data.get("after"));
+            assertSoonGetsData("after");
+        }
+        
+        Assert.assertTrue(t.isDone());
+        Assert.assertTrue(t.isCancelled());
+        try {
+            t.get();
+            Assert.fail("should have thrown CancellationException");
+        } catch (CancellationException e) { /* expected */ }
+        
+        Assert.assertTrue(watch.elapsed(TimeUnit.MILLISECONDS) < Duration.FIVE_SECONDS.toMilliseconds(), 
+            Time.makeTimeStringRounded(watch.elapsed(TimeUnit.MILLISECONDS))+" is too long; should have cancelled very quickly");
+
+        if (started.tryAcquire())
+            // if the task is begun, this should get released
+            Assert.assertTrue(cancelledWhileSleeping.tryAcquire(5, TimeUnit.SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
new file mode 100644
index 0000000..89bde95
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.CompoundTask;
+import org.apache.brooklyn.core.util.task.ParallelTask;
+import org.apache.brooklyn.core.util.task.SequentialTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Test the operation of the {@link CompoundTask} class.
+ */
+public class CompoundTaskExecutionTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompoundTaskExecutionTest.class);
+
+    BasicExecutionManager em;
+    BasicExecutionContext ec;
+
+    @BeforeClass
+    public void setup() {
+        em = new BasicExecutionManager("mycontext");
+        ec = new BasicExecutionContext(em);
+    }
+
+    @AfterClass
+    public void teardown() {
+        if (em != null) em.shutdownNow();
+        em = null;
+    }
+
+    private BasicTask<String> taskReturning(final String val) {
+        return new BasicTask<String>(new Callable<String>() {
+                @Override public String call() {
+                    return val;
+                }
+            });
+    }
+
+    private BasicTask<String> slowTaskReturning(final String val, final Duration pauseTime) {
+        return new BasicTask<String>(new Callable<String>() {
+                @Override public String call() {
+                    Time.sleep(pauseTime);
+                    return val;
+                }
+            });
+    }
+
+
+    @Test
+    public void runSequenceTask() throws Exception {
+        BasicTask<String> t1 = taskReturning("a");
+        BasicTask<String> t2 = taskReturning("b");
+        BasicTask<String> t3 = taskReturning("c");
+        BasicTask<String> t4 = taskReturning("d");
+        Task<List<String>> tSequence = ec.submit(new SequentialTask<String>(t1, t2, t3, t4));
+        assertEquals(tSequence.get(), ImmutableList.of("a", "b", "c", "d"));
+    }
+
+    @Test
+    public void testSequentialTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
+        BasicTask<String> t1 = taskReturning("a");
+        BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
+                @Override public String call() throws Exception {
+                    throw new IllegalArgumentException("forced exception");
+                }
+            });
+        BasicTask<String> t3 = taskReturning("c");
+        SequentialTask<String> task = new SequentialTask<String>(t1, t2, t3);
+        Task<List<String>> tSequence = ec.submit(task);
+
+        try {
+            tSequence.get();
+            fail("t2 should have thrown an exception");
+        } catch (Exception e) {}
+
+        assertTrue(task.isDone());
+        assertTrue(task.isError());
+        assertTrue(t1.isDone());
+        assertFalse(t1.isError());
+        assertTrue(t2.isDone());
+        assertTrue(t2.isError());
+        // t3 not run because of t2 exception
+        assertFalse(t3.isDone());
+        assertFalse(t3.isBegun());
+    }
+
+    @Test
+    public void testParallelTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
+        // differs from test above of SequentialTask in that expect t3 to be executed,
+        // despite t2 failing.
+        // TODO Do we expect tSequence.get() to block for everything to either fail or complete,
+        // and then to throw exception? Currently it does *not* do that so test was previously failing.
+
+        BasicTask<String> t1 = taskReturning("a");
+        BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
+                @Override public String call() throws Exception {
+                    throw new IllegalArgumentException("forced exception");
+                }
+            });
+        BasicTask<String> t3 = slowTaskReturning("c", Duration.millis(100));
+        ParallelTask<String> task = new ParallelTask<String>(t1, t2, t3);
+        Task<List<String>> tSequence = ec.submit(task);
+
+        try {
+            tSequence.get();
+            fail("t2 should have thrown an exception");
+        } catch (Exception e) {}
+
+        assertTrue(task.isDone());
+        assertTrue(task.isError());
+        assertTrue(t1.isDone());
+        assertFalse(t1.isError());
+        assertTrue(t2.isDone());
+        assertTrue(t2.isError());
+        assertTrue(t3.isBegun());
+        assertTrue(t3.isDone());
+        assertFalse(t3.isError());
+    }
+
+    @Test
+    public void runParallelTask() throws Exception {
+        BasicTask<String> t1 = taskReturning("a");
+        BasicTask<String> t2 = taskReturning("b");
+        BasicTask<String> t3 = taskReturning("c");
+        BasicTask<String> t4 = taskReturning("d");
+        Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
+        assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
+    }
+
+    @Test
+    public void runParallelTaskWithDelay() throws Exception {
+        final Semaphore locker = new Semaphore(0);
+        BasicTask<String> t1 = new BasicTask<String>(new Callable<String>() {
+                @Override public String call() {
+                    try {
+                        locker.acquire();
+                    } catch (InterruptedException e) {
+                        throw Throwables.propagate(e);
+                    }
+                    return "a";
+                }
+            });
+        BasicTask<String> t2 = taskReturning("b");
+        BasicTask<String> t3 = taskReturning("c");
+        BasicTask<String> t4 = taskReturning("d");
+        final Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
+
+        assertEquals(ImmutableSet.of(t2.get(), t3.get(), t4.get()), ImmutableSet.of("b", "c", "d"));
+        assertFalse(t1.isDone());
+        assertFalse(tSequence.isDone());
+
+        // get blocks until tasks have completed
+        Thread t = new Thread() {
+            @Override public void run() {
+                try {
+                    tSequence.get();
+                } catch (Exception e) {
+                    throw Throwables.propagate(e);
+                }
+                locker.release();
+            }
+        };
+        t.start();
+        Thread.sleep(30);
+        assertTrue(t.isAlive());
+
+        locker.release();
+
+        assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
+        assertTrue(t1.isDone());
+        assertTrue(tSequence.isDone());
+
+        locker.acquire();
+    }
+
+    @Test
+    public void testComplexOrdering() throws Exception {
+        List<String> data = new CopyOnWriteArrayList<String>();
+        SequentialTask<String> taskA = new SequentialTask<String>(
+                appendAfterDelay(data, "a1"), appendAfterDelay(data, "a2"), appendAfterDelay(data, "a3"), appendAfterDelay(data, "a4"));
+        SequentialTask<String> taskB = new SequentialTask<String>(
+                appendAfterDelay(data, "b1"), appendAfterDelay(data, "b2"), appendAfterDelay(data, "b3"), appendAfterDelay(data, "b4"));
+        Task<List<String>> t = ec.submit(new ParallelTask<String>(taskA, taskB));
+        t.get();
+
+        LOG.debug("Tasks happened in order: {}", data);
+        assertEquals(data.size(), 8);
+        assertEquals(new HashSet<String>(data), ImmutableSet.of("a1", "a2", "a3", "a4", "b1", "b2", "b3", "b4"));
+
+        // a1, ..., a4 should be in order
+        List<String> as = Lists.newArrayList(), bs = Lists.newArrayList();
+        for (String value : data) {
+            ((value.charAt(0) == 'a') ? as : bs).add(value);
+        }
+        assertEquals(as, ImmutableList.of("a1", "a2", "a3", "a4"));
+        assertEquals(bs, ImmutableList.of("b1", "b2", "b3", "b4"));
+    }
+
+    private BasicTask<String> appendAfterDelay(final List<String> list, final String value) {
+        return new BasicTask<String>(new Callable<String>() {
+                @Override public String call() {
+                    try {
+                        Thread.sleep((int) (100 * Math.random()));
+                    } catch (InterruptedException e) {
+                        throw Throwables.propagate(e);
+                    }
+                    LOG.debug("running {}", value);
+                    list.add(value);
+                    return value;
+                }
+            });
+    }
+
+}


Mime
View raw message