felix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pde...@apache.org
Subject svn commit: r1605950 - in /felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm: DependencyManager.java context/ComponentContext.java impl/ComponentImpl.java impl/DispatchExecutor.java impl/FilterComponent.java
Date Fri, 27 Jun 2014 05:25:45 GMT
Author: pderop
Date: Fri Jun 27 05:25:44 2014
New Revision: 1605950

URL: http://svn.apache.org/r1605950
Log:
Added support for parallelism in dependency manager: We can now optionally use a DispatchExecutor
instead of the default
SerialExecutor.  The key difference with the SerialExecutor is that several DispatchQueues
can be executed in parallel 
inside a shared thread pool. When one thread schedules some tasks in DispatchQueue Q1, Q2,
then Q1/Q2 queues are dispatched 
and executed concurrently in a shared thread pool. But each queue will executing tasks scheduled
in it serially, in FIFO order.

Added:
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/DispatchExecutor.java
Modified:
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/DependencyManager.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/context/ComponentContext.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/ComponentImpl.java
    felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/FilterComponent.java

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/DependencyManager.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/DependencyManager.java?rev=1605950&r1=1605949&r2=1605950&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/DependencyManager.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/DependencyManager.java
Fri Jun 27 05:25:44 2014
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 
 import org.apache.felix.dm.context.ComponentContext;
 import org.apache.felix.dm.impl.AdapterServiceImpl;
@@ -66,6 +67,7 @@ public class DependencyManager {
     private final BundleContext m_context;
     private final Logger m_logger;
     private final List<Component> m_components = new CopyOnWriteArrayList<>();
+    private volatile Executor m_threadPool;
 
     // service registry cache
     private static ServiceRegistryCache m_serviceRegistryCache;
@@ -121,6 +123,15 @@ public class DependencyManager {
             m_dependencyManagers.add(new WeakReference(this));
         }
     }
+    
+    /**
+     * Sets a threadpool to this dependency manager. All added/removed components will then
be handled
+     * in parallel, using the provided threadpool.
+     */
+    public DependencyManager setThreadPool(Executor threadPool) {
+        m_threadPool = threadPool;
+        return this;
+    }
 
     /**
      * Returns the list of currently created dependency managers.
@@ -160,11 +171,14 @@ public class DependencyManager {
      * Adds a new component to the dependency manager. After the service is added
      * it will be started immediately.
      * 
-     * @param service the service to add
+     * @param c the service to add
      */
