incubator-aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahrw...@apache.org
Subject svn commit: r989057 - in /incubator/aries/trunk/quiesce: quiesce-api/src/main/java/org/apache/aries/quiesce/manager/ quiesce-api/src/main/java/org/apache/aries/quiesce/participant/ quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/it...
Date Wed, 25 Aug 2010 12:25:51 GMT
Author: mahrwald
Date: Wed Aug 25 12:25:51 2010
New Revision: 989057

URL: http://svn.apache.org/viewvc?rev=989057&view=rev
Log:
Minor improvements to quiesce

Modified:
    incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceCallback.java
    incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceManager.java
    incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/participant/QuiesceParticipant.java
    incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/MockQuiesceParticipant.java
    incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/QuiesceManagerTest.java
    incubator/aries/trunk/quiesce/quiesce-manager/src/main/java/org/apache/aries/quiesce/manager/impl/QuiesceManagerImpl.java

Modified: incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceCallback.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceCallback.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceCallback.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceCallback.java Wed Aug 25 12:25:51 2010
@@ -18,9 +18,19 @@
  */
 package org.apache.aries.quiesce.manager;
  
+import org.apache.aries.quiesce.participant.QuiesceParticipant;
 import org.osgi.framework.Bundle;
 
+/**
+ * Callback that allows a {@link QuiesceParticipant} to alert the {@link QuiesceManager} that
+ * bundles are quiesced (from the point of view of the participant)
+ */
 public interface QuiesceCallback
 {
+  /**
+   * Notify the quiesce manager that the given bundles are quiesced 
+   * (from the point of view of the calling participant)
+   * @param bundlesQuiesced
+   */
   public void bundleQuiesced(Bundle ... bundlesQuiesced);
 }
\ No newline at end of file

Modified: incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceManager.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceManager.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceManager.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/manager/QuiesceManager.java Wed Aug 25 12:25:51 2010
@@ -19,10 +19,57 @@
 package org.apache.aries.quiesce.manager;
 
 import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.aries.quiesce.participant.QuiesceParticipant;
 import org.osgi.framework.Bundle;
 
+/**
+ * Interface for the quiesce manager. A quiesce manager provides the functionality to stop
+ * bundles in such a manner that currently running work can be safely finished. To exploit this
+ * above the quiesce manager individual containers / extenders (such as blueprint, jpa etc) need to 
+ * quiesce aware and register {@link QuiesceParticipant} appropriately.
+ */
 public interface QuiesceManager
 {
-  public void quiesce(long timeout, List<Bundle> bundlesToQuiese);
+  /** 
+   * Request a collection of bundles to be quiesced
+   * 
+   * @param timeout time to wait (in milliseconds) for all the quiesce participants to finish 
+   * before stopping the bundles. If some quiesce participants do not finish within the given timeout the bundles
+   * are stopped regardless at the timeout
+   * @param bundlesToQuiesce
+   */
+  public void quiesce(long timeout, List<Bundle> bundlesToQuiesce);
+  
+  /**
+   * Request a collection of bundles to be quiesced using the default timeout
+   * 
+   * @param bundlesToQuiesce
+   */
   public void quiesce(List<Bundle> bundlesToQuiesce);
+
+  /**
+   * Request a collection of bundles to be quiesced like <code>quiesce(long, List&lt;Bundle&gt;)</code>
+   * return a {@link Future} that the caller can block on instead of void
+   * 
+   * @param bundlesToQuiesce
+   * @return a {@link Future} that captures the execution of quiesce. The returned {@link Future} does
+   * not support the cancel operation.
+   */
+  public Future<?> quiesceWithFuture(List<Bundle> bundlesToQuiesce);
+
+  
+  /**
+   * Request a collection of bundles to be quiesced like <code>quiesce(long, List&lt;Bundle&gt;)</code>
+   * return a {@link Future} that the caller can block on instead of void
+   * 
+   * @param timeout time to wait (in milliseconds) for all the quiesce participants to finish 
+   * before stopping the bundles. If some quiesce participants do not finish within the given timeout the bundles
+   * are stopped regardless at the timeout
+   * @param bundlesToQuiesce
+   * @return a {@link Future} that captures the execution of quiesce. The returned {@link Future} does
+   * not support the cancel operation.
+   */
+  public Future<?> quiesceWithFuture(long timeout, List<Bundle> bundlesToQuiesce);
 }
\ No newline at end of file

Modified: incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/participant/QuiesceParticipant.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/participant/QuiesceParticipant.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/participant/QuiesceParticipant.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-api/src/main/java/org/apache/aries/quiesce/participant/QuiesceParticipant.java Wed Aug 25 12:25:51 2010
@@ -22,7 +22,20 @@ import java.util.List;
 import org.apache.aries.quiesce.manager.QuiesceCallback;
 import org.osgi.framework.Bundle;
 
+/**
+ * Interface for OSGi containers / extenders to hook into the quiesce mechanism. An extender such
+ * as Blueprint should implement a {@link QuiesceParticipant} and register it as a service in the service 
+ * registry.
+ */
 public interface QuiesceParticipant
 {
+  /**
+   * Request a number of bundles to be quiesced by this participant
+   * 
+   * This method must be non-blocking.
+   * @param callback The callback with which to alert the manager of successful quiesce completion (from the view of this
+   * participant)
+   * @param bundlesToQuiesce The bundles scheduled to be quiesced
+   */
   public void quiesce(QuiesceCallback callback, List<Bundle> bundlesToQuiesce);
 }

Modified: incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/MockQuiesceParticipant.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/MockQuiesceParticipant.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/MockQuiesceParticipant.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/MockQuiesceParticipant.java Wed Aug 25 12:25:51 2010
@@ -1,27 +1,10 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.aries.quiesce.manager.itest;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.aries.quiesce.manager.QuiesceCallback;
 import org.apache.aries.quiesce.participant.QuiesceParticipant;
@@ -34,7 +17,13 @@ public class MockQuiesceParticipant impl
 	public static final int WAIT = 2;
 	private int behaviour;
 	private List<QuiesceCallback> callbacks = new ArrayList<QuiesceCallback>();
