brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjcorb...@apache.org
Subject [2/3] brooklyn-server git commit: Adds maxConcurrentChildCommands parameter to DynamicCluster
Date Thu, 01 Dec 2016 12:01:04 GMT
Adds maxConcurrentChildCommands parameter to DynamicCluster

The option configures the maximum number of simultaneous Startable
effector invocations that will be made on members of the group.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4e30074c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4e30074c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4e30074c

Branch: refs/heads/master
Commit: 4e30074ca9b09a49c6a4b052d6f28d05608b36eb
Parents: 5b9f896
Author: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Authored: Tue Nov 29 16:11:12 2016 +0000
Committer: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Committed: Tue Nov 29 16:11:12 2016 +0000

----------------------------------------------------------------------
 .../apache/brooklyn/core/entity/Entities.java   |   2 +-
 .../brooklyn/entity/group/DynamicCluster.java   |  11 +-
 .../entity/group/DynamicClusterImpl.java        | 153 ++++++++++++++++++-
 .../entity/group/DynamicClusterRebindTest.java  |  54 +++++++
 .../entity/group/DynamicClusterTest.java        | 132 ++++++++++++++++
 5 files changed, 343 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 2821652..69670ec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -625,7 +625,7 @@ public class Entities {
     /**
      * Return all descendants of given entity matching the given predicate and optionally
the entity itself.
      * 
-     * @see {@link EntityPredicates} for useful second arguments.
+     * @see EntityPredicates
      */
     @SuppressWarnings("unused")
     public static Iterable<Entity> descendants(Entity root, Predicate<? super Entity>
matching, boolean includeSelf) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f2112e8..3f62f82 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -103,7 +103,7 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
             "dynamiccluster.restartMode", 
             "How this cluster should handle restarts; "
             + "by default it is disallowed, but this key can specify a different mode. "
-            + "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'.
"
+            + "Modes supported by dynamic cluster are 'off', 'sequential', or 'parallel'.
"
             + "However subclasses can define their own modes or may ignore this.", null);
 
     @SetFromFlag("quarantineFailedEntities")
@@ -183,6 +183,15 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
     ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
             "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
 
+    @Beta
+    @SetFromFlag("maxConcurrentChildCommands")
+    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
+            .name("dynamiccluster.maxConcurrentChildCommands")
+            .description("[Beta] The maximum number of effector invocations that will be
made on children at once " +
+                    "(e.g. start, stop, restart). Any value null or less than or equal to
zero means invocations are unbounded")
+            .defaultValue(0)
+            .build();
+
     AttributeSensor<List<Location>> SUB_LOCATIONS = new BasicAttributeSensor<List<Location>>(
             new TypeToken<List<Location>>() {},
             "dynamiccluster.subLocations", "Locations for each availability zone to use");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 8725b12..4ed0ac0 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -30,10 +30,12 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
@@ -98,6 +100,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * A cluster of entities that can dynamically increase or decrease the number of entities.
@@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
     private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID
= Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
             "next.cluster.member.id", "Returns the ID number of the next member to be added");
 
+    /**
+     * Controls the maximum number of effector invocations the cluster will make on members
at once.
+     * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured.
+     */
+    private transient Semaphore childTaskSemaphore;
+
     private volatile FunctionFeed clusterOneAndAllMembersUp;
 
     // TODO better mechanism for arbitrary class name to instance type coercion
@@ -212,9 +222,16 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
     public void init() {
         super.init();
         initialiseMemberId();
+        initialiseTaskPermitSemaphore();
         connectAllMembersUp();
     }
 
+    @Override
+    public void rebind() {
+        super.rebind();
+        initialiseTaskPermitSemaphore();
+    }
+
     private void initialiseMemberId() {
         synchronized (mutex) {
             if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
@@ -223,6 +240,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
         }
     }
 
+    private void initialiseTaskPermitSemaphore() {
+        synchronized (mutex) {
+            if (getChildTaskSemaphore() == null) {
+                Integer maxChildTasks = config().get(MAX_CONCURRENT_CHILD_COMMANDS);
+                if (maxChildTasks != null && maxChildTasks > 0) {
+                    childTaskSemaphore = new Semaphore(maxChildTasks);
+                }
+            }
+        }
+    }
+
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
                 .entity(this)
@@ -551,8 +579,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
                 Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class),
EntityPredicates.isManaged()))));
         } else if ("parallel".equalsIgnoreCase(mode)) {
             ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
-            DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null, 
-                Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class),
EntityPredicates.isManaged()))));
+            for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class),
EntityPredicates.isManaged()))) {
+                DynamicTasks.queue(newThrottledEffectorTask(member, Startable.RESTART, Collections.emptyMap()));
+            }
         } else {
             throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'");
         }
