river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dan Creswell <dan.cresw...@gmail.com>
Subject Re: test failure repeatability - TaskManager
Date Wed, 03 Apr 2013 21:04:40 GMT
I'm with you. My first step was going to be reviewing where runAfter is
used, how often etc. I'd first like to be convinced that all the ordering
constraints are actually required and can't be circumvented/dropped.

On 3 April 2013 22:01, Patricia Shanahan <pats@acm.org> wrote:

> I agree with the idea of understanding the use cases before designing the
> solution, and with using standard API classes as much as possible. The
> table I sent you was intended as a first step towards that.
>
> I'm not convinced that the right solution is a single TaskManager
> successor. Different TaskManager instances may have different use cases,
> and separating them may lead to several simpler solutions, each of which as
> a narrower set of requirements.
>
> Patricia
>
>
>
> On 4/3/2013 1:55 PM, Peter wrote:
>
>> Gut feeling suggests the solution will be executor based, so you're
>> asking good questions, I think we need to understand the use cases
>> better and probably redesign dependant code too.
>>
>> One example of retry, the task will continue attemtping to retry for
>> an entire day.
>>
>> We might need some kind of delay queue, where dependant tasks can
>> signal to following tasks when it's ok to execute.
>>
>> ----- Original message -----
>>
>>> I am not clear on the semantics for runAfter, but maybe this can
>>> be achieved by wrapping a Runnable within another Runnable such
>>> that the 2nd runnable is automatically scheduled after the first
>>> has succeeded? Likewise, it is possible to wrap a Runnable in order
>>> to automatically retry if it throws an exception.
>>>
>>> There are people who are experts at these patterns, but an example
>>> is given (below my signature) for an Executor that wraps an
>>> ExecutorService and queues Runnable instances with limited
>>> parallelism.  It hooks the Runnable in its own run() method.
>>>
>>> If you use a ScheduledExecutorService, you can queue a task to run
>>> with an initial and repeated delay (or at a repeated interval).
>>> The task will be rescheduled *unless* it throws an exception.  This
>>> could be reused to periodically run-try a task after a timeout if
>>> we convert an error thrown in the task into "no error" (hence run
>>> after a fixed delay) and throw out a known exception if there is no
>>> error (to terminate the retry of the task).  A bit of a hack, but
>>> it leverages existing code for re-running a task with a fixed
>>> delay.
>>>
>>> Thanks, Bryan
>>>
>>> package com.bigdata.util.concurrent;
>>>
>>> import java.util.concurrent.**BlockingQueue; import
>>> java.util.concurrent.Callable; import
>>> java.util.concurrent.Executor; import
>>> java.util.concurrent.**ExecutorService; import
>>> java.util.concurrent.Future; import
>>> java.util.concurrent.**FutureTask; import
>>> java.util.concurrent.**LinkedBlockingDeque; import
>>> java.util.concurrent.**RejectedExecutionException; import
>>> java.util.concurrent.**Semaphore;
>>>
>>> import org.apache.log4j.Logger;
>>>
>>> /** * A fly weight helper class that runs tasks either sequentially
>>> or with limited * parallelism against some thread pool. Deadlock
>>> can arise when limited * parallelism is applied if there are
>>> dependencies among the tasks. Limited * parallelism is enforced
>>> using a counting {@link Semaphore}. New tasks can * start iff the
>>> latch is non-zero. The maximum parallelism is the minimum of * the
>>> value specified to the constructor and the potential parallelism
>>> of the * delegate service. * <p> * Note: The pattern for running
>>> tasks on this service is generally to * {@link #execute(Runnable)}
>>> a {@link Runnable} and to make that * {@link Runnable} a {@link
>>> FutureTask} if you want to await the {@link Future} * of a {@link
>>> Runnable} or {@link Callable} or otherwise manage its execution. *
>>> <p> * Note: This class can NOT be trivially wrapped as an {@link
>>> ExecutorService} * since the resulting delegation pattern for
>>> submit() winds up invoking * execute() on the delegate {@link
>>> ExecutorService} rather than on this class. * * @author <a
>>> href="mailto:thompsonbry@**users.sourceforge.net<thompsonbry@users.sourceforge.net>">Bryan
>>> Thompson</a>
>>> * @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
>>> thompsonbry $ */ public class LatchedExecutor implements Executor
>>> {
>>>
>>> private static final transient Logger log = Logger
>>> .getLogger(LatchedExecutor.**class);
>>>
>>> /** * The delegate executor. */ private final Executor executor;
>>>
>>> /** * This is used to limit the concurrency with which tasks
>>> submitted to this * class may execute on the delegate {@link
>>> #executor}. */ private final Semaphore semaphore;
>>>
>>> /** * A thread-safe blocking queue of pending tasks. * * @todo The
>>> capacity of this queue does not of necessity need to be *
>>> unbounded. */ private final BlockingQueue<Runnable> queue = new
>>> LinkedBlockingDeque<Runnable>(**/*unbounded*/);
>>>
>>> private final int nparallel;
>>>
>>> /** * Return the maximum parallelism allowed by this {@link
>>> Executor}. */ public int getNParallel() {
>>>
>>> return nparallel;
>>>
>>> }
>>>
>>> public LatchedExecutor(final Executor executor, final int
>>> nparallel) {
>>>
>>> if (executor == null) throw new IllegalArgumentException();
>>>
>>> if (nparallel < 1) throw new IllegalArgumentException();
>>>
>>> this.executor = executor;
>>>
>>> this.nparallel = nparallel;
>>>
>>> this.semaphore = new Semaphore(nparallel);
>>>
>>> }
>>>
>>> public void execute(final Runnable r) { if (!queue.offer(new
>>> Runnable() { /* * Wrap the Runnable in a class that will start the
>>> next Runnable * from the queue when it completes. */ public void
>>> run() { try { r.run(); } finally { scheduleNext(); } } })) { // The
>>> queue is full. throw new RejectedExecutionException(); } if
>>> (semaphore.tryAcquire()) { // We were able to obtain a permit, so
>>> start another task. scheduleNext(); } }
>>>
>>> /** * Schedule the next task if one is available (non-blocking). *
>>> <p> * Pre-condition: The caller has a permit. */ private void
>>> scheduleNext() { while (true) { Runnable next = null; if ((next =
>>> queue.poll()) != null) { try { executor.execute(next); return; }
>>> catch (RejectedExecutionException ex) { // log error and poll the
>>> queue again. log.error(ex, ex); continue; } } else {
>>> semaphore.release(); return; } } }
>>>
>>> }
>>>
>>>
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message