-    public void add(Component service) {
-        m_components.add(service);
-        ((ComponentContext) service).start();
+    public void add(Component c) {
+        m_components.add(c);
+        if (m_threadPool != null) {
+            ((ComponentContext) c).setThreadPool(m_threadPool);
+        }
+        ((ComponentContext) c).start();
     }
 
     /**

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/context/ComponentContext.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/context/ComponentContext.java?rev=1605950&r1=1605949&r2=1605950&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/context/ComponentContext.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/context/ComponentContext.java
Fri Jun 27 05:25:44 2014
@@ -21,9 +21,12 @@ package org.apache.felix.dm.context;
 import java.util.List;
 import java.util.concurrent.Executor;
 
+import org.apache.felix.dm.Component;
+
 //TODO should this interface extend Component ?
 public interface ComponentContext {
     public Executor getExecutor(); // shared between a component and its dependencies
+    public Component setThreadPool(Executor threadPool);
     public void start();
     public void stop();
     public boolean isAvailable();

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/ComponentImpl.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/ComponentImpl.java?rev=1605950&r1=1605949&r2=1605950&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/ComponentImpl.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/ComponentImpl.java
Fri Jun 27 05:25:44 2014
@@ -1188,4 +1188,11 @@ public class ComponentImpl implements Co
     	}
     	return super.toString();
     }
+    
+    @Override
+    public Component setThreadPool(Executor threadPool) {
+        ensureNotActive();
+        m_executor = new DispatchExecutor(threadPool, m_logger);
+        return this;
+    }
 }

Added: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/DispatchExecutor.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/DispatchExecutor.java?rev=1605950&view=auto
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/DispatchExecutor.java
(added)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/DispatchExecutor.java
Fri Jun 27 05:25:44 2014
@@ -0,0 +1,120 @@
+package org.apache.felix.dm.impl;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.service.log.LogService;
+
+/**   
+ * A parallel DispatchExecutor, similar to the SerialExecutor, except that several DispatchQueues
can be executed 
+ * in parallel inside a shared thread pool. When one thread schedules tasks in DispatchQueue
Q1, Q2, then Q1/Q2 
+ * queues are dispatched and executed concurrently in a shared thread pool. However, each
queue will execute tasks 
+ * scheduled in it serially, in FIFO order.
+ */
+public class DispatchExecutor implements Executor, Runnable {
+	/**
+	 * The threadpool used for the execution of this queue. When the queue run method is executed,
all
+	 * scheduled tasks are executed in FIFO order, but in the threadpool.
+	 */
+	private final Executor m_threadPool;
+
+	/** 
+	 * List of tasks scheduled in our queue. 
+	 */
+	protected final ConcurrentLinkedQueue<Runnable> m_tasks = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Marker used to remember the id of the thread currently executing this dispatch queue.
+     */
+    private volatile Thread m_executingThread;
+
+    /** 
+     * Flag telling if this dispatch queue is already scheduled for execution in the threadpool.
+     */
+    private final AtomicBoolean m_scheduled = new AtomicBoolean();
+
+    /** 
+	 * Logger used to log exceptions thrown by scheduled tasks. 
+	 */
+	private final Logger m_logger;
+
+	/**
+	 * Creates a new DispatchQueue, which can be executed within a fixed thread pool. Multiple
queue
+	 * can be executed concurrently, but all runnables scheduled in a given queue will be executed
serially, 
+	 * in FIFO order. 
+	 */
+	public DispatchExecutor(Executor threadPool, Logger logger) {
+		m_logger = logger;
+		m_threadPool = threadPool;
+	}
+	
+    /**
+     * Enqueues a task for later execution. You must call {@link #execute()} in order
+     * to trigger the actual task submission.
+     */
+    public void schedule(Runnable task) {
+        m_tasks.add(task);
+    }
+
+	/**
+	 * Submits a task in this queue, and schedule this dispatch queue execution in the threadpool.

+	 * If the queue is already executing, then the tasks is enqueued and will be executed later.
+	 * The task is immediately executed (inline execution) if the queue is currently being executed
by 
+	 * the current thread (reentrency feature, similar to SerialExecutor behavior).
+	 */
+	public void execute(Runnable task) {
+        Thread currThread = Thread.currentThread();
+        if (m_executingThread == currThread) {
+            runTask(task);
+        } else {
+            schedule(task);
+            execute();
+        }
+	}
+	
+    /**
+     * Schedules a task for execution, and then attempts to execute it. This method is thread
safe, so 
+     * multiple threads can try to execute a task but only the first will be executed, other
threads will 
+     * return immediately, and the first thread will execute the tasks scheduled by the other
threads.<p>
+     * <p>
+     * This method is reentrant: if the current thread is currently being executed by this
executor, then 
+     * the task passed to this method will be executed immediately, from the current invoking
thread
+     * (inline execution).
+     */
+	public void execute() {
+        if (m_scheduled.compareAndSet(false, true)) { // schedules our run method in the
tpool.
+            m_threadPool.execute(this);
+        }
+	}
+
+	/**
+	 * Executes from the threadpool all currently enqueued tasks
+	 */
+	@Override
+	public void run() {
+        try {
+            // We do a memory barrier in order to ensure consistent per-thread
+            // memory visibility
+            m_executingThread = Thread.currentThread();
+            Runnable task;
+            while ((task = m_tasks.poll()) != null) {
+                runTask(task);
+            }
+        } finally {
+            m_scheduled.set(false);
+            m_executingThread = null;
+            if (m_tasks.peek() != null) {
+                execute();
+            }
+        }
+	}
+
+    private void runTask(Runnable command) {
+		try {
+			command.run();
+		} catch (Throwable t) {
+			m_logger.log(LogService.LOG_ERROR, "Error processing tasks", t);
+		}
+	}
+}
\ No newline at end of file

Modified: felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/FilterComponent.java
URL: http://svn.apache.org/viewvc/felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/FilterComponent.java?rev=1605950&r1=1605949&r2=1605950&view=diff
==============================================================================
--- felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/FilterComponent.java
(original)
+++ felix/sandbox/pderop/dependencymanager-prototype/org.apache.felix.dm/src/org/apache/felix/dm/impl/FilterComponent.java
Fri Jun 27 05:25:44 2014
@@ -324,4 +324,10 @@ public class FilterComponent implements 
 		m_component.setDebug(label);
 		return this;
 	}
+	
+    @Override
+    public Component setThreadPool(Executor threadPool) {
+        m_component.setThreadPool(threadPool);
+        return this;
+    }
 }
\ No newline at end of file



Mime
View raw message