river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricia Shanahan <p...@acm.org>
Subject Re: test failure repeatability - TaskManager
Date Wed, 03 Apr 2013 21:01:35 GMT
I agree with the idea of understanding the use cases before designing 
the solution, and with using standard API classes as much as possible. 
The table I sent you was intended as a first step towards that.

I'm not convinced that the right solution is a single TaskManager 
successor. Different TaskManager instances may have different use cases, 
and separating them may lead to several simpler solutions, each of which 
as a narrower set of requirements.

Patricia


On 4/3/2013 1:55 PM, Peter wrote:
> Gut feeling suggests the solution will be executor based, so you're
> asking good questions, I think we need to understand the use cases
> better and probably redesign dependant code too.
>
> One example of retry, the task will continue attemtping to retry for
> an entire day.
>
> We might need some kind of delay queue, where dependant tasks can
> signal to following tasks when it's ok to execute.
>
> ----- Original message -----
>> I am not clear on the semantics for runAfter, but maybe this can
>> be achieved by wrapping a Runnable within another Runnable such
>> that the 2nd runnable is automatically scheduled after the first
>> has succeeded? Likewise, it is possible to wrap a Runnable in order
>> to automatically retry if it throws an exception.
>>
>> There are people who are experts at these patterns, but an example
>> is given (below my signature) for an Executor that wraps an
>> ExecutorService and queues Runnable instances with limited
>> parallelism.  It hooks the Runnable in its own run() method.
>>
>> If you use a ScheduledExecutorService, you can queue a task to run
>> with an initial and repeated delay (or at a repeated interval).
>> The task will be rescheduled *unless* it throws an exception.  This
>> could be reused to periodically run-try a task after a timeout if
>> we convert an error thrown in the task into "no error" (hence run
>> after a fixed delay) and throw out a known exception if there is no
>> error (to terminate the retry of the task).  A bit of a hack, but
>> it leverages existing code for re-running a task with a fixed
>> delay.
>>
>> Thanks, Bryan
>>
>> package com.bigdata.util.concurrent;
>>
>> import java.util.concurrent.BlockingQueue; import
>> java.util.concurrent.Callable; import
>> java.util.concurrent.Executor; import
>> java.util.concurrent.ExecutorService; import
>> java.util.concurrent.Future; import
>> java.util.concurrent.FutureTask; import
>> java.util.concurrent.LinkedBlockingDeque; import
>> java.util.concurrent.RejectedExecutionException; import
>> java.util.concurrent.Semaphore;
>>
>> import org.apache.log4j.Logger;
>>
>> /** * A fly weight helper class that runs tasks either sequentially
>> or with limited * parallelism against some thread pool. Deadlock
>> can arise when limited * parallelism is applied if there are
>> dependencies among the tasks. Limited * parallelism is enforced
>> using a counting {@link Semaphore}. New tasks can * start iff the
>> latch is non-zero. The maximum parallelism is the minimum of * the
>> value specified to the constructor and the potential parallelism
>> of the * delegate service. * <p> * Note: The pattern for running
>> tasks on this service is generally to * {@link #execute(Runnable)}
>> a {@link Runnable} and to make that * {@link Runnable} a {@link
>> FutureTask} if you want to await the {@link Future} * of a {@link
>> Runnable} or {@link Callable} or otherwise manage its execution. *
>> <p> * Note: This class can NOT be trivially wrapped as an {@link
>> ExecutorService} * since the resulting delegation pattern for
>> submit() winds up invoking * execute() on the delegate {@link
>> ExecutorService} rather than on this class. * * @author <a
>> href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
>> * @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
>> thompsonbry $ */ public class LatchedExecutor implements Executor
>> {
>>
>> private static final transient Logger log = Logger
>> .getLogger(LatchedExecutor.class);
>>
>> /** * The delegate executor. */ private final Executor executor;
>>
>> /** * This is used to limit the concurrency with which tasks
>> submitted to this * class may execute on the delegate {@link
>> #executor}. */ private final Semaphore semaphore;
>>
>> /** * A thread-safe blocking queue of pending tasks. * * @todo The
>> capacity of this queue does not of necessity need to be *
>> unbounded. */ private final BlockingQueue<Runnable> queue = new
>> LinkedBlockingDeque<Runnable>(/*unbounded*/);
>>
>> private final int nparallel;
>>
>> /** * Return the maximum parallelism allowed by this {@link
>> Executor}. */ public int getNParallel() {
>>
>> return nparallel;
>>
>> }
>>
>> public LatchedExecutor(final Executor executor, final int
>> nparallel) {
>>
>> if (executor == null) throw new IllegalArgumentException();
>>
>> if (nparallel < 1) throw new IllegalArgumentException();
>>
>> this.executor = executor;
>>
>> this.nparallel = nparallel;
>>
>> this.semaphore = new Semaphore(nparallel);
>>
>> }
>>
>> public void execute(final Runnable r) { if (!queue.offer(new
>> Runnable() { /* * Wrap the Runnable in a class that will start the
>> next Runnable * from the queue when it completes. */ public void
>> run() { try { r.run(); } finally { scheduleNext(); } } })) { // The
>> queue is full. throw new RejectedExecutionException(); } if
>> (semaphore.tryAcquire()) { // We were able to obtain a permit, so
>> start another task. scheduleNext(); } }
>>
>> /** * Schedule the next task if one is available (non-blocking). *
>> <p> * Pre-condition: The caller has a permit. */ private void
>> scheduleNext() { while (true) { Runnable next = null; if ((next =
>> queue.poll()) != null) { try { executor.execute(next); return; }
>> catch (RejectedExecutionException ex) { // log error and poll the
>> queue again. log.error(ex, ex); continue; } } else {
>> semaphore.release(); return; } } }
>>
>> }
>>
>>
>
>


Mime
View raw message