@@ -788,7 +817,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
 
         // FIXME symmetry in order of added as child, managed, started, and added to group
         final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>)
Iterables.filter(removedEntities, Startable.class);
-        Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP,
Collections.<String,Object>emptyMap());
+        ImmutableList.Builder<Task<?>> tasks = ImmutableList.builder();
+        for (Entity member : removedStartables) {
+            tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
+        }
+        Task<?> invoke = Tasks.parallel(tasks.build());
+        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
             invoke.get();
             return removedEntities;
@@ -826,8 +860,11 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
             addedEntities.add(entity);
             addedEntityLocations.put(entity, loc);
             if (entity instanceof Startable) {
+                // First members are used when subsequent members need some attributes from
them
+                // before they start; make sure they're in the first batch.
+                boolean privileged = Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER));
                 Map<String, ?> args = ImmutableMap.of("locations", MutableList.builder().addIfNotNull(loc).buildImmutable());
-                Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask();
+                Task<?> task = newThrottledEffectorTask(entity, Startable.START, args,
privileged);
                 tasks.put(entity, task);
             }
         }
@@ -1041,14 +1078,116 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements
DynamicClus
 
     protected void stopAndRemoveNode(Entity member) {
         removeMember(member);
-
         try {
             if (member instanceof Startable) {
-                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
+                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String,
Object>emptyMap());
+                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                 task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);
         }
     }
