Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C5B9D1855E for ; Fri, 21 Aug 2015 21:23:27 +0000 (UTC) Received: (qmail 15290 invoked by uid 500); 21 Aug 2015 21:23:27 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 15258 invoked by uid 500); 21 Aug 2015 21:23:27 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 15249 invoked by uid 99); 21 Aug 2015 21:23:27 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 21:23:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 13A911AA9AD for ; Fri, 21 Aug 2015 21:23:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.795 X-Spam-Level: * X-Spam-Status: No, score=1.795 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id JI6r56TSXDLY for ; Fri, 21 Aug 2015 21:22:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id ACF9A276FA for ; Fri, 21 Aug 2015 21:22:29 +0000 (UTC) Received: (qmail 9400 invoked by uid 99); 21 Aug 2015 21:22:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 21:22:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C4247E6B10; Fri, 21 Aug 2015 21:22:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.incubator.apache.org Date: Fri, 21 Aug 2015 21:23:03 -0000 Message-Id: In-Reply-To: <2436fee0ae564f7ea484456c1907c152@git.apache.org> References: <2436fee0ae564f7ea484456c1907c152@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java deleted file mode 100644 index 268a3fe..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java +++ /dev/null @@ -1,309 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Channel.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version - 25aug1998 dl added peek -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * Main interface for buffers, queues, pipes, conduits, etc. - *

- * A Channel represents anything that you can put items - * into and take them out of. As with the Sync - * interface, both - * blocking (put(x), take), - * and timeouts (offer(x, msecs), poll(msecs)) policies - * are provided. Using a - * zero timeout for offer and poll results in a pure balking policy. - *

- * To aid in efforts to use Channels in a more typesafe manner, - * this interface extends Puttable and Takable. You can restrict - * arguments of instance variables to this type as a way of - * guaranteeing that producers never try to take, or consumers put. - * for example: - *

- * class Producer implements Runnable {
- *   final Puttable chan;
- *   Producer(Puttable channel) { chan = channel; }
- *   public void run() {
- *     try {
- *       for(;;) { chan.put(produce()); }
- *     }
- *     catch (InterruptedException ex) {}
- *   }
- *   Object produce() { ... }
- * }
- *
- *
- * class Consumer implements Runnable {
- *   final Takable chan;
- *   Consumer(Takable channel) { chan = channel; }
- *   public void run() {
- *     try {
- *       for(;;) { consume(chan.take()); }
- *     }
- *     catch (InterruptedException ex) {}
- *   }
- *   void consume(Object x) { ... }
- * }
- *
- * class Setup {
- *   void main() {
- *     Channel chan = new SomeChannelImplementation();
- *     Producer p = new Producer(chan);
- *     Consumer c = new Consumer(chan);
- *     new Thread(p).start();
- *     new Thread(c).start();
- *   }
- * }
- * 
- *

- * A given channel implementation might or might not have bounded - * capacity or other insertion constraints, so in general, you cannot tell if - * a given put will block. However, - * Channels that are designed to - * have an element capacity (and so always block when full) - * should implement the - * BoundedChannel - * subinterface. - *

- * Channels may hold any kind of item. However, - * insertion of null is not in general supported. Implementations - * may (all currently do) throw IllegalArgumentExceptions upon attempts to - * insert null. - *

- * By design, the Channel interface does not support any methods to determine - * the current number of elements being held in the channel. - * This decision reflects the fact that in - * concurrent programming, such methods are so rarely useful - * that including them invites misuse; at best they could - * provide a snapshot of current - * state, that could change immediately after being reported. - * It is better practice to instead use poll and offer to try - * to take and put elements without blocking. For example, - * to empty out the current contents of a channel, you could write: - *

- *  try {
- *    for (;;) {
- *       Object item = channel.poll(0);
- *       if (item != null)
- *         process(item);
- *       else
- *         break;
- *    }
- *  }
- *  catch(InterruptedException ex) { ... }
- * 
- *

- * However, it is possible to determine whether an item - * exists in a Channel via peek, which returns - * but does NOT remove the next item that can be taken (or null - * if there is no such item). The peek operation has a limited - * range of applicability, and must be used with care. Unless it - * is known that a given thread is the only possible consumer - * of a channel, and that no time-out-based offer operations - * are ever invoked, there is no guarantee that the item returned - * by peek will be available for a subsequent take. - *

- * When appropriate, you can define an isEmpty method to - * return whether peek returns null. - *

- * Also, as a compromise, even though it does not appear in interface, - * implementation classes that can readily compute the number - * of elements support a size() method. This allows careful - * use, for example in queue length monitors, appropriate to the - * particular implementation constraints and properties. - *

- * All channels allow multiple producers and/or consumers. - * They do not support any kind of close method - * to shut down operation or indicate completion of particular - * producer or consumer threads. - * If you need to signal completion, one way to do it is to - * create a class such as - *

- * class EndOfStream { 
- *    // Application-dependent field/methods
- * }
- * 
- * And to have producers put an instance of this class into - * the channel when they are done. The consumer side can then - * check this via - *
- *   Object x = aChannel.take();
- *   if (x instanceof EndOfStream) 
- *     // special actions; perhaps terminate
- *   else
- *     // process normally
- * 
- *

