brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aledsage <...@git.apache.org>
Subject [GitHub] brooklyn-server pull request #816: Tasks code improvements - prep for better...
Date Thu, 28 Sep 2017 12:52:22 GMT
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/816#discussion_r141578971
  
    --- Diff: core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
---
    @@ -110,48 +248,53 @@ public ExecutionManager getExecutionManager() {
         @SuppressWarnings("unchecked")
         @Override
         public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
    -        BasicTask<?> fakeTaskForContext;
    +        BasicTask<T> fakeTaskForContext;
             if (callableOrSupplier instanceof BasicTask) {
    -            fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
    +            fakeTaskForContext = (BasicTask<T>)callableOrSupplier;
                 if (fakeTaskForContext.isQueuedOrSubmitted()) {
                     if (fakeTaskForContext.isDone()) {
    -                    return Maybe.of((T)fakeTaskForContext.getUnchecked());
    +                    return Maybe.of(fakeTaskForContext.getUnchecked());
                     } else {
                         throw new ImmediateUnsupportedException("Task is in progress and
incomplete: "+fakeTaskForContext);
                     }
                 }
                 callableOrSupplier = fakeTaskForContext.getJob();
    +        } else if (callableOrSupplier instanceof TaskAdaptable) {
    +            return getImmediately( ((TaskAdaptable<T>)callableOrSupplier).asTask()
);
             } else {
    -            fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName",
"immediate evaluation"));
    +            fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName",
"Immediate evaluation"));
             }
    -        fakeTaskForContext.tags.addAll(tags);
    +        final ImmediateSupplier<T> job = callableOrSupplier instanceof ImmediateSupplier
? (ImmediateSupplier<T>) callableOrSupplier 
    +            : InterruptingImmediateSupplier.<T>of(callableOrSupplier);
             fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
             fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
    -        
    -        Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
    -        BasicExecutionContext oldExecutionContext = getCurrentExecutionContext();
    -        registerPerThreadExecutionContext();
     
    -        if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
    -        fakeTaskForContext.cancel();
    +        ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(fakeTaskForContext,
Collections.emptyList(), true);
    +        if (switchContextWrapper!=null) {
    +            return switchContextWrapper.context.getImmediately(switchContextWrapper.wrapperTask);
    +        }
    +
             try {
    -            BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
    -            
    -            if (!(callableOrSupplier instanceof ImmediateSupplier)) {
    -                callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier);
    -            }
    -            boolean wasAlreadyInterrupted = Thread.interrupted();
    -            try {
    -                return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
    -            } finally {
    -                if (wasAlreadyInterrupted) {
    -                    Thread.currentThread().interrupt();
    -                }
    -            }
    - 
    -        } finally {
    -            BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
    -            perThreadExecutionContext.set(oldExecutionContext);
    +            return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>()
{
    --- End diff --
    
    I don't understand how this works (or more accurately what it's supposed to do).
    
    My expectation for what `getImmediately(task)` should do is: try to evaluate the task
immediately, but if it couldn't then leave the task in a good state so if someone else does
`task.get()` then it will still be usable. However that is not what happens if you call it
on an unsubmitted task.
    
    In the simple test below:
    ```
        @Test
        public void testGetImmediatelyForUnsubmittedTask() throws Exception {
            ExecutionContext executionContext = ((EntityInternal)e).getExecutionContext();
            
            Task<String> task = new BasicTask<String>(new Callable<String>()
{
                @Override public String call() throws Exception {
                    Time.sleep(Duration.ONE_SECOND);
                    return "done";
                }});
            Maybe<Object> result = executionContext.getImmediately(task);
            
            System.out.println("result="+result);
            System.out.println("task="+task+"; begun="+task.isBegun()+"; done="+task.isDone());
            System.out.println("context.get(task)="+executionContext.get(task));
            System.out.println("task.get="+task.get());
            System.out.println("context.submit(task).get()="+executionContext.submit(task).get());
        }
    ```
    The `context.get(task)` throws a `CancellationException`. If you comment that out and
try `task.get()` then the same thing happens. If you comment both of those out and try the
submit, again it fails with the `CancellationException`.


---

Mime
View raw message