brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [18/31] git commit: provide some detail and better tostrings when tasks are backing up
Date Tue, 29 Jul 2014 19:32:13 GMT
provide some detail and better tostrings when tasks are backing up


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c4b29907
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c4b29907
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c4b29907

Branch: refs/heads/master
Commit: c4b29907a39f39d03a2cd896e62df76adf29acb3
Parents: 2ecb968
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Wed Jul 23 13:21:16 2014 -0700
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Tue Jul 29 10:42:08 2014 -0400

----------------------------------------------------------------------
 .../util/task/BasicExecutionManager.java        | 163 +++++++++++--------
 .../util/task/SingleThreadedScheduler.java      |  11 +-
 2 files changed, 102 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c4b29907/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
index a9d6a46..1a89501 100644
--- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
+++ b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
@@ -300,47 +300,64 @@ public class BasicExecutionManager implements ExecutionManager {
 		task.submitTimeUtc = System.currentTimeMillis();
 		tasksById.put(task.getId(), task);
 		if (!task.isDone()) {
-			task.result = delayedRunner.schedule(new Callable<Object>() { @SuppressWarnings("rawtypes")
-            public Object call() {
-				if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis();
-				try {
-				    beforeStart(flags, task);
-				    final TaskInternal<?> taskScheduled = (TaskInternal<?>) task.newTask();
-				    taskScheduled.setSubmittedByTask(task);
-				    final Callable<?> oldJob = taskScheduled.getJob();
-				    taskScheduled.setJob(new Callable() { public Object call() {
-				        task.recentRun = taskScheduled;
-				        synchronized (task) {
-				            task.notifyAll();
-				        }
-				        Object result;
-				        try {
-				            result = oldJob.call();
-				        } catch (Exception e) {
-				            log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+");
cancelling scheduled execution", e);
-				            throw Exceptions.propagate(e);
-				        }
-				        task.runCount++;
-				        if (task.period!=null && !task.isCancelled()) {
-				            task.delay = task.period;
-				            submitNewScheduledTask(flags, task);
-				        }
-				        return result;
-				    }});
-				    task.nextRun = taskScheduled;
-				    BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
-				    if (ec!=null) return ec.submit(taskScheduled);
-				    else return submit(taskScheduled);
-				} finally {
-				    afterEnd(flags, task);
-				}
-			}},
-			task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
+			task.result = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
+			    task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
 		} else {
 			task.endTimeUtc = System.currentTimeMillis();
 		}
 		return task;
 	}
+	
+	protected class ScheduledTaskCallable implements Callable<Object> {
+	    public ScheduledTask task;
+	    public Map<?,?> flags;
+	    
+	    public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
+	        this.task = task;
+	        this.flags = flags;
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public Object call() {
+            if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis();
+            try {
+                beforeStart(flags, task);
+                final TaskInternal<?> taskScheduled = (TaskInternal<?>) task.newTask();
+                taskScheduled.setSubmittedByTask(task);
+                final Callable<?> oldJob = taskScheduled.getJob();
+                taskScheduled.setJob(new Callable() { public Object call() {
+                    task.recentRun = taskScheduled;
+                    synchronized (task) {
+                        task.notifyAll();
+                    }
+                    Object result;
+                    try {
+                        result = oldJob.call();
+                    } catch (Exception e) {
+                        log.warn("Error executing "+oldJob+" (scheduled job of "+task+" -
"+task.getDescription()+"); cancelling scheduled execution", e);
+                        throw Exceptions.propagate(e);
+                    }
+                    task.runCount++;
+                    if (task.period!=null && !task.isCancelled()) {
+                        task.delay = task.period;
+                        submitNewScheduledTask(flags, task);
+                    }
+                    return result;
+                }});
+                task.nextRun = taskScheduled;
+                BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
+                if (ec!=null) return ec.submit(taskScheduled);
+                else return submit(taskScheduled);
+            } finally {
+                afterEnd(flags, task);
+            }
+        }
+        
+        @Override
+        public String toString() {
+            return "ScheduledTaskCallable["+task+","+flags+"]";
+        }
+	}
 
     @SuppressWarnings("unchecked")
     protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T>
task) {
@@ -355,43 +372,49 @@ public class BasicExecutionManager implements ExecutionManager {
         if (((TaskInternal<T>)task).getJob() == null) 
             throw new NullPointerException("Task "+task+" submitted with with null job: job
must be supplied.");
         
-        Callable<T> job = new Callable<T>() { public T call() {
-          try {
-            T result = null;
-            Throwable error = null;
-            String oldThreadName = Thread.currentThread().getName();
-            try {
-                if (RENAME_THREADS) {
-                    String newThreadName = oldThreadName+"-"+task.getDisplayName()+
-                            "["+task.getId().substring(0, 8)+"]";
-                    Thread.currentThread().setName(newThreadName);
-                }
-                beforeStart(flags, task);
-                if (!task.isCancelled()) {
-                    result = ((TaskInternal<T>)task).getJob().call();
-                } else throw new CancellationException();
-            } catch(Throwable e) {
-                error = e;
-            } finally {
-                if (RENAME_THREADS) {
-                    Thread.currentThread().setName(oldThreadName);
+        Callable<T> job = new Callable<T>() { 
+            public T call() {
+                try {
+                    T result = null;
+                    Throwable error = null;
+                    String oldThreadName = Thread.currentThread().getName();
+                    try {
+                        if (RENAME_THREADS) {
+                            String newThreadName = oldThreadName+"-"+task.getDisplayName()+
+                                "["+task.getId().substring(0, 8)+"]";
+                            Thread.currentThread().setName(newThreadName);
+                        }
+                        beforeStart(flags, task);
+                        if (!task.isCancelled()) {
+                            result = ((TaskInternal<T>)task).getJob().call();
+                        } else throw new CancellationException();
+                    } catch(Throwable e) {
+                        error = e;
+                    } finally {
+                        if (RENAME_THREADS) {
+                            Thread.currentThread().setName(oldThreadName);
+                        }
+                        afterEnd(flags, task);
+                    }
+                    if (error!=null) {
+                        if (log.isDebugEnabled()) {
+                            // debug only here, because we rethrow
+                            log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(),
error);
+                            if (log.isTraceEnabled())
+                                log.trace("Trace for exception running task "+task+" (rethrowing):
"+error.getMessage(), error);
+                        }
+                        throw Exceptions.propagate(error);
+                    }
+                    return result;
+                } finally {
+                    ((TaskInternal<?>)task).runListeners();
                 }
-                afterEnd(flags, task);
             }
-            if (error!=null) {
-                if (log.isDebugEnabled()) {
-                    // debug only here, because we rethrow
-                    log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(),
error);
-                    if (log.isTraceEnabled())
-                        log.trace("Trace for exception running task "+task+" (rethrowing):
"+error.getMessage(), error);
-                }
-                throw Exceptions.propagate(error);
+            @Override
+            public String toString() {
+                return "BasicExecutionManager.submitNewTask.Callable["+task+","+flags+"]";
             }
-            return result;
-          } finally {
-              ((TaskInternal<?>)task).runListeners();
-          }
-        }};
+        };
         
         // If there's a scheduler then use that; otherwise execute it directly
         Set<TaskScheduler> schedulers = null;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c4b29907/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
index c53dc95..4c28369 100644
--- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
+++ b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
@@ -18,7 +18,6 @@
  */
 package brooklyn.util.task;
 
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -77,10 +76,18 @@ public class SingleThreadedScheduler implements TaskScheduler, CanSetName
{
             return executeNow(c);
         } else {
             WrappingFuture<T> f = new WrappingFuture<T>();
-            order.add(new QueuedSubmission<T>(c, f));
+            order.add(new QueuedSubmission<T>(c, f) {
+                @Override
+                public String toString() {
+                    return "QueuedSubmission["+c+"]";
+                }
+            });
             int size = order.size();
             if (size>0 && (size == 50 || (size<=500 && (size%100)==0)
|| (size%1000)==0) && size!=lastSizeWarn) {
                 LOG.warn("{} is backing up, {} tasks queued", this, size);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Task queue backing up detail, queue "+this+"; task context
is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
+                }
                 lastSizeWarn = size;
             }
             return f;


Mime
View raw message