activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887564 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch: ./ internal/ internal/advanced/
Date Sat, 05 Dec 2009 16:43:51 GMT
Author: chirino
Date: Sat Dec  5 16:43:47 2009
New Revision: 887564

URL: http://svn.apache.org/viewvc?rev=887564&view=rev
Log:
simplifying the advanced package a bit

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ForeignEvent.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/TimerEvent.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/UpdateEvent.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
Sat Dec  5 16:43:47 2009
@@ -43,7 +43,7 @@
         return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
     }
     
-    synchronized private static DispatchSPI spi() {
+    synchronized public static DispatchSPI spi() {
         if(spi==null) {
             spi = cretateDispatchSystemSPI();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
Sat Dec  5 16:43:47 2009
@@ -48,7 +48,7 @@
     public void resume() {
         if( suspendCounter.decrementAndGet() == 0 ) {
             if( size.get() != 0 ) {
-                targetQueue.dispatchAsync(this);
+                dispatchSelfAsync();
             }
         }
     }
@@ -71,10 +71,14 @@
         }
         runnables.add(runnable);
         if( lastSize == 0 && suspendCounter.get()<=0 ) {
-            targetQueue.dispatchAsync(this);
+            dispatchSelfAsync();
         }
     }
 
+    protected void dispatchSelfAsync() {
+        targetQueue.dispatchAsync(this);
+    }
+
     public void run() {
         DispatchQueue original = DispatchSystem.CURRENT_QUEUE.get();
         DispatchSystem.CURRENT_QUEUE.set(this);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
Sat Dec  5 16:43:47 2009
@@ -38,8 +38,6 @@
     final GlobalDispatchQueue globalQueues[];
     final AtomicLong globalQueuedRunnables = new AtomicLong();
 
-    private final ThreadLocal<DispatcherThread> dispatcher = new ThreadLocal<DispatcherThread>();
-    private final ThreadLocal<DispatchContext> dispatcherContext = new ThreadLocal<DispatchContext>();
     private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
 
     final AtomicInteger startCounter = new AtomicInteger();
@@ -115,30 +113,12 @@
         }
     }
 
-    public void setCurrentDispatchContext(DispatchContext context) {
-        dispatcherContext.set(context);
-    }
-
-    public DispatchContext getCurrentDispatchContext() {
-        return dispatcherContext.get();
-    }
-
-    /**
-     * Returns the currently executing dispatcher, or null if the current thread
-     * is not a dispatcher:
-     * 
-     * @return The currently executing dispatcher
-     */
-    public DispatcherThread getCurrentDispatcher() {
-        return dispatcher.get();
-    }
-
     /**
      * A Dispatcher must call this to indicate that is has started it's dispatch
      * loop.
      */
     public void onDispatcherStarted(DispatcherThread d) {
-        dispatcher.set(d);
+        DispatcherThread.CURRENT.set(d);
         loadBalancer.onDispatcherStarted(d);
     }
 