-	private ExecutorService executor = Executors.newCachedThreadPool();
+	private ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "Test");
+            t.setDaemon(true);
+            return t;
+        }
+    });
 	private int started = 0;
 	private int finished = 0;
 	
@@ -51,24 +40,24 @@ public class MockQuiesceParticipant impl
 				case 0:
 					//return immediately
 					System.out.println("MockParticipant: return immediately");
+					finished += 1;
 					callback.bundleQuiesced(bundlesToQuiesce.toArray(new Bundle[bundlesToQuiesce.size()]));
 					callbacks.remove(callback);
-					finished += 1;
 					break;
 				case 1:
 					//just don't do anything
 					System.out.println("MockParticipant: just don't do anything");
 					break;
 				case 2:
-					//Wait for 5s then quiesce
-					System.out.println("MockParticipant: Wait for 5s then quiesce");
+					//Wait for 1s then quiesce
+					System.out.println("MockParticipant: Wait for 1s then quiesce");
 					try {
-						Thread.sleep(5000);
+						Thread.sleep(1000);
 					} catch (InterruptedException e) {
 					}
+					finished += 1;
 					callback.bundleQuiesced(bundlesToQuiesce.toArray(new Bundle[bundlesToQuiesce.size()]));
 					callbacks.remove(callback);
-					finished += 1;
 					break;
 				default: 
 					//Unknown behaviour, don't do anything

Modified: incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/QuiesceManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/QuiesceManagerTest.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/QuiesceManagerTest.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-manager-itest/src/test/java/org/apache/aries/quiesce/manager/itest/QuiesceManagerTest.java Wed Aug 25 12:25:51 2010
@@ -15,7 +15,7 @@
  */
 package org.apache.aries.quiesce.manager.itest;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.ops4j.pax.exam.CoreOptions.equinox;
 import static org.ops4j.pax.exam.CoreOptions.options;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
