felix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pde...@apache.org
Subject svn commit: r1641201 - in /felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl: ComponentImpl.java ComponentScheduler.java DispatchExecutor.java SerialExecutor.java
Date Sun, 23 Nov 2014 15:31:45 GMT
Author: pderop
Date: Sun Nov 23 15:31:44 2014
New Revision: 1641201

URL: http://svn.apache.org/r1641201
Log:
Fixed a problem when the bundle is being stopped: When the bundle is stopped,
the components are possibly removed asynchronously from 
the Activator.destroy() method (even when not using a threadpool). But this is a problem to
do so
and the components should be removed synchronously from the Activator.destroy(), in order
to make sure 
the Bundle context is valid at the time we deactivate the components, and also to make sure
the
threads possibly started by the components are stopped before returning from the Activator.destroy()
method.

Modified:
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentImpl.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentScheduler.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/DispatchExecutor.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/SerialExecutor.java

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentImpl.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentImpl.java?rev=1641201&r1=1641200&r2=1641201&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentImpl.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentImpl.java
Sun Nov 23 15:31:44 2014
@@ -39,7 +39,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -219,15 +221,60 @@ public class ComponentImpl implements Co
 	    }
 	}
 	
-	public void stop() {
+	public void stop() {           
 	    if (m_active.compareAndSet(true, false)) {
-            getExecutor().execute(new Runnable() {
+	        Executor executor = getExecutor();
+
+	        // First, declare the task that will stop our component in our executor. We are
also using a latch
+	        // because if the component bundle is currently being stopped, then we have to deactivate
the component 
+	        // synchronously (if not, the component could be deactivated in another thread after
the bundle has been 
+	        // stopped; and in this case, the bundle would then be invalid at the point we deactivate
the component.	        	        
+	        // Notice that even if we are not using a threadpool (only a SerialExecutor, not
a DispatchExecutor), it is
+	        // possible that the SerialExecutor is being currently run by another thread, so
we really have to use a latch,
+	        // even if we are only using a SerialExecutor.
+	        
+	        final CountDownLatch stopLatch = new CountDownLatch(1);
+	        final Runnable stopTask = new Runnable() {
                 @Override
                 public void run() {
-                    m_isStarted = false;
-                    handleChange();
+                    try {
+                        m_isStarted = false;
+                        handleChange();
+                    } finally {
+                        stopLatch.countDown();
+                    }
                 }
-            });
+            };
+            
+            if (m_bundle == null /* only in tests env */ || m_bundle.getState() == Bundle.ACTIVE)
{
+                executor.execute(stopTask); // asynchronous if we are using a DispatchExecutor
and a threadpool.
+            } else {
+                // If the component bundle is stopping, not active, we want to deactive the
component synchronously
+                // because if not, we would end up in a situation with all sort of problems.
+                
+                if (executor instanceof SerialExecutor) {
+                    // Most of the time, the stopTask will be called synchronously. But in
rare occasions, if the 
+                    // SerialExecutor is busy and being handling an event in another thread,
then in this case
+                    // the stopTask will be executed asynchronously ... but our latch will
make sure we wait for the 
+                    // component deactivation.
+                    executor.execute(stopTask);
+                } else if (executor instanceof DispatchExecutor) {
+                    // If using a threadpool (a DispatchExecutor), then we have to invoke
turn off
+                    // parallelism to avoid possible deadlocks; so we'll schedule our stopTask
in the DispatchExecutor
+                    // with a flag=false in order to try to execute the task from the current
thread, not from the threadpool.
+                    ((DispatchExecutor) executor).execute(stopTask, false);
+                } else {
+                    throw new IllegalStateException("no executor found form component " +
this);
+                }
+
+                try {
+                    if (!stopLatch.await(15000, TimeUnit.MILLISECONDS)) { // todo make the
delay configurable
+                        m_logger.warn("Could not stop component %s timely.", this);
+                    }
+                } catch (InterruptedException e) {
+                    m_logger.info("Thread interrupted while stopping component %s.", this);
+                }
+            }
 	    }
 	}
 
