activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r916762 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: nio/SelectorManager.java nio/SelectorSelection.java nio/SelectorWorker.java stomp/StompConnection.java
Date Fri, 26 Feb 2010 17:14:40 GMT
Author: chirino
Date: Fri Feb 26 17:14:40 2010
New Revision: 916762

URL: http://svn.apache.org/viewvc?rev=916762&view=rev
Log:
Better selector synchonization to help resolve AMQ-2440

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Fri Feb 26 17:14:40 2010
@@ -61,15 +61,25 @@
     public synchronized SelectorSelection register(SocketChannel socketChannel, Listener
listener)
         throws IOException {
 
-        SelectorWorker worker = null;
-        if (freeWorkers.size() > 0) {
-            worker = freeWorkers.getFirst();
-        } else {
-            worker = new SelectorWorker(this);
-            freeWorkers.addFirst(worker);
+        SelectorSelection selection = null;
+        while( selection == null ) {
+            if (freeWorkers.size() > 0) {
+                SelectorWorker worker = freeWorkers.getFirst();
+                if( worker.isReleased() ) {
+                    freeWorkers.remove(worker);
+                } else {
+                    worker.retain();
+                    selection = new SelectorSelection(worker, socketChannel, listener);
+                }
+                
+            } else {
+                // Worker starts /w retain count of 1
+                SelectorWorker worker = new SelectorWorker(this);
+                freeWorkers.addFirst(worker);
+                selection = new SelectorSelection(worker, socketChannel, listener);
+            }
         }
-
-        SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
+        
         return selection;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
Fri Feb 26 17:14:40 2010
@@ -16,10 +16,11 @@
  */
 package org.apache.activemq.transport.nio;
 
-import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.transport.nio.SelectorManager.Listener;
 
@@ -29,23 +30,23 @@
 public final class SelectorSelection {
 
     private final SelectorWorker worker;
-    private final SelectionKey key;
     private final Listener listener;
     private int interest;
+    private SelectionKey key;
+    private AtomicBoolean closed = new AtomicBoolean();
 
-    public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener
listener) throws ClosedChannelException {
+    public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel,
Listener listener) throws ClosedChannelException {
         this.worker = worker;
         this.listener = listener;
-        
-        // Lock when mutating state of the selector
-        worker.lock();
-        
-        try {
-            this.key = socketChannel.register(worker.selector, 0, this);
-            worker.incrementUseCounter();
-        } finally {
-            worker.unlock();
-        }
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    SelectorSelection.this.key = socketChannel.register(worker.selector,
0, SelectorSelection.this);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
     }
 
     public void setInterestOps(int ops) {
@@ -53,29 +54,39 @@
     }
 
     public void enable() {
-        key.interestOps(interest);
-        worker.selector.wakeup();
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    key.interestOps(interest);
+                } catch (CancelledKeyException e) {
+                }
+            }
+        });        
     }
 
     public void disable() {
-        if (key.isValid()) {
-            key.interestOps(0);
-        }
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    key.interestOps(0);
+                } catch (CancelledKeyException e) {
+                }
+            }
+        });        
     }
 
     public void close() {
-        worker.decrementUseCounter();
-        
-        // Lock when mutating state of the selector
-        worker.lock();
-        try {
-            key.cancel();
-            if (!worker.isRunning()) {
-                worker.close();
-            }
-        } catch (IOException e) {
-        } finally {
-            worker.unlock();
+        // guard against multiple closes.
+        if( closed.compareAndSet(false, true) ) {
+            worker.addIoTask(new Runnable() {
+                public void run() {
+                    try {
+                        key.cancel();
+                    } catch (CancelledKeyException e) {
+                    }
+                    worker.release();
+                }
+            });        
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Fri Feb 26 17:14:40 2010
@@ -17,14 +17,12 @@
 package org.apache.activemq.transport.nio;
 
 import java.io.IOException;
-import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SelectorWorker implements Runnable {
 
@@ -33,56 +31,69 @@
     final SelectorManager manager;
     final Selector selector;
     final int id = NEXT_ID.getAndIncrement();
-    final AtomicInteger useCounter = new AtomicInteger();
     private final int maxChannelsPerWorker;
-    private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+
+    final AtomicInteger retainCounter = new AtomicInteger(1);
+    private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
        
     public SelectorWorker(SelectorManager manager) throws IOException {
         this.manager = manager;
         selector = Selector.open();
         maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+        manager.getSelectorExecutor().execute(this);
     }
 
-    void incrementUseCounter() {
-        int use = useCounter.getAndIncrement();
-        if (use == 0) {
-            manager.getSelectorExecutor().execute(this);
-        } else if (use + 1 == maxChannelsPerWorker) {
+    void retain() {
+        if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
             manager.onWorkerFullEvent(this);
         }
     }
 
-    void decrementUseCounter() {
-        int use = useCounter.getAndDecrement();
-        if (use == 1) {
+    void release() {
+        int use = retainCounter.decrementAndGet();
+        if (use == 0) {
             manager.onWorkerEmptyEvent(this);
-        } else if (use == maxChannelsPerWorker) {
+        } else if (use < maxChannelsPerWorker) {
             manager.onWorkerNotFullEvent(this);
         }
     }
+    
+    boolean isReleased() {
+        return retainCounter.get()==0;
+    }
+
 
-    boolean isRunning() {
-        return useCounter.get() != 0;
+    public void addIoTask(Runnable work) {
+        ioTasks.add(work);
+        selector.wakeup();
+    }
+    
+    private void processIoTasks() {
+        Runnable task; 
+        while( (task= ioTasks.poll()) !=null ) {
+            try {
+                task.run();
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
     }
 
+    
+
     public void run() {
 
         String origName = Thread.currentThread().getName();
         try {
             Thread.currentThread().setName("Selector Worker: " + id);
-            while (isRunning()) {
-                
-                lockBarrier();
+            while (!isReleased()) {
                 
+                processIoTasks();
                 int count = selector.select(10);
                 if (count == 0) {
                     continue;
                 }
 
-                if (!isRunning()) {
-                    return;
-                }
-
                 // Get a java.util.Set containing the SelectionKey objects
                 // for all channels that are ready for I/O.
                 Set keys = selector.selectedKeys();
@@ -93,7 +104,9 @@
 
                     final SelectorSelection s = (SelectorSelection)key.attachment();
                     try {
-                        s.disable();
+                        if( key.isValid() ) {
+                            key.interestOps(0);
+                        }
 
                         // Kick off another thread to find newly selected keys
                         // while we process the
@@ -116,13 +129,8 @@
                 }
 
             }
-        } catch (ClosedSelectorException cse) {
-            // Don't accept any more selections
-            manager.onWorkerEmptyEvent(this);
-        } catch (IOException e) {
-            // Don't accept any more selections
-            manager.onWorkerEmptyEvent(this);
-
+            
+        } catch (Throwable e) {
             // Notify all the selections that the error occurred.
             Set keys = selector.keys();
             for (Iterator i = keys.iterator(); i.hasNext();) {
@@ -132,25 +140,13 @@
             }
 
         } finally {
+            try {
+                manager.onWorkerEmptyEvent(this);
+                selector.close();
+            } catch (IOException ignore) {
+            }
             Thread.currentThread().setName(origName);
         }
     }
 
-    private void lockBarrier() {
-        selectorLock.writeLock().lock();
-        selectorLock.writeLock().unlock();
-	}
-
-    public void lock() {
-        selectorLock.readLock().lock();
-        selector.wakeup();
-    }
-
-	public void unlock() {
-	    selectorLock.readLock().unlock();
-	}
-	
-    public void close() throws IOException {
-        selector.close();
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Feb 26 17:14:40 2010
@@ -25,10 +25,6 @@
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 
 public class StompConnection {
 



Mime
View raw message