river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricia Shanahan <p...@acm.org>
Subject Re: test failure repeatability - TaskManager
Date Tue, 02 Apr 2013 21:32:07 GMT
runAfter is a method in the TaskManager.Task interface, implemented by
each of its tasks:

/**
* Return true if this task must be run after at least one task
* in the given task list with an index less than size (size may be
* less then tasks.size()).  Using List.get will be more efficient
* than List.iterator.
*
* @param tasks the tasks to consider.  A read-only List, with all
* elements instanceof Task.
* @param size elements with index less than size should be considered
*

The notes I sent to Peter were part of an effort on my part to improve
performance. This has O(N^2) tendencies, because whenever a task
finishes the TaskManager has to ask each waiting task whether it still
needs to wait for any older task. I wanted to change it so that
TaskManager would know which task was being waited for. It could then
associate with one task a list of tasks that need to be reconsidered
when it finishes.

I think runAfter has two possible uses, and I'm not sure which cases are 
for which purpose:

1. Mutual exclusion - two tasks should not be running at the same time.
That could be implemented by the younger returning true for a task list
containing the older. In this case the sort of overtaking I described
below does not matter.

2. Order preservation - A task needs a state change to have happened
that will not happen until after an older task has run.

Patricia

On 4/2/2013 2:17 PM, Bryan Thompson wrote:
> I am not clear on the semantics for runAfter, but maybe this can be
> achieved by wrapping a Runnable within another Runnable such that the 2nd
> runnable is automatically scheduled after the first has succeeded?
> Likewise, it is possible to wrap a Runnable in order to automatically
> retry if it throws an exception.
>
> There are people who are experts at these patterns, but an example is
> given (below my signature) for an Executor that wraps an ExecutorService
> and queues Runnable instances with limited parallelism.  It hooks the
> Runnable in its own run() method.
>
> If you use a ScheduledExecutorService, you can queue a task to run with an
> initial and repeated delay (or at a repeated interval).  The task will be
> rescheduled *unless* it throws an exception.  This could be reused to
> periodically run-try a task after a timeout if we convert an error thrown
> in the task into "no error" (hence run after a fixed delay) and throw out
> a known exception if there is no error (to terminate the retry of the
> task).  A bit of a hack, but it leverages existing code for re-running a
> task with a fixed delay.
>
> Thanks,
> Bryan
>
> package com.bigdata.util.concurrent;
>
> import java.util.concurrent.BlockingQueue;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executor;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Future;
> import java.util.concurrent.FutureTask;
> import java.util.concurrent.LinkedBlockingDeque;
> import java.util.concurrent.RejectedExecutionException;
> import java.util.concurrent.Semaphore;
>
> import org.apache.log4j.Logger;
>
> /**
>   * A fly weight helper class that runs tasks either sequentially or with
> limited
>   * parallelism against some thread pool. Deadlock can arise when limited
>   * parallelism is applied if there are dependencies among the tasks.
> Limited
>   * parallelism is enforced using a counting {@link Semaphore}. New tasks
> can
>   * start iff the latch is non-zero. The maximum parallelism is the minimum
> of
>   * the value specified to the constructor and the potential parallelism of
> the
>   * delegate service.
>   * <p>
>   * Note: The pattern for running tasks on this service is generally to
>   * {@link #execute(Runnable)} a {@link Runnable} and to make that
>   * {@link Runnable} a {@link FutureTask} if you want to await the {@link
> Future}
>   * of a {@link Runnable} or {@link Callable} or otherwise manage its
> execution.
>   * <p>
>   * Note: This class can NOT be trivially wrapped as an {@link
> ExecutorService}
>   * since the resulting delegation pattern for submit() winds up invoking
>   * execute() on the delegate {@link ExecutorService} rather than on this
> class.
>   *
>   * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan
> Thompson</a>
>   * @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
> thompsonbry $
> */
> public class LatchedExecutor implements Executor {
>
>      private static final transient Logger log = Logger
>              .getLogger(LatchedExecutor.class);
>
>      /**
>       * The delegate executor.
>       */
>      private final Executor executor;
>
>      /**
>       * This is used to limit the concurrency with which tasks submitted to
> this
>       * class may execute on the delegate {@link #executor}.
>       */
>      private final Semaphore semaphore;
>
>      /**
>       * A thread-safe blocking queue of pending tasks.
>       *
>       * @todo The capacity of this queue does not of necessity need to be
>       *       unbounded.
>       */
>      private final BlockingQueue<Runnable> queue = new
> LinkedBlockingDeque<Runnable>(/*unbounded*/);
>
>      private final int nparallel;
>
>      /**
>       * Return the maximum parallelism allowed by this {@link Executor}.
>       */
>      public int getNParallel() {
>      	
>      	return nparallel;
>      	
>      }
>
>      public LatchedExecutor(final Executor executor, final int nparallel) {
>
>          if (executor == null)
>              throw new IllegalArgumentException();
>
>          if (nparallel < 1)
>              throw new IllegalArgumentException();
>
>          this.executor = executor;
>
>          this.nparallel = nparallel;
>
>          this.semaphore = new Semaphore(nparallel);
>
>      }
>
>      public void execute(final Runnable r) {
>          if (!queue.offer(new Runnable() {
>              /*
>               * Wrap the Runnable in a class that will start the next
> Runnable
>               * from the queue when it completes.
>               */
>              public void run() {
>                  try {
>                      r.run();
>                  } finally {
>                      scheduleNext();
>                  }
>              }
>          })) {
>              // The queue is full.
>              throw new RejectedExecutionException();
>          }
>          if (semaphore.tryAcquire()) {
>              // We were able to obtain a permit, so start another task.
>              scheduleNext();
>          }
>      }
>
>      /**
>       * Schedule the next task if one is available (non-blocking).
>       * <p>
>       * Pre-condition: The caller has a permit.
>       */
>      private void scheduleNext() {
>          while (true) {
>              Runnable next = null;
>              if ((next = queue.poll()) != null) {
>                  try {
>                      executor.execute(next);
>                      return;
>                  } catch (RejectedExecutionException ex) {
>                      // log error and poll the queue again.
>                      log.error(ex, ex);
>                      continue;
>                  }
>              } else {
>                  semaphore.release();
>                  return;
>              }
>          }
>      }
>
> }
>
>


Mime
View raw message