river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter <j...@zeus.net.au>
Subject Re: test failure repeatability - TaskManager
Date Wed, 03 Apr 2013 20:55:50 GMT
Gut feeling suggests the solution will be executor based, so you're asking good questions,
I think we need to understand the use cases better and probably redesign dependant code too.

One example of retry, the task will continue attemtping to retry for an entire day.

We might need some kind of delay queue, where dependant tasks can signal to following tasks
when it's ok to execute.

----- Original message -----
> I am not clear on the semantics for runAfter, but maybe this can be
> achieved by wrapping a Runnable within another Runnable such that the 2nd
> runnable is automatically scheduled after the first has succeeded?
> Likewise, it is possible to wrap a Runnable in order to automatically
> retry if it throws an exception.
>
> There are people who are experts at these patterns, but an example is
> given (below my signature) for an Executor that wraps an ExecutorService
> and queues Runnable instances with limited parallelism.  It hooks the
> Runnable in its own run() method.
>
> If you use a ScheduledExecutorService, you can queue a task to run with an
> initial and repeated delay (or at a repeated interval).  The task will be
> rescheduled *unless* it throws an exception.  This could be reused to
> periodically run-try a task after a timeout if we convert an error thrown
> in the task into "no error" (hence run after a fixed delay) and throw out
> a known exception if there is no error (to terminate the retry of the
> task).  A bit of a hack, but it leverages existing code for re-running a
> task with a fixed delay.
>
> Thanks,
> Bryan
>
> package com.bigdata.util.concurrent;
>
> import java.util.concurrent.BlockingQueue;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executor;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Future;
> import java.util.concurrent.FutureTask;
> import java.util.concurrent.LinkedBlockingDeque;
> import java.util.concurrent.RejectedExecutionException;
> import java.util.concurrent.Semaphore;
>
> import org.apache.log4j.Logger;
>
> /**
>  * A fly weight helper class that runs tasks either sequentially or with
> limited
>  * parallelism against some thread pool. Deadlock can arise when limited
>  * parallelism is applied if there are dependencies among the tasks.
> Limited
>  * parallelism is enforced using a counting {@link Semaphore}. New tasks
> can
>  * start iff the latch is non-zero. The maximum parallelism is the minimum
> of
>  * the value specified to the constructor and the potential parallelism of
> the
>  * delegate service.
>  * <p>
>  * Note: The pattern for running tasks on this service is generally to
>  * {@link #execute(Runnable)} a {@link Runnable} and to make that
>  * {@link Runnable} a {@link FutureTask} if you want to await the {@link
> Future}
>  * of a {@link Runnable} or {@link Callable} or otherwise manage its
> execution.
>  * <p>
>  * Note: This class can NOT be trivially wrapped as an {@link
> ExecutorService}
>  * since the resulting delegation pattern for submit() winds up invoking
>  * execute() on the delegate {@link ExecutorService} rather than on this
> class.
>  *
>  * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan
> Thompson</a>
>  * @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
> thompsonbry $
> */
> public class LatchedExecutor implements Executor {
>
>        private static final transient Logger log = Logger
>                        .getLogger(LatchedExecutor.class);
>       
>        /**
>          * The delegate executor.
>          */
>        private final Executor executor;
>       
>        /**
>          * This is used to limit the concurrency with which tasks submitted to
> this
>          * class may execute on the delegate {@link #executor}.
>          */
>        private final Semaphore semaphore;
>       
>        /**
>          * A thread-safe blocking queue of pending tasks.
>          *
>          * @todo The capacity of this queue does not of necessity need to be
>          *            unbounded.
>          */
>        private final BlockingQueue<Runnable> queue = new
> LinkedBlockingDeque<Runnable>(/*unbounded*/);
>
>        private final int nparallel;
>       
>        /**
>          * Return the maximum parallelism allowed by this {@link Executor}.
>          */
>        public int getNParallel() {
>           
>            return nparallel;
>           
>        }
>       
>        public LatchedExecutor(final Executor executor, final int nparallel) {
>
>                if (executor == null)
>                        throw new IllegalArgumentException();
>
>                if (nparallel < 1)
>                        throw new IllegalArgumentException();
>
>                this.executor = executor;
>
>                this.nparallel = nparallel;
>               
>                this.semaphore = new Semaphore(nparallel);
>
>        }
>
>        public void execute(final Runnable r) {
>                if (!queue.offer(new Runnable() {
>                        /*
>                          * Wrap the Runnable in a class that will start
the next
> Runnable
>                          * from the queue when it completes.
>                          */
>                        public void run() {
>                                try {
>                                        r.run();
>                                } finally {
>                                        scheduleNext();
>                                }
>                        }
>                })) {
>                        // The queue is full.
>                        throw new RejectedExecutionException();
>                }
>                if (semaphore.tryAcquire()) {
>                        // We were able to obtain a permit, so start another
task.
>                        scheduleNext();
>                }
>        }
>
>        /**
>          * Schedule the next task if one is available (non-blocking).
>          * <p>
>          * Pre-condition: The caller has a permit.
>          */
>        private void scheduleNext() {
>                while (true) {
>                        Runnable next = null;
>                        if ((next = queue.poll()) != null) {
>                                try {
>                                        executor.execute(next);
>                                        return;
>                                } catch (RejectedExecutionException ex)
{
>                                        // log error and poll the
queue again.
>                                        log.error(ex, ex);
>                                        continue;
>                                }
>                        } else {
>                                semaphore.release();
>                                return;
>                        }
>                }
>        }
>
> }
>
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message