river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1557283 - in /river/jtsk/skunk/qa_refactor/trunk: src/com/sun/jini/config/ src/com/sun/jini/thread/ src/net/jini/config/ src/net/jini/discovery/ src/net/jini/lookup/ src/org/apache/river/api/util/ src/org/apache/river/impl/thread/ test/src...
Date Fri, 10 Jan 2014 23:33:02 GMT
Author: peter_firmstone
Date: Fri Jan 10 23:33:01 2014
New Revision: 1557283

URL: http://svn.apache.org/r1557283
Log:
Remove TaskManager from ServiceDiscoveryManager

Fix some minor issues found with FindBugs

Added generics to Config

Added Added FutureObserver to support dependencies with RetryTask.

Removed SynchronousExecutors and test case.

Added:
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java
Removed:
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
    river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/config/Config.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/config/Configuration.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/config/Config.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/config/Config.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/config/Config.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/config/Config.java Fri Jan 10 23:33:01 2014
@@ -186,7 +186,7 @@ public class Config {
 	    throw new NullPointerException("defaultValue cannot be null");
 
 	final T result = (T) config.getEntry(component, name, type,
-					      defaultValue, data);
+					      (T) defaultValue, data);
 
 	if (result == null) {
 	    if (logger.isLoggable(Level.FINE)) {

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java Fri Jan 10 23:33:01 2014
@@ -62,28 +62,34 @@ import com.sun.jini.constants.TimeConsta
  * @see WakeupManager
  */
 import com.sun.jini.thread.WakeupManager.Ticket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.ObservableFuture;
 
 /**
  *
  * @param <V>
  */
-public abstract class RetryTask<V> implements RunnableFuture<V>, TimeConstants {
+public abstract class RetryTask<V> implements RunnableFuture<V>, ObservableFuture<V>, TimeConstants {
     private final TaskManager	  manager;	// the TaskManager for this task
     private final ExecutorService executor;
-    private volatile RetryTime	  retry;	// the retry object for this task
+    private RetryTime	  retry;	// the retry object for this task
     private volatile boolean	  cancelled;	// have we been cancelled?
-    private volatile boolean	  complete;	// have we completed successfully?
+    private boolean	  complete;	// have we completed successfully?
     private volatile Ticket	  ticket;	// the WakeupManager ticket
-    private volatile long	  startTime;	// the time when we were created or 
-                                        //   last reset
+    private final long	  startTime;	// the time when we were created 
     private final AtomicInteger attempt;	// the current attempt number
     private final WakeupManager wakeup;       // WakeupManager for retry scheduling
+    private final List<FutureObserver<V>> listeners;
 
     /**
      * Default delay backoff times.  These are converted from
@@ -115,7 +121,8 @@ public abstract class RetryTask<V> imple
         this.executor = null;
         this.wakeup = wakeupManager;
         attempt = new AtomicInteger();
-	reset();
+        listeners = new ArrayList<FutureObserver<V>>();
+        startTime = System.currentTimeMillis();
     }
     
     /**
@@ -134,7 +141,19 @@ public abstract class RetryTask<V> imple
         this.executor = executor;
         this.wakeup = wakeupManager;
         attempt = new AtomicInteger();
-        reset();
+        listeners = new ArrayList<FutureObserver<V>>();
+        startTime = System.currentTimeMillis();
+    }
+    
+    public boolean addObserver(FutureObserver<V> listener){
+        synchronized (this){
+            if (cancelled) return false;
+            if (complete) {
+                listener.futureCompleted(this);
+                return false;
+            }
+            return listeners.add(listener);
+        }
     }
 
     /**
@@ -156,8 +175,6 @@ public abstract class RetryTask<V> imple
      */
     public void run() {		// avoid retry if cancelled
         if (cancelled) return;			// do nothing
-	
-
 	boolean success = false;
         try {
             success = tryOnce();
@@ -175,20 +192,25 @@ public abstract class RetryTask<V> imple
                     new Object[]{this, 
                         Long.valueOf(at - System.currentTimeMillis())});
             }
-
-            if (retry == null)	// only create it if we need to
-                retry = new RetryTime();
-            ticket = wakeup.schedule(at, retry);
+            RetryTime time = null;
+            synchronized (this){
+                // only create it if we need to
+                if (retry == null) retry = new RetryTime();	            
+                time = retry;
+            }
+            ticket = wakeup.schedule(at, time);
         } else {
-            complete = true;
             // Notify was here, however I noticed that during some tests,
             // the wakeup manager task was scheduled after cancelling.
-             synchronized (this){
+            synchronized (this){
+                complete = true;
                 notifyAll();	
+                Iterator<FutureObserver<V>> it = listeners.iterator();
+                while (it.hasNext()){
+                it.next().futureCompleted(this);
+                }
             } // see waitFor()
         }
-       
-	
     }
 
     /**
@@ -236,11 +258,16 @@ public abstract class RetryTask<V> imple
      */
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
-	cancelled = true;
+        
         Ticket ticket = this.ticket;
-	if (ticket != null) wakeup.cancel(ticket);
+        if (ticket != null) wakeup.cancel(ticket);
         synchronized (this) {
+            cancelled = true;
             notifyAll();		// see waitFor()
+            Iterator<FutureObserver<V>> it = listeners.iterator();
+            while (it.hasNext()){
+                it.next().futureCompleted(this);
+            }
         }
         return true;
     }
@@ -259,17 +286,22 @@ public abstract class RetryTask<V> imple
      */
     @Override
     public boolean isDone() {
-	return complete;
+        synchronized (this){
+            return complete;
+        }
     }
 
     public boolean waitFor(long duration) throws InterruptedException {
-        
-            while (!cancelled && !complete)
-                synchronized (this){
-                    if (duration == 0 )wait();
-                    else wait(duration);
-                }
+        if (cancelled) throw new CancellationException("RetryTask was cancelled");
+        synchronized (this){
+            // Moved inside sync block to make check atomic
+            while (!cancelled && !complete) {
+                if (duration == 0 )wait();
+                else wait(duration);
+            }
             return complete;
+        }
+            
         
     }
     
@@ -288,16 +320,10 @@ public abstract class RetryTask<V> imple
     }
 
     /**
-     * Reset values for a new use of this task.
+     * Reset values for a new use of this task, smells of object pooling
+     * reset method removed.
      */
-    public final void reset() {
-	cancel(false);		// remove from the wakeup queue
-	startTime = System.currentTimeMillis();
-	cancelled = false;
-	complete = false;
-	ticket = null;
-	attempt.set(0);
-    }
+    
 
     /**
      * This is the runnable class for the <code>WakeupManager</code>,

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java Fri Jan 10 23:33:01 2014
@@ -190,14 +190,14 @@ final class ThreadPool implements Execut
                 if (t instanceof RuntimeException){
                     if (t instanceof SecurityException){
                         // ignore it will be logged.
-                    } else if (t instanceof InterruptedException) {
-                        // If we've caught an interrupt, we need to make sure it's
-                        // set so the while loop stops.
-                        Thread.currentThread().interrupt();
                     } else {
                         // Ignorance of RuntimeException is generally bad, bail out.
                         throw (RuntimeException) t;
                     }
+                } else if (t instanceof InterruptedException) {
+                    // If we've caught an interrupt, we need to make sure it's
+                    // set so the while loop stops.
+                    Thread.currentThread().interrupt();
                 }
             }
         }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/config/Configuration.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/config/Configuration.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/config/Configuration.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/config/Configuration.java Fri Jan 10 23:33:01 2014
@@ -51,6 +51,7 @@ public interface Configuration {
      * @see #getEntry(String, String, Class, Object, Object)
      */
     Object NO_DEFAULT = new Object() {
+        @Override
 	public String toString() { return "Configuration.NO_DEFAULT"; }
     };
 
@@ -61,6 +62,7 @@ public interface Configuration {
      * @see #getEntry(String, String, Class, Object, Object)
      */
     Object NO_DATA = new Object() {
+        @Override
 	public String toString() { return "Configuration.NO_DATA"; }
     };
 
@@ -85,6 +87,7 @@ public interface Configuration {
      * #getEntry(String, String, Class, Object, Object) getEntry}(component,
      * name, type, {@link #NO_DEFAULT}, {@link #NO_DATA})</code>.
      *
+     * @param <T> Object returned.
      * @param component the component being configured
      * @param name the name of the entry for the component
      * @param type the type of the object to be returned
@@ -106,7 +109,7 @@ public interface Configuration {
      * @throws NullPointerException if any argument is <code>null</code>
      * @see #getEntry(String, String, Class, Object)
      */
-    Object getEntry(String component, String name, Class type)
+    <T> T getEntry(String component, String name, Class<T> type)
 	throws ConfigurationException;
 
     /**
@@ -131,6 +134,7 @@ public interface Configuration {
      * #getEntry(String, String, Class, Object, Object) getEntry}(component,
      * name, type, defaultValue, {@link #NO_DATA})</code>.
      *
+     * @param <T>
      * @param component the component being configured
      * @param name the name of the entry for the component
      * @param type the type of the object to be returned
@@ -163,10 +167,10 @@ public interface Configuration {
      * <code>name</code>, or <code>type</code> is <code>null</code>
      * @see #getEntry(String, String, Class, Object, Object)
      */
-    Object getEntry(String component,
+    <T> T getEntry(String component,
 		    String name,
-		    Class type,
-		    Object defaultValue)
+		    Class<T> type,
+		    T defaultValue)
 	throws ConfigurationException;
 
     /**
@@ -188,6 +192,7 @@ public interface Configuration {
      * available for the given component. The value of <code>name</code> must
      * be an <i>Identifier</i>, as defined in the JLS.
      *
+     * @param <T>
      * @param component the component being configured
      * @param name the name of the entry for the component
      * @param type the type of the object to be returned
@@ -222,10 +227,10 @@ public interface Configuration {
      * @throws NullPointerException if <code>component</code>,
      * <code>name</code>, or <code>type</code> is <code>null</code>
      */
-    Object getEntry(String component,
+    <T> T getEntry(String component,
 		    String name,
-		    Class type,
-		    Object defaultValue,
+		    Class<T> type,
+		    T defaultValue,
 		    Object data)
 	throws ConfigurationException;
 }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java Fri Jan 10 23:33:01 2014
@@ -1372,19 +1372,19 @@ abstract class AbstractLookupDiscovery i
         DiscoveryConstraints multicastAnnouncementConstraints;
         InvocationConstraints rawUnicastDiscoveryConstraints;
         ExecutorService executor;
-        Integer multicastRequestMax;
-        Long multicastRequestInterval;
-        Long finalMulticastRequestInterval;
+        int multicastRequestMax;
+        long multicastRequestInterval;
+        long finalMulticastRequestInterval;
         String multicastRequestHost;
         NetworkInterface [] nics;
         int nicsToUse;
-        Integer nicRetryInterval;
-        Long multicastAnnouncementInterval;
-        Long unicastDelayRange;
+        int nicRetryInterval;
+        long multicastAnnouncementInterval;
+        long unicastDelayRange;
         List<Ticket> tickets;
         WakeupManager discoveryWakeupMgr;
         boolean isDefaultWakeupMgr;
-        Long initialMulticastRequestDelayRange;
+        long initialMulticastRequestDelayRange;
         
         private Initializer(Configuration config) throws ConfigurationException,
                 UnsupportedConstraintException, UnknownHostException, SocketException
@@ -1434,19 +1434,19 @@ abstract class AbstractLookupDiscovery i
 
             /* Multicast request-related configuration items */
             multicastRequestMax
-             = ( (Integer)config.getEntry
+             = ( config.getEntry
                                  (COMPONENT_NAME,
                                   "multicastRequestMax",
                                   int.class,
                                   Integer.valueOf(7) ) ).intValue();
             multicastRequestInterval
-             = ( (Long)config.getEntry
+             = ( config.getEntry
                                 (COMPONENT_NAME,
                                 "multicastRequestInterval",
                                 long.class,
                                 Long.valueOf(5000L) ) ).longValue();
             finalMulticastRequestInterval
-             = ( (Long)config.getEntry
+             = ( config.getEntry
                           (COMPONENT_NAME,
                            "finalMulticastRequestInterval",
                            long.class,
@@ -1454,7 +1454,7 @@ abstract class AbstractLookupDiscovery i
             String multicastRequestHost;
             try {
                 multicastRequestHost
-                 = (String) Config.getNonNullEntry(config,
+                 = Config.getNonNullEntry(config,
                                                    COMPONENT_NAME,
                                                    "multicastRequestHost",
                                                    String.class);
@@ -1503,14 +1503,14 @@ abstract class AbstractLookupDiscovery i
             this.nicsToUse = nicsToUse;
 
             nicRetryInterval
-             = ( (Integer)config.getEntry
+             = ( config.getEntry
                                     (COMPONENT_NAME,
                                      "multicastInterfaceRetryInterval",
                                      int.class,
                                      Integer.valueOf(5*60*1000) ) ).intValue();
             /* Multicast announcement-related configuration items */
             multicastAnnouncementInterval
-             = ( (Long)config.getEntry
+             = ( config.getEntry
                           (COMPONENT_NAME,
                            "multicastAnnouncementInterval",
                            long.class,

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Fri Jan 10 23:33:01 2014
@@ -54,6 +54,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -2529,7 +2530,7 @@ public class JoinManager {
         /* Task manager */
         TaskManager taskMgr;
         try {
-            taskMgr = (TaskManager)config.getEntry(COMPONENT_NAME,
+            taskMgr = config.getEntry(COMPONENT_NAME,
                                                    "taskManager",
                                                    TaskManager.class);
         } catch(NoSuchEntryException e) { /* use default */
@@ -2538,7 +2539,7 @@ public class JoinManager {
         /* Wakeup manager */
         WakeupManager wakeupMgr;
         try {
-            wakeupMgr = (WakeupManager)config.getEntry(COMPONENT_NAME,
+            wakeupMgr = config.getEntry(COMPONENT_NAME,
                                                        "wakeupManager",
                                                        WakeupManager.class);
         } catch(NoSuchEntryException e) { /* use default */
@@ -2546,7 +2547,7 @@ public class JoinManager {
                                     (new WakeupManager.ThreadDesc(null,true));
         }
         /* Max number of times to re-schedule tasks in thru wakeup manager */
-        Integer maxNRetries = ((Integer)config.getEntry
+        Integer maxNRetries = (config.getEntry
                                         (COMPONENT_NAME,
                                          "wakeupRetries",
                                          int.class,
@@ -2562,7 +2563,7 @@ public class JoinManager {
                 leaseMgr = new LeaseRenewalManager(config);
             }
         }//endif
-        Long renewalDuration = ((Long)config.getEntry
+        Long renewalDuration = (config.getEntry
                                       (COMPONENT_NAME,
                                        "maxLeaseDuration",
                                        long.class,
@@ -2578,7 +2579,7 @@ public class JoinManager {
 	if(discoveryMgr == null) {
 	    bCreateDiscMgr = true;
             try {
-                discoveryMgr = (DiscoveryManagement)config.getEntry
+                discoveryMgr = config.getEntry
                                                  (COMPONENT_NAME,
                                                   "discoveryManager",
                                                   DiscoveryManagement.class);

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1557283&r1=1557282&r2=1557283&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Fri Jan 10 23:33:01 2014
@@ -17,16 +17,18 @@
  */
 package net.jini.lookup;
 
+import au.net.zeus.collection.RC;
+import au.net.zeus.collection.Ref;
+import au.net.zeus.collection.Referrer;
 import com.sun.jini.logging.Levels;
 import com.sun.jini.lookup.entry.LookupAttributes;
 import com.sun.jini.proxy.BasicProxyTrustVerifier;
-import com.sun.jini.thread.TaskManager;
-import com.sun.jini.thread.TaskManager.Task;
 import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.ExportException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -35,10 +37,17 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -73,6 +82,9 @@ import net.jini.security.BasicProxyPrepa
 import net.jini.security.ProxyPreparer;
 import net.jini.security.TrustVerifier;
 import net.jini.security.proxytrust.ServerProxyTrust;
+import org.apache.river.impl.thread.ExtensibleExecutorService;
+import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
+import org.apache.river.impl.thread.NamedThreadFactory;
 
 /**
  * The <code>ServiceDiscoveryManager</code> class is a helper utility class
@@ -604,7 +616,7 @@ import net.jini.security.proxytrust.Serv
  * @see net.jini.core.lookup.ServiceRegistrar
  */
 public class ServiceDiscoveryManager {
-
+    
     /** Class for implementing register/lookup/notify/dropProxy/discard tasks*/
     private static abstract class CacheTask implements Runnable {
         protected final ProxyReg reg;
@@ -628,7 +640,7 @@ public class ServiceDiscoveryManager {
         public long getSeqN() {
             return thisTaskSeqN;
         }//end getSeqN
-
+        
 	public abstract void run();
     }//end class ServiceDiscoveryManager.CacheTask
 
@@ -636,13 +648,13 @@ public class ServiceDiscoveryManager {
      *  corresponding to a particular serviceID associated with a particular
      *  lookup service.
      */
-    private static abstract class ServiceIdTask extends CacheTask implements Task {
+    private static abstract class ServiceIdTask extends CacheTask {
         protected final ServiceID thisTaskSid;
         ServiceIdTask(ServiceID srvcId, ProxyReg reg, long seqN) {
             super(reg, seqN);
             this.thisTaskSid = srvcId;
         }//end constructor
-
+        
         /** Returns true if the current instance of this task must be run
          *  after at least one task in task manager queue.
          *
@@ -657,20 +669,20 @@ public class ServiceDiscoveryManager {
          *  @param tasks the tasks to consider.
          *  @param size elements with index less than size are considered.
          */
-        public boolean runAfter(List tasks, int size) {
-            for(int i=0; i<size; i++) {
-                Runnable t = (Runnable) tasks.get(i);
-                //Compare only instances of this task class
-                if( !(t instanceof ServiceIdTask) )  continue;
-                ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
-                if( thisTaskSid.equals(otherTaskSid) ) {
-                    if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
-                        return true;//run this task after the other task
-                    }//endif
-                }//endif
-            }//end loop
-            return false;//run this task now
-        }//end runAfter
+//        public boolean runAfter(List tasks, int size) {
+//            for(int i=0; i<size; i++) {
+//                Runnable t = (Runnable) tasks.get(i);
+//                //Compare only instances of this task class
+//                if( !(t instanceof ServiceIdTask) )  continue;
+//                ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
+//                if( thisTaskSid.equals(otherTaskSid) ) {
+//                    if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
+//                        return true;//run this task after the other task
+//                    }//endif
+//                }//endif
+//            }//end loop
+//            return false;//run this task now
+//        }//end runAfter
 
         /** Returns the ServiceID associated with this task. */
         public ServiceID getServiceID() {
@@ -678,7 +690,7 @@ public class ServiceDiscoveryManager {
         }//end getServiceID
 
     }//end class ServiceIdTask
-
+       
     /** Class that defines the listener that will receive local events from
      *  the internal LookupCache used in the blocking versions of lookup().
      */
@@ -750,7 +762,7 @@ public class ServiceDiscoveryManager {
          */
 	public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) {
 	    this.proxy = proxy;
-	    addProxy(proxy, item);
+	    items.put(proxy, item);
 	    this.item = item;
 	}
 	/* Adds the given proxy to the 'proxy-to-item' map. This method is
@@ -881,13 +893,86 @@ public class ServiceDiscoveryManager {
         }
         
     }
+    
+    private static class DependencyLinker {
+
+        private final ExecutorService executor;
+
+        DependencyLinker(ExecutorService ex){
+            executor = new ExtensibleExecutorService
+            (
+                ex, 
+                new RunnableFutureFactory(){
+
+                    @Override
+                    public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
+                        return new FutureTaskSeqNo<T>(r, value);
+                    }
+
+                    @Override
+                    public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
+                        return new FutureTaskSeqNo<T>(c);
+                    }
+                }
+            );
+        }
+
+        void execute(CacheTask task)
+        {
+            executor.submit(task);
+        }
+    }
+    
+    /**
+     * 
+     * @param <T> 
+     */
+    public static final class FutureTaskSeqNo<T> extends FutureTask<T> 
+    {
+        private final long seqNo;
+
+        private FutureTaskSeqNo(Runnable r, T result) {
+            super(r, result);
+            if (r instanceof CacheTask) seqNo = ((CacheTask)r).getSeqN();
+            else seqNo = -1;
+        }
+
+        private FutureTaskSeqNo(Callable<T> c)
+        {
+            super(c);
+            if (c instanceof CacheTask) seqNo = ((CacheTask)c).getSeqN();
+            else seqNo = -1;
+        }
+
+        private long getSeqNo()
+        {
+            return seqNo;
+        }
+    }
 
+    /**
+     * Comparator for PriorityBlockingQueue used in cacheExecutorService
+     * provided by configuration.
+     */
+    public static final class FutureComparator implements Comparator<FutureTaskSeqNo> {
+
+        @Override
+        public int compare(FutureTaskSeqNo o1, FutureTaskSeqNo o2) {
+            long one = o1.getSeqNo();
+            long two = o2.getSeqNo();
+            if (one < two) return -1;
+            if (one > two) return 1;
+            return 0;
+        }
+
+    }
+    
     /** Internal implementation of the LookupCache interface. Instances of
      *  this class are used in the blocking versions of lookup() and are
      *  returned by createLookupCache.
      */
     private final class LookupCacheImpl implements LookupCache {
-
+        
 	/* RemoteEventListener class that is registered with the proxy to
          * receive notifications from lookup services when any ServiceItem
          * changes (NOMATCH_MATCH, MATCH_NOMATCH, MATCH_MATCH)
@@ -994,7 +1079,7 @@ public class ServiceDiscoveryManager {
 	}//end class LookupCacheImpl.RegisterListenerTask
 
 	/** This class requests a "snapshot" of the given registrar's state.*/
-        private final class LookupTask extends CacheTask implements Task {
+        private final class LookupTask extends CacheTask {
 	    private final EventReg eReg;
             public LookupTask(ProxyReg reg, long seqN, EventReg eReg) {
                 super(reg, seqN);
@@ -1038,7 +1123,7 @@ public class ServiceDiscoveryManager {
                                                           itemReg,
                                                           srvcID,
                                                           taskSeqN.getAndIncrement());
-                    cacheTaskMgr.add(t);
+                    cacheDependencyLinker.execute(t);
                 }//end loop
                 /* 2. Handle "new" and "old" items from the given lookup */
                 for(int i=0; i<(matches.items).length; i++) {
@@ -1049,7 +1134,7 @@ public class ServiceDiscoveryManager {
                                                           matches.items[i],
                                                           false,
                                                           taskSeqN.getAndIncrement());
-                    cacheTaskMgr.add(t);
+                    cacheDependencyLinker.execute(t);
                 }//end loop
                 /* 3. Handle events that came in prior to lookup */
                 synchronized (eReg){
@@ -1058,7 +1143,7 @@ public class ServiceDiscoveryManager {
                     while (it.hasNext()) {
                         NotifyEventTask t = (NotifyEventTask) it.next();
                         t.thisTaskSeqN = taskSeqN.getAndIncrement(); // assign new seqN
-                        cacheTaskMgr.add(t);
+                        cacheDependencyLinker.execute(t);
                     }
                     eReg.pending.clear();
                 }//end sync(eReg)
@@ -1084,21 +1169,21 @@ public class ServiceDiscoveryManager {
              *  @param tasks the tasks to consider.
              *  @param size elements with index less than size are considered.
              */
-            public boolean runAfter(List tasks, int size) {
-                for(int i=0; i<size; i++) {
-                    CacheTask t = (CacheTask)tasks.get(i);
-                    if(   t instanceof RegisterListenerTask
-                       || t instanceof LookupTask
-                       || t instanceof NotifyEventTask )
-                    {
-                        ProxyReg otherReg = t.getProxyReg();
-                        if( reg.equals(otherReg) ) {
-                            if(thisTaskSeqN > t.getSeqN()) return true;
-                        }//endif
-                    }//endif
-                }//end loop
-                return false;
-            }//end runAfter
+//            public boolean runAfter(List tasks, int size) {
+//                for(int i=0; i<size; i++) {
+//                    CacheTask t = (CacheTask)tasks.get(i);
+//                    if(   t instanceof RegisterListenerTask
+//                       || t instanceof LookupTask
+//                       || t instanceof NotifyEventTask )
+//                    {
+//                        ProxyReg otherReg = t.getProxyReg();
+//                        if( reg.equals(otherReg) ) {
+//                            if(thisTaskSeqN > t.getSeqN()) return true;
+//                        }//endif
+//                    }//endif
+//                }//end loop
+//                return false;
+//            }//end runAfter
 
 	}//end class LookupCacheImpl.LookupTask
 
@@ -1130,7 +1215,7 @@ public class ServiceDiscoveryManager {
                                                           itemReg,
                                                           srvcID,
                                                           taskSeqN.getAndIncrement());
-                    cacheTaskMgr.add(t);
+                    cacheDependencyLinker.execute(t);
                 }//end loop
                 logger.finest("ServiceDiscoveryManager - ProxyRegDropTask "
                               +"completed");
@@ -1230,20 +1315,20 @@ public class ServiceDiscoveryManager {
              *  @param tasks the tasks to consider.
              *  @param size elements with index less than size are considered.
              */
-            public boolean runAfter(List tasks, int size) {
-                for(int i=0; i<size; i++) {
-                    Runnable t = (Runnable)tasks.get(i);
-                    if(   t instanceof RegisterListenerTask
-                       || t instanceof LookupTask )
-                    {
-                        ProxyReg otherReg = ((CacheTask)t).getProxyReg();
-                        if( reg.equals(otherReg) ) {
-                            if(thisTaskSeqN > ((CacheTask)t).getSeqN()) return true;
-                        }//endif
-                    }//endif
-                }//end loop
-                return super.runAfter(tasks, size);
-            }//end runAfter
+//            public boolean runAfter(List tasks, int size) {
+//                for(int i=0; i<size; i++) {
+//                    Runnable t = (Runnable)tasks.get(i);
+//                    if(   t instanceof RegisterListenerTask
+//                       || t instanceof LookupTask )
+//                    {
+//                        ProxyReg otherReg = ((CacheTask)t).getProxyReg();
+//                        if( reg.equals(otherReg) ) {
+//                            if(thisTaskSeqN > ((CacheTask)t).getSeqN()) return true;
+//                        }//endif
+//                    }//endif
+//                }//end loop
+//                return super.runAfter(tasks, size);
+//            }//end runAfter
 
 	}//end class LookupCacheImpl.NotifyEventTask
 
@@ -1421,9 +1506,8 @@ public class ServiceDiscoveryManager {
             public void run() {
                 logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
                               +"started");
-		boolean changed = false;
-                ServiceItemReg itemReg;
-                itemReg = (ServiceItemReg)serviceIdMap.get(thisTaskSid);
+		boolean previouslyDiscovered = false;
+                ServiceItemReg itemReg = serviceIdMap.get(thisTaskSid);
                 if (itemReg == null) {
                     if( !eventRegMap.containsKey(reg) ) {
                         /* reg must have been discarded, simply return */
@@ -1431,17 +1515,18 @@ public class ServiceDiscoveryManager {
                                       +"NewOldServiceTask completed");
                         return;
                     }//endif
+                    // else
                     itemReg = new ServiceItemReg( reg.getProxy(), srvcItem );
                     ServiceItemReg existed = serviceIdMap.putIfAbsent( thisTaskSid, itemReg );
                     if (existed != null) {
                         itemReg = existed;
                         // Probably changed while we were stuffing around.
-                        changed = true;
+                        previouslyDiscovered = true;
                     }
                 } else {
-                    changed = true;
+                    previouslyDiscovered = true;
                 }
-                if(changed) {//a. old, previously discovered item
+                if(previouslyDiscovered) {//a. old, previously discovered item
                     itemMatchMatchChange(reg.getProxy(), srvcItem,
                                          itemReg, matchMatchEvent);
                 } else {//b. newly discovered item
@@ -1532,7 +1617,8 @@ public class ServiceDiscoveryManager {
 	/* Proxy to the listener that receives remote events from lookups */
 	private volatile RemoteEventListener lookupListenerProxy;
         /** Task manager for the various tasks executed by this LookupCache */
-        private volatile TaskManager cacheTaskMgr;
+        private volatile ExecutorService cacheTaskMgr;
+        private volatile DependencyLinker cacheDependencyLinker;
 	/* Flag that indicates if the LookupCache has been terminated. */
 	private volatile boolean bCacheTerminated = false;
 	/* Contains the ServiceDiscoveryListener's that receive local events */
@@ -1552,7 +1638,8 @@ public class ServiceDiscoveryManager {
 	 */
 	private final long startTime = System.currentTimeMillis();
         /** For tasks waiting on verification events after service discard */
-        private volatile TaskManager serviceDiscardTimerTaskMgr;
+        private volatile ExecutorService serviceDiscardTimerTaskMgr;
+        private final ConcurrentMap<ServiceID,Future> serviceDiscardFutures;
         /* Thread mutex used to interrupt all ServiceDiscardTimerTasks */
         private final Object serviceDiscardMutex = new Object();
         /** Whenever a ServiceIdTask is created in this cache, it is assigned
@@ -1569,6 +1656,7 @@ public class ServiceDiscoveryManager {
 			       ServiceDiscoveryListener sListener,
 			       long leaseDuration)     throws RemoteException
         {
+            this.serviceDiscardFutures = RC.concurrentMap(new ConcurrentHashMap<Referrer<ServiceID>,Referrer<Future>>(), Ref.WEAK_IDENTITY, Ref.STRONG, 60000, 60000);
 	    this.tmpl = copyServiceTemplate(tmpl);
 	    this.leaseDuration = leaseDuration;
 	    this.filter = filter;
@@ -1582,12 +1670,15 @@ public class ServiceDiscoveryManager {
                 if(bCacheTerminated) return;//allow for multiple terminations
                 bCacheTerminated = true;
             }//end sync
-            caches.remove(this);
+            synchronized (caches){
+                caches.remove(this);
+            }
             /* Terminate all tasks: first, terminate this cache's TaskManager*/
-            terminateTaskMgr(cacheTaskMgr);
+            cacheTaskMgr.shutdownNow();
+            cacheTaskMgr = null;
             /* Terminate ServiceDiscardTimerTasks running for this cache */
             synchronized(serviceDiscardMutex) {
-                terminateTaskMgr(serviceDiscardTimerTaskMgr);
+                serviceDiscardTimerTaskMgr.shutdownNow();
             }//end sync(serviceDiscardMutex)
             /* Cancel all event registration leases held by this cache. */
             Set set = eventRegMap.entrySet();
@@ -1665,9 +1756,10 @@ public class ServiceDiscoveryManager {
 		}//end sync(itemReg)
 		if(discardIt) {
 		    ServiceID sid = (ServiceID)e.getKey();
-		    serviceDiscardTimerTaskMgr.add
+		    Future f = serviceDiscardTimerTaskMgr.submit
                                      ( new ServiceDiscardTimerTask(sid) );
-		    cacheTaskMgr.add(new DiscardServiceTask(filteredItem));
+                    serviceDiscardFutures.put(sid, f);
+                    cacheDependencyLinker.execute(new DiscardServiceTask(filteredItem));
 		    return;
 		}//endif
 	    }//end loop
@@ -1771,7 +1863,7 @@ public class ServiceDiscoveryManager {
 	public void addProxyReg(ProxyReg reg) {
             RegisterListenerTask treg = 
                     new RegisterListenerTask(reg, taskSeqN.getAndIncrement());
-            cacheTaskMgr.add(treg);
+            cacheDependencyLinker.execute(treg);
 	}//end LookupCacheImpl.addProxyReg
 
 	/** Remove a ProxyReg from the lookupCache. Called by DiscMgrListener's
@@ -1792,8 +1884,7 @@ public class ServiceDiscoveryManager {
                 }
             }//endif
             t = new ProxyRegDropTask(reg, taskSeqN.getAndIncrement());
-	    removeUselessTask(reg);
-            cacheTaskMgr.add(t);
+            cacheDependencyLinker.execute(t);
 	}//end LookupCacheImpl.removeProxyReg
 
 	/* Throws IllegalStateException if this lookup cache has been
@@ -1908,38 +1999,10 @@ public class ServiceDiscoveryManager {
                         logger.log(Levels.HANDLED, msg, params);
                     }//endif
                 }//endif
-                cacheTaskMgr.add(t);
+                cacheDependencyLinker.execute(t);
             } //end sync(eReg)
 	}//end LookupCacheImpl.notifyServiceMap
 
-	/** Removes from the cache's task manager, all pending tasks
-         *  associated with the given ProxyReg. This method is called
-         *  when the given ProxyReg has been discarded.
-	 */
-	private void removeUselessTask(ProxyReg reg) {
-            List pendingTasks = cacheTaskMgr.getPending();
-            for(int i=0;i<pendingTasks.size();i++) {
-                CacheTask t = (CacheTask)pendingTasks.get(i);
-                if(t.isFromProxy(reg)) cacheTaskMgr.remove(t);
-            }//end loop
-	}//end LookupCacheImpl.removeUselessTask
-
-        /** For the given TaskManager, this method removes all pending and
-         *  active tasks.
-         */
-        private void terminateTaskMgr(TaskManager taskMgr) {
-            synchronized(taskMgr) {
-                /* Remove all pending tasks */
-                List pendingTasks = taskMgr.getPending();
-                for(int i=0;i<pendingTasks.size();i++) {
-                    taskMgr.remove((TaskManager.Task)pendingTasks.get(i));
-                }//end loop
-                /* Interrupt all active tasks, prepare the taskMgr for GC. */
-                taskMgr.terminate();
-                taskMgr = null;
-            }//end sync(taskMgr)
-        }//end LookupCacheImpl.terminateTaskMgr
-
 	/** Removes an entry from the serviceIdMap, and sends a notification.*/
 	private void removeServiceIdMap(ServiceID sid, ServiceItem item) {
             removeServiceIdMapSendNoEvent(sid);
@@ -2081,10 +2144,14 @@ public class ServiceDiscoveryManager {
                  * service proxy so the client always uses the old proxy
                  * (at least, until the version is changed).
                  */
-                newItem.service = oldItem.service;
-                /* Now compare attributes */
-                attrsChanged = !LookupAttributes.equal(newItem.attributeSets,
-                                                       oldItem.attributeSets);
+                synchronized(itemReg){
+                    newItem.service = oldItem.service;
+                    /* Now compare attributes */
+                    attrsChanged = 
+                            !LookupAttributes.equal(newItem.attributeSets,
+                                                    oldItem.attributeSets);
+                }
+                
                 if(!attrsChanged) return;//no change, no need to filter
             } else {//(!matchMatchEvent && !same version) ==> re-registration
                 versionChanged = true;
@@ -2220,13 +2287,25 @@ public class ServiceDiscoveryManager {
              * various tasks executed by this instance of the lookup cache.
              */
             try {
-                cacheTaskMgr = (TaskManager)thisConfig.getEntry
-                                                           (COMPONENT_NAME,
-                                                            "cacheTaskManager",
-                                                            TaskManager.class);
+                cacheTaskMgr = thisConfig.getEntry(COMPONENT_NAME,
+                                                        "cacheExecutorService",
+                                                        ExecutorService.class);
             } catch(ConfigurationException e) { /* use default */
-                cacheTaskMgr = new TaskManager(10,(15*1000),1.0f);
+                cacheTaskMgr =
+                        new ThreadPoolExecutor(
+                                1, 
+                                10, 
+                                15, 
+                                TimeUnit.SECONDS,
+                                new PriorityBlockingQueue(100, new FutureComparator()),
+                                new NamedThreadFactory(
+                                        "SDM lookup cache",
+                                        false
+                                )
+                        );
+                
             }
+            cacheDependencyLinker = new DependencyLinker(cacheTaskMgr);
             /* Get a special-purpose task manager for this cache from the
              * configuration. That task manager will be used to manage the
              * various instances of the special-purpose task, executed by
@@ -2235,13 +2314,23 @@ public class ServiceDiscoveryManager {
              */
             try {
                 serviceDiscardTimerTaskMgr
-                    = (TaskManager)thisConfig.getEntry
-                                                  (COMPONENT_NAME,
-                                                   "discardTaskManager",
-                                                   TaskManager.class);
+                    = thisConfig.getEntry(COMPONENT_NAME,
+                                           "discardExecutorService",
+                                           ExecutorService.class);
             } catch(ConfigurationException e) { /* use default */
-                serviceDiscardTimerTaskMgr = new TaskManager
-                                                         (10,(15*1000),1.0f);
+                serviceDiscardTimerTaskMgr = 
+//                        new TaskManager(10,(15*1000),1.0f);
+                        new ThreadPoolExecutor(
+                                1, 
+                                10,
+                                15,
+                                TimeUnit.SECONDS,
+                                new LinkedBlockingQueue<Runnable>(),
+                                new NamedThreadFactory(
+                                        "SDM discard timer",
+                                        false
+                                )
+                        );
             }
             // Moved here from constructor to avoid publishing this reference
             lookupListenerProxy = lookupListener.export();
@@ -2378,8 +2467,9 @@ public class ServiceDiscoveryManager {
                 }//endif
                 itemReg.setDiscarded(true);
             }//end sync(itemReg)
-            serviceDiscardTimerTaskMgr.add
+            Future f = serviceDiscardTimerTaskMgr.submit
                               ( new ServiceDiscardTimerTask(item.serviceID) );
+            serviceDiscardFutures.put(item.serviceID, f);
             if(sendEvent)  removeServiceNotify(oldFilteredItem);
         }//end LookupCacheImpl.discardRetryLater
 
@@ -2422,15 +2512,9 @@ public class ServiceDiscoveryManager {
 
 	/** Wake up service discard task if running, else remove from mgr. */
 	private void cancelDiscardTask(ServiceID sid) {
-	    Iterator iter = serviceDiscardTimerTaskMgr.getPending().iterator();
-	    while (iter.hasNext()) {
-		ServiceDiscardTimerTask t =
-		    (ServiceDiscardTimerTask)iter.next();
-		if (sid.equals(t.serviceID)) {
-		    if(serviceDiscardTimerTaskMgr.removeIfPending(t)) return;
-		    break;
-		}//endif
-	    }//end loop
+            // Might need to record future's and cancel from there.
+            Future task = serviceDiscardFutures.get(sid);
+            if (task != null) task.cancel(false);
 	    synchronized(serviceDiscardMutex) {
 		serviceDiscardMutex.notifyAll();
 	    }//end sync
@@ -2452,13 +2536,13 @@ public class ServiceDiscoveryManager {
     /* The LeaseRenewalManager to use (passed in, or create one). */
     private final LeaseRenewalManager leaseRenewalMgr;
     /* Contains all of the discovered lookup services (ServiceRegistrar). */
-    private final Set<ProxyReg> proxyRegSet = Collections.newSetFromMap(new ConcurrentHashMap<ProxyReg,Boolean>());
+    private final Set<ProxyReg> proxyRegSet;
     /* Contains all of the DiscoveryListener's employed in lookup discovery. */
-    private final List<DiscoveryListener> listeners = new CopyOnWriteArrayList<DiscoveryListener>();
+    private final List<DiscoveryListener> listeners;
     /* Random number generator for use in lookup. */
     private final Random random = new Random();
     /* Contains all of the instances of LookupCache that are requested. */
-    private final List<LookupCache> caches = new CopyOnWriteArrayList<LookupCache>();
+    private final List<LookupCache> caches;
 
     /* Flag to indicate if the ServiceDiscoveryManager has been terminated. */
     private volatile boolean bTerminated = false;
@@ -2486,7 +2570,6 @@ public class ServiceDiscoveryManager {
 	public void discovered(DiscoveryEvent e) {
 	    ServiceRegistrar[] proxys = e.getRegistrars();
 	    ArrayList<ProxyReg> newProxys = new ArrayList<ProxyReg>(1);
-	    ArrayList<DiscoveryListener> notifies  = null;
 	    for(int i=0; i<proxys.length; i++) {
                 /* Prepare each lookup service proxy before using it. */
                 try {
@@ -2505,14 +2588,16 @@ public class ServiceDiscoveryManager {
                     continue;
                 }
 		ProxyReg reg = new ProxyReg(proxys[i]);
-                proxyRegSet.add(reg);
-                newProxys.add(reg);
+                // Changed to only add to newProxys if actually new 7th Jan 2014
+                if (proxyRegSet.add(reg)) newProxys.add(reg);
 	    }//end loop
 	    Iterator<ProxyReg> iter = newProxys.iterator();
 	    while(iter.hasNext()) {
 		ProxyReg reg = iter.next();
 		cacheAddProxy(reg);
-		if(!listeners.isEmpty()) listenerDiscovered(reg.getProxy(), listeners);
+                synchronized (listeners){
+                    if(!listeners.isEmpty()) listenerDiscovered(reg.getProxy(), listeners);
+                }
 	    }//end loop
 	}//end DiscMgrListener.discovered
 
@@ -2536,7 +2621,9 @@ public class ServiceDiscoveryManager {
 		dropProxy(iter.next());
             }//end loop
             if (!drops.isEmpty()){
-                listenerDropped(drops, listeners);
+                synchronized (listeners){
+                    listenerDropped(drops, listeners);
+                }
             }
 	}//end DiscMgrListener.discarded
         
@@ -2549,20 +2636,24 @@ public class ServiceDiscoveryManager {
 
     /** Adds the given proxy to all the caches maintained by the SDM. */
     private void cacheAddProxy(ProxyReg reg) {
-        Iterator iter = caches.iterator();
-        while (iter.hasNext()) {
-            LookupCacheImpl cache = (LookupCacheImpl)iter.next();
-            cache.addProxyReg(reg);
-        }//end loop
+        synchronized (caches){
+            Iterator iter = caches.iterator();
+            while (iter.hasNext()) {
+                LookupCacheImpl cache = (LookupCacheImpl)iter.next();
+                cache.addProxyReg(reg);
+            }//end loop
+        }
     }//end cacheAddProxy
 
     /** Removes the given proxy from all the caches maintained by the SDM. */
     private void dropProxy(ProxyReg reg ) {
-        Iterator iter = caches.iterator();
-        while (iter.hasNext()) {
-            LookupCacheImpl cache= (LookupCacheImpl)iter.next();
-            cache.removeProxyReg(reg);
-        }//end loop
+        synchronized (caches){
+            Iterator iter = caches.iterator();
+            while (iter.hasNext()) {
+                LookupCacheImpl cache= (LookupCacheImpl)iter.next();
+                cache.removeProxyReg(reg);
+            }//end loop
+        }
     }//end dropProxy
 
     /**
@@ -2810,6 +2901,9 @@ public class ServiceDiscoveryManager {
     }//end constructor
     
     private ServiceDiscoveryManager(Initializer init){
+        this.proxyRegSet = Collections.newSetFromMap(new ConcurrentHashMap<ProxyReg,Boolean>());
+        this.caches = new ArrayList<LookupCache>(32);
+        this.listeners = new ArrayList<DiscoveryListener>(32);
         thisConfig = init.thisConfig;
         registrarPreparer = init.registrarPreparer;
         eventLeasePreparer = init.eventLeasePreparer;
@@ -3230,7 +3324,11 @@ public class ServiceDiscoveryManager {
 	}//end sync
         terminator.interrupt();
         /* Terminate all caches: cancel event leases, un-export listeners */
-        Iterator iter = caches.iterator();
+        List<LookupCache> terminate;
+        synchronized (caches){
+            terminate = new ArrayList(caches);
+        }
+        Iterator iter = terminate.iterator();
         while (iter.hasNext()) {
             LookupCacheImpl cache = (LookupCacheImpl)iter.next();
             cache.terminate();
@@ -3616,7 +3714,9 @@ public class ServiceDiscoveryManager {
 	if(tmpl == null) tmpl = new ServiceTemplate(null, null, null);
 	LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration);
         cache.initCache();
-        caches.add(cache);
+        synchronized (caches){
+            caches.add(cache);
+        }
         logger.finest("ServiceDiscoveryManager - LookupCache created");
 	return cache;
     }//end createLookupCache
@@ -3810,12 +3910,16 @@ public class ServiceDiscoveryManager {
         Long discardWait = Long.valueOf( 2*(5*60*1000));
         DiscoveryManagement discMgr;
         boolean discMgrInternal;
-        DiscMgrListener discMgrListener;
     }
     
-    private static Initializer initial(DiscoveryManagement discoveryMgr, LeaseRenewalManager leaseMgr, Configuration config) throws IOException {
+    private static Initializer initial(
+            DiscoveryManagement discoveryMgr,
+            LeaseRenewalManager leaseMgr,
+            Configuration config)
+            throws IOException 
+    {
         try {
-            return init(discoveryMgr, leaseMgr, EmptyConfiguration.INSTANCE);
+            return init(discoveryMgr, leaseMgr, config);
         } catch(ConfigurationException e) { 
             /* This should never happen */
             throw new IOException(e);

Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java?rev=1557283&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java Fri Jan 10 23:33:01 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.api.util;
+
+import java.util.EventListener;
+import java.util.concurrent.Future;
+
+/**
+ *
+ * @author peter
+ */
+public interface FutureObserver<T> extends EventListener {
+    
+    public void futureCompleted(Future<T> e);
+    
+    public interface ObservableFuture<T> extends Future<T> {
+        /**
+         * Adds FutureObserver's to this ObservableFuture.
+         * 
+         * @param observer to observe this.
+         * @return true if observer was added, false otherwise.
+         */
+        public boolean addObserver(FutureObserver<T> observer);
+    }
+    
+}

Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java?rev=1557283&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java Fri Jan 10 23:33:01 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.ObservableFuture;
+
+/**
+ *
+ * @author peter
+ */
+public class ObservableFutureTask<T> extends FutureTask<T> implements ObservableFuture<T>{
+    public final List<FutureObserver<T>> listeners;
+    public volatile boolean done;
+
+    public ObservableFutureTask(Callable<T> callable) {
+        super(callable);
+        listeners = new LinkedList<FutureObserver<T>>();
+    }
+    
+    public ObservableFutureTask(Runnable r, T result){
+        super(r,result);
+        listeners = new LinkedList<FutureObserver<T>>();
+    }
+    
+    protected void done() {
+        done = true;
+        synchronized (listeners){
+            Iterator<FutureObserver<T>> it = listeners.iterator();
+            while (it.hasNext()){
+                it.next().futureCompleted(this);
+            }
+        }
+    }
+
+    @Override
+    public boolean addObserver(FutureObserver<T> l) {
+        if (l == null) throw new NullPointerException("Null Listener");
+        if (done){ 
+            l.futureCompleted(this);
+            return false;
+        }
+        synchronized (listeners){
+            return listeners.add(l);
+        }
+    }
+}



Mime
View raw message