brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [5/9] git commit: switch incomplete tasks to be tracked by ID, rather than a simple count, so we can GC those which are interrupted (and so their afterEnd method is never called), ensuring incomplete counts are accurate modulo interrupted-but-not-garbage
Date Fri, 17 Oct 2014 10:24:15 GMT
switch incomplete tasks to be tracked by ID, rather than a simple count, so we can GC those
which are interrupted (and so their afterEnd method is never called),
ensuring incomplete counts are accurate modulo interrupted-but-not-garbage-collected tasks;
and log if tasks are removed while active


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e5945223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e5945223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e5945223

Branch: refs/heads/master
Commit: e5945223e91cfa8c6ecaa76e6c85f1df1b10c60b
Parents: cf86b82
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Thu Oct 16 11:52:11 2014 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Thu Oct 16 17:44:39 2014 +0100

----------------------------------------------------------------------
 .../util/task/BasicExecutionManager.java        | 116 +++++++++++++------
 .../src/main/java/brooklyn/util/task/Tasks.java |  18 ++-
 2 files changed, 87 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e5945223/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
index 6098c5a..87e18b6 100644
--- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
+++ b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -44,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,20 +96,24 @@ public class BasicExecutionManager implements ExecutionManager {
     // TODO Could have a set of all knownTasks; but instead we're having a separate set per
tag,
     // so the same task could be listed multiple times if it has multiple tags...
 
-    //access to the below is synchronized in code in this class, to allow us to preserve
order while guaranteeing thread-safe
+    //access to this field AND to members in this field is synchronized, 
+    //to allow us to preserve order while guaranteeing thread-safe
     //(but more testing is needed before we are sure it is thread-safe!)
     //synch blocks are as finely grained as possible for efficiency
     //Not using a CopyOnWriteArraySet for each, because profiling showed this being a massive
perf bottleneck.
-    private ConcurrentMap<Object,Set<Task<?>>> tasksByTag = new ConcurrentHashMap<Object,Set<Task<?>>>();
+    private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>();
     
     private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>();
 
     private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object,
TaskScheduler>();
-    
+
+    /** count of all tasks submitted, including finished */
     private final AtomicLong totalTaskCount = new AtomicLong();
     
-    private final AtomicInteger incompleteTaskCount = new AtomicInteger();
+    /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd)
*/
+    private Set<String> incompleteTaskIds = new ConcurrentHashSet<String>();
     
+    /** tasks started but not yet finished */
     private final AtomicInteger activeTaskCount = new AtomicInteger();
     
     private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>();
@@ -173,7 +179,10 @@ public class BasicExecutionManager implements ExecutionManager {
      * a reference to it as a tag.
      */
     public void deleteTag(Object tag) {
-        Set<Task<?>> tasks = tasksByTag.remove(tag);
+        Set<Task<?>> tasks;
+        synchronized (tasksByTag) {
+            tasks = tasksByTag.remove(tag);
+        }
         if (tasks != null) {
             for (Task<?> task : tasks) {
                 deleteTask(task);
@@ -196,10 +205,21 @@ public class BasicExecutionManager implements ExecutionManager {
     protected boolean deleteTaskNonRecursive(Task<?> task) {
         Set<?> tags = checkNotNull(task, "task").getTags();
         for (Object tag : tags) {
-            Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
-            if (tasks != null) tasks.remove(task);
+            synchronized (tasksByTag) {
+                Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
+                if (tasks != null) {
+                    tasks.remove(task);
+                    if (tasks.isEmpty()) {
+                        tasksByTag.remove(tag);
+                    }
+                }
+            }
         }
         Task<?> removed = tasksById.remove(task.getId());
+        incompleteTaskIds.remove(task.getId());
+        if (removed!=null && removed.isSubmitted() && !removed.isDone())
{
+            log.warn("Deleting submitted but incomplete task (cancel it first): "+removed);
+        }
         return removed != null;
     }
 
@@ -214,7 +234,7 @@ public class BasicExecutionManager implements ExecutionManager {
     
     /** count of tasks submitted but not ended */
     public long getNumIncompleteTasks() {
-        return incompleteTaskCount.get();
+        return incompleteTaskIds.size();
     }
     
     /** count of tasks started but not ended */
@@ -227,16 +247,24 @@ public class BasicExecutionManager implements ExecutionManager {
         return tasksById.size();
     }
 
-    private Set<Task<?>> tasksWithTagLiveNonNull(Object tag) {
+    private Set<Task<?>> tasksWithTagCreating(Object tag) {
         Preconditions.checkNotNull(tag);
-        tasksByTag.putIfAbsent(tag, Collections.synchronizedSet(new LinkedHashSet<Task<?>>()));
-        return tasksWithTagLiveOrNull(tag);
+        synchronized (tasksByTag) {
+            Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+            if (result==null) {
+                result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>());
+                tasksByTag.put(tag, result);
+            }
+            return result;
+        }
     }
 
     /** exposes live view, for internal use only */
     @Beta
     public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
-        return tasksByTag.get(tag);
+        synchronized (tasksByTag) {
+            return tasksByTag.get(tag);
+        }
     }
 
     @Override
@@ -255,7 +283,8 @@ public class BasicExecutionManager implements ExecutionManager {
     
     @Override
     public Set<Task<?>> getTasksWithTag(Object tag) {
-        Set<Task<?>> result = tasksWithTagLiveNonNull(tag);
+        Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+        if (result==null) return Collections.emptySet();
         synchronized (result) {
             return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result));
         }
@@ -266,12 +295,17 @@ public class BasicExecutionManager implements ExecutionManager {
         Set<Task<?>> result = new LinkedHashSet<Task<?>>();
         Iterator<?> ti = tags.iterator();
         while (ti.hasNext()) {
-            result.addAll(getTasksWithTag(ti.next()));
+            Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
+            if (tasksForTag!=null) {
+                synchronized (tasksForTag) {
+                    result.addAll(tasksForTag);
+                }
+            }
         }
         return Collections.unmodifiableSet(result);
     }
 
-    /** only works with at least one tag */
+    /** only works with at least one tag; returns empty if no tags */
     @Override
     public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
         //NB: for this method retrieval for multiple tags could be made (much) more efficient
(if/when it is used with multiple tags!)
@@ -296,7 +330,11 @@ public class BasicExecutionManager implements ExecutionManager {
     @Beta
     public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
     
-    public Set<Object> getTaskTags() { return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet()));
}
+    public Set<Object> getTaskTags() { 
+        synchronized (tasksByTag) {
+            return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet()));

+        }
+    }
 
     public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1),
r); }
     public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags,
