river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1554723 [4/5] - in /river/jtsk/skunk/qa_refactor/trunk: qa/src/com/sun/jini/qa/harness/ qa/src/com/sun/jini/test/impl/mahalo/ qa/src/com/sun/jini/test/impl/reggie/ qa/src/com/sun/jini/test/resources/ qa/src/com/sun/jini/test/share/ qa/src/...
Date Thu, 02 Jan 2014 02:45:09 GMT
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java?rev=1554723&r1=1554722&r2=1554723&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java Thu Jan  2 02:45:07 2014
@@ -26,19 +26,29 @@ import com.sun.jini.discovery.internal.M
 import com.sun.jini.logging.Levels;
 import com.sun.jini.logging.LogUtil;
 import com.sun.jini.thread.RetryTask;
-import com.sun.jini.thread.TaskManager;
 import com.sun.jini.thread.WakeupManager;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import net.jini.config.Configuration;
@@ -52,6 +62,7 @@ import net.jini.core.discovery.LookupLoc
 import net.jini.core.lookup.ServiceRegistrar;
 import net.jini.security.BasicProxyPreparer;
 import net.jini.security.ProxyPreparer;
+import org.apache.river.impl.thread.NamedThreadFactory;
 
 /**
  * This package private superclass of LookupLocatorDiscovery exists for 
@@ -88,16 +99,17 @@ abstract class AbstractLookupLocatorDisc
 	}
     }
 
-    /** Task manager for the discovery tasks. On the first attempt to
+    /** ExecutorService for the discovery tasks. On the first attempt to
      *  discover each locator, the tasks used to perform those discoveries
-     *  are managed by this <code>TaskManager</code> so that the number of
+     *  are managed by this <code>ExecutorService</code> so that the number of
      *  concurrent threads can be bounded. If one or more of those attempts
      *  fails, a <code>WakeupManager</code> is used (through the use of a
      *  <code>RetryTask</code>) to schedule - at a later time (employing a
      *  "backoff strategy") - the re-execution of each failed task in this
-     *  <code>TaskManager</code>.
+     *  <code>ExecutorService</code>.
      */
-    private final TaskManager discoveryTaskMgr;
+    private final ExecutorService discoveryExecutor;
+    private final Set<RetryTask> discoveryTasks;
     /** Wakeup manager for the discovery tasks. For any locator, after
      *  an initial failure to discover the locator, the task used to
      *  perform all future discovery attempts is managed by this
@@ -111,17 +123,17 @@ abstract class AbstractLookupLocatorDisc
      */
     private final WakeupManager discoveryWakeupMgr;
     /** Stores LookupLocators that have not been discovered yet. */ 
-    private final HashSet undiscoveredLocators = new HashSet(11); // sync(this)
+    private final Set<LocatorReg> undiscoveredLocators = Collections.newSetFromMap(new ConcurrentHashMap<LocatorReg,Boolean>());
     /** Stores LookupLocators that have been discovered */
-    private final ArrayList discoveredLocators = new ArrayList(11); // sync(this)
+    private final List<LocatorReg> discoveredLocators = new CopyOnWriteArrayList<LocatorReg>();
     /** Thread that handles pending notifications. */
-    private Notifier notifierThread; // sync(pendingNotifies)
+    private final Notifier notifierThread = new Notifier();
     /** Notifications to be sent to listeners. */
-    private final LinkedList pendingNotifies = new LinkedList(); // sync(pendingNotifies)
+    private final BlockingDeque<NotifyTask> pendingNotifies = new LinkedBlockingDeque<NotifyTask>();
     /** Stores DiscoveryListeners **/
-    private final ArrayList listeners = new ArrayList(1); // sync(this)
+    private final List<DiscoveryListener> listeners = new CopyOnWriteArrayList<DiscoveryListener>(); // sync(this) only for add and remove to avoid duplicates.
     /** Flag indicating whether or not this class is still functional. */
-    private boolean terminated = false; // sync(this)
+    private volatile boolean terminated = false; // sync(this)
     /* Preparer for the proxies to the lookup services that are discovered
      * and used by this utility.
      */
@@ -130,7 +142,7 @@ abstract class AbstractLookupLocatorDisc
      * protocol.
      * TODO: investigate why this field isn't used.
      */
-    private Discovery protocol2 = Discovery.getProtocol2(null);
+    private final Discovery protocol2 = Discovery.getProtocol2(null);
     /* 
      * Controls how long to wait before attempting unicast discovery, on
      * startup.
@@ -140,7 +152,7 @@ abstract class AbstractLookupLocatorDisc
      * Flag which indicates if discoverLocators was called during
      * initialUnicastDelayRange delay.
      */
-    private boolean discoverLocatorsCalled = false; // sync(this)
+    private volatile boolean discoverLocatorsCalled = false;
     /** Wrapper class in which each instance corresponds to a lookup service
      *  to discover via unicast discovery.
      */
@@ -317,16 +329,22 @@ abstract class AbstractLookupLocatorDisc
 			System.currentTimeMillis() + MIN_RETRY,
 		    new Runnable() {
 			public void run() {
-			    discoveryTaskMgr.add
-			     (new DiscoveryTask(LocatorReg.this,
-				discoveryTaskMgr, discoveryWakeupMgr));
+                            DiscoveryTask task = new DiscoveryTask(LocatorReg.this,
+				discoveryExecutor, discoveryWakeupMgr);
+			    discoveryExecutor.submit(task);
+                            synchronized (discoveryTasks){
+                                discoveryTasks.add(task);
+                            }
 			}
 		    }
 		);
 	    } else {
-		discoveryTaskMgr.add
-                 (new DiscoveryTask(this,
-			discoveryTaskMgr, discoveryWakeupMgr));
+                DiscoveryTask task = new DiscoveryTask(this,
+			discoveryExecutor, discoveryWakeupMgr);
+		discoveryExecutor.submit(task);
+                synchronized (discoveryTasks){
+                    discoveryTasks.add(task);
+                }
 	    }
 	}
 	