@@ -159,7 +139,7 @@
     }
 
     protected DispatcherThread chooseDispatcher() {
-        DispatcherThread d = dispatcher.get();
+        DispatcherThread d = DispatcherThread.CURRENT.get();
         if (d == null) {
             synchronized (dispatchers) {
                 if(dispatchers.isEmpty())
@@ -223,7 +203,7 @@
     }
     
     public DispatchQueue createQueue(String label) {
-        SerialDispatchQueue rc = new SerialDispatchQueue(label);
+        AdvancedSerialDispatchQueue rc = new AdvancedSerialDispatchQueue(label);
         rc.setTargetQueue(getGlobalQueue(DEFAULT));
         return rc;
     }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java?rev=887564&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedQueue.java
Sat Dec  5 16:43:47 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+public interface AdvancedQueue {
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java?rev=887564&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
Sat Dec  5 16:43:47 2009
@@ -0,0 +1,19 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
+
+public class AdvancedSerialDispatchQueue extends SerialDispatchQueue {
+
+    
+    public AdvancedSerialDispatchQueue(String label) {
+        super(label);
+//        context = new DispatchContext(this, true, label);
+}
+    
+//    @Override
+//    protected void dispatchSelfAsync() {
+//        AdvancedQueue aq = ((AdvancedQueue)targetQueue);
+//        super.dispatchSelfAsync();
+//    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
Sat Dec  5 16:43:47 2009
@@ -3,29 +3,30 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread.UpdateEvent;
 import org.apache.activemq.util.list.LinkedNode;
 
 /**
  * 
  */
-class DispatchContext extends LinkedNode<DispatchContext> {
+class DispatchContext extends LinkedNode<DispatchContext> implements Runnable {
+    
+    public static final ThreadLocal<DispatchContext> CURRENT = new ThreadLocal<DispatchContext>();
 
-    private final DispatcherThread dispacher;
     // The target:
     private final Runnable runnable;
     // The name of this context:
-    final String name;
+    final String label;
+
     // list prio can only be updated in the thread of of the owning
     // dispatcher
     protected int listPrio;
 
     // The update events are used to update fields in the dispatch context
     // from foreign threads:
-    final UpdateEvent updateEvent[];
+    final UpdateEvent updateEvent[] = new UpdateEvent[2];
 
     final DispatchObserver tracker;
-    protected DispatcherThread currentOwner;
+    protected DispatcherThread target;
     private DispatcherThread updateDispatcher = null;
 
     int priority;
@@ -33,36 +34,22 @@
     private boolean closed = false;
     final CountDownLatch closeLatch = new CountDownLatch(1);
 
-    protected DispatchContext(DispatcherThread dispatcherThread, Runnable runnable, boolean
persistent, String name) {
-        dispacher = dispatcherThread;
+    protected DispatchContext(DispatcherThread thread, Runnable runnable, boolean persistent,
String label) {
         this.runnable = runnable;
-        this.name = name;
-        this.currentOwner = (DispatcherThread) dispacher;
-        if (persistent && dispacher.spi != null) {
-            this.tracker = dispacher.spi.getLoadBalancer().createExecutionTracker((DispatchContext)
this);
+        this.label = label;
+        this.target = thread;
+        if (persistent && target.spi != null) {
+            this.tracker = target.spi.getLoadBalancer().createExecutionTracker((DispatchContext)
this);
         } else {
             this.tracker = null;
         }
-        updateEvent = createUpdateEvent();
-        updateEvent[0] = dispacher.new UpdateEvent(this);
-        updateEvent[1] = dispacher.new UpdateEvent(this);
+        updateEvent[0] = new UpdateEvent(this);
+        updateEvent[1] = new UpdateEvent(this);
         if (persistent) {
-            currentOwner.takeOwnership(this);
+            target.takeOwnership(this);
         }
     }
 
-    private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
-        return new DispatcherThread.UpdateEvent[2];
-    }
-
-    /**
-     * Gets the execution tracker for the context.
-     * 
-     * @return the execution tracker for the context:
-     */
-    public DispatchObserver getExecutionTracker() {
-        return tracker;
-    }
 
     /**
      * This can only be called by the owning dispatch thread:
@@ -77,7 +64,7 @@
         synchronized (this) {
 
             // If we're already set to this dispatcher
-            if (newDispatcher == currentOwner) {
+            if (newDispatcher == target) {
                 if (updateDispatcher == null || updateDispatcher == newDispatcher) {
                     return;
                 }
@@ -85,18 +72,18 @@
 
             updateDispatcher = (DispatcherThread) newDispatcher;
             if (DispatcherThread.DEBUG)
-                System.out.println(getName() + " updating to " + updateDispatcher);
+                System.out.println(getLabel() + " updating to " + updateDispatcher);
 
-            currentOwner.onForeignUpdate(this);
+            target.onForeignUpdate(this);
         }
 
     }
 
     public void requestDispatch() {
 
-        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+        DispatcherThread callingDispatcher = DispatcherThread.CURRENT.get();
         if (tracker != null)
-            tracker.onDispatch(callingDispatcher, dispacher.getCurrentDispatchContext());
+            tracker.onDispatch(callingDispatcher, DispatchContext.CURRENT.get());
 
         // Otherwise this is coming off another thread, so we need to
         // synchronize
@@ -104,22 +91,22 @@
         synchronized (this) {
             // If the owner of this context is the calling thread, then
             // delegate to the dispatcher.
-            if (currentOwner == callingDispatcher) {
+            if (target == callingDispatcher) {
 
-                if (!currentOwner.running) {
+                if (!target.running) {
                     // TODO In the event that the current dispatcher
                     // failed due to a runtime exception, we could
                     // try to switch to a new dispatcher.
                     throw new RejectedExecutionException();
                 }
                 if (!isLinked()) {
-                    currentOwner.priorityQueue.add(this, listPrio);
+                    target.priorityQueue.add(this, listPrio);
                 }
                 return;
             }
 
             dispatchRequested = true;
-            currentOwner.onForeignUpdate(this);
+            target.onForeignUpdate(this);
         }
     }
 
@@ -129,12 +116,12 @@
             return;
         }
 
-        priority = Math.min(priority, dispacher.MAX_USER_PRIORITY);
+        priority = Math.min(priority, target.MAX_USER_PRIORITY);
 
         if (this.priority == priority) {
             return;
         }
-        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+        DispatcherThread callingDispatcher = DispatcherThread.CURRENT.get();
 
         // Otherwise this is coming off another thread, so we need to
         // synchronize to protect against ownership changes:
@@ -146,7 +133,7 @@
 
             // If this is called by the owning dispatcher, then we go ahead
             // and update:
-            if (currentOwner == callingDispatcher) {
+            if (target == callingDispatcher) {
 
                 if (priority != listPrio) {
 
@@ -155,13 +142,13 @@
                     // at the new priority:
                     if (isLinked()) {
                         unlink();
-                        currentOwner.priorityQueue.add(this, listPrio);
+                        target.priorityQueue.add(this, listPrio);
                     }
                 }
                 return;
             }
 
-            currentOwner.onForeignUpdate(this);
+            target.onForeignUpdate(this);
         }
 
     }
@@ -176,16 +163,16 @@
 
             if (updateDispatcher != null && updateDispatcher.takeOwnership(this))
{
                 if (DispatcherThread.DEBUG) {
-                    System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                    System.out.println("Assigning " + getLabel() + " to " + updateDispatcher);
                 }
 
-                if (currentOwner.removeDispatchContext(this)) {
+                if (target.removeDispatchContext(this)) {
                     dispatchRequested = true;
                 }
 
                 updateDispatcher.onForeignUpdate(this);
-                switchedDispatcher(currentOwner, updateDispatcher);
-                currentOwner = updateDispatcher;
+                switchedDispatcher(target, updateDispatcher);
+                target = updateDispatcher;
                 updateDispatcher = null;
 
             } else {
@@ -199,14 +186,7 @@
         }
     }
 
-    /**
-     * May be overriden by subclass to additional work on dispatcher switch
-     * 
-     * @param oldDispatcher The old dispatcher
-     * @param newDispatcher The new Dispatcher
-     */
     protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher)
{
-
     }
 
     public boolean isClosed() {
@@ -214,20 +194,20 @@
     }
 
     public void close(boolean sync) {
-        DispatcherThread callingDispatcher = dispacher.getCurrentDispatcher();
+        DispatcherThread callingDispatcher = DispatcherThread.CURRENT.get();
         // System.out.println(this + "Closing");
         synchronized (this) {
             closed = true;
             // If the owner of this context is the calling thread, then
             // delegate to the dispatcher.
-            if (currentOwner == callingDispatcher) {
-                dispacher.removeDispatchContext(this);
+            if (target == callingDispatcher) {
+                target.removeDispatchContext(this);
                 closeLatch.countDown();
                 return;
             }
         }
 
-        currentOwner.onForeignUpdate(this);
+        target.onForeignUpdate(this);
         if (sync) {
             boolean interrupted = false;
             while (true) {
@@ -245,16 +225,12 @@
         }
     }
 
-    public final String toString() {
-        return getName();
-    }
-
     public DispatcherThread getTargetQueue() {
-        return currentOwner;
+        return target;
     }
 
-    public String getName() {
-        return name;
+    public String getLabel() {
+        return label;
     }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
Sat Dec  5 16:43:47 2009
@@ -29,11 +29,12 @@
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
-import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
 
 public class DispatcherThread implements Runnable {
 
+    static public final ThreadLocal<DispatcherThread> CURRENT = new ThreadLocal<DispatcherThread>();
+
     private final ThreadDispatchQueue dispatchQueues[];
     
     static final boolean DEBUG = false;
@@ -50,9 +51,10 @@
     protected final PriorityLinkedList<DispatchContext> priorityQueue;
 
     // Dispatch queue for requests from other threads:
-    private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+    final LinkedNodeList<ForeignEvent>[] foreignQueue = createForeignQueue();
+
     private static final int[] TOGGLE = new int[] { 1, 0 };
-    private int foreignToggle = 0;
+    int foreignToggle = 0;
 
     // Timed Execution List
     protected final TimerHeap<Runnable> timerHeap = new TimerHeap<Runnable>()
{
@@ -63,7 +65,7 @@
     };
 
     protected final String name;
-    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+    final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
     private final Semaphore foreignPermits = new Semaphore(0);
 
     private final Mapper<Integer, DispatchContext> PRIORITY_MAPPER = new Mapper<Integer,
DispatchContext>() {
@@ -82,33 +84,17 @@
 
         MAX_USER_PRIORITY = priorities - 1;
         priorityQueue = new PriorityLinkedList<DispatchContext>(MAX_USER_PRIORITY +
1, PRIORITY_MAPPER);
-        foreignQueue = createForeignEventQueue();
         for (int i = 0; i < 2; i++) {
             foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
         }
         this.spi = spi;
     }
-
+    
     @SuppressWarnings("unchecked")
-    private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+    private LinkedNodeList<ForeignEvent>[] createForeignQueue() {
         return new LinkedNodeList[2];
     }
 
-    protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
-        public abstract void execute();
-
-        final void addToList() {
-            synchronized (foreignQueue) {
-                if (!this.isLinked()) {
-                    foreignQueue[foreignToggle].addLast(this);
-                    if (!foreignAvailable.getAndSet(true)) {
-                        wakeup();
-                    }
-                }
-            }
-        }
-    }
-
     public boolean isThreaded() {
         return threaded;
     }
@@ -121,19 +107,6 @@
         return MAX_USER_PRIORITY;
     }
 
-    class UpdateEvent extends ForeignEvent {
-        private final DispatchContext pdc;
-
-        UpdateEvent(DispatchContext pdc) {
-            this.pdc = pdc;
-        }
-
-        // Can only be called by the owner of this dispatch context:
-        public void execute() {
-            pdc.processForeignUpdates();
-        }
-    }
-
     public DispatchContext register(Runnable runnable, String name) {
         return new DispatchContext(this, runnable, true, name);
     }
@@ -202,11 +175,7 @@
 
     public void run() {
 
-        if (spi != null) {
-            // Inform the dispatcher that we have started:
-            spi.onDispatcherStarted((DispatcherThread) this);
-        }
-
+        spi.onDispatcherStarted((DispatcherThread) this);
         DispatchContext pdc;
         try {
             while (running) {
@@ -218,14 +187,14 @@
                     }
                     
                     if (pdc.tracker != null) {
-                        spi.setCurrentDispatchContext(pdc);
+                        DispatchContext.CURRENT.set(pdc);
                     }
 
                     counter++;
                     pdc.run();
 
                     if (pdc.tracker != null) {
-                        spi.setCurrentDispatchContext(null);
+                        DispatchContext.CURRENT.set(null);
                     }
                 }
 
@@ -256,7 +225,7 @@
                         }
 
                         fe.unlink();
-                        fe.execute();
+                        fe.run();
                     }
                 }
             }
@@ -265,9 +234,7 @@
         } catch (Throwable thrown) {
             thrown.printStackTrace();
         } finally {
-            if (spi != null) {
-                spi.onDispatcherStopped((DispatcherThread) this);
-            }
+            spi.onDispatcherStopped((DispatcherThread) this);
             cleanup();
         }
     }
@@ -354,13 +321,6 @@
         context.requestDispatch();
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
-     * .dispatch.Dispatcher.Dispatchable)
-     */
     public final void dispatch(Runnable runnable, int priority) {
         DispatchContext context = new DispatchContext(this, runnable, false, name);
         context.updatePriority(priority);
@@ -390,13 +350,6 @@
         dispatch(runnable, prio);
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
-     * long, java.util.concurrent.TimeUnit)
-     */
     public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit)
{
         schedule(runnable, 0, delay, timeUnit);
     }
@@ -407,36 +360,29 @@
                 execute(runnable, prio);
             }
         };