- * In time-out based methods (poll(msecs) and offer(x, msecs), - * time bounds are interpreted in - * a coarse-grained, best-effort fashion. Since there is no - * way in Java to escape out of a wait for a synchronized - * method/block, time bounds can sometimes be exceeded when - * there is a lot contention for the channel. Additionally, - * some Channel semantics entail a ``point of - * no return'' where, once some parts of the operation have completed, - * others must follow, regardless of time bound. - *

- * Interruptions are in general handled as early as possible - * in all methods. Normally, InterruptionExceptions are thrown - * in put/take and offer(msec)/poll(msec) if interruption - * is detected upon entry to the method, as well as in any - * later context surrounding waits. - *

- * If a put returns normally, an offer - * returns true, or a put or poll returns non-null, the operation - * completed successfully. - * In all other cases, the operation fails cleanly -- the - * element is not put or taken. - *

- * As with Sync classes, spinloops are not directly supported, - * are not particularly recommended for routine use, but are not hard - * to construct. For example, here is an exponential backoff version: - *

- * Object backOffTake(Channel q) throws InterruptedException {
- *   long waitTime = 0;
- *   for (;;) {
- *      Object x = q.poll(0);
- *      if (x != null)
- *        return x;
- *      else {
- *        Thread.sleep(waitTime);
- *        waitTime = 3 * waitTime / 2 + 1;
- *      }
- *    }
- * 
- *

- * Sample Usage. Here is a producer/consumer design - * where the channel is used to hold Runnable commands representing - * background tasks. - *

- * class Service {
- *   private final Channel channel = ... some Channel implementation;
- *  
- *   private void backgroundTask(int taskParam) { ... }
- *
- *   public void action(final int arg) {
- *     Runnable command = 
- *       new Runnable() {
- *         public void run() { backgroundTask(arg); }
- *       };
- *     try { channel.put(command) }
- *     catch (InterruptedException ex) {
- *       Thread.currentThread().interrupt(); // ignore but propagate
- *     }
- *   }
- * 
- *   public Service() {
- *     Runnable backgroundLoop = 
- *       new Runnable() {
- *         public void run() {
- *           for (;;) {
- *             try {
- *               Runnable task = (Runnable)(channel.take());
- *               task.run();
- *             }
- *             catch (InterruptedException ex) { return; }
- *           }
- *         }
- *       };
- *     new Thread(backgroundLoop).start();
- *   }
- * }
- *    
- * 
- *

[ Introduction to this package. ] - * @see Sync - * @see BoundedChannel -**/ - -public interface Channel extends Puttable, Takable { - - /** - * Place item in the channel, possibly waiting indefinitely until - * it can be accepted. Channels implementing the BoundedChannel - * subinterface are generally guaranteed to block on puts upon - * reaching capacity, but other implementations may or may not block. - * @param item the element to be inserted. Should be non-null. - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case the element is guaranteed not - * to be inserted. Otherwise, on normal return, the element is guaranteed - * to have been inserted. - **/ - public void put(Object item) throws InterruptedException; - - /** - * Place item in channel only if it can be accepted within - * msecs milliseconds. The time bound is interpreted in - * a coarse-grained, best-effort fashion. - * @param item the element to be inserted. Should be non-null. - * @param msecs the number of milliseconds to wait. If less than - * or equal to zero, the method does not perform any timed waits, - * but might still require - * access to a synchronization lock, which can impose unbounded - * delay if there is a lot of contention for the channel. - * @return true if accepted, else false - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case the element is guaranteed not - * to be inserted (i.e., is equivalent to a false return). - **/ - public boolean offer(Object item, long msecs) throws InterruptedException; - - /** - * Return and remove an item from channel, - * possibly waiting indefinitely until - * such an item exists. - * @return some item from the channel. Different implementations - * may guarantee various properties (such as FIFO) about that item - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case state of the channel is unchanged. - * - **/ - public Object take() throws InterruptedException; - - - /** - * Return and remove an item from channel only if one is available within - * msecs milliseconds. The time bound is interpreted in a coarse - * grained, best-effort fashion. - * @param msecs the number of milliseconds to wait. If less than - * or equal to zero, the operation does not perform any timed waits, - * but might still require - * access to a synchronization lock, which can impose unbounded - * delay if there is a lot of contention for the channel. - * @return some item, or null if the channel is empty. - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case state of the channel is unchanged - * (i.e., equivalent to a null return). - **/ - - public Object poll(long msecs) throws InterruptedException; - - /** - * Return, but do not remove object at head of Channel, - * or null if it is empty. - **/ - - public Object peek(); - - -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java deleted file mode 100644 index d21e929..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java +++ /dev/null @@ -1,403 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: ClockDaemon.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 29Aug1998 dl created initial public version - 17dec1998 dl null out thread after shutdown -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -import java.util.Date; - -/** - * A general-purpose time-based daemon, vaguely similar in functionality - * to common system-level utilities such as at - * (and the associated crond) in Unix. - * Objects of this class maintain a single thread and a task queue - * that may be used to execute Runnable commands in any of three modes -- - * absolute (run at a given time), relative (run after a given delay), - * and periodic (cyclically run with a given delay). - *

- * All commands are executed by the single background thread. - * The thread is not actually started until the first - * request is encountered. Also, if the - * thread is stopped for any reason, one is started upon encountering - * the next request, or restart() is invoked. - *

- * If you would instead like commands run in their own threads, you can - * use as arguments Runnable commands that start their own threads - * (or perhaps wrap within ThreadedExecutors). - *

- * You can also use multiple - * daemon objects, each using a different background thread. However, - * one of the reasons for using a time daemon is to pool together - * processing of infrequent tasks using a single background thread. - *

- * Background threads are created using a ThreadFactory. The - * default factory does not - * automatically setDaemon status. - *

- * The class uses Java timed waits for scheduling. These can vary - * in precision across platforms, and provide no real-time guarantees - * about meeting deadlines. - *

[ Introduction to this package. ] - **/ - -public class ClockDaemon extends ThreadFactoryUser { - - - /** tasks are maintained in a standard priority queue **/ - protected final Heap heap_ = new Heap(DefaultChannelCapacity.get()); - - - protected static class TaskNode implements Comparable { - final Runnable command; // The command to run - final long period; // The cycle period, or -1 if not periodic - private long timeToRun_; // The time to run command - - // Cancellation does not immediately remove node, it just - // sets up lazy deletion bit, so is thrown away when next - // encountered in run loop - - private boolean cancelled_ = false; - - // Access to cancellation status and and run time needs sync - // since they can be written and read in different threads - - synchronized void setCancelled() { cancelled_ = true; } - synchronized boolean getCancelled() { return cancelled_; } - - synchronized void setTimeToRun(long w) { timeToRun_ = w; } - synchronized long getTimeToRun() { return timeToRun_; } - - - public int compareTo(Object other) { - long a = getTimeToRun(); - long b = ((TaskNode)(other)).getTimeToRun(); - return (a < b)? -1 : ((a == b)? 0 : 1); - } - - @Override - public boolean equals(Object o) { // GemStoneAddition - if (o == null || !(o instanceof TaskNode)) return false; - return this.compareTo(o) == 0; - } - - @Override - public int hashCode() { // GemStoneAddition - return (int)getTimeToRun(); - } - - TaskNode(long w, Runnable c, long p) { - timeToRun_ = w; command = c; period = p; - } - - TaskNode(long w, Runnable c) { this(w, c, -1); } - } - - - /** - * Execute the given command at the given time. - * @param date -- the absolute time to run the command, expressed - * as a java.util.Date. - * @param command -- the command to run at the given time. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executeAt(Date date, Runnable command) { - TaskNode task = new TaskNode(date.getTime(), command); - heap_.insert(task); - restart(); - return task; - } - - /** - * Excecute the given command after waiting for the given delay. - *

- * Sample Usage. - * You can use a ClockDaemon to arrange timeout callbacks to break out - * of stuck IO. For example (code sketch): - *

-   * class X {   ...
-   * 
-   *   ClockDaemon timer = ...
-   *   Thread readerThread;
-   *   FileInputStream datafile;
-   * 
-   *   void startReadThread() {
-   *     datafile = new FileInputStream("data", ...);
-   * 
-   *     readerThread = new Thread(new Runnable() {
-   *      public void run() {
-   *        for(;;) {
-   *          // try to gracefully exit before blocking
-   *         if (Thread.currentThread().isInterrupted()) {
-   *           quietlyWrapUpAndReturn();
-   *         }
-   *         else {
-   *           try {
-   *             int c = datafile.read();
-   *             if (c == -1) break;
-   *             else process(c);
-   *           }
-   *           catch (IOException ex) {
-   *            cleanup();
-   *            return;
-   *          }
-   *       }
-   *     } };
-   *
-   *    readerThread.start();
-   *
-   *    // establish callback to cancel after 60 seconds
-   *    timer.executeAfterDelay(60000, new Runnable() {
-   *      readerThread.interrupt();    // try to interrupt thread
-   *      datafile.close(); // force thread to lose its input file 
-   *    });
-   *   } 
-   * }
-   * 
- * @param millisecondsToDelay -- the number of milliseconds - * from now to run the command. - * @param command -- the command to run after the delay. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executeAfterDelay(long millisecondsToDelay, Runnable command) { - long runtime = System.currentTimeMillis() + millisecondsToDelay; - TaskNode task = new TaskNode(runtime, command); - heap_.insert(task); - restart(); - return task; - } - - /** - * Execute the given command every period milliseconds. - * If startNow is true, execution begins immediately, - * otherwise, it begins after the first period delay. - *

- * Sample Usage. Here is one way - * to update Swing components acting as progress indicators for - * long-running actions. - *

-   * class X {
-   *   JLabel statusLabel = ...;
-   *
-   *   int percentComplete = 0;
-   *   synchronized int  getPercentComplete() { return percentComplete; }
-   *   synchronized void setPercentComplete(int p) { percentComplete = p; }
-   *
-   *   ClockDaemon cd = ...;
-   * 
-   *   void startWorking() {
-   *     Runnable showPct = new Runnable() {
-   *       public void run() {
-   *          SwingUtilities.invokeLater(new Runnable() {
-   *            public void run() {
-   *              statusLabel.setText(getPercentComplete() + "%");
-   *            } 
-   *          } 
-   *       } 
-   *     };
-   *
-   *     final Object updater = cd.executePeriodically(500, showPct, true);
-   *
-   *     Runnable action = new Runnable() {
-   *       public void run() {
-   *         for (int i = 0; i < 100; ++i) {
-   *           work();
-   *           setPercentComplete(i);
-   *         }
-   *         cd.cancel(updater);
-   *       }
-   *     };
-   *
-   *     new Thread(action).start();
-   *   }
-   * }  
-   * 
- * @param period -- the period, in milliseconds. Periods are - * measured from start-of-task to the next start-of-task. It is - * generally a bad idea to use a period that is shorter than - * the expected task duration. - * @param command -- the command to run at each cycle - * @param startNow -- true if the cycle should start with execution - * of the task now. Otherwise, the cycle starts with a delay of - * period milliseconds. - * @exception IllegalArgumentException if period less than or equal to zero. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executePeriodically(long period, - Runnable command, - boolean startNow) { - - if (period <= 0) throw new IllegalArgumentException(); - - long firstTime = System.currentTimeMillis(); - if (!startNow) firstTime += period; - - TaskNode task = new TaskNode(firstTime, command, period); - heap_.insert(task); - restart(); - return task; - } - - /** - * Cancel a scheduled task that has not yet been run. - * The task will be cancelled - * upon the next opportunity to run it. This has no effect if - * this is a one-shot task that has already executed. - * Also, if an execution is in progress, it will complete normally. - * (It may however be interrupted via getThread().interrupt()). - * But if it is a periodic task, future iterations are cancelled. - * @param taskID -- a task reference returned by one of - * the execute commands - * @exception ClassCastException if the taskID argument is not - * of the type returned by an execute command. - **/ - public static void cancel(Object taskID) { - ((TaskNode)taskID).setCancelled(); - } - - - /** The thread used to process commands **/ - protected Thread thread_; - - - /** - * Return the thread being used to process commands, or - * null if there is no such thread. You can use this - * to invoke any special methods on the thread, for - * example, to interrupt it. - **/ - public synchronized Thread getThread() { - return thread_; - } - - /** set thread_ to null to indicate termination **/ - protected synchronized void clearThread() { - thread_ = null; - } - - /** - * Start (or restart) a thread to process commands, or wake - * up an existing thread if one is already running. This - * method can be invoked if the background thread crashed - * due to an unrecoverable exception in an executed command. - **/ - - public synchronized void restart() { - if (thread_ == null) { - thread_ = threadFactory_.newThread(runLoop_); - thread_.start(); - } - else - notify(); - } - - - /** - * Cancel all tasks and interrupt the background thread executing - * the current task, if any. - * A new background thread will be started if new execution - * requests are encountered. If the currently executing task - * does not repsond to interrupts, the current thread may persist, even - * if a new thread is started via restart(). - **/ - public synchronized void shutDown() { - heap_.clear(); - if (thread_ != null) - thread_.interrupt(); - thread_ = null; - } - - /** Return the next task to execute, or null if thread is interrupted **/ - protected synchronized TaskNode nextTask() { - - // Note: This code assumes that there is only one run loop thread - - try { - while (!Thread.interrupted()) { - - // Using peek simplifies dealing with spurious wakeups - - TaskNode task = (TaskNode)(heap_.peek()); - - if (task == null) { - wait(); - } - else { - long now = System.currentTimeMillis(); - long when = task.getTimeToRun(); - - if (when > now) { // false alarm wakeup - wait(when - now); - } - else { - task = (TaskNode)(heap_.extract()); - - if (!task.getCancelled()) { // Skip if cancelled by - - if (task.period > 0) { // If periodic, requeue - task.setTimeToRun(now + task.period); - heap_.insert(task); - } - - return task; - } - } - } - } - } - catch (InterruptedException ex) { Thread.currentThread().interrupt(); /* GemStoneAddition */ } // fall through - - return null; // on interrupt - } - - /** - * The runloop is isolated in its own Runnable class - * just so that the main - * class need not implement Runnable, which would - * allow others to directly invoke run, which is not supported. - **/ - - protected class RunLoop implements Runnable { - public void run() { - try { - for (;;) { - if (Thread.interrupted()) break; // GemStoneAddition - TaskNode task = nextTask(); - if (task != null) - task.command.run(); - else - break; - } - } - finally { - clearThread(); - } - } - } - - protected final RunLoop runLoop_; - - /** - * Create a new ClockDaemon - **/ - - public ClockDaemon() { - runLoop_ = new RunLoop(); - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java deleted file mode 100644 index fb4006d..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java +++ /dev/null @@ -1,277 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: ConditionVariable.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * This class is designed for fans of POSIX pthreads programming. - * If you restrict yourself to Mutexes and CondVars, you can - * use most of your favorite constructions. Don't randomly mix them - * with synchronized methods or blocks though. - *

- * Method names and behavior are as close as is reasonable to - * those in POSIX. - *

- * Sample Usage. Here is a full version of a bounded buffer - * that implements the BoundedChannel interface, written in - * a style reminscent of that in POSIX programming books. - *

- * class CVBuffer implements BoundedChannel {
- *   private final Mutex mutex;
- *   private final CondVar notFull;
- *   private final CondVar notEmpty;
- *   private int count = 0;
- *   private int takePtr = 0;     
- *   private int putPtr = 0;
- *   private final Object[] array;
- * 
- *   public CVBuffer(int capacity) { 
- *     array = new Object[capacity];
- *     mutex = new Mutex();
- *     notFull = new CondVar(mutex);
- *     notEmpty = new CondVar(mutex);
- *   }
- * 
- *   public int capacity() { return array.length; }
- * 
- *   public void put(Object x) throws InterruptedException {
- *     mutex.acquire();
- *     try {
- *       while (count == array.length) {
- *         notFull.await();
- *       }
- *       array[putPtr] = x;
- *       putPtr = (putPtr + 1) % array.length;
- *       ++count;
- *       notEmpty.signal();
- *     }
- *     finally {
- *       mutex.release();
- *     }
- *   }
- * 
- *   public Object take() throws InterruptedException {
- *     Object x = null;
- *     mutex.acquire();
- *     try {
- *       while (count == 0) {
- *         notEmpty.await();
- *       }
- *       x = array[takePtr];
- *       array[takePtr] = null;
- *       takePtr = (takePtr + 1) % array.length;
- *       --count;
- *       notFull.signal();
- *     }
- *     finally {
- *       mutex.release();
- *     }
- *     return x;
- *   }
- * 
- *   public boolean offer(Object x, long msecs) throws InterruptedException {
- *     mutex.acquire();
- *     try {
- *       if (count == array.length) {
- *         notFull.timedwait(msecs);
- *         if (count == array.length)
- *           return false;
- *       }
- *       array[putPtr] = x;
- *       putPtr = (putPtr + 1) % array.length;
- *       ++count;
- *       notEmpty.signal();
- *       return true;
- *     }
- *     finally {
- *       mutex.release();
- *     }
- *   }
- *   
- *   public Object poll(long msecs) throws InterruptedException {
- *     Object x = null;
- *     mutex.acquire();
- *     try {
- *       if (count == 0) {
- *         notEmpty.timedwait(msecs);
- *         if (count == 0)
- *           return null;
- *       }
- *       x = array[takePtr];
- *       array[takePtr] = null;
- *       takePtr = (takePtr + 1) % array.length;
- *       --count;
- *       notFull.signal();
- *     }
- *     finally {
- *       mutex.release();
- *     }
- *     return x;
- *   }
- * }
- *
- * 
- * @see Mutex - *

[ Introduction to this package. ] - - **/ - -public class CondVar { - - /** The mutex **/ - protected final Sync mutex_; - - /** - * Create a new CondVar that relies on the given mutual - * exclusion lock. - * @param mutex A non-reentrant mutual exclusion lock. - * Standard usage is to supply an instance of Mutex, - * but, for example, a Semaphore initialized to 1 also works. - * On the other hand, many other Sync implementations would not - * work here, so some care is required to supply a sensible - * synchronization object. - * In normal use, the mutex should be one that is used for all - * synchronization of the object using the CondVar. Generally, - * to prevent nested monitor lockouts, this - * object should not use any native Java synchronized blocks. - **/ - - public CondVar(Sync mutex) { - mutex_ = mutex; - } - - /** - * Wait for notification. This operation at least momentarily - * releases the mutex. The mutex is always held upon return, - * even if interrupted. - * @exception InterruptedException if the thread was interrupted - * before or during the wait. However, if the thread is interrupted - * after the wait but during mutex re-acquisition, the interruption - * is ignored, while still ensuring - * that the currentThread's interruption state stays true, so can - * be probed by callers. - **/ - public void await() throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - try { - synchronized(this) { - mutex_.release(); - try { - wait(); - } - catch (InterruptedException ex) { - notify(); - throw ex; - } - } - } - finally { - // Must ignore interrupt on re-acquire - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - mutex_.acquire(); - break; - } - catch (InterruptedException ex) { - interrupted = true; - } - finally { // GemStoneAddition - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } -// if (interrupted) { -// Thread.currentThread().interrupt(); -// } - } - } - - /** - * Wait for at most msecs for notification. - * This operation at least momentarily - * releases the mutex. The mutex is always held upon return, - * even if interrupted. - * @param msecs The time to wait. A value less than or equal to zero - * causes a momentarily release - * and re-acquire of the mutex, and always returns false. - * @return false if at least msecs have elapsed - * upon resumption; else true. A - * false return does NOT necessarily imply that the thread was - * not notified. For example, it might have been notified - * after the time elapsed but just before resuming. - * @exception InterruptedException if the thread was interrupted - * before or during the wait. - **/ - - public boolean timedwait(long msecs) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - boolean success = false; - try { - synchronized(this) { - mutex_.release(); - try { - if (msecs > 0) { - long start = System.currentTimeMillis(); - wait(msecs); - success = System.currentTimeMillis() - start <= msecs; - } - } - catch (InterruptedException ex) { - notify(); - throw ex; - } - } - } - finally { - // Must ignore interrupt on re-acquire -// boolean interrupted = false; GemStoneAddition - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - mutex_.acquire(); - break; - } - catch (InterruptedException ex) { - interrupted = true; - } - finally { // GemStoneAddition - if (interrupted) Thread.currentThread().interrupt(); - } - } -// if (interrupted) { -// Thread.currentThread().interrupt(); -// } - } - return success; - } - - /** - * Notify a waiting thread. - * If one exists, a non-interrupted thread will return - * normally (i.e., not via InterruptedException) from await or timedwait. - **/ - public synchronized void signal() { - notify(); - } - - /** Notify all waiting threads **/ - public synchronized void broadcast() { - notifyAll(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java deleted file mode 100644 index d41baa4..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java +++ /dev/null @@ -1,126 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: CountDown.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A CountDown can serve as a simple one-shot barrier. - * A Countdown is initialized - * with a given count value. Each release decrements the count. - * All acquires block until the count reaches zero. Upon reaching - * zero all current acquires are unblocked and all - * subsequent acquires pass without blocking. This is a one-shot - * phenomenon -- the count cannot be reset. - * If you need a version that resets the count, consider - * using a Barrier. - *

- * Sample usage. Here are a set of classes in which - * a group of worker threads use a countdown to - * notify a driver when all threads are complete. - *

- * class Worker implements Runnable { 
- *   private final CountDown done;
- *   Worker(CountDown d) { done = d; }
- *   public void run() {
- *     doWork();
- *    done.release();
- *   }
- * }
- * 
- * class Driver { // ...
- *   void main() {
- *     CountDown done = new CountDown(N);
- *     for (int i = 0; i < N; ++i) 
- *       new Thread(new Worker(done)).start();
- *     doSomethingElse(); 
- *     done.acquire(); // wait for all to finish
- *   } 
- * }
- * 
- * - *

[ Introduction to this package. ] - * -**/ - -public class CountDown implements Sync { - protected final int initialCount_; - protected int count_; - - /** Create a new CountDown with given count value **/ - public CountDown(int count) { count_ = initialCount_ = count; } - - - /* - This could use double-check, but doesn't out of concern - for surprising effects on user programs stemming - from lack of memory barriers with lack of synch. - */ - public void acquire() throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - synchronized(this) { - while (count_ > 0) - wait(); - } - } - - - public boolean attempt(long msecs) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - synchronized(this) { - if (count_ <= 0) - return true; - else if (msecs <= 0) - return false; - else { - long waitTime = msecs; - long start = System.currentTimeMillis(); - for (;;) { - wait(waitTime); - if (count_ <= 0) - return true; - else { - waitTime = msecs - (System.currentTimeMillis() - start); - if (waitTime <= 0) - return false; - } - } - } - } - } - - /** - * Decrement the count. - * After the initialCount'th release, all current and future - * acquires will pass - **/ - public synchronized void release() { - if (--count_ == 0) - notifyAll(); - } - - /** Return the initial count value **/ - public int initialCount() { return initialCount_; } - - - /** - * Return the current count value. - * This is just a snapshot value, that may change immediately - * after returning. - **/ - public synchronized int currentCount() { return count_; } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java deleted file mode 100644 index 9b55352..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java +++ /dev/null @@ -1,299 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: CyclicBarrier.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jul1998 dl Create public version - 28Aug1998 dl minor code simplification -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A cyclic barrier is a reasonable choice for a barrier in contexts - * involving a fixed sized group of threads that - * must occasionally wait for each other. - * (A Rendezvous better handles applications in which - * any number of threads meet, n-at-a-time.) - *

- * CyclicBarriers use an all-or-none breakage model - * for failed synchronization attempts: If threads - * leave a barrier point prematurely because of timeout - * or interruption, others will also leave abnormally - * (via BrokenBarrierException), until - * the barrier is restarted. This is usually - * the simplest and best strategy for sharing knowledge - * about failures among cooperating threads in the most - * common usages contexts of Barriers. - * This implementation has the property that interruptions - * among newly arriving threads can cause as-yet-unresumed - * threads from a previous barrier cycle to return out - * as broken. This transmits breakage - * as early as possible, but with the possible byproduct that - * only some threads returning out of a barrier will realize - * that it is newly broken. (Others will not realize this until a - * future cycle.) (The Rendezvous class has a more uniform, but - * sometimes less desirable policy.) - *

- * Barriers support an optional Runnable command - * that is run once per barrier point. - *

- * Sample usage Here is a code sketch of - * a barrier in a parallel decomposition design. - *

- * class Solver {
- *   final int N;
- *   final float[][] data;
- *   final CyclicBarrier barrier;
- *   
- *   class Worker implements Runnable {
- *      int myRow;
- *      Worker(int row) { myRow = row; }
- *      public void run() {
- *         while (!done()) {
- *            processRow(myRow);
- *
- *            try {
- *              barrier.barrier(); 
- *            }
- *            catch (InterruptedException ex) { return; }
- *            catch (BrokenBarrierException ex) { return; }
- *         }
- *      }
- *   }
- *
- *   public Solver(float[][] matrix) {
- *     data = matrix;
- *     N = matrix.length;
- *     barrier = new CyclicBarrier(N);
- *     barrier.setBarrierCommand(new Runnable() {
- *       public void run() { mergeRows(...); }
- *     });
- *     for (int i = 0; i < N; ++i) {
- *       new Thread(new Worker(i)).start();
- *     waitUntilDone();
- *    }
- * }
- * 
- *

[ Introduction to this package. ] - - **/ -public class CyclicBarrier implements Barrier { - - protected final int parties_; - protected boolean broken_ = false; - protected Runnable barrierCommand_ = null; - protected int count_; // number of parties still waiting - protected int resets_ = 0; // incremented on each release - - /** - * Create a CyclicBarrier for the indicated number of parties, - * and no command to run at each barrier. - * @exception IllegalArgumentException if parties less than or equal to zero. - **/ - - public CyclicBarrier(int parties) { this(parties, null); } - - /** - * Create a CyclicBarrier for the indicated number of parties. - * and the given command to run at each barrier point. - * @exception IllegalArgumentException if parties less than or equal to zero. - **/ - - public CyclicBarrier(int parties, Runnable command) { - if (parties <= 0) throw new IllegalArgumentException(); - parties_ = parties; - count_ = parties; - barrierCommand_ = command; - } - - /** - * Set the command to run at the point at which all threads reach the - * barrier. This command is run exactly once, by the thread - * that trips the barrier. The command is not run if the barrier is - * broken. - * @param command the command to run. If null, no command is run. - * @return the previous command - **/ - - public synchronized Runnable setBarrierCommand(Runnable command) { - Runnable old = barrierCommand_; - barrierCommand_ = command; - return old; - } - - public synchronized boolean broken() { return broken_; } - - /** - * Reset to initial state. Clears both the broken status - * and any record of waiting threads, and releases all - * currently waiting threads with indeterminate return status. - * This method is intended only for use in recovery actions - * in which it is somehow known - * that no thread could possibly be relying on the - * the synchronization properties of this barrier. - **/ - - public synchronized void restart() { - broken_ = false; - ++resets_; - count_ = parties_; - notifyAll(); - } - - - public int parties() { return parties_; } - - /** - * Enter barrier and wait for the other parties()-1 threads. - * @return the arrival index: the number of other parties - * that were still waiting - * upon entry. This is a unique value from zero to parties()-1. - * If it is zero, then the current - * thread was the last party to hit barrier point - * and so was responsible for releasing the others. - * @exception BrokenBarrierException if any other thread - * in any previous or current barrier - * since either creation or the last restart - * operation left the barrier - * prematurely due to interruption or time-out. (If so, - * the broken status is also set.) - * Threads that are notified to have been - * interrupted after being released are not considered - * to have broken the barrier. - * In all cases, the interruption - * status of the current thread is preserved, so can be tested - * by checking Thread.interrupted. - * @exception InterruptedException if this thread was interrupted - * during the barrier, and was the one causing breakage. - * If so, broken status is also set. - **/ - - public int barrier() throws InterruptedException, BrokenBarrierException { - if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - return doBarrier(false, 0); - } - - /** - * Enter barrier and wait at most msecs for the other parties()-1 threads. - * @return if not timed out, the arrival index: the number of other parties - * that were still waiting - * upon entry. This is a unique value from zero to parties()-1. - * If it is zero, then the current - * thread was the last party to hit barrier point - * and so was responsible for releasing the others. - * @exception BrokenBarrierException - * if any other thread - * in any previous or current barrier - * since either creation or the last restart - * operation left the barrier - * prematurely due to interruption or time-out. (If so, - * the broken status is also set.) - * Threads that are noticed to have been - * interrupted after being released are not considered - * to have broken the barrier. - * In all cases, the interruption - * status of the current thread is preserved, so can be tested - * by checking Thread.interrupted. - * @exception InterruptedException if this thread was interrupted - * during the barrier. If so, broken status is also set. - * @exception TimeoutException if this thread timed out waiting for - * the barrier. If the timeout occured while already in the - * barrier, broken status is also set. - **/ - - public int attemptBarrier(long msecs) - throws InterruptedException, TimeoutException, BrokenBarrierException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in doBarrier - return doBarrier(true, msecs); - } - - protected synchronized int doBarrier(boolean timed, long msecs) - throws InterruptedException, TimeoutException, BrokenBarrierException { - - if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - - int index = --count_; - - if (broken_) { - throw new BrokenBarrierException(index); - } - else if (Thread.interrupted()) { - broken_ = true; - notifyAll(); - throw new InterruptedException(); - } - else if (index == 0) { // tripped - count_ = parties_; - ++resets_; - notifyAll(); - try { - if (barrierCommand_ != null) - barrierCommand_.run(); - return 0; - } - catch (RuntimeException ex) { - broken_ = true; - return 0; - } - } - else if (timed && msecs <= 0) { - broken_ = true; - notifyAll(); - throw new TimeoutException(msecs); - } - else { // wait until next reset - int r = resets_; - long startTime = (timed)? System.currentTimeMillis() : 0; - long waitTime = msecs; - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - wait(waitTime); - } - catch (InterruptedException ex) { - // Only claim that broken if interrupted before reset - if (resets_ == r) { - broken_ = true; - notifyAll(); - throw ex; - } - else { -// Thread.currentThread().interrupt(); // propagate - interrupted = true; // GemStoneAddition - } - } - finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - - if (broken_) - throw new BrokenBarrierException(index); - - else if (r != resets_) - return index; - - else if (timed) { - waitTime = msecs - (System.currentTimeMillis() - startTime); - if (waitTime <= 0) { - broken_ = true; - notifyAll(); - throw new TimeoutException(msecs); - } - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java deleted file mode 100644 index 3231870..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java +++ /dev/null @@ -1,58 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: DefaultChannelCapacity.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A utility class to set the default capacity of - * BoundedChannel - * implementations that otherwise require a capacity argument - * @see BoundedChannel - * [ Introduction to this package. ]

- **/ - -public class DefaultChannelCapacity { - - /** The initial value of the default capacity is 1024 **/ - public static final int INITIAL_DEFAULT_CAPACITY = 1024; - - /** the current default capacity **/ - private static final SynchronizedInt defaultCapacity_ = - new SynchronizedInt(INITIAL_DEFAULT_CAPACITY); - - /** - * Set the default capacity used in - * default (no-argument) constructor for BoundedChannels - * that otherwise require a capacity argument. - * @exception IllegalArgumentException if capacity less or equal to zero - */ - public static void set(int capacity) { - if (capacity <= 0) throw new IllegalArgumentException(); - defaultCapacity_.set(capacity); - } - - /** - * Get the default capacity used in - * default (no-argument) constructor for BoundedChannels - * that otherwise require a capacity argument. - * Initial value is INITIAL_DEFAULT_CAPACITY - * @see #INITIAL_DEFAULT_CAPACITY - */ - public static int get() { - return defaultCapacity_.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java deleted file mode 100644 index a87b8bd..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java +++ /dev/null @@ -1,36 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: DirectExecutor.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 21Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * - * An implementation of Executor that - * invokes the run method of the supplied command and then returns. - * - *

[ Introduction to this package. ] - **/ -public class DirectExecutor implements Executor { - /** - * Execute the given command directly in the current thread. - **/ - public void execute(Runnable command) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - - command.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java deleted file mode 100644 index 3625c04..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java +++ /dev/null @@ -1,70 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Executor.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 19Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * Interface for objects that execute Runnables, - * as well as various objects that can be wrapped - * as Runnables. - * The main reason to use Executor throughout a program or - * subsystem is to provide flexibility: You can easily - * change from using thread-per-task to using pools or - * queuing, without needing to change most of your code that - * generates tasks. - *

- * The general intent is that execution be asynchronous, - * or at least independent of the caller. For example, - * one of the simplest implementations of execute - * (as performed in ThreadedExecutor) - * is new Thread(command).start();. - * However, this interface allows implementations that instead - * employ queueing or pooling, or perform additional - * bookkeeping. - *

- * - *

[ Introduction to this package. ] - **/ -public interface Executor { - /** - * Execute the given command. This method is guaranteed - * only to arrange for execution, that may actually - * occur sometime later; for example in a new - * thread. However, in fully generic use, callers - * should be prepared for execution to occur in - * any fashion at all, including immediate direct - * execution. - *

- * The method is defined not to throw - * any checked exceptions during execution of the command. Generally, - * any problems encountered will be asynchronous and - * so must be dealt with via callbacks or error handler - * objects. If necessary, any context-dependent - * catastrophic errors encountered during - * actions that arrange for execution could be accompanied - * by throwing context-dependent unchecked exceptions. - *

- * However, the method does throw InterruptedException: - * It will fail to arrange for execution - * if the current thread is currently interrupted. - * Further, the general contract of the method is to avoid, - * suppress, or abort execution if interruption is detected - * in any controllable context surrounding execution. - **/ - public void execute(Runnable command) throws InterruptedException; - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java deleted file mode 100644 index 32081a9..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java +++ /dev/null @@ -1,198 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: FIFOReadWriteLock.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version - 23nov2001 dl Replace main algorithm with fairer - version based on one by Alexander Terekhov -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - - -/** - * This class implements a policy for reader/writer locks in which - * threads contend in a First-in/First-out manner for access (modulo - * the limitations of FIFOSemaphore, which is used for queuing). This - * policy does not particularly favor readers or writers. As a - * byproduct of the FIFO policy, the attempt methods may - * return false even when the lock might logically be - * available, but, due to contention, cannot be accessed within the - * given time bound.

- * - * This lock is NOT reentrant. Current readers and - * writers should not try to re-obtain locks while holding them. - *

- * - * [ Introduction to this package. ]

- * - * @see FIFOSemaphore -**/ - -public class FIFOReadWriteLock implements ReadWriteLock { - - /** - * Fair Semaphore serving as a kind of mutual exclusion lock. - * Writers acquire on entry, and hold until rwlock exit. - * Readers acquire and release only during entry (but are - * blocked from doing so if there is an active writer). - **/ - protected final FIFOSemaphore entryLock = new FIFOSemaphore(1); - - /** - * Number of threads that have entered read lock. Note that this is - * never reset to zero. Incremented only during acquisition of read - * lock while the "entryLock" is held, but read elsewhere, so is - * declared volatile. - **/ - protected volatile int readers; - - /** - * Number of threads that have exited read lock. Note that this is - * never reset to zero. Accessed only in code protected by - * synchronized(this). When exreaders != readers, the rwlock is - * being used for reading. Else if the entry lock is held, it is - * being used for writing (or in transition). Else it is free. - * Note: To distinguish these states, we assume that fewer than 2^32 - * reader threads can simultaneously execute. - **/ - protected int exreaders; - - protected void acquireRead() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquire - entryLock.acquire(); - ++readers; - entryLock.release(); - } - - protected synchronized void releaseRead() { - /* - If this is the last reader, notify a possibly waiting writer. - Because waits occur only when entry lock is held, at most one - writer can be waiting for this notification. Because increments - to "readers" aren't protected by "this" lock, the notification - may be spurious (when an incoming reader in in the process of - updating the field), but at the point tested in acquiring write - lock, both locks will be held, thus avoiding false alarms. And - we will never miss an opportunity to send a notification when it - is actually needed. - */ - - if (++exreaders == readers) - notify(); - } - - protected void acquireWrite() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquire - // Acquiring entryLock first forces subsequent entering readers - // (as well as writers) to block. - entryLock.acquire(); - - // Only read "readers" once now before loop. We know it won't - // change because we hold the entry lock needed to update it. - int r = readers; - - try { - synchronized(this) { - while (exreaders != r) - wait(); - } - } - catch (InterruptedException ie) { - entryLock.release(); - throw ie; - } - } - - protected void releaseWrite() { - entryLock.release(); - } - - protected boolean attemptRead(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attempt - if (!entryLock.attempt(msecs)) - return false; - - ++readers; - entryLock.release(); - return true; - } - - protected boolean attemptWrite(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attempt - long startTime = (msecs <= 0)? 0 : System.currentTimeMillis(); - - if (!entryLock.attempt(msecs)) - return false; - - int r = readers; - - try { - synchronized(this) { - while (exreaders != r) { - long timeLeft = (msecs <= 0)? 0: - msecs - (System.currentTimeMillis() - startTime); - - if (timeLeft <= 0) { - entryLock.release(); - return false; - } - - wait(timeLeft); - } - return true; - } - } - catch (InterruptedException ie) { - entryLock.release(); - throw ie; - } - } - - // support for ReadWriteLock interface - - protected class ReaderSync implements Sync { - public void acquire() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquireRead - acquireRead(); - } - public void release() { - releaseRead(); - } - public boolean attempt(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attemptRead - return attemptRead(msecs); - } - } - - protected class WriterSync implements Sync { - public void acquire() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquireWrite - acquireWrite(); - } - public void release() { - releaseWrite(); - } - public boolean attempt(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attemptWrite - return attemptWrite(msecs); - } - } - - protected final Sync readerSync = new ReaderSync(); - protected final Sync writerSync = new WriterSync(); - - public Sync writeLock() { return writerSync; } - public Sync readLock() { return readerSync; } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java deleted file mode 100644 index a7f8a0f..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java +++ /dev/null @@ -1,84 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: FIFOSemaphore.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A First-in/First-out implementation of a Semaphore. - * Waiting requests will be satisified in - * the order that the processing of those requests got to a certain point. - * If this sounds vague it is meant to be. FIFO implies a - * logical timestamping at some point in the processing of the - * request. To simplify things we don't actually timestamp but - * simply store things in a FIFO queue. Thus the order in which - * requests enter the queue will be the order in which they come - * out. This order need not have any relationship to the order in - * which requests were made, nor the order in which requests - * actually return to the caller. These depend on Java thread - * scheduling which is not guaranteed to be predictable (although - * JVMs tend not to go out of their way to be unfair). - *

[ Introduction to this package. ] -**/ - -public class FIFOSemaphore extends QueuedSemaphore { - - /** - * Create a Semaphore with the given initial number of permits. - * Using a seed of one makes the semaphore act as a mutual exclusion lock. - * Negative seeds are also allowed, in which case no acquires will proceed - * until the number of releases has pushed the number of permits past 0. - **/ - - public FIFOSemaphore(long initialPermits) { - super(new FIFOWaitQueue(), initialPermits); - } - - /** - * Simple linked list queue used in FIFOSemaphore. - * Methods are not synchronized; they depend on synch of callers - **/ - - protected static class FIFOWaitQueue extends WaitQueue { - protected WaitNode head_ = null; - protected WaitNode tail_ = null; - - @Override // GemStoneAddition - protected void insert(WaitNode w) { - if (tail_ == null) - head_ = tail_ = w; - else { - tail_.next = w; - tail_ = w; - } - } - - @Override // GemStoneAddition - protected WaitNode extract() { - if (head_ == null) - return null; - else { - WaitNode w = head_; - head_ = w.next; - if (head_ == null) tail_ = null; - w.next = null; - return w; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java deleted file mode 100644 index c7e9827..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java +++ /dev/null @@ -1,535 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Task.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 7Jan1999 dl first release - 14jan1999 dl simplify start() semantics; - improve documentation - 18Jan1999 dl Eliminate useless time-based waits. - 7Mar1999 dl Add reset method, - add array-based composite operations - 27Apr1999 dl Rename -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - - -/** - * Abstract base class for Fork/Join Tasks. - * - *

- * FJTasks are lightweight, stripped-down analogs of Threads. - * Many FJTasks share the same pool of Java threads. This is - * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that - * mainly contain - * methods called only internally by FJTasks. - * FJTasks support versions of the most common methods found in class Thread, - * including start(), yield() and join(). However, they - * don't support priorities, ThreadGroups or other bookkeeping - * or control methods of class Thread. - *

- * FJTasks should normally be defined by subclassing and adding a run() method. - * Alternatively, static inner class Wrap(Runnable r) - * can be used to - * wrap an existing Runnable object in a FJTask. - *

- * FJTaskRunnerGroup.execute(FJTask) can be used to - * initiate a FJTask from a non-FJTask thread. - * And FJTaskRunnerGroup.invoke(FJTask) can be used to initiate - * a FJTask and then wait for it to complete before returning. - * These are the only entry-points from normal threads to FJTasks. - * Most FJTask methods themselves may only be called from within running FJTasks. - * They throw ClassCastExceptions if they are not, - * reflecting the fact that these methods - * can only be executed using FJTaskRunner threads, not generic - * java.lang.Threads. - *

- * There are three different ways to run a FJTask, - * with different scheduling semantics: - *

    - *
  • FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask)) - * behaves pretty much like Thread.start(). It enqueues a task to be - * run the next time any FJTaskRunner thread is otherwise idle. - * It maintains standard FIFO ordering with respect to - * the group of worker threads. - *
  • FJTask.fork() (as well as the two-task spawning method, - * coInvoke(task1, task2), and the array version - * coInvoke(FJTask[] tasks)) starts a task - * that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread as the one that created it, but is FIFO - * with respect to other tasks if it is run by - * other worker threads. That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - * Fork() is noticeably faster than start(), but can only be - * used when these scheduling semantics are acceptable. - *
  • FJTask.invoke(FJTask) just executes the run method - * of one task from within another. It is the analog of a - * direct call. - *
- *

- * The main economies of FJTasks stem from the fact that - * FJTasks do not support blocking operations of any kind. - * FJTasks should just run to completion without - * issuing waits or performing blocking IO. - * There are several styles for creating the run methods that - * execute as tasks, including - * event-style methods, and pure computational methods. - * Generally, the best kinds of FJTasks are those that in turn - * generate other FJTasks. - *

- * There is nothing actually - * preventing you from blocking within a FJTask, and very short waits/blocks are - * completely well behaved. But FJTasks are not designed - * to support arbitrary synchronization - * since there is no way to suspend and resume individual tasks - * once they have begun executing. FJTasks should also be finite - * in duration -- they should not contain infinite loops. - * FJTasks that might need to perform a blocking - * action, or hold locks for extended periods, or - * loop forever can instead create normal - * java Thread objects that will do so. FJTasks are just not - * designed to support these things. - * FJTasks may however yield() control to allow their FJTaskRunner threads - * to run other tasks, - * and may wait for other dependent tasks via join(). These - * are the only coordination mechanisms supported by FJTasks. - *

- * FJTasks, and the FJTaskRunners that execute them are not - * intrinsically robust with respect to exceptions. - * A FJTask that aborts via an exception does not automatically - * have its completion flag (isDone) set. - * As with ordinary Threads, an uncaught exception will normally cause - * its FJTaskRunner thread to die, which in turn may sometimes - * cause other computations being performed to hang or abort. - * You can of course - * do better by trapping exceptions inside the run methods of FJTasks. - *

- * The overhead differences between FJTasks and Threads are substantial, - * especially when using fork() or coInvoke(). - * FJTasks can be two or three orders of magnitude faster than Threads, - * at least when run on JVMs with high-performance garbage collection - * (every FJTask quickly becomes garbage) and good native thread support. - *

- * Given these overhead savings, you might be tempted to use FJTasks for - * everything you would use a normal Thread to do. Don't. Java Threads - * remain better for general purpose thread-based programming. Remember - * that FJTasks cannot be used for designs involving arbitrary blocking - * synchronization or I/O. Extending FJTasks to support such capabilities - * would amount to re-inventing the Thread class, and would make them - * less optimal in the contexts that they were designed for. - *

[ Introduction to this package. ] - *

- * @see FJTaskRunner - * @see FJTaskRunnerGroup - **/ - -public abstract class FJTask implements Runnable { - - /** - * The only status information associated with FJTasks is whether - * the they are considered to have completed. - * It is set true automatically within - * FJTaskRunner methods upon completion - * of the run method, or manually via cancel. - **/ - - private volatile boolean done; // = false; - - /** - * Return the FJTaskRunner thread running the current FJTask. - * Most FJTask methods are just relays to their current - * FJTaskRunners, that perform the indicated actions. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - - public static FJTaskRunner getFJTaskRunner() { - return (FJTaskRunner)(Thread.currentThread()); - } - - /** - * Return the FJTaskRunnerGroup of the thread running the current FJTask. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - public static FJTaskRunnerGroup getFJTaskRunnerGroup() { - return getFJTaskRunner().getGroup(); - } - - - /** - * Return true if current task has terminated or been cancelled. - * The method is a simple analog of the Thread.isAlive() - * method. However, it reports true only when the task has terminated - * or has been cancelled. It does not distinguish these two cases. - * And there is no way to determine whether a FJTask has been started - * or is currently executing. - **/ - - public final boolean isDone() { return done; } - - /** - * Indicate termination. Intended only to be called by FJTaskRunner. - * FJTasks themselves should use (non-final) method - * cancel() to suppress execution. - **/ - - protected final void setDone() { done = true; } - - /** - * Set the termination status of this task. This simple-minded - * analog of Thread.interrupt - * causes the task not to execute if it has not already been started. - * Cancelling a running FJTask - * has no effect unless the run method itself uses isDone() - * to probe cancellation and take appropriate action. - * Individual run() methods may sense status and - * act accordingly, normally by returning early. - **/ - - public void cancel() { setDone(); } - - - /** - * Clear the termination status of this task. - * This method is intended to be used - * only as a means to allow task objects to be recycled. It should - * be called only when you are sure that the previous - * execution of this task has terminated and, if applicable, has - * been joined by all other waiting tasks. Usage in any other - * context is a very bad idea. - **/ - - public void reset() { done = false; } - - - /** - * Execute this task. This method merely places the task in a - * group-wide scheduling queue. - * It will be run - * the next time any TaskRunner thread is otherwise idle. - * This scheduling maintains FIFO ordering of started tasks - * with respect to - * the group of worker threads. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void start() { getFJTaskRunnerGroup().executeTask(this); } - - - /** - * Arrange for execution of a strictly dependent task. - * The task that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread, but is FIFO with respect to other tasks - * forked by this thread when taken by other worker threads. - * That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - *

- * Fork() is noticeably - * faster than start(). However, it may only - * be used for strictly dependent tasks -- generally, those that - * could logically be issued as straight method calls without - * changing the logic of the program. - * The method is optimized for use in parallel fork/join designs - * in which the thread that issues one or more forks - * cannot continue until at least some of the forked - * threads terminate and are joined. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void fork() { getFJTaskRunner().push(this); } - - /** - * Allow the current underlying FJTaskRunner thread to process other tasks. - *

- * Spinloops based on yield() are well behaved so long - * as the event or condition being waited for is produced via another - * FJTask. Additionally, you must never hold a lock - * while performing a yield or join. (This is because - * multiple FJTasks can be run by the same Thread during - * a yield. Since java locks are held per-thread, the lock would not - * maintain the conceptual exclusion you have in mind.) - *

- * Otherwise, spinloops using - * yield are the main construction of choice when a task must wait - * for a condition that it is sure will eventually occur because it - * is being produced by some other FJTask. The most common - * such condition is built-in: join() repeatedly yields until a task - * has terminated after producing some needed results. You can also - * use yield to wait for callbacks from other FJTasks, to wait for - * status flags to be set, and so on. However, in all these cases, - * you should be confident that the condition being waited for will - * occur, essentially always because it is produced by - * a FJTask generated by the current task, or one of its subtasks. - * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void yield() { getFJTaskRunner().taskYield(); } - - /** - * Yield until this task isDone. - * Equivalent to while(!isDone()) yield(); - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void join() { getFJTaskRunner().taskJoin(this); } - - /** - * Immediately execute task t by calling its run method. Has no - * effect if t has already been run or has been cancelled. - * It is equivalent to calling t.run except that it - * deals with completion status, so should always be used - * instead of directly calling run. - * The method can be useful - * when a computation has been packaged as a FJTask, but you just need to - * directly execute its body from within some other task. - **/ - - public static void invoke(FJTask t) { - if (!t.isDone()) { - t.run(); - t.setDone(); - } - } - - /** - * Fork both tasks and then wait for their completion. It behaves as: - *

-   * task1.fork(); task2.fork(); task2.join(); task1.join();
-   * 
- * As a simple classic example, here is - * a class that computes the Fibonacci function: - *
-   * public class Fib extends FJTask {
-   * 
-   *  // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2);  for n> 1
-   *  //          fibonacci(0) = 0; 
-   *  //          fibonacci(1) = 1.       
-   *
-   *  // Value to compute fibonacci function for.
-   *  // It is replaced with the answer when computed.
-   *  private volatile int number;
-   *
-   *  public Fib(int n) { number = n; }
-   *
-   *  public int getAnswer() {
-   *    if (!isDone()) throw new Error("Not yet computed");
-   *    return number;
-   *  }
-   *
-   *  public void run() {
-   *    int n = number;
-   *    if (n > 1) {
-   *      Fib f1 = new Fib(n - 1);
-   *      Fib f2 = new Fib(n - 2);
-   *
-   *      coInvoke(f1, f2); // run these in parallel
-   *
-   *      // we know f1 and f2 are computed, so just directly access numbers
-   *      number = f1.number + f2.number;
-   *    }
-   *  }
-   *
-   *  public static void main(String[] args) { // sample driver
-   *    try {
-   *      int groupSize = 2;    // 2 worker threads
-   *      int num = 35;         // compute fib(35)
-   *      FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
-   *      Fib f = new Fib(num);
-   *      group.invoke(f);
-   *      int result = f.getAnswer();
-   *      System.out.println(" Answer: " + result);
-   *    }
-   *    catch (InterruptedException ex) {
-   *      System.out.println("Interrupted");
-   *    }
-   *  }
-   * }
-   * 
- * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void coInvoke(FJTask task1, FJTask task2) { - getFJTaskRunner().coInvoke(task1, task2); - } - - - /** - * Fork all tasks in array, and await their completion. - * Behaviorally equivalent to: - *
-   * for (int i = 0; i < tasks.length; ++i) tasks[i].fork();
-   * for (int i = 0; i < tasks.length; ++i) tasks[i].join();
-   * 
- **/ - - public static void coInvoke(FJTask[] tasks) { - getFJTaskRunner().coInvoke(tasks); - } - - /** - * A FJTask that holds a Runnable r, and calls r.run when executed. - * The class is a simple utilty to allow arbitrary Runnables - * to be used as FJTasks. - **/ - - public static class Wrap extends FJTask { - protected final Runnable runnable; - public Wrap(Runnable r) { runnable = r; } - public void run() { runnable.run(); } - } - - - /** - * A new Seq, when executed, - * invokes each task provided in the constructor, in order. - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Seq extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in order - **/ - public Seq(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Seq(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - public void run() { - for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in array order - **/ - - public static FJTask seq(FJTask[] tasks) { - return new Seq(tasks); - } - - /** - * A new Par, when executed, - * runs the tasks provided in the constructor in parallel using - * coInvoke(tasks). - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Par extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in parallel - **/ - public Par(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Par(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - - public void run() { - FJTask.coInvoke(tasks); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in parallel using coInvoke - **/ - public static FJTask par(FJTask[] tasks) { - return new Par(tasks); - } - - /** - * A new Seq2(task1, task2), when executed, - * invokes task1 and then task2, in order. - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Seq2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Seq2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.invoke(fst); - FJTask.invoke(snd); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in order - **/ - - public static FJTask seq(FJTask task1, FJTask task2) { - return new Seq2(task1, task2); - } - - /** - * A new Par(task1, task2), when executed, - * runs task1 and task2 in parallel using coInvoke(task1, task2). - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Par2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Par2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.coInvoke(fst, snd); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in parallel - **/ - public static FJTask par(FJTask task1, FJTask task2) { - return new Par2(task1, task2); - } - -}