new BasicTask<Void>(flags, r)); }
@@ -362,25 +400,31 @@ public class BasicExecutionManager implements ExecutionManager {
                 final Callable<?> oldJob = taskScheduled.getJob();
                 final TaskInternal<?> taskScheduledF = taskScheduled;
                 taskScheduled.setJob(new Callable() { public Object call() {
+                    boolean resubmitted = false;
                     task.recentRun = taskScheduledF;
-                    synchronized (task) {
-                        task.notifyAll();
-                    }
-                    Object result;
                     try {
-                        result = oldJob.call();
-                    } catch (Exception e) {
-                        log.warn("Error executing "+oldJob+" (scheduled job of "+task+" -
"+task.getDescription()+"); cancelling scheduled execution", e);
-                        throw Exceptions.propagate(e);
-                    }
-                    task.runCount++;
-                    if (task.period!=null && !task.isCancelled()) {
-                        task.delay = task.period;
-                        submitSubsequentScheduledTask(flags, task);
-                    } else {
-                        afterEndScheduledTaskAllIterations(flags, task);
+                        synchronized (task) {
+                            task.notifyAll();
+                        }
+                        Object result;
+                        try {
+                            result = oldJob.call();
+                        } catch (Exception e) {
+                            log.warn("Error executing "+oldJob+" (scheduled job of "+task+"
- "+task.getDescription()+"); cancelling scheduled execution", e);
+                            throw Exceptions.propagate(e);
+                        }
+                        task.runCount++;
+                        if (task.period!=null && !task.isCancelled()) {
+                            task.delay = task.period;
+                            submitSubsequentScheduledTask(flags, task);
+                            resubmitted = true;
+                        }
+                        return result;
+                    } finally {
+                        // do in finally block in case we were interrupted
+                        if (!resubmitted)
+                            afterEndScheduledTaskAllIterations(flags, task);
                     }
-                    return result;
                 }});
                 task.nextRun = taskScheduled;
                 BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
@@ -551,7 +595,7 @@ public class BasicExecutionManager implements ExecutionManager {
     }
     /** invoked when a task is submitted */
     protected void beforeSubmit(Map<?,?> flags, Task<?> task) {
-        incompleteTaskCount.incrementAndGet();
+        incompleteTaskIds.add(task.getId());
         
         Task<?> currentTask = Tasks.current();
         if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask);
@@ -561,7 +605,7 @@ public class BasicExecutionManager implements ExecutionManager {
         if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
 
         for (Object tag: ((TaskInternal<?>)task).getTags()) {
-            tasksWithTagLiveNonNull(tag).add(task);
+            tasksWithTagCreating(tag).add(task);
         }
     }
 
@@ -617,11 +661,11 @@ public class BasicExecutionManager implements ExecutionManager {
             activeTaskCount.decrementAndGet();
         }
         if (isEndingAllIterations) {
-            incompleteTaskCount.decrementAndGet();
+            incompleteTaskIds.remove(task.getId());
             ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+            ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
         }
 
-        ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
         if (startedInThisThread) {
             PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
             //clear thread _after_ endTime set, so we won't get a null thread when there
is no end-time

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e5945223/core/src/main/java/brooklyn/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java
index 09195e8..77ae161 100644
--- a/core/src/main/java/brooklyn/util/task/Tasks.java
+++ b/core/src/main/java/brooklyn/util/task/Tasks.java
@@ -392,22 +392,18 @@ public class Tasks {
 
         t.blockUntilEnded(timer.getDurationRemaining());
         
-        // TODO when the below is confirmed, delete the code below
-//        Future<?> f = ((BasicTask<?>)t).getInternalFuture();
-//        if (f==null) return;
-//        try {
-//            f.isDone();
-//        } catch (Exception e) {
-//            Exceptions.propagateIfFatal(e);
-//            // ignore
-//        }
         while (true) {
             if (t.getEndTimeUtc()>=0) return true;
             // above should be sufficient; but just in case, trying the below
             Thread tt = t.getThread();
-            if (tt==null || !tt.isAlive()) {
-                log.warn("Internal task thread is dead or null ("+tt+") but task not ended:
"+t.getEndTimeUtc()+" ("+t+")");
+            if (t instanceof ScheduledTask) {
+                ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
                 return true;
+            } else {
+                if (tt==null || !tt.isAlive()) {
+                    log.warn("Internal task thread is dead or null ("+tt+") but task not
ended: "+t.getEndTimeUtc()+" ("+t+")");
+                    return true;
+                }
             }
             if (timer.isExpired())
                 return false;


Mime
View raw message