geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [38/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:23:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
deleted file mode 100644
index 6d6555a..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
+++ /dev/null
@@ -1,979 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: FJTaskRunner.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  7Jan1999   dl                 First public release
-  13Jan1999  dl                 correct a stat counter update; 
-                                ensure inactive status on run termination;
-                                misc minor cleaup
-  14Jan1999  dl                 Use random starting point in scan;
-                                variable renamings.
-  18Jan1999  dl                 Runloop allowed to die on task exception;
-                                remove useless timed join
-  22Jan1999  dl                 Rework scan to allow use of priorities.
-  6Feb1999   dl                 Documentation updates.
-  7Mar1999   dl                 Add array-based coInvoke
-  31Mar1999  dl                 Revise scan to remove need for NullTasks
-  27Apr1999  dl                 Renamed
-  23oct1999  dl                 Earlier detect of interrupt in scanWhileIdling
-  24nov1999  dl                 Now works on JVMs that do not properly
-                                implement read-after-write of 2 volatiles.
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-
-import java.util.Random;
-
-/**
- * Specialized Thread subclass for running FJTasks.
- * <p>
- * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
- * Double-ended queues support stack-based operations
- * push and pop, as well as queue-based operations put and take.
- * Normally, threads run their own tasks. But they
- * may also steal tasks from each others DEQs.
- * <p>
- * The algorithms are minor variants of those used
- * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
- * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
- * to a lesser extent 
- * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
- * but are adapted to work in Java.
- * <p>
- * The two most important capabilities are:
- * <ul>
- *  <li> Fork a FJTask: 
- *  <pre>
- *  Push task onto DEQ
- *  </pre>
- *  <li> Get a task to run (for example within taskYield)
- *  <pre>
- *  If DEQ is not empty, 
- *     Pop a task and run it.
- *  Else if any other DEQ is not empty, 
- *     Take ("steal") a task from it and run it.
- *  Else if the entry queue for our group is not empty,
- *     Take a task from it and run it.
- *  Else if current thread is otherwise idling
- *     If all threads are idling
- *        Wait for a task to be put on group entry queue
- *  Else
- *      Yield or Sleep for a while, and then retry
- *  </pre>
- * </ul>
- * The push, pop, and put are designed to only ever called by the
- * current thread, and take (steal) is only ever called by
- * other threads.
- * All other operations are composites and variants of these,
- * plus a few miscellaneous bookkeeping methods.
- * <p>
- * Implementations of the underlying representations and operations
- * are geared for use on JVMs operating on multiple CPUs (although
- * they should of course work fine on single CPUs as well).
- * <p>
- * A possible snapshot of a FJTaskRunner's DEQ is:
- * <pre>
- *     0     1     2     3     4     5     6    ...
- *  +-----+-----+-----+-----+-----+-----+-----+--
- *  |     |  t  |  t  |  t  |  t  |     |     | ...  deq array
- *  +-----+-----+-----+-----+-----+-----+-----+--
- *           ^                       ^
- *          base                    top 
- *   (incremented                     (incremented 
- *       on take,                         on push    
- *    decremented                     decremented
- *       on put)                          on pop)
- * </pre>
- * <p>
- * FJTasks are held in elements of the DEQ. 
- * They are maintained in a bounded array that
- * works similarly to a circular bounded buffer. To ensure
- * visibility of stolen FJTasks across threads, the array elements
- * must be <code>volatile</code>. 
- * Using volatile rather than synchronizing suffices here since
- * each task accessed by a thread is either one that it
- * created or one that has never seen before. Thus we cannot
- * encounter any staleness problems executing run methods,
- * although FJTask programmers must be still sure to either synch or use
- * volatile for shared data within their run methods.
- * <p>
- * However, since there is no way
- * to declare an array of volatiles in Java, the DEQ elements actually
- * hold VolatileTaskRef objects, each of which in turn holds a
- * volatile reference to a FJTask. 
- * Even with the double-indirection overhead of 
- * volatile refs, using an array for the DEQ works out
- * better than linking them since fewer shared
- * memory locations need to be
- * touched or modified by the threads while using the DEQ.
- * Further, the double indirection may alleviate cache-line
- * sharing effects (which cannot otherwise be directly dealt with in Java).
- * <p>
- * The indices for the <code>base</code> and <code>top</code> of the DEQ
- * are declared as volatile. The main contention point with
- * multiple FJTaskRunner threads occurs when one thread is trying
- * to pop its own stack while another is trying to steal from it.
- * This is handled via a specialization of Dekker's algorithm,
- * in which the popping thread pre-decrements <code>top</code>,
- * and then checks it against <code>base</code>. 
- * To be conservative in the face of JVMs that only partially
- * honor the specification for volatile, the pop proceeds
- * without synchronization only if there are apparently enough
- * items for both a simultaneous pop and take to succeed.
- * It otherwise enters a 
- * synchronized lock to check if the DEQ is actually empty,
- * if so failing. The stealing thread
- * does almost the opposite, but is set up to be less likely
- * to win in cases of contention: Steals always run under synchronized
- * locks in order to avoid conflicts with other ongoing steals.
- * They pre-increment <code>base</code>, and then check against
- * <code>top</code>. They back out (resetting the base index 
- * and failing to steal) if the
- * DEQ is empty or is about to become empty by an ongoing pop.
- * <p>
- * A push operation can normally run concurrently with a steal.
- * A push enters a synch lock only if the DEQ appears full so must
- * either be resized or have indices adjusted due to wrap-around
- * of the bounded DEQ. The put operation always requires synchronization.
- * <p>
- * When a FJTaskRunner thread has no tasks of its own to run, 
- * it tries to be a good citizen. 
- * Threads run at lower priority while scanning for work.
- * <p>
- * If the task is currently waiting
- * via yield, the thread alternates scans (starting at a randomly 
- * chosen victim) with Thread.yields. This is
- * well-behaved so long as the JVM handles Thread.yield in a
- * sensible fashion. (It need not. Thread.yield is so underspecified
- * that it is legal for a JVM to treat it as a no-op.) This also
- * keeps things well-behaved even if we are running on a uniprocessor
- * JVM using a simple cooperative threading model.
- * <p>
- * If a thread needing work is
- * is otherwise idle (which occurs only in the main runloop), and
- * there are no available tasks to steal or poll, it
- * instead enters into a sleep-based (actually timed wait(msec))
- * phase in which it progressively sleeps for longer durations
- * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
- * currently 100ms) between scans. 
- * If all threads in the group
- * are idling, they further progress to a hard wait phase, suspending
- * until a new task is entered into the FJTaskRunnerGroup entry queue.
- * A sleeping FJTaskRunner thread may be awakened by a new
- * task being put into the group entry queue or by another FJTaskRunner
- * becoming active, but not merely by some DEQ becoming non-empty.
- * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
- * in cases where all but one worker thread start sleeping
- * even though there will eventually be work produced
- * by a thread that is taking a long time to place tasks in DEQ.
- * These sleep mechanics are handled in the FJTaskRunnerGroup class.
- * <p>
- * Composite operations such as taskJoin include heavy
- * manual inlining of the most time-critical operations
- * (mainly FJTask.invoke). 
- * This opens up a few opportunities for further hand-optimizations. 
- * Until Java compilers get a lot smarter, these tweaks
- * improve performance significantly enough for task-intensive 
- * programs to be worth the poorer maintainability and code duplication.
- * <p>
- * Because they are so fragile and performance-sensitive, nearly
- * all methods are declared as final. However, nearly all fields
- * and methods are also declared as protected, so it is possible,
- * with much care, to extend functionality in subclasses. (Normally
- * you would also need to subclass FJTaskRunnerGroup.)
- * <p>
- * None of the normal java.lang.Thread class methods should ever be called
- * on FJTaskRunners. For this reason, it might have been nicer to
- * declare FJTaskRunner as a Runnable to run within a Thread. However,
- * this would have complicated many minor logistics. And since
- * no FJTaskRunner methods should normally be called from outside the
- * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
- * usage.
- * <p>
- * You might think that layering this kind of framework on top of
- * Java threads, which are already several levels removed from raw CPU
- * scheduling on most systems, would lead to very poor performance. 
- * But on the platforms
- * tested, the performance is quite good.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * @see FJTask
- * @see FJTaskRunnerGroup
- **/
-
-public class FJTaskRunner extends Thread {
-  
-  /** The group of which this FJTaskRunner is a member **/
-  protected final FJTaskRunnerGroup group;
-
-  /**
-   *  Constructor called only during FJTaskRunnerGroup initialization
-   **/
-
-  protected FJTaskRunner(FJTaskRunnerGroup g) { 
-    group = g;
-    victimRNG = new Random(System.identityHashCode(this));
-    runPriority = getPriority();
-    setDaemon(true);
-  }
-
-  /**
-   * Return the FJTaskRunnerGroup of which this thread is a member
-   **/
-  
-  protected final FJTaskRunnerGroup getGroup() { return group; }
-
-
-  /* ------------ DEQ Representation ------------------- */
-
-
-  /**
-   * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
-   * elements. The DEQ is grown if necessary, but default value is
-   * normally much more than sufficient unless  there are
-   * user programming errors or questionable operations generating
-   * large numbers of Tasks without running them.
-   * Capacities must be a power of two. 
-   **/
-
-  protected static final int INITIAL_CAPACITY = 4096; 
-
-  /**
-   * The maximum supported DEQ capacity.
-   * When exceeded, FJTaskRunner operations throw Errors
-   **/
-
-  protected static final int MAX_CAPACITY = 1 << 30;
-
-  /**
-   * An object holding a single volatile reference to a FJTask.
-   **/
-  
-  protected final static class VolatileTaskRef {
-    /** The reference **/
-    protected volatile FJTask ref;
-
-    /** Set the reference **/
-    protected final void put(FJTask r) { ref = r; }
-    /** Return the reference **/
-    protected final FJTask get()     { return ref; }
-    /** Return the reference and clear it **/
-    protected final FJTask take()    { FJTask r = ref; ref = null; return r;  }
-
-    /**
-     * Initialization utility for constructing arrays. 
-     * Make an array of given capacity and fill it with
-     * VolatileTaskRefs.
-     **/
-    protected static VolatileTaskRef[] newArray(int cap) {
-      VolatileTaskRef[] a = new VolatileTaskRef[cap];
-      for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
-      return a;
-    }
-
-  }
-
-  /**
-   * The DEQ array.
-   **/
-    
-  protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
-
-  /** Current size of the task DEQ **/
-  protected int deqSize() { return deq.length; }
-
-  /** 
-   * Current top of DEQ. Generally acts just like a stack pointer in an 
-   * array-based stack, except that it circularly wraps around the
-   * array, as in an array-based queue. The value is NOT
-   * always kept within <code>0 ... deq.length</code> though. 
-   * The current top element is always at <code>top & (deq.length-1)</code>.
-   * To avoid integer overflow, top is reset down 
-   * within bounds whenever it is noticed to be out out bounds;
-   * at worst when it is at <code>2 * deq.length</code>.
-   **/
-  protected volatile int top = 0;
-
-
-  /** 
-   * Current base of DEQ. Acts like a take-pointer in an
-   * array-based bounded queue. Same bounds and usage as top.
-   **/
-
-  protected volatile int base = 0;
-
-
-  /**
-   * An extra object to synchronize on in order to
-   * achieve a memory barrier.
-   **/
-
-  protected final Object barrier = new Object();
-
-  /* ------------ Other BookKeeping ------------------- */
-
-  /**
-   * Record whether current thread may be processing a task
-   * (i.e., has been started and is not in an idle wait).
-   * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
-   * stored here for simplicity.
-   **/
-
-  protected boolean active = false;
-
-  /** Random starting point generator for scan() **/
-  protected final Random victimRNG;
-
-
-  /** Priority to use while scanning for work **/
-  protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
-
-  /** Priority to use while running tasks **/
-  protected int runPriority;
-
-  /**
-   * Set the priority to use while scanning.
-   * We do not bother synchronizing access, since
-   * by the time the value is needed, both this FJTaskRunner 
-   * and its FJTaskRunnerGroup will
-   * necessarily have performed enough synchronization
-   * to avoid staleness problems of any consequence.
-   **/
-  protected void setScanPriority(int pri) { scanPriority = pri; }
-
-
-  /**
-   * Set the priority to use while running tasks.
-   * Same usage and rationale as setScanPriority.
-   **/
-  protected void setRunPriority(int pri) {  runPriority = pri; }
-
-  /**
-   * Compile-time constant for statistics gathering.
-   * Even when set, reported values may not be accurate
-   * since all are read and written without synchronization.
-   **/
-
-
-
-  static final boolean COLLECT_STATS = true;
-  // static final boolean COLLECT_STATS = false;
-
-
-  // for stat collection
-
-  /** Total number of tasks run **/
-  protected int runs = 0;
-
-  /** Total number of queues scanned for work **/
-  protected int scans = 0;
-
-  /** Total number of tasks obtained via scan **/
-  protected int steals = 0;
-
-
-
-
-  /* ------------ DEQ operations ------------------- */
-
-
-  /**
-   * Push a task onto DEQ.
-   * Called ONLY by current thread.
-   **/
-
-  protected final void push(final FJTask r) {
-    int t = top;
-
-    /*
-      This test catches both overflows and index wraps.  It doesn't
-      really matter if base value is in the midst of changing in take. 
-      As long as deq length is < 2^30, we are guaranteed to catch wrap in
-      time since base can only be incremented at most length times
-      between pushes (or puts). 
-    */
-
-    if (t < (base & (deq.length-1)) + deq.length) {
-
-      deq[t & (deq.length-1)].put(r);
-      top = t + 1;
-    }
-
-    else  // isolate slow case to increase chances push is inlined
-      slowPush(r); // check overflow and retry
-  }
-
-
-  /**
-   * Handle slow case for push
-   **/
-
-  protected synchronized void slowPush(final FJTask r) {
-    checkOverflow();
-    push(r); // just recurse -- this one is sure to succeed.
-  }
-
-
-  /**
-   * Enqueue task at base of DEQ.
-   * Called ONLY by current thread.
-   * This method is currently not called from class FJTask. It could be used
-   * as a faster way to do FJTask.start, but most users would
-   * find the semantics too confusing and unpredictable.
-   **/
-
-  protected final synchronized void put(final FJTask r) {
-    for (;;) {
-      int b = base - 1;
-      if (top < b + deq.length) {
-        
-        int newBase = b & (deq.length-1);
-        deq[newBase].put(r);
-        base = newBase;
-        
-        if (b != newBase) { // Adjust for index underflow
-          int newTop = top & (deq.length-1);
-          if (newTop < newBase) newTop += deq.length;
-          top = newTop;
-        }
-        return;
-      }
-      else {
-        checkOverflow();
-        // ... and retry
-      }
-    }
-  }
-
-  /**
-   * Return a popped task, or null if DEQ is empty.
-   * Called ONLY by current thread.
-   * <p>
-   * This is not usually called directly but is
-   * instead inlined in callers. This version differs from the
-   * cilk algorithm in that pop does not fully back down and
-   * retry in the case of potential conflict with take. It simply
-   * rechecks under synch lock. This gives a preference
-   * for threads to run their own tasks, which seems to
-   * reduce flailing a bit when there are few tasks to run.
-   **/
-
-  protected final FJTask pop() {
-    /* 
-       Decrement top, to force a contending take to back down.
-    */
-
-    int t = --top;      
-
-    /*
-      To avoid problems with JVMs that do not properly implement
-      read-after-write of a pair of volatiles, we conservatively
-      grab without lock only if the DEQ appears to have at least two
-      elements, thus guaranteeing that both a pop and take will succeed,
-      even if the pre-increment in take is not seen by current thread.
-      Otherwise we recheck under synch.
-    */
-
-    if (base + 1 < t) 
-      return deq[t & (deq.length-1)].take();
-    else
-      return confirmPop(t);
-
-  }
-
-
-  /**
-   * Check under synch lock if DEQ is really empty when doing pop. 
-   * Return task if not empty, else null.
-   **/
-
-  protected final synchronized FJTask confirmPop(int provisionalTop) {
-    if (base <= provisionalTop) 
-      return deq[provisionalTop & (deq.length-1)].take();
-    else {    // was empty
-      /*
-        Reset DEQ indices to zero whenever it is empty.
-        This both avoids unnecessary calls to checkOverflow
-        in push, and helps keep the DEQ from accumulating garbage
-      */
-
-      top = base = 0;
-      return null;
-    }
-  }
-
-
-  /** 
-   * Take a task from the base of the DEQ.
-   * Always called by other threads via scan()
-   **/
-
-  
-  protected final synchronized FJTask take() {
-
-    /*
-      Increment base in order to suppress a contending pop
-    */
-    
-    int b = base++;     
-    
-    if (b < top) 
-      return confirmTake(b);
-    else {
-      // back out
-      base = b; 
-      return null;
-    }
-  }
-
-
-  /**
-   * double-check a potential take
-   **/
-  
-  protected FJTask confirmTake(int oldBase) {
-
-    /*
-      Use a second (guaranteed uncontended) synch
-      to serve as a barrier in case JVM does not
-      properly process read-after-write of 2 volatiles
-    */
-
-    synchronized(barrier) {
-      if (oldBase < top) {
-        /*
-          We cannot call deq[oldBase].take here because of possible races when
-          nulling out versus concurrent push operations.  Resulting
-          accumulated garbage is swept out periodically in
-          checkOverflow, or more typically, just by keeping indices
-          zero-based when found to be empty in pop, which keeps active
-          region small and constantly overwritten. 
-        */
-        
-        return deq[oldBase & (deq.length-1)].get();
-      }
-      else {
-        base = oldBase;
-        return null;
-      }
-    }
-  }
-
-
-  /**
-   * Adjust top and base, and grow DEQ if necessary.
-   * Called only while DEQ synch lock being held.
-   * We don't expect this to be called very often. In most
-   * programs using FJTasks, it is never called.
-   **/
-
-  protected void checkOverflow() { 
-    int t = top;
-    int b = base;
-    
-    if (t - b < deq.length-1) { // check if just need an index reset
-      
-      int newBase = b & (deq.length-1);
-      int newTop  = top & (deq.length-1);
-      if (newTop < newBase) newTop += deq.length;
-      top = newTop;
-      base = newBase;
-      
-      /* 
-         Null out refs to stolen tasks. 
-         This is the only time we can safely do it.
-      */
-      
-      int i = newBase;
-      while (i != newTop && deq[i].ref != null) {
-        deq[i].ref = null;
-        i = (i - 1) & (deq.length-1);
-      }
-      
-    }
-    else { // grow by doubling array
-      
-      int newTop = t - b;
-      int oldcap = deq.length;
-      int newcap = oldcap * 2;
-      
-      if (newcap >= MAX_CAPACITY)
-        throw new Error("FJTask queue maximum capacity exceeded");
-      
-      VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
-      
-      // copy in bottom half of new deq with refs from old deq
-      for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
-      
-      // fill top half of new deq with new refs
-      for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
-      
-      deq = newdeq;
-      base = 0;
-      top = newTop;
-    }
-  }
-
-
-  /* ------------ Scheduling  ------------------- */
-
-
-  /**
-   * Do all but the pop() part of yield or join, by
-   * traversing all DEQs in our group looking for a task to
-   * steal. If none, it checks the entry queue. 
-   * <p>
-   * Since there are no good, portable alternatives,
-   * we rely here on a mixture of Thread.yield and priorities
-   * to reduce wasted spinning, even though these are
-   * not well defined. We are hoping here that the JVM
-   * does something sensible.
-   * @param waitingFor if non-null, the current task being joined
-   **/
-
-  protected void scan(final FJTask waitingFor) {
-
-    FJTask task = null;
-
-    // to delay lowering priority until first failure to steal
-    boolean lowered = false;
-    
-    /*
-      Circularly traverse from a random start index. 
-      
-      This differs slightly from cilk version that uses a random index
-      for each attempted steal.
-      Exhaustive scanning might impede analytic tractablity of 
-      the scheduling policy, but makes it much easier to deal with
-      startup and shutdown.
-    */
-    
-    FJTaskRunner[] ts = group.getArray();
-    int idx = victimRNG.nextInt(ts.length);
-    
-    for (int i = 0; i < ts.length; ++i) {
-      
-      FJTaskRunner t = ts[idx];
-      if (++idx >= ts.length) idx = 0; // circularly traverse
-      
-      if (t != null && t != this) {
-        
-        if (waitingFor != null && waitingFor.isDone()) {
-          break;
-        }
-        else {
-          if (COLLECT_STATS) ++scans;
-          task = t.take();
-          if (task != null) {
-            if (COLLECT_STATS) ++steals;
-            break;
-          }
-          else if (isInterrupted()) {
-            break;
-          }
-          else if (!lowered) { // if this is first fail, lower priority
-            lowered = true;
-            setPriority(scanPriority);
-          }
-          else {           // otherwise we are at low priority; just yield
-            yield();
-          }
-        }
-      }
-      
-    } 
-
-    if (task == null) {
-      if (COLLECT_STATS) ++scans;
-      task = group.pollEntryQueue();
-      if (COLLECT_STATS) if (task != null) ++steals;
-    }
-    
-    if (lowered) setPriority(runPriority);
-    
-    if (task != null && !task.isDone()) {
-      if (COLLECT_STATS) ++runs;
-      task.run(); 
-      task.setDone(); 
-    }
-
-  }
-
-  /**
-   * Same as scan, but called when current thread is idling.
-   * It repeatedly scans other threads for tasks,
-   * sleeping while none are available. 
-   * <p>
-   * This differs from scan mainly in that
-   * since there is no reason to return to recheck any
-   * condition, we iterate until a task is found, backing
-   * off via sleeps if necessary.
-   **/
-
-  protected void scanWhileIdling() {
-    FJTask task = null;
-    
-    boolean lowered = false;
-    long iters = 0;
-    
-    FJTaskRunner[] ts = group.getArray();
-    int idx = victimRNG.nextInt(ts.length);
-    
-    do {
-      for (int i = 0; i < ts.length; ++i) {
-        
-        FJTaskRunner t = ts[idx];
-        if (++idx >= ts.length) idx = 0; // circularly traverse
-        
-        if (t != null && t != this) {
-          if (COLLECT_STATS) ++scans;
-          
-          task = t.take();
-          if (task != null) {
-            if (COLLECT_STATS) ++steals;
-            if (lowered) setPriority(runPriority);
-            group.setActive(this);
-            break;
-          }
-        }
-      } 
-      
-      if (task == null) {
-        if (isInterrupted()) 
-          return;
-        
-        if (COLLECT_STATS) ++scans;
-        task = group.pollEntryQueue();
-        
-        if (task != null) {
-          if (COLLECT_STATS) ++steals;
-          if (lowered) setPriority(runPriority);
-          group.setActive(this);
-        }
-        else {
-          ++iters;
-          //  Check here for yield vs sleep to avoid entering group synch lock
-          if (iters >= FJTaskRunnerGroup/*group GemStoneModification*/.SCANS_PER_SLEEP) {
-            group.checkActive(this, iters);
-            if (isInterrupted())
-              return;
-          }
-          else if (!lowered) {
-            lowered = true;
-            setPriority(scanPriority);
-          }
-          else {
-            yield();
-          }
-        }
-      }
-    } while (task == null);
-
-
-    if (!task.isDone()) {
-      if (COLLECT_STATS) ++runs;
-      task.run(); 
-      task.setDone(); 
-    }
-    
-  }
-
-  /* ------------  composite operations ------------------- */
-
-    
-  /**
-   * Main runloop
-   **/
-
-  @Override // GemStoneAddition
-  public void run() {
-    try{ 
-      while (!interrupted()) {
-        FJTask task = pop();
-        if (task != null) {
-          if (!task.isDone()) {
-            // inline FJTask.invoke
-            if (COLLECT_STATS) ++runs;
-            task.run(); 
-            task.setDone(); 
-          }
-        }
-        else
-          scanWhileIdling();
-      }
-    }
-    finally {
-      group.setInactive(this);
-    }
-  }
-
-  /**
-   * Execute a task in this thread. Generally called when current task
-   * cannot otherwise continue.
-   **/
-
-    
-  protected final void taskYield() {
-    FJTask task = pop();
-    if (task != null) {
-      if (!task.isDone()) {
-        if (COLLECT_STATS) ++runs;
-        task.run(); 
-        task.setDone(); 
-      }
-    }
-    else
-      scan(null);
-  }
-
-
-  /**
-   * Process tasks until w is done.
-   * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
-   **/
-
-  protected final void taskJoin(final FJTask w) {
-
-    while (!w.isDone()) { 
-
-      FJTask task = pop();
-      if (task != null) {
-        if (!task.isDone()) {
-          if (COLLECT_STATS) ++runs;
-          task.run(); 
-          task.setDone(); 
-          if (task == w) return; // fast exit if we just ran w
-        }
-      }
-      else
-        scan(w);
-    }
-  }
-
-  /**
-   * A specialized expansion of
-   * <code> w.fork(); invoke(v); w.join(); </code>
-   **/
-
-
-  protected final void coInvoke(final FJTask w, final FJTask v) {
-
-    // inline  push
-
-    int t = top;
-    if (t < (base & (deq.length-1)) + deq.length) {
-
-      deq[t & (deq.length-1)].put(w);
-      top = t + 1;
-
-      // inline  invoke
-
-      if (!v.isDone()) { 
-        if (COLLECT_STATS) ++runs; 
-        v.run(); 
-        v.setDone(); 
-      }
-      
-      // inline  taskJoin
-      
-      while (!w.isDone()) {
-        FJTask task  = pop();
-        if (task != null) {
-          if (!task.isDone()) {
-            if (COLLECT_STATS) ++runs;
-            task.run(); 
-            task.setDone(); 
-            if (task == w) return; // fast exit if we just ran w
-          }
-        }
-        else
-          scan(w);
-      }
-    }
-
-    else      // handle non-inlinable cases
-      slowCoInvoke(w, v);
-  }
-
-
-  /**
-   * Backup to handle noninlinable cases of coInvoke
-   **/
-
-  protected void slowCoInvoke(final FJTask w, final FJTask v) {
-    push(w); // let push deal with overflow
-    FJTask.invoke(v);
-    taskJoin(w);
-  }
-
-
-  /**
-   * Array-based version of coInvoke
-   **/
-
-  protected final void coInvoke(FJTask[] tasks) {
-    int nforks = tasks.length - 1;
-
-    // inline bulk push of all but one task
-
-    int t = top;
-
-    if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
-      for (int i = 0; i < nforks; ++i) {
-        deq[t++ & (deq.length-1)].put(tasks[i]);
-        top = t;
-      }
-
-      // inline invoke of one task
-      FJTask v = tasks[nforks];
-      if (!v.isDone()) { 
-        if (COLLECT_STATS) ++runs; 
-        v.run(); 
-        v.setDone(); 
-      }
-      
-      // inline  taskJoins
-      
-      for (int i = 0; i < nforks; ++i) { 
-        FJTask w = tasks[i];
-        while (!w.isDone()) {
-
-          FJTask task = pop();
-          if (task != null) {
-            if (!task.isDone()) {
-              if (COLLECT_STATS) ++runs;
-              task.run(); 
-              task.setDone(); 
-            }
-          }
-          else
-            scan(w);
-        }
-      }
-    }
-
-    else  // handle non-inlinable cases
-      slowCoInvoke(tasks);
-  }
-
-  /**
-   * Backup to handle atypical or noninlinable cases of coInvoke
-   **/
-
-  protected void slowCoInvoke(FJTask[] tasks) {
-    for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
-    for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
deleted file mode 100644
index 3ac2872..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: FJTaskRunnerGroup.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  7Jan1999   dl                 First public release
-  12Jan1999  dl                 made getActiveCount public; misc minor cleanup.
-  14Jan1999  dl                 Added executeTask
-  20Jan1999  dl                 Allow use of priorities; reformat stats
-  6Feb1999   dl                 Lazy thread starts
-  27Apr1999  dl                 Renamed
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A stripped down analog of a ThreadGroup used for
- * establishing and managing FJTaskRunner threads.
- * ThreadRunnerGroups serve as the control boundary separating
- * the general world of normal threads from the specialized world
- * of FJTasks. 
- * <p>
- * By intent, this class does not subclass java.lang.ThreadGroup, and
- * does not support most methods found in ThreadGroups, since they
- * would make no sense for FJTaskRunner threads. In fact, the class
- * does not deal with ThreadGroups at all. If you want to restrict
- * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
- * it from within that ThreadGroup.
- * <p>
- * The main contextual parameter for a FJTaskRunnerGroup is
- * the group size, established in the constructor. 
- * Groups must be of a fixed size.
- * There is no way to dynamically increase or decrease the number
- * of threads in an existing group.
- * <p>
- * In general, the group size should be equal to the number
- * of CPUs on the system. (Unfortunately, there is no portable
- * means of automatically detecting the number of CPUs on a JVM, so there is
- * no good way to automate defaults.)  In principle, when
- * FJTasks are used for computation-intensive tasks, having only 
- * as many threads as CPUs should minimize bookkeeping overhead
- * and contention, and so maximize throughput. However, because
- * FJTaskRunners lie atop Java threads, and in turn operating system
- * thread support and scheduling policies, 
- * it is very possible that using more threads
- * than CPUs will improve overall throughput even though it adds
- * to overhead. This will always be so if FJTasks are I/O bound.
- * So it may pay to experiment a bit when tuning on particular platforms.
- * You can also use <code>setRunPriorities</code> to either
- * increase or decrease the priorities of active threads, which
- * may interact with group size choice.
- * <p>
- * In any case, overestimating group sizes never
- * seriously degrades performance (at least within reasonable bounds). 
- * You can also use a value
- * less than the number of CPUs in order to reserve processing
- * for unrelated threads. 
- * <p>
- * There are two general styles for using a FJTaskRunnerGroup.
- * You can create one group per entire program execution, for example 
- * as a static singleton, and use it for all parallel tasks:
- * <pre>
- * class Tasks {
- *   static FJTaskRunnerGroup group;
- *   public void initialize(int groupsize) {
- *      group = new FJTaskRunnerGroup(groupSize);
- *   }
- *   // ...
- * }
- * </pre>
- * Alternatively, you can make new groups on the fly and use them only for
- * particular task sets. This is more flexible,,
- * and leads to more controllable and deterministic execution patterns,
- * but it encounters greater overhead on startup. Also, to reclaim
- * system resources, you should
- * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
- * using one-shot groups. Otherwise, because FJTaskRunners set 
- * <code>Thread.isDaemon</code>
- * status, they will not normally be reclaimed until program termination.
- * <p>
- * The main supported methods are <code>execute</code>,
- * which starts a task processed by FJTaskRunner threads,
- * and <code>invoke</code>, which starts one and waits for completion.
- * For example, you might extend the above <code>FJTasks</code>
- * class to support a task-based computation, say, the
- * <code>Fib</code> class from the <code>FJTask</code> documentation:
- * <pre>
- * class Tasks { // continued
- *   // ...
- *   static int fib(int n) {
- *     try {
- *       Fib f = new Fib(n);
- *       group.invoke(f);
- *       return f.getAnswer();
- *     }
- *     catch (InterruptedException ex) {
- *       throw new Error("Interrupted during computation");
- *     }
- *   }
- * }
- * </pre>
- * <p>
- * Method <code>stats()</code> can be used to monitor performance.
- * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
- * the compile-time constant COLLECT_STATS set to false. In this
- * case, various simple counts reported in stats() are not collected.
- * On platforms tested,
- * this leads to such a tiny performance improvement that there is 
- * very little motivation to bother.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * <p>
- * @see FJTask
- * @see FJTaskRunner
- **/
-
-public class FJTaskRunnerGroup implements Executor {
-
-  /** The threads in this group **/
-  protected final FJTaskRunner[] threads;
-
-  /** Group-wide queue for tasks entered via execute() **/
-  protected final LinkedQueue entryQueue = new LinkedQueue();
-
-  /** Number of threads that are not waiting for work **/
-  protected int activeCount = 0;
-
-  /** Number of threads that have been started. Used to avoid
-      unecessary contention during startup of task sets.
-  **/
-  protected int nstarted = 0;
-
-  /**
-   * Compile-time constant. If true, various counts of
-   * runs, waits, etc., are maintained. These are NOT
-   * updated with synchronization, so statistics reports
-   * might not be accurate.
-   **/
-  
-  static final boolean COLLECT_STATS = true;
-  //  static final boolean COLLECT_STATS = false;
-
-  // for stats
-
-  /** The time at which this ThreadRunnerGroup was constructed **/
-  long initTime = 0;
-
-  /** Total number of executes or invokes **/
-  int entries = 0;
-
-  static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
-
-  /** 
-   * Create a FJTaskRunnerGroup with the indicated number
-   * of FJTaskRunner threads. Normally, the best size to use is
-   * the number of CPUs on the system. 
-   * <p>
-   * The threads in a FJTaskRunnerGroup are created with their
-   * isDaemon status set, so do not normally need to be
-   * shut down manually upon program termination.
-   **/
-
-  public FJTaskRunnerGroup(int groupSize) { 
-    threads = new FJTaskRunner[groupSize];
-    initializeThreads();
-    initTime = System.currentTimeMillis();
-  }
-
-  /**
-   * Arrange for execution of the given task
-   * by placing it in a work queue. If the argument
-   * is not of type FJTask, it is embedded in a FJTask via 
-   * <code>FJTask.Wrap</code>.
-   * @exception InterruptedException if current Thread is
-   * currently interrupted 
-   **/
-
-  public void execute(Runnable r) throws InterruptedException {
-//    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in put
-    if (r instanceof FJTask) {
-      entryQueue.put(/*(FJTask) GemStoneAddition*/r);
-    }
-    else {
-      entryQueue.put(new FJTask.Wrap(r));
-    }
-    signalNewTask();
-  }
-
-
-  /**
-   * Specialized form of execute called only from within FJTasks
-   **/
-  public void executeTask(FJTask t) {
-    try {
-      entryQueue.put(t);
-      signalNewTask();
-    }
-    catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-
-  /**
-   * Start a task and wait it out. Returns when the task completes.
-   * @exception InterruptedException if current Thread is
-   * interrupted before completion of the task.
-   **/
-
-  public void invoke(Runnable r) throws InterruptedException {
-//    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in put
-    InvokableFJTask w = new InvokableFJTask(r);
-    entryQueue.put(w);
-    signalNewTask();
-    w.awaitTermination();
-  }
-
-
-  /**
-   * Try to shut down all FJTaskRunner threads in this group
-   * by interrupting them all. This method is designed
-   * to be used during cleanup when it is somehow known
-   * that all threads are idle.
-   * FJTaskRunners only
-   * check for interruption when they are not otherwise
-   * processing a task (and its generated subtasks,
-   * if any), so if any threads are active, shutdown may
-   * take a while, and may lead to unpredictable
-   * task processing.
-   **/
-
-  public void interruptAll() {
-    // paranoically interrupt current thread last if in group.
-    Thread current = Thread.currentThread();
-    boolean stopCurrent = false;
-
-    for (int i = 0; i < threads.length; ++i) {
-      Thread t = threads[i];
-      if (t == current) 
-        stopCurrent = true;
-      else
-        t.interrupt();
-    }
-    if (stopCurrent)
-      current.interrupt();
-  }
-
-
-  /**
-   * Set the priority to use while a FJTaskRunner is
-   * polling for new tasks to perform. Default
-   * is currently Thread.MIN_PRIORITY+1. The value
-   * set may not go into effect immediately, but
-   * will be used at least the next time a thread scans for work.
-   **/
-  public synchronized void setScanPriorities(int pri) {
-    for (int i = 0; i < threads.length; ++i) {
-      FJTaskRunner t = threads[i];
-      t.setScanPriority(pri);
-      if (!t.active) t.setPriority(pri);
-    }
-  }
-
-
-  /**
-   * Set the priority to use while a FJTaskRunner is
-   * actively running tasks. Default
-   * is the priority that was in effect by the thread that
-   * constructed this FJTaskRunnerGroup. Setting this value
-   * while threads are running may momentarily result in
-   * them running at this priority even when idly waiting for work.
-   **/
-  public synchronized void setRunPriorities(int pri) {
-    for (int i = 0; i < threads.length; ++i) {
-      FJTaskRunner t = threads[i];
-      t.setRunPriority(pri);
-      if (t.active) t.setPriority(pri);
-    }
-  }
-
-    
-
-  /** Return the number of FJTaskRunner threads in this group **/
-
-  public int size() { return threads.length; }
-
-
-  /** 
-   * Return the number of threads that are not idly waiting for work.
-   * Beware that even active threads might not be doing any useful
-   * work, but just spinning waiting for other dependent tasks.
-   * Also, since this is just a snapshot value, some tasks
-   * may be in the process of becoming idle.
-   **/
-  public synchronized int getActiveCount() { return activeCount; }
-
-  /**
-   * Prints various snapshot statistics to System.out.
-   * <ul>
-   *   <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
-   *         <em>n</em> from zero to group size - 1):
-   *     <ul>
-   *       <li> A star "*" is printed if the thread is currently active;
-   *            that is, not sleeping while waiting for work. Because
-   *            threads gradually enter sleep modes, an active thread
-   *            may in fact be about to sleep (or wake up).
-   *       <li> <em>Q Cap</em> The current capacity of its task queue.
-   *       <li> <em>Run</em> The total number of tasks that have been run.
-   *       <li> <em>New</em> The number of these tasks that were
-   *               taken from either the entry queue or from other 
-   *               thread queues; that is, the number of tasks run
-   *               that were <em>not</em> forked by the thread itself.
-   *       <li> <em>Scan</em> The number of times other task
-   *               queues or the entry queue were polled for tasks.
-   *     </ul>
-   *   <li> <em>Execute</em> The total number of tasks entered
-   *        (but not necessarily yet run) via execute or invoke.
-   *   <li> <em>Time</em> Time in seconds since construction of this
-   *         FJTaskRunnerGroup.
-   *   <li> <em>Rate</em> The total number of tasks processed
-   *          per second across all threads. This
-   *          may be useful as a simple throughput indicator
-   *          if all processed tasks take approximately the
-   *          same time to run.
-   * </ul>
-   * <p>
-   * Cautions: Some statistics are updated and gathered 
-   * without synchronization,
-   * so may not be accurate. However, reported counts may be considered
-   * as lower bounds of actual values. 
-   * Some values may be zero if classes are compiled
-   * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
-   * classes can be independently compiled with different values of
-   * COLLECT_STATS.) Also, the counts are maintained as ints so could
-   * overflow in exceptionally long-lived applications.
-   * <p>
-   * These statistics can be useful when tuning algorithms or diagnosing
-   * problems. For example:
-   * <ul>
-   *  <li> High numbers of scans may mean that there is insufficient
-   *      parallelism to keep threads busy. However, high scan rates
-   *      are expected if the number
-   *      of Executes is also high or there is a lot of global
-   *      synchronization in the application, and the system is not otherwise
-   *      busy. Threads may scan
-   *      for work hundreds of times upon startup, shutdown, and
-   *      global synch points of task sets.
-   *  <li> Large imbalances in tasks run across different threads might
-   *      just reflect contention with unrelated threads on a system
-   *      (possibly including JVM threads such as GC), but may also
-   *      indicate some systematic bias in how you generate tasks.
-   *  <li> Large task queue capacities may mean that too many tasks are being
-   *     generated before they can be run. 
-   *     Capacities are reported rather than current numbers of tasks
-   *     in queues because they are better indicators of the existence
-   *     of these kinds of possibly-transient problems.
-   *     Queue capacities are
-   *     resized on demand from their initial value of 4096 elements,
-   *     which is much more than sufficient for the kinds of 
-   *     applications that this framework is intended to best support.
-   * </ul>
-   **/
-
-  public void stats() {
-    long time = System.currentTimeMillis() - initTime;
-    double secs = (/*(double) GemStoneAddition */time) / 1000.0;
-    long totalRuns = 0;
-    long totalScans = 0;
-    long totalSteals = 0;
-
-    System.out.print("Thread" +
-                     "\tQ Cap" +
-                       "\tScans" +
-                       "\tNew" +
-                       "\tRuns" +
-                       "\n");
-
-    for (int i = 0; i < threads.length; ++i) {
-      FJTaskRunner t = threads[i];
-      int truns = t.runs;
-      totalRuns += truns;
-
-      int tscans = t.scans;
-      totalScans += tscans;
-
-      int tsteals = t.steals;
-      totalSteals += tsteals;
-
-      String star = (getActive(t))? "*" : " ";
-
-
-      System.out.print("T" + i + star +
-                       "\t" + t.deqSize() +
-                       "\t" + tscans +
-                       "\t" + tsteals +
-                       "\t" + truns +
-                       "\n");
-    }
-
-    System.out.print("Total" +
-                     "\t    " +
-                     "\t" + totalScans +
-                     "\t" + totalSteals +
-                     "\t" + totalRuns +
-                     "\n");
-
-    System.out.print("Execute: " + entries); 
-    
-    System.out.print("\tTime: " + secs);
-
-    long rps = 0;
-    if (secs != 0) rps = Math.round(/*(double) GemStoneAddition */(totalRuns) / secs);
-
-    System.out.println("\tRate: " + rps);
-  }
-
-
-  /* ------------ Methods called only by FJTaskRunners ------------- */
-
-
-  /**
-   * Return the array of threads in this group. 
-   * Called only by FJTaskRunner.scan().
-   **/
-
-  protected FJTaskRunner[] getArray() { return threads; }
-
-
-  /**
-   * Return a task from entry queue, or null if empty.
-   * Called only by FJTaskRunner.scan().
-   **/
-
-  protected FJTask pollEntryQueue() {
-    try {
-      FJTask t = (FJTask)(entryQueue.poll(0));
-      return t;
-    }
-    catch(InterruptedException ex) { // ignore interrupts
-      Thread.currentThread().interrupt();
-      return null;
-    }
-  }
-
-
-  /**
-   * Return active status of t.
-   * Per-thread active status can only be accessed and
-   * modified via synchronized method here in the group class.
-   **/
-
-  protected synchronized boolean getActive(FJTaskRunner t) {
-    return t.active;
-  }
-
-
-  /**
-   * Set active status of thread t to true, and notify others
-   * that might be waiting for work. 
-   **/
-
-  protected synchronized void setActive(FJTaskRunner t) {
-    if (!t.active) { 
-      t.active = true;
-      ++activeCount;
-      if (nstarted < threads.length) 
-        threads[nstarted++].start();
-      else
-        notifyAll();
-    }
-  }
-
-  /**
-   * Set active status of thread t to false.
-   **/
-
-  protected synchronized void setInactive(FJTaskRunner t) {
-    if (t.active) { 
-      t.active = false;
-      --activeCount;
-    }
-  }
-
-  /**
-   * The number of times to scan other threads for tasks 
-   * before transitioning to a mode where scans are
-   * interleaved with sleeps (actually timed waits).
-   * Upon transition, sleeps are for duration of
-   * scans / SCANS_PER_SLEEP milliseconds.
-   * <p>
-   * This is not treated as a user-tunable parameter because
-   * good values do not appear to vary much across JVMs or
-   * applications. Its main role is to help avoid some
-   * useless spinning and contention during task startup.
-   **/
-  static final long SCANS_PER_SLEEP = 15;
-
-  /**
-   * The maximum time (in msecs) to sleep when a thread is idle,
-   * yet others are not, so may eventually generate work that
-   * the current thread can steal. This value reflects the maximum time
-   * that a thread may sleep when it possibly should not, because there
-   * are other active threads that might generate work. In practice,
-   * designs in which some threads become stalled because others
-   * are running yet not generating tasks are not likely to work
-   * well in this framework anyway, so the exact value does not matter
-   * too much. However, keeping it in the sub-second range does
-   * help smooth out startup and shutdown effects.
-   **/
-
-  static final long MAX_SLEEP_TIME = 100;
-
-  /**
-   * Set active status of thread t to false, and
-   * then wait until: (a) there is a task in the entry 
-   * queue, or (b) other threads are active, or (c) the current
-   * thread is interrupted. Upon return, it
-   * is not certain that there will be work available.
-   * The thread must itself check. 
-   * <p>
-   * The main underlying reason
-   * for these mechanics is that threads do not
-   * signal each other when they add elements to their queues.
-   * (This would add to task overhead, reduce locality.
-   * and increase contention.)
-   * So we must rely on a tamed form of polling. However, tasks
-   * inserted into the entry queue do result in signals, so
-   * tasks can wait on these if all of them are otherwise idle.
-   **/
-
-  protected synchronized void checkActive(FJTaskRunner t, long scans) {
-
-    setInactive(t);
-
-    try {
-      // if nothing available, do a hard wait
-      if (activeCount == 0 && entryQueue.peek() == null) { 
-        wait();
-      }
-      else { 
-        // If there is possibly some work,
-        // sleep for a while before rechecking 
-
-        long msecs = scans / SCANS_PER_SLEEP;
-        if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
-        int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
-        wait(msecs, nsecs);
-      }
-    }
-    catch (InterruptedException ex) {
-      notify(); // avoid lost notifies on interrupts
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /* ------------ Utility methods  ------------- */
-
-  /**
-   * Start or wake up any threads waiting for work
-   **/
-
-  protected synchronized void signalNewTask() {
-    if (COLLECT_STATS) ++entries;
-    if (nstarted < threads.length) 
-       threads[nstarted++].start();
-    else
-      notify();
-  }
-
-  /**
-   * Create all FJTaskRunner threads in this group.
-   **/
-
-  protected void initializeThreads() {
-    for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
-  }
-
-
-
-
-  /**
-   * Wrap wait/notify mechanics around a task so that
-   * invoke() can wait it out 
-   **/
-  protected static final class InvokableFJTask extends FJTask {
-    protected final Runnable wrapped;
-    protected boolean terminated = false;
-
-    protected InvokableFJTask(Runnable r) { wrapped = r; }
-
-    public void run() {
-      try {
-        if (wrapped instanceof FJTask)
-          FJTask.invoke((FJTask)(wrapped));
-        else
-          wrapped.run();
-      }
-      finally {
-        setTerminated();
-      }
-    }
-
-    protected synchronized void setTerminated() {
-      terminated = true;
-      notifyAll(); 
-    }
-
-    protected synchronized void awaitTermination() throws InterruptedException {
-      if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition in case terminated is true
-      while (!terminated) wait();
-    }
-  }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
deleted file mode 100644
index 040fc91..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: FutureResult.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  30Jun1998  dl               Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-import java.lang.reflect.*;
-
-/**
- * A  class maintaining a single reference variable serving as the result
- * of an operation. The result cannot be accessed until it has been set.
- * <p>
- * <b>Sample Usage</b> <p>
- * <pre>
- * class ImageRenderer { Image render(byte[] raw); }
- * class App {
- *   Executor executor = ...
- *   ImageRenderer renderer = ...
- *   void display(byte[] rawimage) {
- *     try {
- *       FutureResult futureImage = new FutureResult();
- *       Runnable command = futureImage.setter(new Callable() {
- *          public Object call() { return renderer.render(rawImage); }
- *       });
- *       executor.execute(command);
- *       drawBorders();             // do other things while executing
- *       drawCaption();
- *       drawImage((Image)(futureImage.get())); // use future
- *     }
- *     catch (InterruptedException ex) { return; }
- *     catch (InvocationTargetException ex) { cleanup(); return; }
- *   }
- * }
- * </pre>
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * @see Executor
- **/
-
-public class FutureResult {
-  /** The result of the operation **/
-  protected Object value_ = null;
-  
-  /** Status -- true after first set **/
-  protected boolean ready_ = false;
-
-  /** the exception encountered by operation producing result **/
-  protected InvocationTargetException exception_ = null;
-
-  /** 
-   * Create an initially unset FutureResult
-   **/
-  public FutureResult() { }
-
-
-  /** 
-   * Return a Runnable object that, when run, will set the result value.
-   * @param function - a Callable object whose result will be
-   * held by this FutureResult.
-   * @return A Runnable object that, when run, will call the
-   * function and (eventually) set the result.
-   **/
-
-  public Runnable setter(final Callable function) {
-    return new Runnable() {
-      public void run() {
-        try {
-          set(function.call());
-        }
-        catch(Exception ex) {
-          setException(ex);
-        }
-      }
-    };
-  }
-
-  /** internal utility: either get the value or throw the exception **/
-  protected Object doGet() throws InvocationTargetException {
-    if (exception_ != null) 
-      throw exception_;
-    else
-      return value_; 
-  }
-
-  /**
-   * Access the reference, waiting if necessary until it is ready.
-   * @return current value
-   * @exception InterruptedException if current thread has been interrupted
-   * @exception InvocationTargetException if the operation
-   * producing the value encountered an exception.
-   **/
-  public synchronized Object get() 
-    throws InterruptedException, InvocationTargetException {
-    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition in case ready is true
-    while (!ready_) wait();
-    return doGet();
-  }
-
-
-
-  /**
-   * Wait at most msecs to access the reference.
-   * @return current value
-   * @exception TimeoutException if not ready after msecs
-   * @exception InterruptedException if current thread has been interrupted
-   * @exception InvocationTargetException if the operation
-   * producing the value encountered an exception.
-   **/
-  public synchronized Object timedGet(long msecs) 
-    throws TimeoutException, InterruptedException, InvocationTargetException {
-    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition
-    long startTime = (msecs <= 0)? 0 : System.currentTimeMillis();
-    long waitTime = msecs;
-    if (ready_) return doGet();
-    else if (waitTime <= 0) throw new TimeoutException(msecs);
-    else {
-      for (;;) {
-        wait(waitTime);
-        if (ready_) return doGet();
-        else {
-          waitTime = msecs - (System.currentTimeMillis() - startTime);
-          if (waitTime <= 0)
-            throw new TimeoutException(msecs);
-        }
-      }
-    }
-  }
-
-  /**
-   * Set the reference, and signal that it is ready. It is not
-   * considered an error to set the value more than once,
-   * but it is not something you would normally want to do.
-   * @param newValue The value that will be returned by a subsequent get();
-   **/
-  public synchronized void set(Object newValue) {
-    value_ = newValue;
-    ready_ = true;
-    notifyAll();
-  }
-
-  /**
-   * Set the exception field, also setting ready status.
-   * @param ex The exception. It will be reported out wrapped
-   * within an InvocationTargetException 
-   **/
-  public synchronized void setException(Throwable ex) {
-    exception_ = new InvocationTargetException(ex);
-    ready_ = true;
-    notifyAll();
-  }
-
-
-  /**
-   * Get the exception, or null if there isn't one (yet).
-   * This does not wait until the future is ready, so should
-   * ordinarily only be called if you know it is.
-   * @return the exception encountered by the operation
-   * setting the future, wrapped in an InvocationTargetException
-   **/
-  public synchronized InvocationTargetException getException() {
-    return exception_;
-  }
-
-  /**
-   * Return whether the reference or exception have been set.
-   * @return true if has been set. else false
-   **/
-  public synchronized boolean isReady() {
-    return ready_; 
-  }
-
-  /**
-   * Access the reference, even if not ready
-   * @return current value
-   **/
-  public synchronized Object peek() {
-    return value_; 
-  }
-
-
-  /**
-   * Clear the value and exception and set to not-ready,
-   * allowing this FutureResult to be reused. This is not
-   * particularly recommended and must be done only
-   * when you know that no other object is depending on the
-   * properties of this FutureResult.
-   **/
-  public synchronized void clear() {
-    value_ = null;
-    exception_ = null;
-    ready_ = false;
-  }
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
deleted file mode 100644
index bf8afac..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: Heap.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  29Aug1998  dl               Refactored from BoundedPriorityQueue
-  08dec2001  dl               Null out slots of removed items
-  03feb2002  dl               Also null out in clear
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-import java.util.Comparator;
-
-/**
- * A heap-based priority queue, without any concurrency control
- * (i.e., no blocking on empty/full states).
- * This class provides the data structure mechanics for BoundedPriorityQueue.
- * <p>
- * The class currently uses a standard array-based heap, as described
- * in, for example, Sedgewick's Algorithms text. All methods
- * are fully synchronized. In the future,
- * it may instead use structures permitting finer-grained locking.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-
-public class Heap  {
-  protected Object[] nodes_;  // the tree nodes, packed into an array
-  protected int count_ = 0;   // number of used slots
-  protected final Comparator cmp_;  // for ordering
-
-  /**
-   * Create a Heap with the given initial capacity and comparator
-   * @exception IllegalArgumentException if capacity less or equal to zero
-   **/
-
-  public Heap(int capacity, Comparator cmp) 
-   throws IllegalArgumentException {
-    if (capacity <= 0) throw new IllegalArgumentException();
-    nodes_ = new Object[capacity];
-    cmp_ = cmp;
-  }
-
-  /**
-   * Create a Heap with the given capacity,
-   * and relying on natural ordering.
-   **/
-
-  public Heap(int capacity) { 
-    this(capacity, null); 
-  }
-
-
-  /** perform element comaprisons using comparator or natural ordering **/
-  protected int compare(Object a, Object b) {
-    if (cmp_ == null) 
-      return ((Comparable)a).compareTo(b);
-    else
-      return cmp_.compare(a, b);
-  }
-
-
-  // indexes of heap parents and children
-  protected final int parent(int k) { return (k - 1) / 2;  }
-  protected final int left(int k)   { return 2 * k + 1; }
-  protected final int right(int k)  { return 2 * (k + 1); }
-
-  /**
-   * insert an element, resize if necessary
-   **/
-  public synchronized void insert(Object x) {
-    if (count_ >= nodes_.length) {
-      int newcap =  3 * nodes_.length / 2 + 1;
-      Object[] newnodes = new Object[newcap];
-      System.arraycopy(nodes_, 0, newnodes, 0, nodes_.length);
-      nodes_ = newnodes;
-    }
-
-    int k = count_;
-    ++count_;
-    while (k > 0) {
-      int par = parent(k);
-      if (compare(x, nodes_[par]) < 0) {
-        nodes_[k] = nodes_[par];
-        k = par;
-      }
-      else break;
-    }
-    nodes_[k] = x;
-  }
-    
-
-  /**
-   * Return and remove least element, or null if empty
-   **/
-
-  public synchronized Object extract() {
-    if (count_ < 1) return null;
-
-    int k = 0; // take element at root;
-    Object least = nodes_[k];
-    --count_;
-    Object x = nodes_[count_];
-    nodes_[count_] = null;
-    for (;;) {
-      int l = left(k);
-      if (l >= count_)
-        break;
-      else {
-        int r = right(k);
-        int child = (r >= count_ || compare(nodes_[l], nodes_[r]) < 0)? l : r; 
-        if (compare(x, nodes_[child]) > 0) {
-          nodes_[k] = nodes_[child];
-          k = child;
-        }
-        else break;
-      }
-    }
-    nodes_[k] = x;
-    return least;
-  }
-
-  /** Return least element without removing it, or null if empty **/
-  public synchronized Object peek() {
-    if (count_ > 0) 
-      return nodes_[0];
-    else
-      return null;
-  }
-
-  /** Return number of elements **/
-  public synchronized int size() {
-    return count_;
-  }
-  
-  /** remove all elements **/
-  public synchronized void clear() {
-    for (int i = 0; i < count_; ++i)
-      nodes_[i] = null;
-    count_ = 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
deleted file mode 100644
index 6e4b624..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: Latch.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  11Jun1998  dl               Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A latch is a boolean condition that is set at most once, ever.
- * Once a single release is issued, all acquires will pass.
- * <p>
- * <b>Sample usage.</b> Here are a set of classes that use
- * a latch as a start signal for a group of worker threads that
- * are created and started beforehand, and then later enabled.
- * <pre>
- * class Worker implements Runnable {
- *   private final Latch startSignal;
- *   Worker(Latch l) { startSignal = l; }
- *    public void run() {
- *      startSignal.acquire();
- *      doWork();
- *   }
- *   void doWork() { ... }
- * }
- *
- * class Driver { // ...
- *   void main() {
- *     Latch go = new Latch();
- *     for (int i = 0; i < N; ++i) // make threads
- *       new Thread(new Worker(go)).start();
- *     doSomethingElse();         // don't let run yet 
- *     go.release();              // let all threads proceed
- *   } 
- * }
- *</pre>
- * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
-**/  
-
-public class Latch implements Sync {
-  protected boolean latched_ = false;
-
-  /*
-    This could use double-check, but doesn't.
-    If the latch is being used as an indicator of
-    the presence or state of an object, the user would
-    not necessarily get the memory barrier that comes with synch
-    that would be needed to correctly use that object. This
-    would lead to errors that users would be very hard to track down. So, to
-    be conservative, we always use synch.
-  */
-
-  public void acquire() throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    synchronized(this) {
-      while (!latched_) 
-        wait(); 
-    }
-  }
-
-  public boolean attempt(long msecs) throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    synchronized(this) {
-      if (latched_) 
-        return true;
-      else if (msecs <= 0) 
-        return false;
-      else {
-        long waitTime = msecs;
-        long start = System.currentTimeMillis();
-        for (;;) {
-          wait(waitTime);
-          if (latched_) 
-            return true;
-          else {
-            waitTime = msecs - (System.currentTimeMillis() - start);
-            if (waitTime <= 0) 
-              return false;
-          }
-        }
-      }
-    }
-  }
-
-  /** Enable all current and future acquires to pass **/
-  public synchronized void release() {
-    latched_ = true;
-    notifyAll();
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
deleted file mode 100644
index 057e322..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: LayeredSync.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  1Aug1998  dl               Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A class that can be used to compose Syncs.
- * A LayeredSync object manages two other Sync objects,
- * <em>outer</em> and <em>inner</em>. The acquire operation
- * invokes <em>outer</em>.acquire() followed by <em>inner</em>.acquire(),
- * but backing out of outer (via release) upon an exception in inner.
- * The other methods work similarly.
- * <p>
- * LayeredSyncs can be used to compose arbitrary chains
- * by arranging that either of the managed Syncs be another
- * LayeredSync.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
-**/
-
-
-public class LayeredSync implements Sync {
-
-  protected final Sync outer_;
-  protected final Sync inner_;
-
-  /** 
-   * Create a LayeredSync managing the given outer and inner Sync
-   * objects
-   **/
-
-  public LayeredSync(Sync outer, Sync inner) {
-    outer_ = outer;
-    inner_ = inner;
-  }
-
-  public void acquire() throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition
-    outer_.acquire();
-    try {
-      inner_.acquire();
-    }
-    catch (InterruptedException ex) {
-      outer_.release();
-      throw ex;
-    }
-  }
-
-  public boolean attempt(long msecs) throws InterruptedException {
-
-    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - for safety
-    long start = (msecs <= 0)? 0 : System.currentTimeMillis();
-    long waitTime = msecs;
-
-    if (outer_.attempt(waitTime)) {
-      try {
-        if (msecs > 0)
-          waitTime = msecs - (System.currentTimeMillis() - start);
-        if (inner_.attempt(waitTime))
-          return true;
-        else {
-          outer_.release();
-          return false;
-        }
-      }
-      catch (InterruptedException ex) {
-        outer_.release();
-        throw ex;
-      }
-    }
-    else
-      return false;
-  }
-
-  public void release() {
-    inner_.release();
-    outer_.release();
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
deleted file mode 100644
index 76cdf0d..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: LinkedNode.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  11Jun1998  dl               Create public version
-  25may2000  dl               Change class access to public
-  26nov2001  dl               Added no-arg constructor, all public access.
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/** A standard linked list node used in various queue classes **/
-public class LinkedNode { 
-  public Object value;
-  public LinkedNode next;
-  public LinkedNode() {}
-  public LinkedNode(Object x) { value = x; }
-  public LinkedNode(Object x, LinkedNode n) { value = x; next = n; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
deleted file mode 100644
index 17bec28..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: LinkedQueue.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  11Jun1998  dl               Create public version
-  25aug1998  dl               added peek
-  10dec1998  dl               added isEmpty
-  10oct1999  dl               lock on node object to ensure visibility
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-/**
- * A linked list based channel implementation.
- * The algorithm avoids contention between puts
- * and takes when the queue is not empty. 
- * Normally a put and a take can proceed simultaneously. 
- * (Although it does not allow multiple concurrent puts or takes.)
- * This class tends to perform more efficently than
- * other Channel implementations in producer/consumer
- * applications.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-@SuppressFBWarnings(value="ML_SYNC_ON_FIELD_TO_GUARD_CHANGING_THAT_FIELD",justification="GemFire doesn't use this class")
-public class LinkedQueue implements Channel {
-
-
-  /** 
-   * Dummy header node of list. The first actual node, if it exists, is always 
-   * at head_.next. After each take, the old first node becomes the head.
-   **/
-  protected LinkedNode head_;         
-
-  /**
-   * Helper monitor for managing access to last node.
-   **/
-  protected final Object putLock_ = new Object(); 
-
-  /** 
-   * The last node of list. Put() appends to list, so modifies last_
-   **/
-  protected LinkedNode last_;         
-
-  /**
-   * The number of threads waiting for a take.
-   * Notifications are provided in put only if greater than zero.
-   * The bookkeeping is worth it here since in reasonably balanced
-   * usages, the notifications will hardly ever be necessary, so
-   * the call overhead to notify can be eliminated.
-   **/
-  protected int waitingForTake_ = 0;  
-
-  public LinkedQueue() {
-    head_ = new LinkedNode(null); 
-    last_ = head_;
-  }
-
-  /** Main mechanics for put/offer **/
-  protected void insert(Object x) { 
-    synchronized(putLock_) {
-      LinkedNode p = new LinkedNode(x);
-      synchronized(last_) {
-        last_.next = p;
-//        last_ = p; GemStoneAddition synchronize on last_ in futile attempt to guard it
-      }
-      last_ = p; // GemStoneAddition
-      if (waitingForTake_ > 0)
-        putLock_.notify();
-    }
-  }
-
-  /** Main mechanics for take/poll **/
-  protected synchronized Object extract() {
-    synchronized(head_) {
-      Object x = null;
-      LinkedNode first = head_.next;
-      if (first != null) {
-        x = first.value;
-        first.value = null;
-        head_ = first; 
-      }
-      return x;
-    }
-  }
-
-
-  public void put(Object x) throws InterruptedException {
-    if (x == null) throw new IllegalArgumentException();
-    if (Thread.interrupted()) throw new InterruptedException();
-    insert(x); 
-  }
-
-  public boolean offer(Object x, long msecs) throws InterruptedException { 
-    if (x == null) throw new IllegalArgumentException();
-    if (Thread.interrupted()) throw new InterruptedException();
-    insert(x); 
-    return true;
-  }
-
-  public Object take() throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    // try to extract. If fail, then enter wait-based retry loop
-    Object x = extract();
-    if (x != null)
-      return x;
-    else { 
-      synchronized(putLock_) {
-        try {
-          ++waitingForTake_;
-          for (;;) {
-            x = extract();
-            if (x != null) {
-              --waitingForTake_;
-              return x;
-            }
-            else {
-              putLock_.wait(); 
-            }
-          }
-        }
-        catch(InterruptedException ex) { 
-          --waitingForTake_; 
-          putLock_.notify();
-          throw ex; 
-        }
-      }
-    }
-  }
-
-  public Object peek() {
-    synchronized(head_) {
-      LinkedNode first = head_.next;
-      if (first != null) 
-        return first.value;
-      else 
-        return null;
-    }
-  }    
-
-
-  public boolean isEmpty() {
-    synchronized(head_) {
-      return head_.next == null;
-    }
-  }    
-
-  public Object poll(long msecs) throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    Object x = extract();
-    if (x != null) 
-      return x;
-    else {
-      synchronized(putLock_) {
-        try {
-          long waitTime = msecs;
-          long start = (msecs <= 0)? 0 : System.currentTimeMillis();
-          ++waitingForTake_;
-          for (;;) {
-            x = extract();
-            if (x != null || waitTime <= 0) {
-              --waitingForTake_;
-              return x;
-            }
-            else {
-              putLock_.wait(waitTime); 
-              waitTime = msecs - (System.currentTimeMillis() - start);
-            }
-          }
-        }
-        catch(InterruptedException ex) { 
-          --waitingForTake_; 
-          putLock_.notify();
-          throw ex; 
-        }
-      }
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
deleted file mode 100644
index fd54006..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: LockedExecutor.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  21Jun1998  dl               Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * An implementation of Executor that 
- * invokes the run method of the supplied command within
- * a synchronization lock and then returns.
- * 
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-public class LockedExecutor implements Executor {
-  
-  /** The mutex **/
-  protected final Sync mutex_;
-
-  /** 
-   * Create a new LockedExecutor that relies on the given mutual
-   * exclusion lock. 
-   * @param mutex Any mutual exclusion lock.
-   * Standard usage is to supply an instance of <code>Mutex</code>,
-   * but, for example, a Semaphore initialized to 1 also works.
-   * On the other hand, many other Sync implementations would not
-   * work here, so some care is required to supply a sensible 
-   * synchronization object.
-   **/
- 
-  public LockedExecutor(Sync mutex) {
-    mutex_ = mutex;
-  }
-
-  /** 
-   * Execute the given command directly in the current thread,
-   * within the supplied lock.
-   **/
-  public void execute(Runnable command) throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition for safety
-    mutex_.acquire();
-    try {
-      command.run();
-    }
-    finally {
-      mutex_.release();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
deleted file mode 100644
index 7ff7208..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-/*
-  File: Mutex.java
-
-  Originally written by Doug Lea and released into the public domain.
-  This may be used for any purposes whatsoever without acknowledgment.
-  Thanks for the assistance and support of Sun Microsystems Labs,
-  and everyone contributing, testing, and using this code.
-
-  History:
-  Date       Who                What
-  11Jun1998  dl               Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A simple non-reentrant mutual exclusion lock.
- * The lock is free upon construction. Each acquire gets the
- * lock, and each release frees it. Releasing a lock that
- * is already free has no effect. 
- * <p>
- * This implementation makes no attempt to provide any fairness
- * or ordering guarantees. If you need them, consider using one of
- * the Semaphore implementations as a locking mechanism.
- * <p>
- * <b>Sample usage</b><br>
- * <p>
- * Mutex can be useful in constructions that cannot be
- * expressed using java synchronized blocks because the
- * acquire/release pairs do not occur in the same method or
- * code block. For example, you can use them for hand-over-hand
- * locking across the nodes of a linked list. This allows
- * extremely fine-grained locking,  and so increases 
- * potential concurrency, at the cost of additional complexity and
- * overhead that would normally make this worthwhile only in cases of
- * extreme contention.
- * <pre>
- * class Node { 
- *   Object item; 
- *   Node next; 
- *   Mutex lock = new Mutex(); // each node keeps its own lock
- *
- *   Node(Object x, Node n) { item = x; next = n; }
- * }
- *
- * class List {
- *    protected Node head; // pointer to first node of list
- *
- *    // Use plain java synchronization to protect head field.
- *    //  (We could instead use a Mutex here too but there is no
- *    //  reason to do so.)
- *    protected synchronized Node getHead() { return head; }
- *
- *    boolean search(Object x) throws InterruptedException {
- *      Node p = getHead();
- *      if (p == null) return false;
- *
- *      //  (This could be made more compact, but for clarity of illustration,
- *      //  all of the cases that can arise are handled separately.)
- *
- *      p.lock.acquire();              // Prime loop by acquiring first lock.
- *                                     //    (If the acquire fails due to
- *                                     //    interrupt, the method will throw
- *                                     //    InterruptedException now,
- *                                     //    so there is no need for any
- *                                     //    further cleanup.)
- *      for (;;) {
- *        if (x.equals(p.item)) {
- *          p.lock.release();          // release current before return
- *          return true;
- *        }
- *        else {
- *          Node nextp = p.next;
- *          if (nextp == null) {
- *            p.lock.release();       // release final lock that was held
- *            return false;
- *          }
- *          else {
- *            try {
- *              nextp.lock.acquire(); // get next lock before releasing current
- *            }
- *            catch (InterruptedException ex) {
- *              p.lock.release();    // also release current if acquire fails
- *              throw ex;
- *            }
- *            p.lock.release();      // release old lock now that new one held
- *            p = nextp;
- *          }
- *        }
- *      }
- *    }
- *
- *    synchronized void add(Object x) { // simple prepend
- *      // The use of `synchronized'  here protects only head field.
- *      // The method does not need to wait out other traversers 
- *      // who have already made it past head.
- *
- *      head = new Node(x, head);
- *    }
- *
- *    // ...  other similar traversal and update methods ...
- * }
- * </pre>
- * <p>
- * @see Semaphore
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
-**/
-
-public class Mutex implements Sync  {
-
-  /** The lock status **/
-  protected boolean inuse_ = false;
-
-  public void acquire() throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    synchronized(this) {
-      try {
-        while (inuse_) wait();
-        inuse_ = true;
-      }
-      catch (InterruptedException ex) {
-        notify();
-        throw ex;
-      }
-    }
-  }
-
-  public synchronized void release()  {
-    inuse_ = false;
-    notify(); 
-  }
-
-
-  public boolean attempt(long msecs) throws InterruptedException {
-    if (Thread.interrupted()) throw new InterruptedException();
-    synchronized(this) {
-      if (!inuse_) {
-        inuse_ = true;
-        return true;
-      }
-      else if (msecs <= 0)
-        return false;
-      else {
-        long waitTime = msecs;
-        long start = System.currentTimeMillis();
-        try {
-          for (;;) {
-            wait(waitTime);
-            if (!inuse_) {
-              inuse_ = true;
-              return true;
-            }
-            else {
-              waitTime = msecs - (System.currentTimeMillis() - start);
-              if (waitTime <= 0) 
-                return false;
-            }
-          }
-        }
-        catch (InterruptedException ex) {
-          notify();
-          throw ex;
-        }
-      }
-    }  
-  }
-
-}
-



Mime
View raw message