@@ -1259,8 +1306,7 @@ public class ComponentImpl implements Co
         if (instanceFactory != null) {
             return instanceFactory.getClass().getName();
         } else {
-            // Unexpected ...
-            return getClass().getName();
+            throw new IllegalStateException("can't find the component class name");
         }
     }
     

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentScheduler.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentScheduler.java?rev=1641201&r1=1641200&r2=1641201&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentScheduler.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/ComponentScheduler.java
Sun Nov 23 15:31:44 2014
@@ -18,8 +18,8 @@
  */
 package org.apache.felix.dm.impl;
 
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import org.apache.felix.dm.Component;
@@ -42,7 +42,7 @@ public class ComponentScheduler {
     private final static String PARALLEL = "org.apache.felix.dependencymanager.parallel";
     private volatile ComponentExecutorFactory m_componentExecutorFactory;
     private final Executor m_serial = new SerialExecutor(null);
-    private Set<Component> m_pending = new LinkedHashSet<>();
+    private ConcurrentMap<Component, Component> m_pending = new ConcurrentHashMap<>();
 
     public static ComponentScheduler instance() {
         return m_instance;
@@ -53,7 +53,7 @@ public class ComponentScheduler {
         m_serial.execute(new Runnable() {
             @Override
             public void run() {
-                for (Component c : m_pending) {
+                for (Component c : m_pending.keySet()) {
                     createComponentExecutor(m_componentExecutorFactory, c);
                     ((ComponentContext) c).start();
                 }
@@ -77,7 +77,7 @@ public class ComponentScheduler {
                 public void run() {
                     ComponentExecutorFactory execFactory = m_componentExecutorFactory;
                     if (execFactory == null) {
-                        m_pending.add(c);
+                        m_pending.put(c, c);
                     }
                     else {
                         createComponentExecutor(execFactory, c);
@@ -89,14 +89,8 @@ public class ComponentScheduler {
     }
 
     public void remove(final Component c) {
-        m_serial.execute(new Runnable() {
-            @Override
-            public void run() {
-                if (!m_pending.remove(c)) {
-                    ((ComponentContext) c).stop();
-                }
-            }
-        });
+        m_pending.remove(c);
+        ((ComponentContext) c).stop();
     }
 
     private boolean mayStartNow(Component c) {

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/DispatchExecutor.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/DispatchExecutor.java?rev=1641201&r1=1641200&r2=1641201&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/DispatchExecutor.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/DispatchExecutor.java
Sun Nov 23 15:31:44 2014
@@ -1,6 +1,9 @@
 package org.apache.felix.dm.impl;
 
+import java.lang.ref.WeakReference;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -8,20 +11,35 @@ import java.util.concurrent.atomic.Atomi
 import org.osgi.service.log.LogService;
 
 /**   
- * A parallel DispatchExecutor, similar to the SerialExecutor, except that several DispatchExecutor
can be executed 
- * in parallel inside a shared thread pool. When one thread schedules tasks in DispatchQueue
Q1, Q2, then Q1/Q2 
- * queues are dispatched and executed concurrently in a shared thread pool. However, each
queue will execute tasks 
- * scheduled in it serially, in FIFO order.
+ * A DispatchExecutor is a queue that can execute FIFO tasks in a shared threadpool configured
for the dispatcher.
+ * Each task scheduled in a given DispatchExecutor will be executed serially in FIFO order;
and multiple 
+ * DispatchExecutor instances may each run concurrently with respect to each other.
+ * <p>
+ * 
+ * This class also supports synchronous scheduling, like the @link {@link SerialExecutor}
class; and in this case,
+ * only one caller threads will execute the tasks scheduled in the DispatchQueue and in the
case the internal 
+ * threadpool won't be used).
+ * 
+ * <p> 
+ * 
+ * This class is <b>lock free</b> by design and ensures <b>"safe object
publication"</b> between scheduling threads and
+ * actual executing thread: if one thread T1 schedules a task, but another thread T2 actually

+ * executes it, then all the objects from the T1 thread will be "safely published" to the
executing T2 thread.
+ * Safe publication is ensured  because we are using a ConcurrentLinkedQueue.
+ * (see [1], chapter 3.5.3 (Safe publication idioms). 
+ * 
+ * [1] Java Concurrency In Practice, Addison Wesley
+ * 
+ * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
  */
 public class DispatchExecutor implements Executor, Runnable {
 	/**
-	 * The threadpool used for the execution of this queue. When the queue run method is executed,
all
-	 * scheduled tasks are executed in FIFO order, but in the threadpool.
+	 * The threadpool used for the execution of the tasks that are scheduled in this queue.
 	 */
 	private final Executor m_threadPool;
 
 	/** 
-	 * List of tasks scheduled in our queue. 
+	 * List of tasks scheduled in our queue.
 	 */
 	protected final ConcurrentLinkedQueue<Runnable> m_tasks = new ConcurrentLinkedQueue<>();
 
@@ -39,64 +57,86 @@ public class DispatchExecutor implements
 	 * Logger used to log exceptions thrown by scheduled tasks. 
 	 */
 	private final Logger m_logger;
-
+	
 	/**
 	 * Creates a new DispatchQueue, which can be executed within a fixed thread pool. Multiple
queue
 	 * can be executed concurrently, but all runnables scheduled in a given queue will be executed
serially, 
 	 * in FIFO order. 
+	 * 
+	 * @param threadPool the executor (typically a threadpool) used to execute this DispatchExecutor.
+	 * @param logger the Logger used when errors are taking place
 	 */
-	public DispatchExecutor(Executor threadPool, Logger logger) {
+    public DispatchExecutor(Executor threadPool, Logger logger) {
 		m_logger = logger;
 		m_threadPool = threadPool;
 	}
 	
     /**
      * Enqueues a task for later execution. You must call {@link #execute()} in order
-     * to trigger the actual task submission.
+     * to trigger the actual execution of all scheduled tasks (in FIFO order).
      */
     public void schedule(Runnable task) {
         m_tasks.add(task);
     }
 
 	/**
-	 * Submits a task in this queue, and schedule this dispatch queue execution in the threadpool.

-	 * If the queue is already executing, then the tasks is enqueued and will be executed later.
+	 * Submits a task in this queue, and schedule the execution of this DispatchQueue in the
threadpool. 	 
 	 * The task is immediately executed (inline execution) if the queue is currently being executed
by 
-	 * the current thread (reentrency feature, similar to SerialExecutor behavior).
+	 * the current thread.
 	 */
 	public void execute(Runnable task) {
+	    execute(task, true);
+	}
+	
+	/**
+     * Schedules a task in this queue.
+     * If the queue is currently being executed by the current thread, then the task is executed
immediately.
+     * @tasks the task to schedule
+     * @threadpool true if the queue should be executed in the threadpool, false if the queue
must be executed by
+     * only one caller thread.
+     */
+    public void execute(Runnable task, boolean threadpool) {
         Thread currThread = Thread.currentThread();
         if (m_executingThread == currThread) {
             runTask(task);
         } else {
             schedule(task);
-            execute();
+            execute(threadpool);
         }
-	}
+    }
 	
     /**
-     * Schedules a task for execution, and then attempts to execute it. This method is thread
safe, so 
-     * multiple threads can try to execute a task but only the first will be executed, other
threads will 
-     * return immediately, and the first thread will execute the tasks scheduled by the other
threads.<p>
-     * <p>
-     * This method is reentrant: if the current thread is currently being executed by this
executor, then 
-     * the task passed to this method will be executed immediately, from the current invoking
thread
-     * (inline execution).
+     * Schedules the execution of this DispatchQueue in the threadpool.
      */
 	public void execute() {
+	    execute(true);
+	}
+	
+    /**
+     * Schedules the execution of this DispatchQueue in the threadpool.
+     * 
+     * @param threadpool true means the DispatchQueue is executed in the threadpool, false
means the queue is executed from the
+     * caller thread.
+     */
+    public void execute(boolean threadpool) {
         if (m_scheduled.compareAndSet(false, true)) { // schedules our run method in the
tpool.
             try {
-                m_threadPool.execute(this);
+                if (threadpool) {
+                    m_threadPool.execute(this);
+                } else {
+                    run(); // run all queue tasks from the caller thread
+                }
             } catch (RejectedExecutionException e) {
                 // The threadpool seems stopped (maybe the framework is being stopped). Anyway,
just execute our tasks
                 // from the current thread.
                 run();
             }
         }
-	}
+    }
 
 	/**
-	 * Executes from the threadpool all currently enqueued tasks
+	 * Run all tasks scheduled in this queue, in FIFO order. This method may be executed either
in the threadpool, or from
+	 * the caller thread.
 	 */
 	@Override
 	public void run() {
@@ -117,9 +157,13 @@ public class DispatchExecutor implements
         }
 	}
 
-    private void runTask(Runnable command) {
+	/**
+	 * Runs a given task
+	 * @param task the task to execute
+	 */
+    private void runTask(Runnable task) {
 		try {
-			command.run();
+		    task.run();
 		} catch (Throwable t) {
 			m_logger.log(LogService.LOG_ERROR, "Error processing tasks", t);
 		}

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/SerialExecutor.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/SerialExecutor.java?rev=1641201&r1=1641200&r2=1641201&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/SerialExecutor.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dependencymanager/src/org/apache/felix/dm/impl/SerialExecutor.java
Sun Nov 23 15:31:44 2014
@@ -26,12 +26,12 @@ import org.osgi.service.log.LogService;
 
 /**
  * Allows you to enqueue tasks from multiple threads and then execute
- * them on one thread sequentially. It assumes more than one thread will
+ * them on one thread sequentially. It assumes no more than one thread will
  * try to execute the tasks and it will make an effort to pick the first
  * task that comes along whilst making sure subsequent tasks return
  * without waiting. <p>
  * 
- * This class is lock free and ensures "safe object publication" between scheduling threads
and
+ * This class is <b>lock free</b> by design and ensures <b>"safe object
publication"</b> between scheduling threads and
  * actual executing thread: if one thread T1 schedules a task, but another thread T2 actually

  * executes it, then all the objects from the T1 thread will be "safely published" to the
executing T2 thread.
  * Safe publication is ensured  because we are using a ConcurrentLinkedQueue.



Mime
View raw message