+
+    @Nullable
+    protected Semaphore getChildTaskSemaphore() {
+        return childTaskSemaphore;
+    }
+
+    /**
+     * @return An unprivileged effector task.
+     * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean)
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T>
effector, Map<?, ?> arguments) {
+        return newThrottledEffectorTask(target, effector, arguments, false);
+    }
+
+    /**
+     * Creates tasks that obtain permits from {@link #childTaskSemaphore} before invoking
<code>effector</code>
+     * on <code>target</code>. Permits are released in a {@link ListenableFuture#addListener
listener}. No
+     * permits are obtained if {@link #childTaskSemaphore} is <code>null</code>.
+     * @param target Entity to invoke effector on
+     * @param effector Effector to invoke on target
+     * @param arguments Effector arguments
+     * @param isPrivileged If true the method obtains a permit from {@link #childTaskSemaphore}
+     *                     immediately and returns the effector invocation task, otherwise
it
+     *                     returns a task that sequentially obtains a permit then runs the
effector.
+     * @return An unsubmitted task.
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T>
effector, Map<?, ?> arguments, boolean isPrivileged) {
+        final Task<?> toSubmit;
+        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
+        if (getChildTaskSemaphore() != null) {
+            // permitObtained communicates to the release task whether the permit should
really be released
+            // or not. ObtainPermit sets it to true when a permit is acquired.
+            final AtomicBoolean permitObtained = new AtomicBoolean();
+            final String description = "Waiting for permit to run " + effector.getName()
+ " on " + target;
+            final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), description,
permitObtained);
+            // Acquire the permit now for the privileged task and just queue the effector
invocation.
+            // If it's unprivileged then queue a task to obtain a permit first.
+            if (isPrivileged) {
+                obtain.run();
+                toSubmit = effectorTask;
+            } else {
+                Task<?> obtainMutex = Tasks.builder()
+                        .description(description)
+                        .body(new ObtainPermit(getChildTaskSemaphore(), description, permitObtained))
+                        .build();
+                toSubmit = Tasks.sequential(
+                        "Waiting for permit then running " + effector.getName() + " on "
+ target,
+                        obtainMutex, effectorTask);
+            }
+            toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), permitObtained),
MoreExecutors.sameThreadExecutor());
+        } else {
+            toSubmit = effectorTask;
+        }
+        return toSubmit;
+    }
+
+    private static class ObtainPermit implements Runnable {
+        private final Semaphore permit;
+        private final String description;
+        private final AtomicBoolean hasObtainedPermit;
+
+        private ObtainPermit(Semaphore permit, String description, AtomicBoolean hasObtainedPermit)
{
+            this.permit = permit;
+            this.description = description;
+            this.hasObtainedPermit = hasObtainedPermit;
+        }
+
+        @Override
+        public void run() {
+            String oldDetails = Tasks.setBlockingDetails(description);
+            LOG.debug("{} acquiring permit from {}", this, permit);
+            try {
+                permit.acquire();
+                hasObtainedPermit.set(true);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                Tasks.setBlockingDetails(oldDetails);
+            }
+        }
+    }
+
+    private static class ReleasePermit implements Runnable {
+        private final Semaphore permit;
+        private final AtomicBoolean wasPermitObtained;
+
+        private ReleasePermit(Semaphore permit, AtomicBoolean wasPermitObtained) {
+            this.permit = permit;
+            this.wasPermitObtained = wasPermitObtained;
+        }
+
+        @Override
+        public void run() {
+            if (wasPermitObtained.get()) {
+                LOG.debug("{} releasing permit from {}", this, permit);
+                permit.release();
+            } else {
+                LOG.debug("{} not releasing a permit from {} because it appears one was never
obtained", this, permit);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
new file mode 100644
index 0000000..bbf3a2a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.entity.group;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
+
+    @Test
+    public void testThrottleAppliesAfterRebind() throws Exception {
+        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class))
+                        .configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new
AtomicInteger()));
+        app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation()));
+        EntityAsserts.assertAttributeEquals(cluster, DynamicCluster.GROUP_SIZE, 1);
+
+        rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        cluster = Entities.descendants(app(), DynamicCluster.class).iterator().next();
+        cluster.resize(10);
+        EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE,
10);
+        EntityAsserts.assertAttributeEquals(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 36d3c39..c3e7d7f 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -41,12 +42,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
@@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.FailingEntity;
 import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
@@ -67,10 +73,15 @@ import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
@@ -1225,4 +1236,125 @@ public class DynamicClusterTest extends BrooklynAppUnitTestSupport
{
         assertEquals(found.size(), expectedNonFirstCount);
     }
 
+    @DataProvider
+    public Object[][] maxConcurrentCommandsTestProvider() {
+        return new Object[][]{{1}, {2}, {3}};
+    }
+
+    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
+    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands)
{
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
+                .configure(DynamicCluster.INITIAL_SIZE, 10)
+                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
+        app.start(ImmutableList.of(app.newSimulatedLocation()));
+        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+    }
+
+    // Tests handling of the first member of a cluster by asserting that a group, whose
+    // other members wait for the first, always starts.
+    @Test
+    public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exception
{
+        final AtomicInteger counter = new AtomicInteger();
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+
+        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
+                .body(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster,
DynamicCluster.FIRST);
+                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
+                        final Entity source = first.get();
+                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source,
Attributes.SERVICE_UP);
+                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
+                        return booleanTask.get();
+                    }
+                })
+                .build();
+
+        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
+
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
+
+        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
+        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
+
+        // app.start blocks so in the failure case this test would block forever.
+        Asserts.assertReturnsEventually(new Runnable() {
+            public void run() {
+                app.start(ImmutableList.of(app.newSimulatedLocation()));
+                EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.RUNNING);
+            }
+        }, Asserts.DEFAULT_LONG_TIMEOUT);
+    }
+
+    @Test
+    public void testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission()
{
+        // Tests that permits are not released when their start task is cancelled.
+        // Expected behaviour is:
+        // - permit obtained for first member. cancelled task submitted. permit released.
+        // - no permit obtained for second member. cancelled task submitted. no permit released.
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 2)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1));
+        final DynamicClusterImpl clusterImpl = DynamicClusterImpl.class.cast(Entities.deproxy(cluster));
+        assertNotNull(clusterImpl.getChildTaskSemaphore());
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+        try {
+            app.start(ImmutableList.<Location>of(app.newSimulatedLocation()));
+            Asserts.shouldHaveFailedPreviously("Cluster start should have failed because
the member start was cancelled");
+        } catch (Exception e) {
+            // ignored.
+        }
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+    }
+
+    @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
+    public interface ThrowOnAsyncStartEntity extends TestEntity {
+        ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class,
"concurrency", "max concurrency", 1);
+        ConfigKey<AtomicInteger> COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class,
"counter");
+        ConfigKey<Boolean> START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch");
+    }
+
+    public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity
{
+        private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
+        @Override
+        public void start(Collection<? extends Location> locs) {
+            int count = config().get(COUNTER).incrementAndGet();
+            try {
+                LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)});
+                config().get(START_LATCH);
+                // Throw if more than one entity is starting at the same time as this.
+                assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count
+ " <= " + config().get(MAX_CONCURRENCY));
+                super.start(locs);
+            } finally {
+                config().get(COUNTER).decrementAndGet();
+            }
+        }
+    }
+
+    /** Used in {@link #testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}.
*/
+    @ImplementedBy(CancelEffectorInvokeClusterImpl.class)
+    public interface CancelEffectorInvokeCluster extends DynamicCluster {}
+
+    /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to cancel each task
before it's submitted. */
+    public static class CancelEffectorInvokeClusterImpl extends DynamicClusterImpl implements
CancelEffectorInvokeCluster {
+        @Override
+        protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T>
effector, Map<?, ?> arguments, boolean isPrivileged) {
+            Task<?> unsubmitted = super.newThrottledEffectorTask(target, effector,
arguments, isPrivileged);
+            unsubmitted.cancel(true);
+            return unsubmitted;
+        }
+    }
+
 }


Mime
View raw message