-        if (getCurrentDispatcher() == this) {
+        if (DispatcherThread.CURRENT.get() == this) {
             timerHeap.addRelative(wrapper, delay, timeUnit);
         } else {
-            new ForeignEvent() {
-                public void execute() {
-                    timerHeap.addRelative(wrapper, delay, timeUnit);
+            add(new TimerEvent(this, wrapper, delay, timeUnit));
+        }
+    }
+    
+    final void add(ForeignEvent event) {
+        synchronized (foreignQueue) {
+            if (!event.isLinked()) {
+                foreignQueue[foreignToggle].addLast(event);
+                if (!foreignAvailable.getAndSet(true)) {
+                    wakeup();
                 }
-            }.addToList();
+            }
         }
     }
 
+
     public String toString() {
         return name;
     }
 
-    final DispatcherThread getCurrentDispatcher() {
-        if (spi != null) {
-            return (DispatcherThread) spi.getCurrentDispatcher();
-        } else if (Thread.currentThread() == thread) {
-            return (DispatcherThread) this;
-        } else {
-            return null;
-        }
-
-    }
-
-    final DispatchContext getCurrentDispatchContext() {
-        return spi.getCurrentDispatchContext();
-    }
-
     public String getName() {
         return name;
     }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ForeignEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ForeignEvent.java?rev=887564&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ForeignEvent.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ForeignEvent.java
Sat Dec  5 16:43:47 2009
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.util.list.LinkedNode;
+
+abstract class ForeignEvent extends LinkedNode<ForeignEvent> implements Runnable {
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887564&r1=887563&r2=887564&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
Sat Dec  5 16:43:47 2009
@@ -28,7 +28,7 @@
 
     private final boolean DEBUG = false;
 
-    //TODO: Added plumbing for periodic rebalancing which we should
+    //TODO: Added plumbing for periodic re-balancing which we should
     //consider implementing
     private static final boolean ENABLE_UPDATES = false;
     private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/TimerEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/TimerEvent.java?rev=887564&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/TimerEvent.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/TimerEvent.java
Sat Dec  5 16:43:47 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.TimeUnit;
+
+final class TimerEvent extends ForeignEvent {
+
+    private final long delay;
+    private final Runnable wrapper;
+    private final TimeUnit timeUnit;
+    private final DispatcherThread thread;
+
+    public TimerEvent(DispatcherThread thread, Runnable wrapper, long delay, TimeUnit timeUnit)
{
+        this.thread = thread;
+        this.delay = delay;
+        this.wrapper = wrapper;
+        this.timeUnit = timeUnit;
+    }
+
+    public void run() {
+        thread.timerHeap.addRelative(wrapper, delay, timeUnit);
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/UpdateEvent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/UpdateEvent.java?rev=887564&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/UpdateEvent.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/UpdateEvent.java
Sat Dec  5 16:43:47 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+
+class UpdateEvent extends ForeignEvent {
+    protected final DispatchContext dispatchContext;
+
+    public UpdateEvent(DispatchContext dispatchContext) {
+        this.dispatchContext = dispatchContext;
+    }
+
+    // Can only be called by the owner of this dispatch context:
+    public void run() {
+        dispatchContext.processForeignUpdates();
+    }
+}
\ No newline at end of file



Mime
View raw message