@@ -23,10 +23,15 @@ import static org.ops4j.pax.exam.CoreOpt
 import static org.ops4j.pax.exam.OptionUtils.combine;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.aries.quiesce.manager.QuiesceManager;
 import org.apache.aries.quiesce.participant.QuiesceParticipant;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,337 +53,354 @@ import org.osgi.util.tracker.ServiceTrac
 
 @RunWith(JUnit4TestRunner.class)
 public class QuiesceManagerTest {
-  public static final long DEFAULT_TIMEOUT = 30000;
-  private QuiesceManager manager;
-  private Bundle b1;
-  private Bundle b2;
-  private Bundle b3;
-  private long timeoutTime;
-  private List<Bundle> bundleList;
-  private MockQuiesceParticipant participant1;
-  private MockQuiesceParticipant participant2;
-  private MockQuiesceParticipant participant3;
-
-
-  @Inject
-  protected BundleContext bundleContext;
-  
-  @Before
-  public void setup() {
-	  manager = getOsgiService(QuiesceManager.class);
-	  b1 = bundleContext.getBundle(5);
-	  b2 = bundleContext.getBundle(6);
-	  b3 = bundleContext.getBundle(10);
-	  participant1 = new MockQuiesceParticipant(MockQuiesceParticipant.RETURNIMMEDIATELY);
-	  participant2 = new MockQuiesceParticipant(MockQuiesceParticipant.NEVERRETURN);
-	  participant3 = new MockQuiesceParticipant(MockQuiesceParticipant.WAIT);
-
-  }
-  
-  @Test
-  public void testNullSafe() throws Exception {
-	  //Check we're null safe
-	  manager.quiesce(null);  
-  }
-  
-  @Test
-  public void testNoParticipants() throws Exception {
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  //Try quiescing one bundle with no participants
-	  manager.quiesce(bundleList);
-	  //quiesce is non-blocking so what do we do? 
-	  //verify bundle is no longer active
-	  timeoutTime = System.currentTimeMillis()+5000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  b1.start();
-  }
-  
-  @Test
-  public void testImmediateReturn() throws Exception {
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  //Register a mock participant which will report back quiesced immediately
-	  ServiceRegistration sr = bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
-	  //Try quiescing the bundle with immediate return
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  manager.quiesce(1000,bundleList);
-	  timeoutTime = System.currentTimeMillis()+5000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant should have finished once", participant1.getFinishedCount() == 1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  b1.start();
-	  sr.unregister();
-	  participant1.reset();
-  }
-    
-  @Test
-  public void testNoReturn() throws Exception {
-	  //Register a mock participant which won't respond
-	  ServiceRegistration sr = bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
-	  //recreate the list as it may have been emptied?
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  
-	  //Try quiescing the bundle with no return
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  manager.quiesce(1000,bundleList);
-	  timeoutTime = System.currentTimeMillis()+5000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant should have started once", participant2.getStartedCount() == 1);
-	  assertTrue("Participant should not have finished", participant2.getFinishedCount() == 0);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  b1.start();
-	  sr.unregister();
-	  participant2.reset();
-  }
-	  
-	@Test
-	public void testWaitAShortTime() throws Exception {
-	  //Try quiescing where participant takes 5s to do the work. We should get InterruptedException
-	  ServiceRegistration sr = bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
-	  //recreate the list as it may have been emptied?
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  manager.quiesce(10000,bundleList);
-	  //timeout is > how long participant takes, and < the quiesce timeout
-	  timeoutTime = System.currentTimeMillis()+7000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant should have started once", participant3.getStartedCount() == 1);
-	  assertTrue("Participant should finished once", participant3.getFinishedCount() == 1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  b1.start();
-	  participant3.reset();
-	}
-
-	@Test
-	public void testThreeParticipants() throws Exception {
-	  //Register three participants. One returns immediately, one waits 5s then returns, one never returns
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
-	  //recreate the list as it may have been emptied
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  manager.quiesce(10000,bundleList);
-	  timeoutTime = System.currentTimeMillis()+15000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant 1 should have started once", participant1.getStartedCount() == 1);
-	  assertTrue("Participant 1 should finished once", participant1.getFinishedCount() == 1);
-	  assertTrue("Participant 2 should have started once", participant2.getStartedCount() == 1);
-	  assertTrue("Participant 2 should not have finished", participant2.getFinishedCount() == 0);
-	  assertTrue("Participant 3 should have started once", participant3.getStartedCount() == 1);
-	  assertTrue("Participant 3 should finished once", participant3.getFinishedCount() == 1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  
-	  b1.start();
-	  participant1.reset();
-	  participant2.reset();
-	  participant3.reset();
-	}
-	
-	@Test
-	public void testTwoBundles() throws Exception {
-		//Register three participants. One returns immediately, one waits 5s then returns, one never returns
-		  bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
-		  bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
-		  bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
-	  //recreate the list as it may have been emptied
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  bundleList.add(b2);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  assertTrue("Bundle "+b2.getSymbolicName()+" should be in active state", b2.getState() == Bundle.ACTIVE);
-	  manager.quiesce(10000,bundleList);
-	  timeoutTime = System.currentTimeMillis()+15000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant 1 should have started once", participant1.getStartedCount() == 1);
-	  assertTrue("Participant 1 should finished once", participant1.getFinishedCount() == 1);
-	  assertTrue("Participant 2 should have started once", participant2.getStartedCount() == 1);
-	  assertTrue("Participant 2 should not have finished", participant2.getFinishedCount() == 0);
-	  assertTrue("Participant 3 should have started once", participant3.getStartedCount() == 1);
-	  assertTrue("Participant 3 should finished once", participant3.getFinishedCount() == 1);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  assertTrue("Bundle "+b2.getSymbolicName()+" should not be in active state", b2.getState() != Bundle.ACTIVE);
-	  b1.start();
-	  b2.start();	
-	  participant1.reset();
-	  participant2.reset();
-	  participant3.reset();
-	}
-	
-	@Test
-	public void testOverlappedQuiesces() throws Exception {
-	  
-      //Register three participants. One returns immediately, one waits 5s then returns, one never returns
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
-	  bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
-	  //recreate the list as it may have been emptied
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b1);
-	  bundleList.add(b2);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
-	  assertTrue("Bundle "+b2.getSymbolicName()+" should be in active state", b2.getState() == Bundle.ACTIVE);
-	  assertTrue("Bundle "+b3.getSymbolicName()+" should be in active state", b3.getState() == Bundle.ACTIVE);
-	  manager.quiesce(10000,bundleList);
-	  bundleList = new ArrayList<Bundle>();
-	  bundleList.add(b2);
-	  bundleList.add(b3);
-	  manager.quiesce(bundleList);
-	  timeoutTime = System.currentTimeMillis()+15000;
-	  while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
-		  Thread.sleep(500);
-	  }
-	  assertTrue("Participant 1 should have started twice", participant1.getStartedCount() == 2);
-	  assertTrue("Participant 1 should finished twice", participant1.getFinishedCount() == 2);
-	  assertTrue("Participant 2 should have started twice", participant2.getStartedCount() == 2);
-	  assertTrue("Participant 2 should not have finished", participant2.getFinishedCount() == 0);
-	  assertTrue("Participant 3 should have started twice", participant3.getStartedCount() == 2);
-	  assertTrue("Participant 3 should finished twice", participant3.getFinishedCount() == 2);
-	  assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
-	  assertTrue("Bundle "+b2.getSymbolicName()+" should not be in active state", b2.getState() != Bundle.ACTIVE);
-	  assertTrue("Bundle "+b3.getSymbolicName()+" should not be in active state", b3.getState() != Bundle.ACTIVE);
-	  participant1.reset();
-	  participant2.reset();
-	  participant3.reset();
-	  
-	}
- 
-  @org.ops4j.pax.exam.junit.Configuration
-  public static Option[] configuration() {
-    Option[] options = options(
-        bootDelegation(),
+    public static final long DEFAULT_TIMEOUT = 30000;
+    private QuiesceManager manager;
+    private Bundle b1;
+    private Bundle b2;
+    private Bundle b3;
+    private long timeoutTime;
+    private List<Bundle> bundleList;
+    private MockQuiesceParticipant participant1;
+    private MockQuiesceParticipant participant2;
+    private MockQuiesceParticipant participant3;
+
+
+    @Inject
+    protected BundleContext bundleContext;
+
+    @Before
+    public void setup() {
+        manager = getOsgiService(QuiesceManager.class);
+        b1 = bundleContext.getBundle(5);
+        b2 = bundleContext.getBundle(6);
+        b3 = bundleContext.getBundle(10);
+        participant1 = new MockQuiesceParticipant(MockQuiesceParticipant.RETURNIMMEDIATELY);
+        participant2 = new MockQuiesceParticipant(MockQuiesceParticipant.NEVERRETURN);
+        participant3 = new MockQuiesceParticipant(MockQuiesceParticipant.WAIT);
+
+    }
+
+    @After
+    public void after() {
+        participant1.reset();
+        participant2.reset();
+        participant3.reset();
+    }
+
+    @Test
+    public void testNullSafe() throws Exception {
+        //Check we're null safe
+        manager.quiesce(null);  
+    }
+
+    @Test
+    public void testNoParticipants() throws Exception {
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+
+        //Try quiescing one bundle with no participants
+        manager.quiesceWithFuture(2000, bundleList).get(5000, TimeUnit.MILLISECONDS);
+
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testImmediateReturn() throws Exception {
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        //Register a mock participant which will report back quiesced immediately
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
+        //Try quiescing the bundle with immediate return
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
         
-        // Log
-        mavenBundle("org.ops4j.pax.logging", "pax-logging-api"),
-        mavenBundle("org.ops4j.pax.logging", "pax-logging-service"),
-        // Felix Config Admin
-        mavenBundle("org.apache.felix", "org.apache.felix.configadmin"),
-        // Felix mvn url handler
-        mavenBundle("org.ops4j.pax.url", "pax-url-mvn"),
-
-        // this is how you set the default log level when using pax
-        // logging (logProfile)
-        systemProperty("org.ops4j.pax.logging.DefaultServiceLog.level").value("DEBUG"),
-
-        // Bundles
-        mavenBundle("org.osgi", "org.osgi.compendium"),
-        mavenBundle("org.apache.aries", "org.apache.aries.util"),
-        mavenBundle("commons-lang", "commons-lang"),
-        mavenBundle("commons-collections", "commons-collections"),
-        mavenBundle("commons-pool", "commons-pool"),
-        mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.serp"),
-        mavenBundle("org.apache.aries.quiesce", "org.apache.aries.quiesce.api"),
-        mavenBundle("org.apache.aries.quiesce", "org.apache.aries.quiesce.manager"),
+        manager.quiesceWithFuture(1000,bundleList).get(5000, TimeUnit.MILLISECONDS);
         
-        equinox().version("3.5.0"));
-    options = updateOptions(options);
-    return options;
-  }
-  
-  
-  protected Bundle getBundle(String symbolicName) {
-    return getBundle(symbolicName, null);
-  }
-
-  protected Bundle getBundle(String bundleSymbolicName, String version) {
-    Bundle result = null;
-    for (Bundle b : bundleContext.getBundles()) {
-      if (b.getSymbolicName().equals(bundleSymbolicName)) {
-        if (version == null
-            || b.getVersion().equals(Version.parseVersion(version))) {
-          result = b;
-          break;
+        assertTrue("Participant should have finished once", participant1.getFinishedCount() == 1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testNoReturn() throws Exception {
+        //Register a mock participant which won't respond
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        //recreate the list as it may have been emptied?
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+
+        //Try quiescing the bundle with no return
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+        manager.quiesce(1000,bundleList);
+        timeoutTime = System.currentTimeMillis()+5000;
+        while (System.currentTimeMillis() < timeoutTime && b1.getState() == Bundle.ACTIVE){
+            Thread.sleep(500);
         }
-      }
+        
+        assertTrue("Participant should have started once", participant2.getStartedCount() == 1);
+        assertTrue("Participant should not have finished", participant2.getFinishedCount() == 0);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
     }
-    return result;
-  }
 
-  public static BootDelegationOption bootDelegation() {
-    return new BootDelegationOption("org.apache.aries.unittest.fixture");
-  }
-  
-  public static MavenArtifactProvisionOption mavenBundle(String groupId,
-      String artifactId) {
-    return CoreOptions.mavenBundle().groupId(groupId).artifactId(artifactId)
+    @Test
+    public void testWaitAShortTime() throws Exception {
+        //Try quiescing where participant takes 5s to do the work. We should get InterruptedException
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        //recreate the list as it may have been emptied?
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+        
+        // we should be finishing in about 5000 millis not 10000
+        manager.quiesceWithFuture(10000,bundleList).get(7000, TimeUnit.MILLISECONDS);
+
+        assertTrue("Participant should have started once", participant3.getStartedCount() == 1);
+        assertTrue("Participant should finished once", participant3.getFinishedCount() == 1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testThreeParticipants() throws Exception {
+        //Register three participants. One returns immediately, one waits 5s then returns, one never returns
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        //recreate the list as it may have been emptied
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+        
+        manager.quiesceWithFuture(10000,bundleList).get(15000, TimeUnit.MILLISECONDS);
+        
+        assertTrue("Participant 1 should have started once", participant1.getStartedCount() == 1);
+        assertTrue("Participant 1 should finished once", participant1.getFinishedCount() == 1);
+        assertTrue("Participant 2 should have started once", participant2.getStartedCount() == 1);
+        assertTrue("Participant 2 should not have finished", participant2.getFinishedCount() == 0);
+        assertTrue("Participant 3 should have started once", participant3.getStartedCount() == 1);
+        assertTrue("Participant 3 should finished once", participant3.getFinishedCount() == 1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testFuture() throws Exception {
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+
+        Future<?> future = manager.quiesceWithFuture(2000, Arrays.asList(b1));
+
+        // causes us to wait
+        future.get();
+
+        assertEquals("Participant 2 has started", 1, participant2.getStartedCount());
+        assertEquals("Participant 2 has finished", 0, participant2.getFinishedCount());
+        assertEquals("Participant 3 has started", 1, participant3.getStartedCount());
+        assertEquals("Participant 3 has finished", 1, participant3.getFinishedCount());
+    }
+    
+    @Test
+    public void testFutureWithWait() throws Exception {
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+
+        Future<?> future = manager.quiesceWithFuture(2000, Arrays.asList(b1));
+
+        try {
+            // causes us to wait, but too short
+            future.get(500, TimeUnit.MILLISECONDS);
+            fail("Too short wait, should have thrown TimeoutException");
+        } catch (TimeoutException te) {
+            // expected
+        }
+
+        assertEquals("Participant 2 has started", 1, participant2.getStartedCount());
+        assertEquals("Participant 2 has finished", 0, participant2.getFinishedCount());
+        assertEquals("Participant 3 has started", 1, participant3.getStartedCount());
+        assertEquals("Participant 3 has finished", 0, participant3.getFinishedCount());
+        assertTrue("Bundle "+b1.getSymbolicName()+" should still be active, because we did not wait long enough", b1.getState() == Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testTwoBundles() throws Exception {
+        //Register three participants. One returns immediately, one waits 5s then returns, one never returns
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        //recreate the list as it may have been emptied
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        bundleList.add(b2);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+        assertTrue("Bundle "+b2.getSymbolicName()+" should be in active state", b2.getState() == Bundle.ACTIVE);
+
+        manager.quiesceWithFuture(10000,bundleList).get(15000, TimeUnit.MILLISECONDS);
+
+        assertTrue("Participant 1 should have started once", participant1.getStartedCount() == 1);
+        assertTrue("Participant 1 should finished once", participant1.getFinishedCount() == 1);
+        assertTrue("Participant 2 should have started once", participant2.getStartedCount() == 1);
+        assertTrue("Participant 2 should not have finished", participant2.getFinishedCount() == 0);
+        assertTrue("Participant 3 should have started once", participant3.getStartedCount() == 1);
+        assertTrue("Participant 3 should finished once", participant3.getFinishedCount() == 1);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+        assertTrue("Bundle "+b2.getSymbolicName()+" should not be in active state", b2.getState() != Bundle.ACTIVE);
+    }
+
+    @Test
+    public void testOverlappedQuiesces() throws Exception {
+
+        //Register three participants. One returns immediately, one waits 5s then returns, one never returns
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant1, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant2, null);
+        bundleContext.registerService(QuiesceParticipant.class.getName(), participant3, null);
+        //recreate the list as it may have been emptied
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b1);
+        bundleList.add(b2);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should be in active state", b1.getState() == Bundle.ACTIVE);
+        assertTrue("Bundle "+b2.getSymbolicName()+" should be in active state", b2.getState() == Bundle.ACTIVE);
+        assertTrue("Bundle "+b3.getSymbolicName()+" should be in active state", b3.getState() == Bundle.ACTIVE);
+        manager.quiesce(2000,bundleList);
+        bundleList = new ArrayList<Bundle>();
+        bundleList.add(b2);
+        bundleList.add(b3);
+        manager.quiesce(2000,bundleList);
+        timeoutTime = System.currentTimeMillis()+10000;
+        while (System.currentTimeMillis() < timeoutTime && (b1.getState() == Bundle.ACTIVE || b2.getState() == Bundle.ACTIVE || b3.getState() == Bundle.ACTIVE)) {
+            Thread.sleep(500);
+        }
+        assertTrue("Participant 1 should have started twice as it has been asked to quiesce twice", participant1.getStartedCount() == 2);
+        assertTrue("Participant 1 should finished twice as it should have returned from two quiesce requests immediately", participant1.getFinishedCount() == 2);
+        assertTrue("Participant 2 should have started twice as it has been asked to quiesce twice", participant2.getStartedCount() == 2);
+        assertTrue("Participant 2 should not have finished as it should never return from it's two quiesce requests", participant2.getFinishedCount() == 0);
+        assertTrue("Participant 3 should have started twice as it has been asked to quiesce twice", participant3.getStartedCount() == 2);
+        assertTrue("Participant 3 should finished twice as it should have waited a short time before returning from it's two quiesce requests", participant3.getFinishedCount() == 2);
+        assertTrue("Bundle "+b1.getSymbolicName()+" should not be in active state", b1.getState() != Bundle.ACTIVE);
+        assertTrue("Bundle "+b2.getSymbolicName()+" should not be in active state", b2.getState() != Bundle.ACTIVE);
+        assertTrue("Bundle "+b3.getSymbolicName()+" should not be in active state", b3.getState() != Bundle.ACTIVE);
+    }
+
+    @org.ops4j.pax.exam.junit.Configuration
+    public static Option[] configuration() {
+        Option[] options = options(
+                bootDelegation(),
+
+                // Log
+                mavenBundle("org.ops4j.pax.logging", "pax-logging-api"),
+                mavenBundle("org.ops4j.pax.logging", "pax-logging-service"),
+                // Felix Config Admin
+                mavenBundle("org.apache.felix", "org.apache.felix.configadmin"),
+                // Felix mvn url handler
+                mavenBundle("org.ops4j.pax.url", "pax-url-mvn"),
+
+                // this is how you set the default log level when using pax
+                // logging (logProfile)
+                systemProperty("org.ops4j.pax.logging.DefaultServiceLog.level").value("DEBUG"),
+
+                // Bundles
+                mavenBundle("org.osgi", "org.osgi.compendium"),
+                mavenBundle("org.apache.aries", "org.apache.aries.util"),
+                mavenBundle("commons-lang", "commons-lang"),
+                mavenBundle("commons-collections", "commons-collections"),
+                mavenBundle("commons-pool", "commons-pool"),
+                mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.serp"),
+                mavenBundle("org.apache.aries.quiesce", "org.apache.aries.quiesce.api"),
+                mavenBundle("org.apache.aries.quiesce", "org.apache.aries.quiesce.manager"),
+
+                equinox().version("3.5.0"));
+        options = updateOptions(options);
+        return options;
+    }
+
+
+    protected Bundle getBundle(String symbolicName) {
+        return getBundle(symbolicName, null);
+    }
+
+    protected Bundle getBundle(String bundleSymbolicName, String version) {
+        Bundle result = null;
+        for (Bundle b : bundleContext.getBundles()) {
+            if (b.getSymbolicName().equals(bundleSymbolicName)) {
+                if (version == null
+                        || b.getVersion().equals(Version.parseVersion(version))) {
+                    result = b;
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
+    public static BootDelegationOption bootDelegation() {
+        return new BootDelegationOption("org.apache.aries.unittest.fixture");
+    }
+
+    public static MavenArtifactProvisionOption mavenBundle(String groupId,
+            String artifactId) {
+        return CoreOptions.mavenBundle().groupId(groupId).artifactId(artifactId)
         .versionAsInProject();
-  }
+    }
+
+    protected static Option[] updateOptions(Option[] options) {
+        // We need to add pax-exam-junit here when running with the ibm
+        // jdk to avoid the following exception during the test run:
+        // ClassNotFoundException: org.ops4j.pax.exam.junit.Configuration
+        if ("IBM Corporation".equals(System.getProperty("java.vendor"))) {
+            Option[] ibmOptions = options(wrappedBundle(mavenBundle(
+                    "org.ops4j.pax.exam", "pax-exam-junit")));
+            options = combine(ibmOptions, options);
+        }
+
+        return options;
+    }
+
+    protected <T> T getOsgiService(Class<T> type, long timeout) {
+        return getOsgiService(type, null, timeout);
+    }
+
+    protected <T> T getOsgiService(Class<T> type) {
+        return getOsgiService(type, null, DEFAULT_TIMEOUT);
+    }
+
+    protected <T> T getOsgiService(Class<T> type, String filter, long timeout) {
+        return getOsgiService(null, type, filter, timeout);
+    }
 
-  protected static Option[] updateOptions(Option[] options) {
-    // We need to add pax-exam-junit here when running with the ibm
-    // jdk to avoid the following exception during the test run:
-    // ClassNotFoundException: org.ops4j.pax.exam.junit.Configuration
-    if ("IBM Corporation".equals(System.getProperty("java.vendor"))) {
-      Option[] ibmOptions = options(wrappedBundle(mavenBundle(
-          "org.ops4j.pax.exam", "pax-exam-junit")));
-      options = combine(ibmOptions, options);
-    }
-
-    return options;
-  }
-
-  protected <T> T getOsgiService(Class<T> type, long timeout) {
-    return getOsgiService(type, null, timeout);
-  }
-
-  protected <T> T getOsgiService(Class<T> type) {
-    return getOsgiService(type, null, DEFAULT_TIMEOUT);
-  }
-  
-  protected <T> T getOsgiService(Class<T> type, String filter, long timeout) {
-    return getOsgiService(null, type, filter, timeout);
-  }
-
-  protected <T> T getOsgiService(BundleContext bc, Class<T> type,
-      String filter, long timeout) {
-    ServiceTracker tracker = null;
-    try {
-      String flt;
-      if (filter != null) {
-        if (filter.startsWith("(")) {
-          flt = "(&(" + Constants.OBJECTCLASS + "=" + type.getName() + ")"
-              + filter + ")";
-        } else {
-          flt = "(&(" + Constants.OBJECTCLASS + "=" + type.getName() + ")("
-              + filter + "))";
+    protected <T> T getOsgiService(BundleContext bc, Class<T> type,
+            String filter, long timeout) {
+        ServiceTracker tracker = null;
+        try {
+            String flt;
+            if (filter != null) {
+                if (filter.startsWith("(")) {
+                    flt = "(&(" + Constants.OBJECTCLASS + "=" + type.getName() + ")"
+                    + filter + ")";
+                } else {
+                    flt = "(&(" + Constants.OBJECTCLASS + "=" + type.getName() + ")("
+                    + filter + "))";
+                }
+            } else {
+                flt = "(" + Constants.OBJECTCLASS + "=" + type.getName() + ")";
+            }
+            Filter osgiFilter = FrameworkUtil.createFilter(flt);
+            tracker = new ServiceTracker(bc == null ? bundleContext : bc, osgiFilter,
+                    null);
+            tracker.open();
+            // Note that the tracker is not closed to keep the reference
+            // This is buggy, has the service reference may change i think
+            Object svc = type.cast(tracker.waitForService(timeout));
+            if (svc == null) {
+                throw new RuntimeException("Gave up waiting for service " + flt);
+            }
+            return type.cast(svc);
+        } catch (InvalidSyntaxException e) {
+            throw new IllegalArgumentException("Invalid filter", e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
-      } else {
-        flt = "(" + Constants.OBJECTCLASS + "=" + type.getName() + ")";
-      }
-      Filter osgiFilter = FrameworkUtil.createFilter(flt);
-      tracker = new ServiceTracker(bc == null ? bundleContext : bc, osgiFilter,
-          null);
-      tracker.open();
-      // Note that the tracker is not closed to keep the reference
-      // This is buggy, has the service reference may change i think
-      Object svc = type.cast(tracker.waitForService(timeout));
-      if (svc == null) {
-        throw new RuntimeException("Gave up waiting for service " + flt);
-      }
-      return type.cast(svc);
-    } catch (InvalidSyntaxException e) {
-      throw new IllegalArgumentException("Invalid filter", e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
     }
-  }
 }

Modified: incubator/aries/trunk/quiesce/quiesce-manager/src/main/java/org/apache/aries/quiesce/manager/impl/QuiesceManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/quiesce/quiesce-manager/src/main/java/org/apache/aries/quiesce/manager/impl/QuiesceManagerImpl.java?rev=989057&r1=989056&r2=989057&view=diff
==============================================================================
--- incubator/aries/trunk/quiesce/quiesce-manager/src/main/java/org/apache/aries/quiesce/manager/impl/QuiesceManagerImpl.java (original)
+++ incubator/aries/trunk/quiesce/quiesce-manager/src/main/java/org/apache/aries/quiesce/manager/impl/QuiesceManagerImpl.java Wed Aug 25 12:25:51 2010
@@ -14,16 +14,13 @@
 package org.apache.aries.quiesce.manager.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -33,6 +30,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.aries.quiesce.manager.QuiesceCallback;
 import org.apache.aries.quiesce.manager.QuiesceManager;
@@ -54,7 +52,14 @@ public class QuiesceManagerImpl implemen
     /** The container's {@link BundleContext} */
     private BundleContext bundleContext = null;
     /** The thread pool to execute timeout commands */
-    private ScheduledExecutorService timeoutExecutor = Executors.newScheduledThreadPool(10);
+    private ScheduledExecutorService timeoutExecutor = Executors.newScheduledThreadPool(10, new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "Quiesce Manager Timeout Thread");
+            t.setDaemon(true);
+            return t;
+        }
+    });
+    
     /** The thread pool to execute quiesce commands */
     private ExecutorService executor = new ThreadPoolExecutor(0, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {
 		
@@ -64,8 +69,9 @@ public class QuiesceManagerImpl implemen
 			return t;
 		}
 	});
+    
     /** The map of bundles that are currently being quiesced */
-    private static ConcurrentHashMap<Long, Bundle> bundleMap = new ConcurrentHashMap<Long, Bundle>();
+    private static ConcurrentHashMap<Bundle, Bundle> bundleMap = new ConcurrentHashMap<Bundle, Bundle>();
 
 
     public QuiesceManagerImpl(BundleContext bc) {
@@ -79,22 +85,72 @@ public class QuiesceManagerImpl implemen
      * need to listen for the resulting stop events. 
      */
     public void quiesce(long timeout, List<Bundle> bundles) {
-    	if (bundles != null && !!!bundles.isEmpty()) {
-			//check that bundle b is not already quiescing
-			Iterator<Bundle> it = bundles.iterator();
-			Set<Bundle> bundlesToQuiesce = new HashSet<Bundle>();
-			while(it.hasNext()) {
-				Bundle b = it.next();
-				Bundle priorBundle = bundleMap.putIfAbsent(b.getBundleId(), b);
-				if (priorBundle == null) {
-					bundlesToQuiesce.add(b);
-				}else{
-					LOGGER.warn("Already quiescing bundle "+ b.getSymbolicName());
-				}
-	  	  	}
-			Runnable command = new BundleQuiescer(bundlesToQuiesce, timeout, bundleMap);
-			executor.execute(command);
-    	}
+        quiesceWithFuture(timeout, bundles);
+    }
+    
+    public Future<?> quiesceWithFuture(List<Bundle> bundlesToQuiesce) {
+        return quiesceWithFuture(defaultTimeout, bundlesToQuiesce);
+    }
+    
+    public Future<?> quiesceWithFuture(long timeout, List<Bundle> bundles) {
+        QuiesceFuture result = new QuiesceFuture();
+        if (bundles != null && !!!bundles.isEmpty()) {
+            //check that bundle b is not already quiescing
+            Iterator<Bundle> it = bundles.iterator();
+            Set<Bundle> bundlesToQuiesce = new HashSet<Bundle>();
+            while(it.hasNext()) {
+                Bundle b = it.next();
+                Bundle priorBundle = bundleMap.putIfAbsent(b, b);
+                if (priorBundle == null) {
+                    bundlesToQuiesce.add(b);
+                }else{
+                    LOGGER.warn("Already quiescing bundle "+ b.getSymbolicName());
+                }
+            }
+            Runnable command = new BundleQuiescer(bundlesToQuiesce, timeout, result, bundleMap);
+            executor.execute(command);
+            
+            return result;
+        } else {
+            result.registerDone();
+        }
+        
+        return result;
+    }
+    
+    private static class QuiesceFuture implements Future<Object> {
+        private CountDownLatch latch = new CountDownLatch(1);
+        
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Quiesce operations can be cancelled");
+        }
+
+        public Object get() throws InterruptedException, ExecutionException {
+            latch.await();
+            return null;
+        }
+
+        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            if (!!!latch.await(timeout, unit))
+                throw new TimeoutException();
+            
+            return null;
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return latch.getCount() == 0;
+        }
+        
+        public void registerDone() {
+            if (!!!isDone()) {
+                latch.countDown();
+            }
+        }
+        
     }
 
     /**
@@ -108,15 +164,33 @@ public class QuiesceManagerImpl implemen
     	quiesce(defaultTimeout, bundlesToQuiesce);
     }
   
-    private static boolean stopBundle(Bundle bundleToStop) {
+    /**
+     * Stop a bundle that was to be quiesced. This happens either when all the participants
+     * are finished or when the timeout has occurred.
+     * 
+     * The set of all bundles to quiesce is used to track stops, so that they do not occur twice.
+     * @param bundleToStop
+     * @param bundlesToStop
+     * @return
+     */
+    private static boolean stopBundle(Bundle bundleToStop, Set<Bundle> bundlesToStop) {
     	try {
-    		bundleToStop.stop();
-    		bundleMap.remove(bundleToStop.getBundleId());
-    	}catch (BundleException be) {
+    	    synchronized (bundlesToStop) {
+    	        if (bundlesToStop.remove(bundleToStop)) {
+    	            bundleToStop.stop();
+    	            bundleMap.remove(bundleToStop);
+    	        }
+    	    }
+    	} catch (BundleException be) {
     		return false;
     	}
     	return true;
     }
+    
+    private static boolean stillQuiescing(Bundle bundleToStop) {
+        return bundleMap.containsKey(bundleToStop);
+    }
+    
 
     /**
      * BundleQuiescer is used for each bundle to quiesce. It creates a callback object for each 
@@ -127,12 +201,14 @@ public class QuiesceManagerImpl implemen
      */
     private class BundleQuiescer implements Runnable {
 	  
-    	private Set<Bundle> bundlesToQuiesce;
-    	private long timeout;
+    	private final Set<Bundle> bundlesToQuiesce;
+    	private final long timeout;
+    	private final QuiesceFuture future;
     	
-    	public BundleQuiescer(Set<Bundle> bundlesToQuiesce, long timeout, ConcurrentHashMap<Long, Bundle> bundleMap) {
+    	public BundleQuiescer(Set<Bundle> bundlesToQuiesce, long timeout, QuiesceFuture future, ConcurrentHashMap<Bundle, Bundle> bundleMap) {
     		this.bundlesToQuiesce = new HashSet<Bundle>(bundlesToQuiesce);
     		this.timeout = timeout;
+    		this.future = future;
     	}
 
     	public void run() {
@@ -142,14 +218,28 @@ public class QuiesceManagerImpl implemen
 					if (serviceRefs != null) {
 						List<QuiesceParticipant> participants = new ArrayList<QuiesceParticipant>();
 						final List<QuiesceCallbackImpl> callbacks = new ArrayList<QuiesceCallbackImpl>();
-						Set<Bundle> copyOfBundles = new HashSet<Bundle>(bundlesToQuiesce);
-						Timer timer = new Timer();
+						List<Bundle> copyOfBundles = new ArrayList<Bundle>(bundlesToQuiesce);
+						
+						ScheduledFuture<?> timeoutFuture = timeoutExecutor.schedule(new Runnable() {
+						    public void run() {
+						        LOGGER.warn("Quiesce timed out");
+						        synchronized (bundlesToQuiesce) {
+						            for (Bundle b : new ArrayList<Bundle>(bundlesToQuiesce)) {
+    						            LOGGER.warn("Could not quiesce within timeout, so stopping bundle "+ b.getSymbolicName());
+    						            stopBundle(b, bundlesToQuiesce);
+						            }
+						        }
+						        future.registerDone();
+						        LOGGER.debug("Quiesce complete");
+						    }
+						}, timeout, TimeUnit.MILLISECONDS);
+
 						
 						//Create callback objects for all participants
 						for( ServiceReference sr : serviceRefs ) {
 							QuiesceParticipant participant = (QuiesceParticipant) bundleContext.getService(sr);
 							participants.add(participant);
-							callbacks.add(new QuiesceCallbackImpl(copyOfBundles, callbacks, timer));
+							callbacks.add(new QuiesceCallbackImpl(bundlesToQuiesce, callbacks, future, timeoutFuture));
 						}
 						
 						//Quiesce each participant and wait for an interrupt from a callback 
@@ -157,45 +247,24 @@ public class QuiesceManagerImpl implemen
 						for( int i=0; i<participants.size(); i++ ) {
 							QuiesceParticipant participant = participants.get(i);
 							QuiesceCallbackImpl callback = callbacks.get(i);
-							List<Bundle> participantBundles = new ArrayList<Bundle>();
-							//deep copy
-							for (Bundle b : copyOfBundles) {
-								participantBundles.add(b);
-							}
-							participant.quiesce(callback, participantBundles);
-						}
-						timer.schedule(new TimerTask() {
-
-							@Override
-							public void run() {
-								//stop bundles
-								//go through callbacks and cancel all bundles
-								for ( Enumeration<Bundle> remainingBundles = bundleMap.elements(); remainingBundles.hasMoreElements(); ) {
-									Bundle b = remainingBundles.nextElement();
-									LOGGER.warn("Could not quiesce, so stopping bundle "+ b.getSymbolicName());
-									stopBundle(b);
-								}
-								/*
-								for ( QuiesceCallbackImpl cb : callbacks ) {
-									System.out.println("Clearing callback");
-									cb.clear();
-									}
-									*/
-							}
-							
-						}, timeout);
-					}else{
-						LOGGER.warn("No participants, so stopping bundles");
-						for ( Enumeration<Bundle> remainingBundles = bundleMap.elements(); remainingBundles.hasMoreElements(); ) {
-							Bundle b = remainingBundles.nextElement();
-							stopBundle(b);
-						}
-					}
-				}
-			} catch (InvalidSyntaxException e) {
-				LOGGER.warn("Exception trying to get service references for quiesce participants "+ e.getMessage());
-			}
-		}
+							participant.quiesce(callback, copyOfBundles);
+						}                        
+                    }else{
+                        LOGGER.warn("No quiesce participants, so stopping bundles");
+                        for (Bundle b : bundlesToQuiesce) {
+                            stopBundle(b, bundlesToQuiesce);
+                        }
+                        future.registerDone();
+                    }
+                }
+            } catch (InvalidSyntaxException e) {
+                LOGGER.warn("Exception trying to get service references for quiesce participants, so stopping bundles."+ e.getMessage());
+                for (Bundle b : bundlesToQuiesce) {
+                    stopBundle(b, bundlesToQuiesce);
+                }
+                future.registerDone();
+            }
+        }
 	}
  
     /**
@@ -205,23 +274,24 @@ public class QuiesceManagerImpl implemen
     private static class QuiesceCallbackImpl implements QuiesceCallback {
     	//Must be a copy
     	private final Set<Bundle> toQuiesce;
+    	// Must not be a copy
+    	private final Set<Bundle> toQuiesceShared;    	
     	//Must not be a copy
     	private final List<QuiesceCallbackImpl> allCallbacks;
     	//Timer so we can cancel the alarm if all done
-    	private final Timer timer;
+    	private final QuiesceFuture future;
+    	//The cleanup action that runs at timeout
+    	private final ScheduledFuture<?> timeoutFuture;
     	
-    	public QuiesceCallbackImpl(Collection<Bundle> toQuiesce, List<QuiesceCallbackImpl> allCallbacks, Timer timer) 
+    	public QuiesceCallbackImpl(Set<Bundle> toQuiesce, List<QuiesceCallbackImpl> allCallbacks, QuiesceFuture future, ScheduledFuture<?> timeoutFuture) 
     	{
     		this.toQuiesce = new HashSet<Bundle>(toQuiesce);
+    		this.toQuiesceShared = toQuiesce;
     		this.allCallbacks = allCallbacks;
-    		this.timer = timer;
+    		this.future = future;
+    		this.timeoutFuture = timeoutFuture;
     	}
 
-    	public void clear() {
-			// TODO Auto-generated method stub
-			
-		}
-
 		/** 
     	 * Removes the bundles from the list of those to quiesce. 
     	 * If the list is now empty, this callback object is finished (i.e. 
@@ -233,17 +303,32 @@ public class QuiesceManagerImpl implemen
     	 */
     	public void bundleQuiesced(Bundle... bundlesQuiesced) {
     		
-    		synchronized (allCallbacks) {
-			  for(Bundle b : bundlesQuiesced) {
-				  if(toQuiesce.remove(b)) {
-					  if(checkOthers(b)){
-						 QuiesceManagerImpl.stopBundle(b);
-						 if(allCallbacksComplete()){
-							 timer.cancel();
-						 }
-					  }
-				  }
-			  }
+            boolean timeoutOccurred = false; 
+            
+            synchronized (allCallbacks) {
+                for(Bundle b : bundlesQuiesced) {
+                    if(QuiesceManagerImpl.stillQuiescing(b)) {
+                        if(toQuiesce.remove(b)) {
+                            if(checkOthers(b)){
+                                QuiesceManagerImpl.stopBundle(b, toQuiesceShared);
+                                if(allCallbacksComplete()){
+                                    future.registerDone();
+                                    timeoutFuture.cancel(false);
+                                    LOGGER.debug("Quiesce complete");
+                                }
+                            }
+                        }
+                    } else {
+                        timeoutOccurred = true;
+                        break;
+                    }
+                }
+                if (timeoutOccurred) {
+                        Iterator<QuiesceCallbackImpl> it = allCallbacks.iterator();
+                        while (it.hasNext()) {
+                            it.next().toQuiesce.clear();
+                        }
+                }
 			}
     	}
 
@@ -260,7 +345,8 @@ public class QuiesceManagerImpl implemen
 			boolean allDone = true;
 			Iterator<QuiesceCallbackImpl> it = allCallbacks.iterator();
 			while (allDone && it.hasNext()) {
-				allDone = !!!it.next().toQuiesce.isEmpty();
+                QuiesceCallbackImpl next = it.next();
+                if (!!!next.toQuiesce.isEmpty()) allDone = false;
 			}
 			return allDone;
 		}		



Mime
View raw message