@@ -359,12 +377,12 @@ abstract class AbstractLookupLocatorDisc
     /** Data structure containing task data processed by the Notifier Thread */
     private static class NotifyTask {
 	/** The listeners to notify */
-	public final ArrayList listeners;
+	public final List listeners;
 	/** Map of discovered registrars to groups in which each is a member */
 	public final Map groupsMap;
 	/** True if discarded, else discovered */
 	public final boolean discard;
-	public NotifyTask(ArrayList listeners,
+	public NotifyTask(List listeners,
                           Map groupsMap,
 			  boolean discard)
 	{
@@ -381,23 +399,28 @@ abstract class AbstractLookupLocatorDisc
      *  Only 1 instance of this thread is run.
      */
     private class Notifier extends Thread {
+        // In case client code catches and resets interrupt.
+        private volatile boolean interrupted = false;
 	/** Construct a daemon thread */
 	public Notifier() {
 	    super("event notifier");
 	    setDaemon(true);
 	}//end constructor
+        
+        public void interrupt(){
+            super.interrupt();
+            interrupted = true;
+        }
 
 	public void run() {
             logger.finest("LookupLocatorDiscovery - Notifier thread started");
-	    while (true) {
+	    while (!interrupted) {
 		NotifyTask task;
-		synchronized (pendingNotifies) {
-		    if (pendingNotifies.isEmpty()) {
-			notifierThread = null;
-			return;
-		    }//endif
-		    task = (NotifyTask)pendingNotifies.removeFirst();
-		}//end sync(pendingNotifies)
+		try {
+                    task = pendingNotifies.takeFirst(); // Blocks waiting for food.
+                } catch (InterruptedException ex) {
+                    return;
+                }
                 boolean firstListener = true;
 		for(Iterator iter = task.listeners.iterator();iter.hasNext();){
 		    DiscoveryListener l = (DiscoveryListener)iter.next();
@@ -409,23 +432,26 @@ abstract class AbstractLookupLocatorDisc
                         String eType = (task.discard ? 
                                                     "discarded":"discovered");
                         ServiceRegistrar[] regs = e.getRegistrars();
-                        logger.finest(eType+" event  -- "+regs.length
-                                                         +" lookup(s)");
+                        int len = regs.length;
+                        logger.log(Level.FINEST, "{0} event  -- {1} lookup(s)",
+                                new Object[]{eType, len});
                         Map groupsMap = e.getGroups();
-                        for(int i=0;i<regs.length;i++) {
+                        for(int i=0;i<len;i++) {
                             LookupLocator loc = null;
                             try {
                                 loc = regs[i].getLocator();
                             } catch (Throwable ex) { /* ignore */ }
                             String[] groups = (String[])groupsMap.get(regs[i]);
-                            logger.finest("    "+eType+" locator  = "+loc);
+                            logger.log(Level.FINEST, "    {0} locator  = {1}", 
+                                    new Object[]{eType, loc});
                             if(groups.length == 0) {
-                                logger.finest("    "+eType
-                                              +" group    = NO_GROUPS");
+                                logger.log(Level.FINEST,
+                                        "    {0} group    = NO_GROUPS", eType);
                             } else {
                                 for(int j=0;j<groups.length;j++) {
-                                    logger.finest("    "+eType+" group["+j+"] "
-                                                  +"= "+groups[j]);
+                                    logger.log(Level.FINEST,
+                                            "    {0} group[{1}] = {2}",
+                                            new Object[]{eType, j, groups[j]});
                                 }//end loop
                             }//endif(groups.length)
                         }//end loop
@@ -459,10 +485,10 @@ abstract class AbstractLookupLocatorDisc
     private class DiscoveryTask extends RetryTask {
         private final LocatorReg reg;
         public DiscoveryTask(LocatorReg reg,
-                             TaskManager taskMgr,
+                             ExecutorService executor,
                              WakeupManager wakeupMgr)
         {
-            super(taskMgr,wakeupMgr);
+            super(executor,wakeupMgr);
             this.reg = reg;
 	}//end constructor
 
@@ -479,45 +505,38 @@ abstract class AbstractLookupLocatorDisc
          */
         public boolean tryOnce() {
             logger.finest("LookupLocatorDiscovery - DiscoveryTask started");
-            synchronized(AbstractLookupLocatorDiscovery.this) {
-		if (terminated) {
-		    return true;
-		}
-                /* Locators may have been removed (ex. removeLocators or
-                 * setLocators) between the time they were added to the map,
-                 * and the time this task is finally executed. Determine if
-                 * this task should continue.
-                 */
-                if( undiscoveredLocators.isEmpty() ) {
-                    logger.finest("LookupLocatorDiscovery - DiscoveryTask "
-                                  +"completed");
-                    return true;//true ==> done. Don't queue retry.
-                }//endif
-                if(!undiscoveredLocators.contains(reg)) {
-                    logger.finest("LookupLocatorDiscovery - DiscoveryTask "
-                                  +"completed");
-                    return true;//already removed, true ==> don't queue retry
-                }
-            }//end sync(LookupLocatorDiscovery.this)
+            if (terminated) return true;
+            /* Locators may have been removed (ex. removeLocators or
+             * setLocators) between the time they were added to the map,
+             * and the time this task is finally executed. Determine if
+             * this task should continue.
+             */
+            if( undiscoveredLocators.isEmpty() ) {
+                logger.finest("LookupLocatorDiscovery - DiscoveryTask "
+                              +"completed");
+                return true;//true ==> done. Don't queue retry.
+            }//endif
+            if(!undiscoveredLocators.contains(reg)) {
+                logger.finest("LookupLocatorDiscovery - DiscoveryTask "
+                              +"completed");
+                return true;//already removed, true ==> don't queue retry
+            }
             /* Use the unicast discovery protocol to perform the actual
              * discovery. Note that since this process involves remote,
              * interprocess (socket) communication, it is important that
              * this processing be performed outside of the sync block.
              */
             boolean noRetry = regTryGetProxy(reg);//t -> done, f -> queue retry
-	    synchronized (AbstractLookupLocatorDiscovery.this) {
-		if (terminated) {
-		    return true;
-		}
-		if(noRetry) {
-		    logger.finest("LookupLocatorDiscovery - DiscoveryTask "
-				  +"completed");
-		} else {
-		    logger.finest("LookupLocatorDiscovery - DiscoveryTask "
-				  +"failed, will retry later");
-		}//endif
-		return noRetry;
-	    }
+            if (terminated) return true;
+            
+            if(noRetry) {
+                logger.finest("LookupLocatorDiscovery - DiscoveryTask "
+                              +"completed");
+            } else {
+                logger.finest("LookupLocatorDiscovery - DiscoveryTask "
+                              +"failed, will retry later");
+            }//endif
+            return noRetry;
 
 	}//end tryOnce
 
@@ -531,14 +550,6 @@ abstract class AbstractLookupLocatorDisc
             return reg.getNextTryTime();
         }//end retryTime
 
-        /** Returns true if current instance must be run after task(s) in
-         *  task manager queue.
-         *  @param tasks the tasks to consider.
-         *  @param size  elements with index less than size are considered.
-         */
-        public boolean runAfter(java.util.List tasks, int size) {
-            return false;
-        }//end runAfter
     }//end class DiscoveryTask
 
     /**
@@ -575,7 +586,8 @@ abstract class AbstractLookupLocatorDisc
     
     private AbstractLookupLocatorDiscovery(Initializer init){
         registrarPreparer = init.registrarPreparer;
-        discoveryTaskMgr = init.discoveryTaskMgr;
+        discoveryExecutor = init.discoveryTaskMgr;
+        discoveryTasks = Collections.newSetFromMap(new WeakHashMap<RetryTask,Boolean>());
         discoveryWakeupMgr = init.discoveryWakeupMgr;
         initialUnicastDelayRange = init.initialUnicastDelayRange.longValue();
     }
@@ -603,24 +615,21 @@ abstract class AbstractLookupLocatorDisc
         if(l == null) {
             throw new NullPointerException("can't add null listener");
         }
-	synchronized(this) {
-            if (terminated) {
-                throw new IllegalStateException("discovery terminated");
-            }
-	    if(listeners.contains(l)) return; //already have this listener
-	    listeners.add(l);
-	    if(!discoveredLocators.isEmpty()) {
-                HashMap groupsMap = new HashMap(discoveredLocators.size());
-                Iterator iter = discoveredLocators.iterator();
-                for (int i = 0; iter.hasNext(); i++) {
-                    LocatorReg reg = (LocatorReg)iter.next();
-                    groupsMap.put(reg.getProxy(), reg.getMemberGroups());
-                }//end loop
-		ArrayList list = new ArrayList(1);
-		list.add(l);
-                addNotify(list, groupsMap, false);
-	    }//endif
-	}//end sync
+	
+        if (terminated) throw new IllegalStateException("discovery terminated");
+        synchronized (this){ // sync to make atomic with remove
+            if(listeners.contains(l)) return; //already have this listener
+            listeners.add(l); // Small window where it's possible to add duplicates.
+        }
+        Map groupsMap = new HashMap(discoveredLocators.size());
+        Iterator<LocatorReg> iter = discoveredLocators.iterator();
+        while (iter.hasNext()) {
+            LocatorReg reg = iter.next();
+            groupsMap.put(reg.getProxy(), reg.getMemberGroups());
+        }//end loop
+        List list = new ArrayList(1);
+        list.add(l);
+        if (!groupsMap.isEmpty()) addNotify(list, groupsMap, false);
     }//end addDiscoveryListener
 
     /**
@@ -636,12 +645,11 @@ abstract class AbstractLookupLocatorDisc
      * 
      * @see #addDiscoveryListener
      */
-    public synchronized void removeDiscoveryListener(DiscoveryListener l) {
-        if (terminated) {
-            throw new IllegalStateException("discovery terminated");
+    public void removeDiscoveryListener(DiscoveryListener l) {
+        if (terminated) throw new IllegalStateException("discovery terminated");
+        synchronized (this){ // sync to avoid duplicates, makes add atomic
+            listeners.remove(l);
         }
-	int index = listeners.indexOf(l);
-	if(index != -1)  listeners.remove(index);
     }//end removeDiscoveryListener
 
     /**
@@ -661,15 +669,9 @@ abstract class AbstractLookupLocatorDisc
      * @see net.jini.discovery.DiscoveryManagement#removeDiscoveryListener
      */
     public ServiceRegistrar[] getRegistrars() {
-        synchronized(this) {
-            if (terminated) {
-                throw new IllegalStateException("discovery terminated");
-            }
-            if((discoveredLocators == null) || (discoveredLocators.isEmpty())){
-                return new ServiceRegistrar[0];
-            }
-            return buildServiceRegistrar();
-        }//end sync(this)
+        if (terminated) throw new IllegalStateException("discovery terminated");
+        if(discoveredLocators.isEmpty()) return new ServiceRegistrar[0];
+        return buildServiceRegistrar();
     }//end getRegistrars
 
     /**
@@ -691,17 +693,17 @@ abstract class AbstractLookupLocatorDisc
      * @see net.jini.discovery.DiscoveryManagement#discard
      */
     public void discard(ServiceRegistrar proxy) {
-	synchronized(this) {
-            if (terminated) {
-                throw new IllegalStateException("discovery terminated");
-            }
-            if(proxy == null) return;
-	    LookupLocator lct = findRegFromProxy(proxy);
-	    if(lct == null) return;
-            /* Remove locator from the set of already-discovered locators */
-	    LocatorReg reg = removeDiscoveredLocator(lct);
+        if (terminated) {
+            throw new IllegalStateException("discovery terminated");
+        }
+        if(proxy == null) return;
+        LookupLocator lct = findRegFromProxy(proxy);
+        if(lct == null) return;
+        /* Remove locator from the set of already-discovered locators */
+        LocatorReg reg = removeDiscoveredLocator(lct);
+        if (reg != null){
             /* Prepare the information for the discarded event */
-            HashMap groupsMap = new HashMap(1);
+            Map groupsMap = new HashMap(1);
             groupsMap.put(reg.getProxy(), reg.getMemberGroups());
             /* Prepare the discarded locatorReg for re-discovery */
             synchronized (reg){
@@ -709,12 +711,12 @@ abstract class AbstractLookupLocatorDisc
                 reg.memberGroups = null;
                 reg.delayNextTryTime();
             }
-	    addToMap(reg);//put discarded reg back in the not-discovered map
+            addAndQueueDiscoveryTaskIfAbsent(reg);//put discarded reg back in the not-discovered map
             /* Send a discarded event to all registered listeners */
-	    if(!listeners.isEmpty()) {
-                addNotify((ArrayList)listeners.clone(), groupsMap, true);
-	    }//endif
-	}//end sync(this)
+            if(!listeners.isEmpty()) {
+                addNotify(listeners, groupsMap, true);
+            }//endif
+        }
     }//end discard
 
     /**
@@ -727,13 +729,23 @@ abstract class AbstractLookupLocatorDisc
      *
      * @see net.jini.discovery.DiscoveryManagement#terminate
      */
-    public synchronized void terminate() {
-        if(terminated) return;
-        terminated = true;
+    public void terminate() {
+        synchronized (this){
+            if(terminated) return;
+            terminated = true;
+        }
         terminateTaskMgr();
-        synchronized(pendingNotifies) {
-            pendingNotifies.clear();
-        }//end sync
+        boolean interrupted = false;
+        // Don't leave DiscoveryListener's hanging.
+        while (!pendingNotifies.isEmpty()) {
+            try {
+                Thread.sleep(1000L);
+            } catch (InterruptedException ex) {
+                interrupted = true;
+            }
+        }
+        notifierThread.interrupt();
+        if (interrupted) Thread.currentThread().interrupt();
     }//end terminate
 
     /**
@@ -757,27 +769,23 @@ abstract class AbstractLookupLocatorDisc
      * @see net.jini.discovery.DiscoveryLocatorManagement#getLocators
      * @see #setLocators
      */
-    public synchronized LookupLocator[] getLocators() {
-        if (terminated) {
-            throw new IllegalStateException("discovery terminated");
-        }
+    public LookupLocator[] getLocators() {
+        if (terminated) throw new IllegalStateException("discovery terminated");
         /* Includes the set of already-discovered lookup services and
          * the set of not-yet-discovered lookup services.
          */
-        int size = discoveredLocators.size() + undiscoveredLocators.size();
-	LookupLocator[] ret = new LookupLocator[size];
+        List<LookupLocator> locators = new LinkedList<LookupLocator>();
         /* Retrieve the locators of the already-discovered lookup services */
-	int k = 0;
 	Iterator iter = discoveredLocators.iterator();
 	while(iter.hasNext()) {
-	    ret[k++] = ((LocatorReg)iter.next()).l;
+	    locators.add(((LocatorReg)iter.next()).l);
         }//end loop
         /* Append the locators of the not-yet-discovered lookup services */
         iter = undiscoveredLocators.iterator();
 	while(iter.hasNext()) {
-	    ret[k++] = ((LocatorReg)iter.next()).l;
+	    locators.add(((LocatorReg)iter.next()).l);
         }//end loop
-	return ret;
+	return locators.toArray(new LookupLocator[locators.size()]);
     }//end getLocators
 
     /**
@@ -801,7 +809,7 @@ abstract class AbstractLookupLocatorDisc
      * @see net.jini.discovery.DiscoveryLocatorManagement#addLocators
      * @see #removeLocators
      */
-    public synchronized void addLocators(LookupLocator[] locators) {
+    public void addLocators(LookupLocator[] locators) {
         testSetForNull(locators);
         if (terminated) {
             throw new IllegalStateException("discovery terminated");
@@ -841,61 +849,60 @@ abstract class AbstractLookupLocatorDisc
      */
     public void setLocators(LookupLocator[] locators) {
         testSetForNull(locators);
-	synchronized(this) {
-            if (terminated) {
-                throw new IllegalStateException("discovery terminated");
-            }
-            HashMap groupsMap = new HashMap(1);
-            /* From the set of already-discovered locators, remove each 
-             * element that is NOT in the input set of locators.
-             */
-	    Iterator iter = discoveredLocators.iterator();
-	    while(iter.hasNext()) {
-		LocatorReg reg = (LocatorReg)iter.next();
-		if(!isArrayContains(locators, reg.l)) {
-		    iter.remove();
-                    groupsMap.put(reg.getProxy(), reg.getMemberGroups());
-		}//endif
-	    }//end loop
-            /* From the set of yet-to-be-discovered locators, remove each 
-             * element that is NOT in the input set of replacement locators.
-             * 
-             * Note that if the discovery task is currently attempting to
-             * discover a locator from this set, and if that locator is not
-             * contained in the given input set of replacement locators (that
-             * is, it is no longer desired that that locator be discovered),
-             * then the discovery task, when it completes (either successfully
-             * or un-successfully) the attempt to discover that locator, will
-             * end all discovery processing with respect to the affected
-             * locator.
-             *
-             * To inform the discovery task -- upon its return from the
-             * unicast discovery process -- of the desire to terminate all
-             * discovery processing for that particular locator, the element
-             * in the set of undiscoveredLocators that corresponds to that
-             * locator is removed. This means that if the discovery attempt
-             * failed, the locator will no longer be considered one of the
-             * yet-to-be-discovered locators; and if the attempt succeeded,
-             * prevents the locator from being placed in the set of 
-             * already-discovered locators. It also prevents any discarded
-             * or discovered events from being sent.
-             */
-            iter = undiscoveredLocators.iterator();
-	    while(iter.hasNext()) {
-		LocatorReg reg = (LocatorReg)iter.next();
-		if(!isArrayContains(locators, reg.l))  {
-                    iter.remove();
-                }//endif
-	    }//end loop
-            /* Initiate discovery process for any new, un-discovered locators*/
-	    discoverLocators(locators);
-            /* Send a discarded event to all registered listeners for any
-             * locators that were removed by this method.
-             */
-	    if(!groupsMap.isEmpty() && !listeners.isEmpty()) {
-                addNotify((ArrayList)listeners.clone(), groupsMap, true);
+        if (terminated) throw new IllegalStateException("discovery terminated");
+
+        Map groupsMap = new HashMap(1);
+        /* From the set of already-discovered locators, remove each 
+         * element that is NOT in the input set of locators.
+         */
+        Iterator<LocatorReg> iter = discoveredLocators.iterator();
+        List<LocatorReg> remove = new LinkedList<LocatorReg>();
+        while(iter.hasNext()) {
+            LocatorReg reg = iter.next();
+            if(!isArrayContains(locators, reg.l)) {
+                remove.add(reg);
+                groupsMap.put(reg.getProxy(), reg.getMemberGroups());
+            }//endif
+        }//end loop
+        discoveredLocators.removeAll(remove);
+        /* From the set of yet-to-be-discovered locators, remove each 
+         * element that is NOT in the input set of replacement locators.
+         * 
+         * Note that if the discovery task is currently attempting to
+         * discover a locator from this set, and if that locator is not
+         * contained in the given input set of replacement locators (that
+         * is, it is no longer desired that that locator be discovered),
+         * then the discovery task, when it completes (either successfully
+         * or un-successfully) the attempt to discover that locator, will
+         * end all discovery processing with respect to the affected
+         * locator.
+         *
+         * To inform the discovery task -- upon its return from the
+         * unicast discovery process -- of the desire to terminate all
+         * discovery processing for that particular locator, the element
+         * in the set of undiscoveredLocators that corresponds to that
+         * locator is removed. This means that if the discovery attempt
+         * failed, the locator will no longer be considered one of the
+         * yet-to-be-discovered locators; and if the attempt succeeded,
+         * prevents the locator from being placed in the set of 
+         * already-discovered locators. It also prevents any discarded
+         * or discovered events from being sent.
+         */
+        iter = undiscoveredLocators.iterator();
+        while(iter.hasNext()) {
+            LocatorReg reg = (LocatorReg)iter.next();
+            if(!isArrayContains(locators, reg.l))  {
+                iter.remove();
             }//endif
-	}//end sync(this)
+        }//end loop
+        /* Initiate discovery process for any new, un-discovered locators*/
+        discoverLocators(locators);
+        /* Send a discarded event to all registered listeners for any
+         * locators that were removed by this method.
+         */
+        if(!groupsMap.isEmpty() && !listeners.isEmpty()) {
+            addNotify( listeners, groupsMap, true);
+        }//endif
     }//end setLocators
 
     /**
@@ -926,29 +933,25 @@ abstract class AbstractLookupLocatorDisc
      */
     public void removeLocators(LookupLocator[] locators) {
         testSetForNull(locators);
-	synchronized(this) {
-            if (terminated) {
-                throw new IllegalStateException("discovery terminated");
-            }
-            HashMap groupsMap = new HashMap(1);
-	    for(int i=0; i<locators.length; i++) {
-		LocatorReg reg = removeDiscoveredLocator(locators[i]);
-		if(reg != null) {//removing an already-discovered reg
-                    groupsMap.put(reg.getProxy(), reg.getMemberGroups());
-		    continue;
-		}//endif
-		reg = findReg(locators[i]);
-		if(reg != null) {//reg not yet discovered, stop discovery of it
-                    undiscoveredLocators.remove(reg);
-		}//endif
-	    }//end loop
-            /* Send a discarded event to all registered listeners for any
-             * locators that were removed by this method.
-             */
-	    if(!groupsMap.isEmpty() && !listeners.isEmpty()) {
-                addNotify((ArrayList)listeners.clone(), groupsMap, true);
+        if (terminated)  throw new IllegalStateException("discovery terminated");
+        Map groupsMap = new HashMap(1);
+        for(int i=0; i<locators.length; i++) {
+            LocatorReg reg = removeDiscoveredLocator(locators[i]);
+            if(reg != null) {//removing an already-discovered reg
+                groupsMap.put(reg.getProxy(), reg.getMemberGroups());
+                continue;
+            }//endif
+            reg = findReg(locators[i]);
+            if(reg != null) {//reg not yet discovered, stop discovery of it
+                undiscoveredLocators.remove(reg);
             }//endif
-	}//end sync
+        }//end loop
+        /* Send a discarded event to all registered listeners for any
+         * locators that were removed by this method.
+         */
+        if(!groupsMap.isEmpty() && !listeners.isEmpty()) {
+            addNotify(listeners, groupsMap, true);
+        }//endif
     }//end removeLocators
 
     /**
@@ -965,19 +968,15 @@ abstract class AbstractLookupLocatorDisc
      *         this method is called after the <code>terminate</code>
      *         method has been called.
      */
-    public synchronized LookupLocator[] getDiscoveredLocators() {
-        if (terminated) {
-            throw new IllegalStateException("discovery terminated");
-        }
-	int size = discoveredLocators.size();
-	LookupLocator[] ret = new LookupLocator[size];
+    public LookupLocator[] getDiscoveredLocators() {
+        if (terminated) throw new IllegalStateException("discovery terminated");
+	List<LookupLocator> locators = new LinkedList<LookupLocator>();
         /* Retrieve the locators of the already-discovered lookup services */
-	int k = 0;
-	Iterator iter = discoveredLocators.iterator();
+	Iterator<LocatorReg> iter = discoveredLocators.iterator();
 	while(iter.hasNext()) { 
-            ret[k++] = ((LocatorReg)iter.next()).l;
+            locators.add(iter.next().l);
         }//end loop
-	return ret;
+	return locators.toArray(new LookupLocator[locators.size()]);
     }//end getDiscoveredLocators
 
     /**
@@ -995,23 +994,20 @@ abstract class AbstractLookupLocatorDisc
      *         this method is called after the <code>terminate</code>
      *         method has been called.
      */
-    public synchronized LookupLocator[] getUndiscoveredLocators() {
-        if (terminated) {
-            throw new IllegalStateException("discovery terminated");
-        }
-        LookupLocator[] locs = new LookupLocator[undiscoveredLocators.size()];
-        Iterator iter = undiscoveredLocators.iterator();
+    public LookupLocator[] getUndiscoveredLocators() {
+        if (terminated) throw new IllegalStateException("discovery terminated");
+        List<LookupLocator> locators = new LinkedList<LookupLocator>();
+        Iterator<LocatorReg> iter = undiscoveredLocators.iterator();
         for(int i=0;iter.hasNext();i++) {
-            locs[i] = ((LocatorReg)iter.next()).l;
+            locators.add(iter.next().l);
         }//end loop
-        return locs;
+        return locators.toArray(new LookupLocator[locators.size()]);
     }//end getUndiscoveredLocators
 
     /** Initiates the discovery process for the lookup services having the
      *  given locators.
      */
     private void discoverLocators(LookupLocator[] lcts) {
-	assert Thread.holdsLock(this);
 	discoverLocatorsCalled = true;
 	if (lcts == null)  return;
 	LookupLocator lct;
@@ -1020,7 +1016,7 @@ abstract class AbstractLookupLocatorDisc
 	    LocatorReg reg = findReg(lcts[i]);//in not-yet-discovered map?
 	    if(reg == null) {
 		reg = new LocatorReg(lcts[i]);
-		addToMap(reg);
+		addAndQueueDiscoveryTaskIfAbsent(reg);
 	    }//endif
 	}//end loop
     }//end discoverLocators
@@ -1088,26 +1084,22 @@ abstract class AbstractLookupLocatorDisc
          * listeners that the locator has been discovered, and return true
          * to prevent retries from being queued.
          */
-	synchronized (this) {
-            if(!undiscoveredLocators.contains(reg)) {
-                return true;//already removed, true ==> don't queue retry
-            }//endif
-            /* Discovery un-successful, leave in set, try new wakeup task */
-	    if(!b) {
-                return false;//this causes a retry to be queued
-	    }//endif
-            /* Discovery was successful, move reg from undiscoveredLocators
-             * to discoveredLocators, and notify listeners
-             */
-            undiscoveredLocators.remove(reg);
-	    discoveredLocators.add(reg);
-	    if(!listeners.isEmpty()) {
-		addNotify((ArrayList)listeners.clone(),
+        if(!undiscoveredLocators.contains(reg))  return true;//already removed, true ==> don't queue retry
+        /* Discovery un-successful, leave in set, try new wakeup task */
+        if(!b) return false;//this causes a retry to be queued
+        /* Discovery was successful, move reg from undiscoveredLocators
+         * to discoveredLocators, and notify listeners
+         */
+        if (undiscoveredLocators.remove(reg)){ //Atomic
+            discoveredLocators.add(reg);
+            if(!listeners.isEmpty()) {
+                addNotify(listeners,
                           mapRegToGroups(reg.getProxy(), reg.getMemberGroups()),
                           false);
             }//endif
-            return true;//done; don't queue any retries
-	}//end sync(this)
+            //done; don't queue any retries return true
+        } //else already removed, true ==> don't queue retry
+        return true; 
     }//end regTryGetProxy
 
     /** From each element of the set of LocatorReg objects that correspond
@@ -1116,15 +1108,13 @@ abstract class AbstractLookupLocatorDisc
      *  in an array of ServiceRegistrar.
      */
     private ServiceRegistrar[] buildServiceRegistrar() {
-	int k = 0;
-	ServiceRegistrar[] proxys =
-                              new ServiceRegistrar[discoveredLocators.size()];
-	Iterator iter = discoveredLocators.iterator();
+	List<ServiceRegistrar> regs = new LinkedList<ServiceRegistrar>();
+	Iterator<LocatorReg> iter = discoveredLocators.iterator();
 	while(iter.hasNext()) {
-	    LocatorReg reg = (LocatorReg)iter.next();
-	    proxys[k++] = reg.getProxy();
+	    LocatorReg reg = iter.next();
+	    regs.add(reg.getProxy());
 	}//end loop
-	return proxys;
+	return regs.toArray(new ServiceRegistrar[regs.size()]);
     }//end buildServiceRegistrar
 
     /** 
@@ -1133,9 +1123,8 @@ abstract class AbstractLookupLocatorDisc
      *  not yet been discovered, and queues a DiscoveryTask to attempt,
      *  through unicast discovery, to discover the associated lookup service.
      */
-    private void addToMap(LocatorReg reg) {
-        undiscoveredLocators.add(reg);//add to set of not-yet-discovered locs
-	reg.queueDiscoveryTask();
+    private void addAndQueueDiscoveryTaskIfAbsent(LocatorReg reg) {
+        if (undiscoveredLocators.add(reg)) reg.queueDiscoveryTask();
     }//end addToMap
 
     /** Determines whether or not the lookup service associated with the
@@ -1153,19 +1142,13 @@ abstract class AbstractLookupLocatorDisc
     /** Add a notification task to the pending queue, and start an instance of
      *  the Notifier thread if one isn't already running.
      */
-    private void addNotify(ArrayList notifies,
+    private void addNotify(List notifies,
                            Map groupsMap,
 			   boolean discard)
     {
-	synchronized (pendingNotifies) {
-	    pendingNotifies.addLast(new NotifyTask(notifies,
-                                                   groupsMap,
-                                                   discard));
-	    if (notifierThread == null) {
-		notifierThread = new Notifier();
-		notifierThread.start();
-	    }//endif
-	}//end sync
+        pendingNotifies.addLast(new NotifyTask(notifies,
+                                               groupsMap,
+                                               discard));
     }//end addNotify
 
     /** Convenience method used to remove the LocatorReg - corresponding to
@@ -1177,7 +1160,7 @@ abstract class AbstractLookupLocatorDisc
 	while(iter.hasNext()) {
 	    LocatorReg reg = (LocatorReg)iter.next();
 	    if(reg.l.equals(lct)) {
-		iter.remove();
+		discoveredLocators.remove(reg);
 		return reg;
 	    }//endif
 	}//end loop
@@ -1194,14 +1177,25 @@ abstract class AbstractLookupLocatorDisc
             discoveryWakeupMgr.cancelAll();//cancel all tickets
         }//endif
         /* Cancel/remove pending tasks from the task manager and terminate */
-        if(discoveryTaskMgr != null) {
-            ArrayList pendingTasks = discoveryTaskMgr.getPending();
-            for(int i=0;i<pendingTasks.size();i++) {
-                RetryTask pendingTask = (RetryTask)pendingTasks.get(i);
-                pendingTask.cancel();//cancel wakeup ticket
-                discoveryTaskMgr.remove(pendingTask);//remove from task mgr
-            }//end loop
-            discoveryTaskMgr.terminate();//interrupt all active tasks
+        if(discoveryExecutor != null) {
+            synchronized (discoveryTasks){
+                Iterator<RetryTask> it = discoveryTasks.iterator();
+                while (it.hasNext()){
+                    it.next().cancel();//cancel wakeup ticket
+                }
+            }
+            List<Runnable> pendingTasks = discoveryExecutor.shutdownNow();
+            Iterator<Runnable> ir = pendingTasks.iterator();
+            while (ir.hasNext()){
+                Runnable pendingTask = ir.next();
+                if (pendingTask instanceof RetryTask){
+                    ((RetryTask) pendingTask).cancel();//cancel wakeup ticket
+                    System.err.println("Cancelled RetryTask");
+                } else if (pendingTask instanceof Future) {
+                    ((Future) pendingTask).cancel(true);
+                    System.err.println("Task not instanceof RetryTask: " + pendingTask);
+                }
+            }
         }//endif
     }//end terminateTaskMgr
 
@@ -1214,12 +1208,18 @@ abstract class AbstractLookupLocatorDisc
     }//end isArrayContains
     
     /* Convenience method useful for debugging. */
-    private synchronized void printMap () {
+    public String toString() {
+        StringBuilder sb = new StringBuilder(300);
+        String message = "Undiscovered Locators reg:";
+        char lineReturn = '\n';
         Iterator iter = undiscoveredLocators.iterator();
         while(iter.hasNext()) {
             LocatorReg reg = (LocatorReg)iter.next();	    
-            System.out.println("printMap reg:" + reg.l);
+            sb.append(message);
+            sb.append(reg.l);
+            sb.append(lineReturn);
         }//end loop
+        return sb.toString();
     }//end printMap
 
     /**
@@ -1311,40 +1311,37 @@ abstract class AbstractLookupLocatorDisc
      */
     void beginDiscovery(final LookupLocator[] locators)
     {
-	synchronized(this) {
-	    if (locators == null) {
-		return;
-	    }
-	    testSetForNull(locators);
-	    if (initialUnicastDelayRange > 0) {
-		discoveryWakeupMgr.schedule(
-		    System.currentTimeMillis() +
-			(long) (Math.random() * initialUnicastDelayRange),
-		    new Runnable() {
-			public void run() {
-			    synchronized (AbstractLookupLocatorDiscovery.this) {
-				if (terminated || discoverLocatorsCalled) {
-				    // discoverLocatorsCalled will be true
-				    // if there has been an intervening
-				    // addLocators or setLocators call.
-				    return;
-				}
-				discoverLocators(locators);
-			    }
-			}
-		    }
-		);
-	    } else {
-                synchronized (AbstractLookupLocatorDiscovery.this){
-                    discoverLocators(locators);
+        notifierThread.start();
+        if (locators == null) {
+            return;
+        }
+        testSetForNull(locators);
+        if (initialUnicastDelayRange > 0) {
+            discoveryWakeupMgr.schedule(
+                System.currentTimeMillis() +
+                    (long) (Math.random() * initialUnicastDelayRange),
+                new Runnable() {
+                    public void run() {
+                        synchronized (AbstractLookupLocatorDiscovery.this) {
+                            if (terminated || discoverLocatorsCalled) {
+                                // discoverLocatorsCalled will be true
+                                // if there has been an intervening
+                                // addLocators or setLocators call.
+                                return;
+                            }
+                        }
+                        discoverLocators(locators);
+                    }
                 }
-	    }
-	}
+            );
+        } else {
+            discoverLocators(locators);
+        }
     }
     
     private static class Initializer{
         ProxyPreparer registrarPreparer;
-        TaskManager discoveryTaskMgr;
+        ExecutorService discoveryTaskMgr;
         WakeupManager discoveryWakeupMgr;
         Long initialUnicastDelayRange;
     }
@@ -1371,11 +1368,15 @@ abstract class AbstractLookupLocatorDisc
                                                     new BasicProxyPreparer());
         /* Task manager */
         try {
-            i.discoveryTaskMgr = (TaskManager)config.getEntry(COMPONENT_NAME,
-                                                            "taskManager",
-                                                            TaskManager.class);
+            i.discoveryTaskMgr = (ExecutorService)config.getEntry(COMPONENT_NAME,
+                                                            "executorService",
+                                                            ExecutorService.class);
         } catch(NoSuchEntryException e) { /* use default */
-            i.discoveryTaskMgr = new TaskManager(MAX_N_TASKS,(15*1000),1.0f);
+            i.discoveryTaskMgr = 
+            new ThreadPoolExecutor(1, MAX_N_TASKS ,
+                              15L, TimeUnit.SECONDS,
+                              new LinkedBlockingQueue<Runnable>(),
+                              new NamedThreadFactory("LookupLocatorDiscovery", false));
         }
         /* Wakeup manager */
         try {

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java?rev=1554723&r1=1554722&r2=1554723&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java Thu Jan  2 02:45:07 2014
@@ -17,40 +17,10 @@
  */
 package net.jini.discovery;
 
-import com.sun.jini.config.Config;
-import com.sun.jini.discovery.Discovery;
-import com.sun.jini.discovery.DiscoveryConstraints;
-import com.sun.jini.discovery.UnicastResponse;
-import com.sun.jini.discovery.internal.MultiIPDiscovery;
-import com.sun.jini.logging.Levels;
-import com.sun.jini.logging.LogUtil;
-import com.sun.jini.thread.RetryTask;
-import com.sun.jini.thread.TaskManager;
-import com.sun.jini.thread.WakeupManager;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;
-import net.jini.config.EmptyConfiguration;
-import net.jini.config.NoSuchEntryException;
-import net.jini.core.constraint.InvocationConstraints;
-import net.jini.core.constraint.MethodConstraints;
-import net.jini.core.constraint.RemoteMethodControl;
 import net.jini.core.discovery.LookupLocator;
-import net.jini.core.lookup.ServiceRegistrar;
-import net.jini.security.BasicProxyPreparer;
-import net.jini.security.ProxyPreparer;
 
 /**
  * This class encapsulates the functionality required of an entity that

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java?rev=1554723&r1=1554722&r2=1554723&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java Thu Jan  2 02:45:07 2014
@@ -23,7 +23,6 @@ import com.sun.jini.constants.ThrowableC
 import com.sun.jini.logging.Levels;
 import com.sun.jini.logging.LogManager;
 import com.sun.jini.proxy.ConstrainableProxyUtil;
-import com.sun.jini.thread.TaskManager;
 import java.lang.reflect.Method;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
@@ -32,6 +31,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
@@ -43,6 +48,7 @@ import net.jini.core.lease.LeaseExceptio
 import net.jini.core.lease.LeaseMap;
 import net.jini.core.lease.LeaseMapException;
 import net.jini.core.lease.UnknownLeaseException;
+import org.apache.river.impl.thread.NamedThreadFactory;
 
 /**
  * Provides for the systematic renewal and overall management of a set
@@ -175,16 +181,17 @@ import net.jini.core.lease.UnknownLeaseE
  *     should have durations exceeding the <code>roundTripTime</code>.
  *     This entry is obtained in the constructor.
  * </table>
- * <table summary="Describes the taskManager configuration entry"
+ * <table summary="Describes the executorService configuration entry"
  *	  border="0" cellpadding="2">
  *   <tr valign="top">
  *     <th scope="col" summary="layout"> <font size="+1">&#X2022;</font>
  *     <th scope="col" align="left" colspan="2"> <font size="+1"><code>
- *	 taskManager</code></font>
+ *	 executorService</code></font>
  *   <tr valign="top"> <td> &nbsp <th scope="row" align="right">
- *     Type: <td> {@link TaskManager}
+ *     Type: <td> {@link ExecutorService}
  *   <tr valign="top"> <td> &nbsp <th scope="row" align="right">
- *     Default: <td> <code>new TaskManager(11, 15000, 1.0f)</code>
+ *     Default: <td> <code>new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS,
+ *     new LinkedBlockingQueue())</code>
  *   <tr valign="top"> <td> &nbsp <th scope="row" align="right">
  *     Description: <td> The object used to manage queuing tasks
  *     involved with renewing leases and sending notifications. The
@@ -193,7 +200,7 @@ import net.jini.core.lease.UnknownLeaseE
  *     seconds before removing idle threads, and uses a load factor of
  *     1.0 when determining whether to create a new thread. Note that
  *     the implementation of the renewal algorithm includes an assumption
- *     that the <code>TaskManager</code> uses a load factor of 1.0.
+ *     that the <code>ExecutorService</code> uses a load factor of 1.0.
  * </table>
  * 
  * <a name="logging">
@@ -300,7 +307,7 @@ import net.jini.core.lease.UnknownLeaseE
  * are added to it and the {@link LeaseMap#renewAll} method is called. Otherwise, the
  * last lease is renewed directly.
  * <p> 
- * The <code>TaskManager</code> that manages the renewal threads has a bound on
+ * The <code>ExecutorService</code> that manages the renewal threads has a bound on
  * the number of simultaneous threads it will support. The renewal time of
  * leases may be adjusted earlier in time to reduce the likelihood that the
  * renewal of a lease will be delayed due to exhaustion of the thread pool.
@@ -355,8 +362,15 @@ public class LeaseRenewalManager {
     /** Time window in which to look for batchable leases */
     private long renewBatchTimeWindow = 1000 * 60 * 5;
 
-    /** Task manager for queuing and renewing leases */
-    TaskManager taskManager = new TaskManager(11, 1000 * 15, 1.0f);
+    /** Task manager for queuing and renewing leases 
+     *  NOTE: test failures occur with queue's that have capacity, 
+     *  no test failures occur with SynchronousQueue, for the time
+     *  being, until the cause is sorted out we may need to rely on 
+     *  a larger pool, if necessary.  TaskManager is likely to have 
+     *  lower throughput capacity that ExecutorService with a
+     *  SynchronousQueue although this hasn't been confirmed yet.
+     */
+    final ExecutorService leaseRenewalExecutor;
 
     /**
      * The worst-case renewal round-trip-time
@@ -380,7 +394,7 @@ public class LeaseRenewalManager {
      */
     private List calcList;
 
-    private final class RenewTask implements TaskManager.Task {
+    private final class RenewTask implements Runnable {
 	/** Entries of leases to renew (if multiple, all can be batched) */
 	private final List bList;
 
@@ -454,11 +468,6 @@ public class LeaseRenewalManager {
 	    }
 	}
 
-	/** No ordering. */
-	public boolean runAfter(List tasks, int size) {
-	    return false;
-	}
-
 	/**
 	 * Find any expired leases, remove them from bList and
 	 * leaseInRenew, and return any with listeners.
@@ -706,6 +715,11 @@ public class LeaseRenewalManager {
      * that initially manages no leases.
      */
     public LeaseRenewalManager() {
+        leaseRenewalExecutor = 
+            new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
+                    new SynchronousQueue<Runnable>(), 
+                    new NamedThreadFactory("LeaseRenewalManager",true),
+                    new CallerRunsPolicy());
     }
 
     /**
@@ -731,8 +745,12 @@ public class LeaseRenewalManager {
 	renewalRTT = Config.getLongEntry(
 	    config, LRM, "roundTripTime",
 	    renewalRTT, 1, Long.MAX_VALUE);
-	taskManager = (TaskManager) Config.getNonNullEntry(
-	    config, LRM, "taskManager", TaskManager.class, taskManager);
+	leaseRenewalExecutor = Config.getNonNullEntry(
+	    config, LRM, "executorService", ExecutorService.class,
+                new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
+                    new SynchronousQueue<Runnable>(), 
+                    new NamedThreadFactory("LeaseRenewalManager",false),
+                    new CallerRunsPolicy()) );
     }
 
     /**
@@ -760,6 +778,10 @@ public class LeaseRenewalManager {
 			       long desiredExpiration,
 			       LeaseListener listener)
     {
+        leaseRenewalExecutor = new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
+                    new SynchronousQueue<Runnable>(), 
+                    new NamedThreadFactory("LeaseRenewalManager",true),
+                    new CallerRunsPolicy());
 	renewUntil(lease, desiredExpiration, listener);
     }
 
@@ -787,7 +809,7 @@ public class LeaseRenewalManager {
      *	       <code>null</code>
      * @see #renewUntil
      */
-    public void renewUntil(Lease lease,
+    public final void renewUntil(Lease lease,
 			   long desiredExpiration,
 			   LeaseListener listener)
     {
@@ -1152,6 +1174,10 @@ public class LeaseRenewalManager {
 	remove(lease);
 	lease.cancel();
     }
+    
+    public void close(){
+        leaseRenewalExecutor.shutdown();
+    }
 
     /**
      * Removes a given lease from the managed set of leases; but does
@@ -1192,7 +1218,9 @@ public class LeaseRenewalManager {
 	 * Subtract one to account for the queuer thread, which should not be
 	 * counted.
 	 */
-	int maxThreads = taskManager.getMaxThreads() - 1;
+	int maxThreads = leaseRenewalExecutor instanceof ThreadPoolExecutor ? 
+            ((ThreadPoolExecutor)leaseRenewalExecutor).getMaximumPoolSize() - 1 
+                : 10;
 	if (calcList == null) {
 	    calcList = new ArrayList(maxThreads);
 	}
@@ -1271,7 +1299,7 @@ public class LeaseRenewalManager {
 	if (queuer == null) {
 	    if (newWakeup < Long.MAX_VALUE) {
 		queuer = new QueuerTask(newWakeup);
-		taskManager.add(queuer);
+		leaseRenewalExecutor.execute(queuer);
 	    }
 	} else if (newWakeup < queuer.wakeup ||
 		   (newWakeup == Long.MAX_VALUE && leaseInRenew.isEmpty()))
@@ -1522,19 +1550,15 @@ public class LeaseRenewalManager {
 	return ((Entry) leases.lastKey()).actualRenew;
     }
 
-    private class QueuerTask implements TaskManager.Task {
+    private class QueuerTask implements Runnable {
 
 	/** When to next wake up and queue a new renew task */
-	long wakeup;
+	private long wakeup;
 
 	QueuerTask(long wakeup) {
 	    this.wakeup = wakeup;
 	}
 
-	/** No ordering */
-	public boolean runAfter(List tasks, int size) {
-	    return false;
-	}
 
         public void run() {
 	    synchronized (LeaseRenewalManager.this) {
@@ -1546,7 +1570,7 @@ public class LeaseRenewalManager {
 			final long now = System.currentTimeMillis();
 			long delta = wakeup - now;
 			if (delta <= 0) {
-			    taskManager.add(new RenewTask(now));
+			    leaseRenewalExecutor.execute(new RenewTask(now));
 			} else {
 			    LeaseRenewalManager.this.wait(delta);
 			}



Mime
View raw message