brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjcorbett <...@git.apache.org>
Subject [GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...
Date Mon, 28 Nov 2016 16:09:49 GMT
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89813930
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
---
    @@ -1040,14 +1077,81 @@ protected void discardNode(Entity entity) {
     
         protected void stopAndRemoveNode(Entity member) {
             removeMember(member);
    -
             try {
                 if (member instanceof Startable) {
    -                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
    +                Task<?> task = newThrottledEffectorTask(member, Startable.STOP,
Collections.<String, Object>emptyMap());
    +                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                     task.getUnchecked();
                 }
             } finally {
                 Entities.unmanage(member);
             }
         }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T>
effector, Map<?, ?> arguments) {
    +        return newThrottledEffectorTask(target, effector, arguments, false);
    +    }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T>
effector, Map<?, ?> arguments, boolean isPrivileged) {
    +        final Semaphore permit = childTaskSemaphore;
    +        final Task<?> toSubmit;
    +        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
    +        if (permit != null) {
    +            // Acquire the permit now for the privileged task and just queue the effector
invocation.
    +            // If it's unprivileged then queue a task to obtain a permit first.
    +            final String description = "Waiting for permit to run " + effector.getName()
+ " on " + target;
    +            final Runnable obtain = new ObtainPermit(permit, description);
    +            if (isPrivileged) {
    +                obtain.run();
    +                toSubmit = effectorTask;
    +            } else {
    +                // acquire semaphore here if privileged?
    +                Task<?> obtainMutex = Tasks.builder()
    +                        .description(description)
    +                        .body(new ObtainPermit(permit, description))
    +                        .build();
    +                toSubmit = Tasks.sequential(obtainMutex, effectorTask);
    +            }
    +            toSubmit.addListener(new ReleasePermit(permit), MoreExecutors.sameThreadExecutor());
    --- End diff --
    
    Do you think it's better to queue the release as another task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message