activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r892951 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/actor/ main/java/org/apache/activemq/dispatch/internal/nio/ main/java/org/apache/activemq/dispatch/internal/simple/ test/java/org/ap...
Date Mon, 21 Dec 2009 20:00:27 GMT
Author: chirino
Date: Mon Dec 21 20:00:26 2009
New Revision: 892951

URL: http://svn.apache.org/viewvc?rev=892951&view=rev
Log:
better nio event loop.  More nio tests.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
Mon Dec 21 20:00:26 2009
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.actor;
 
 import java.lang.reflect.Constructor;
@@ -16,6 +32,12 @@
 
 import static org.objectweb.asm.ClassWriter.*;
 
+/**
+ * This class creates proxy objects that allow you to easily implement the 
+ * actor pattern in java using a {@link DispatchQueue}.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class ActorProxy {
 
     public static <T> T create(Class<T> interfaceClass, T target, DispatchQueue
queue) throws IllegalArgumentException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioDispatchSource.java
Mon Dec 21 20:00:26 2009
@@ -33,7 +33,7 @@
  */
 final public class NioDispatchSource extends BaseSuspendable implements DispatchSource {
 
-    public static final boolean DEBUG = true;
+    public static final boolean DEBUG = false;
 
     interface KeyActor {
         public void register();
@@ -88,7 +88,7 @@
                 key.attach(new Runnable() {
                     public void run() {
                         int ops = key.readyOps();
-                        debug("%s: selector found ready ops: %d", this, ops);
+                        debug("selector found ready ops: %d", ops);
                         readyOps |= ops;
                         resume();
                     }
@@ -102,7 +102,7 @@
             if( readyOps!=0 && suspended.get() <= 0) {
                 final int dispatchedOps = readyOps;
                 readyOps = 0;
-                debug("%s: dispatching for ops: %d", this, dispatchedOps);
+                debug("dispatching for ops: %d", dispatchedOps);
                 targetQueue.dispatchAsync(new Runnable() {
                     public void run() {
                         eventHandler.run();
@@ -113,13 +113,13 @@
         }
         
         public void addInterest(int ops) {
-            debug("%s: adding interest: %d", this, ops);
+            debug("adding interest: %d", ops);
             key.interestOps(key.interestOps()|ops);
         }
         
         public void cancel() {
             if (key != null && key.isValid()) {
-                debug("%s: canceling key: %s", this, key);
+                debug("canceling key.");
                 // This will make sure that the key is removed
                 // from the selector.
                 key.cancel();
@@ -186,7 +186,7 @@
 
     protected void debug(String str, Object... args) {
         if (DEBUG) {
-            System.out.println(format(str, args));
+            System.out.println(format("[DEBUG] NioDispatchSource %0#10x: ", System.identityHashCode(this))+format(str,
args));
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NioSelector.java
Mon Dec 21 20:00:26 2009
@@ -23,6 +23,12 @@
 import java.util.Iterator;
 import java.util.Set;
 
+import static java.lang.String.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class NioSelector {
     
     public final static ThreadLocal<NioSelector> CURRENT_SELECTOR = new ThreadLocal<NioSelector>();
@@ -38,20 +44,11 @@
         return selector;
     }
 
-    protected void cleanup() {
-        try {
-            selector.close();
-        } catch (IOException e) {
-            if (DEBUG) {
-                debug("Error closing selector", e);
-            }
-        }
-    }
-
     /**
      * Subclasses may override this to provide an alternative wakeup mechanism.
      */
-    protected void wakeup() {
+    public void wakeup() {
+        debug("waking selector");
         selector.wakeup();
     }
 
@@ -66,8 +63,7 @@
      *            to become ready.
      * @throws IOException
      */
-    public void doSelect(long timeout) throws IOException {
-
+    public int select(long timeout) throws IOException {
         try {
             if (timeout == -1) {
                 selector.select();
@@ -76,19 +72,20 @@
             } else {
                 selector.selectNow();
             }
-            processSelected();
+            return processSelected();
 
         } catch (CancelledKeyException ignore) {
-            // A key may have been canceled.
+            return 0;
         }
-
     }
 
-    private void processSelected() {
+    private int processSelected() {
 
         // Walk the set of ready keys servicing each ready context:
         Set<SelectionKey> selectedKeys = selector.selectedKeys();
-        if (!selectedKeys.isEmpty()) {
+        int size = selectedKeys.size();
+        if (size!=0) {
+            debug("selected: %d",size);
             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();)
{
                 SelectionKey key = i.next();
                 boolean valid = key.isValid();
@@ -99,6 +96,7 @@
                 }
             }
         }
+        return size;
     }
 
     public void shutdown() throws IOException {
@@ -109,13 +107,21 @@
         selector.close();
     }
 
-    private final void debug(String str) {
-        System.out.println(this + ": " + str);
+    protected void debug(String str, Object... args) {
+        if (DEBUG) {
+            System.out.println(format("[DEBUG] NioSelector %0#10x: ", System.identityHashCode(this))+format(str,
args));
+        }
     }
 
-    private final void debug(String str, Throwable e) {
-        System.out.println(this + ": " + str);
-        e.printStackTrace();
+    protected void debug(Throwable thrown, String str, Object... args) {
+        if (DEBUG) {
+            if (str != null) {
+                debug(str, args);
+            }
+            if (thrown != null) {
+                thrown.printStackTrace();
+            }
+        }
     }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
Mon Dec 21 20:00:26 2009
@@ -18,7 +18,6 @@
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.io.IOException;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -67,7 +66,7 @@
             start: for (;;) {
 
                 try {
-                    this.selector.doSelect(0);
+                    this.selector.select(0);
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
@@ -196,35 +195,31 @@
         return null;
     }
 
-    private final Semaphore wakeups = new Semaphore(0);
     private final AtomicBoolean inWaitingList = new AtomicBoolean(false);
 
     private void waitForWakeup() throws InterruptedException {
         try {
-            this.selector.doSelect(0);
+            this.selector.select(0);
         } catch (IOException e) {
             e.printStackTrace();
         }
         while (threadQueuedRunnables.get() == 0 && dispatcher.globalQueuedRunnables.get()
== 0) {
-            if (!wakeups.tryAcquire()) {
-                if (inWaitingList.compareAndSet(false, true)) {
-                    if (!dispatcher.addWaitingDispatcher(this)) {
-                        inWaitingList.set(false);
-                    }
+            if (inWaitingList.compareAndSet(false, true)) {
+                dispatcher.addWaitingDispatcher(this);
+            }
+            try {
+                // If the selector found some work...
+                if( this.selector.select(100) != 0 ) {
+                    return;
                 }
+            } catch (IOException e) {
+                e.printStackTrace();
             }
-            wakeups.acquire();
         }
-        wakeups.drainPermits();
-    }
-
-    public void globalWakeup() {
-        wakeups.release();
-        inWaitingList.set(false);
     }
 
     public void wakeup() {
-        wakeups.release();
+       this.selector.wakeup();
     }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
Mon Dec 21 20:00:26 2009
@@ -90,15 +90,9 @@
         return source;
     }
 
-    public boolean addWaitingDispatcher(DispatcherThread dispatcher) {
-        if (globalQueuedRunnables.get() <= 0) {
-            waitingDispatcherCount.incrementAndGet();
-            waitingDispatchers.add(dispatcher);
-            return true;
-        } else {
-            dispatcher.globalWakeup();
-            return false;
-        }
+    public void addWaitingDispatcher(DispatcherThread dispatcher) {
+        waitingDispatcherCount.incrementAndGet();
+        waitingDispatchers.add(dispatcher);
     }
 
     public void wakeup() {
@@ -107,7 +101,7 @@
             DispatcherThread dispatcher = waitingDispatchers.poll();
             if (dispatcher != null) {
                 waitingDispatcherCount.decrementAndGet();
-                dispatcher.globalWakeup();
+                dispatcher.wakeup();
             }
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java?rev=892951&r1=892950&r2=892951&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/NioDispatchSoruceTest.java
Mon Dec 21 20:00:26 2009
@@ -32,10 +32,14 @@
 import static java.util.concurrent.TimeUnit.*;
 import static junit.framework.Assert.*;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class NioDispatchSoruceTest {
 
     @Test
-    public void test() throws IOException, InterruptedException {
+    public void connect() throws IOException, InterruptedException {
 
         // Create the nio server socket...
         final ServerSocketChannel channel = ServerSocketChannel.open();
@@ -46,26 +50,42 @@
         // Get a dispatcher and queue..
         SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherConfig());
         dispatcher.resume();
+        
+        Thread.sleep(1000);
         DispatchQueue accepts = dispatcher.createSerialQueue("test");
         
         // Create a source attached to the server socket to deal with new connectins..
         DispatchSource source = dispatcher.createSource(channel, SelectionKey.OP_ACCEPT,
accepts);
-        // All we do is just release a countdown latch...
-        RunnableCountDownLatch accepted = new RunnableCountDownLatch(1) {
-            @Override
-            public void run() {
-                try {
-                    SocketChannel socket = channel.accept();
-                    socket.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-                super.run();
-            }
-        };
+        RunnableCountDownLatch accepted;
+        
+
+        // Events should not be seen until the source is resumed.
+        accepted = acceptor(channel);
         source.setEventHandler(accepted);
+        connect(channel);
+        assertFalse(accepted.await(1, SECONDS));
+        source.resume();
+        assertTrue(accepted.await(1, SECONDS));
 
-        // Connect to the server in a new thread.
+        // Since we are resumed we should get the next connect quickly..
+        System.out.println("start....");
+        accepted = acceptor(channel);
+        source.setEventHandler(accepted);
+        connect(channel);
+        System.out.println("waiting....");
+        assertTrue(accepted.await(2, SECONDS));
+
+        // Now test that events don't get fired
+        // once the source is canceled. 
+        accepted = acceptor(channel);
+        source.setEventHandler(accepted);
+        source.cancel();
+        connect(channel);        
+        assertFalse(accepted.await(2, SECONDS));
+        
+    }
+
+    private void connect(final ServerSocketChannel channel) {
         new Thread("connect") {
             public void run() {
                 try {
@@ -77,14 +97,21 @@
                 }
             }
         }.start();
-        
-        // Events should not get delivered until the source is resumed.
-        assertFalse(accepted.await(1, SECONDS));
-        source.resume();
-        
-        // Count down latch should get released now.
-        assertTrue(accepted.await(1, SECONDS));
-        
+    }
+
+    private RunnableCountDownLatch acceptor(final ServerSocketChannel channel) {
+        return new RunnableCountDownLatch(1) {
+            @Override
+            public void run() {
+                try {
+                    SocketChannel socket = channel.accept();
+                    socket.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                super.run();
+            }
+        };
     }
 
     static public InetSocketAddress address(String host, int port) throws UnknownHostException
{



Mime
View raw message