activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r921318 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/thread/ activemq-core/src/main/java/org/apache/activemq/transport/nio/ activemq-core/src/m...
Date Wed, 10 Mar 2010 11:48:09 GMT
Author: dejanb
Date: Wed Mar 10 11:48:08 2010
New Revision: 921318

URL: http://svn.apache.org/viewvc?rev=921318&view=rev
Log:
merging 897262,897939,898774,916762,916780,920325,920330,920827,920838,920881 - https://issues.apache.org/activemq/browse/AMQ-2440
- stomp+nio

Added:
    activemq/branches/activemq-5.3/assembly/src/sample-conf/activemq-stomp.xml
      - copied unchanged from r897262, activemq/trunk/assembly/src/sample-conf/activemq-stomp.xml
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar 10 11:48:08 2010
@@ -77,6 +77,7 @@ import org.apache.activemq.state.Consume
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.state.TransactionState;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -91,6 +92,8 @@ import org.apache.activemq.util.ServiceS
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
 /**
  * @version $Revision: 1.8 $
  */
@@ -908,8 +911,7 @@ public class TransportConnection impleme
                 cs.getContext().getStopping().set(true);
             }
             try {
-                new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress())
{
-                    @Override
+                getDefaultTaskRunnerFactory().execute(new Runnable(){
                     public void run() {
                         serviceLock.writeLock().lock();
                         try {
@@ -922,7 +924,7 @@ public class TransportConnection impleme
                             serviceLock.writeLock().unlock();
                         }
                     }
-                }.start();
+                });
             } catch (Throwable t) {
                 LOG.warn("cannot create async transport stopper thread.. not waiting for
stop to complete, reason:", t);
                 stopped.countDown();

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Wed Mar 10 11:48:08 2010
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.jmx.Ma
 import org.apache.activemq.broker.region.ConnectorStatistics;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -32,6 +33,9 @@ import org.apache.activemq.util.ServiceS
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -202,9 +206,7 @@ public class TransportConnector implemen
         server.setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
-                    // Starting the connection could block due to
-                    // wireformat negotiation, so start it in an async thread.
-                    Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress())
{
+                    getDefaultTaskRunnerFactory().execute(new Runnable(){
                         public void run() {
                             try {
                                 Connection connection = createConnection(transport);
@@ -214,8 +216,7 @@ public class TransportConnector implemen
                                 onAcceptError(e);
                             }
                         }
-                    };
-                    startThread.start();
+                    });
                 } catch (Exception e) {
                     String remoteHost = transport.getRemoteAddress();
                     ServiceSupport.dispose(transport);

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
Wed Mar 10 11:48:08 2010
@@ -26,24 +26,24 @@ import java.util.concurrent.ThreadFactor
  */
 public final class DefaultThreadPools {
 
-    private static final Executor DEFAULT_POOL;
-    static {
-        DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-    }    
+//    private static final Executor DEFAULT_POOL;
+//    static {
+//        DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+//            public Thread newThread(Runnable runnable) {
+//                Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
+//                thread.setDaemon(true);
+//                return thread;
+//            }
+//        });
+//    }    
     private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
     
     private DefaultThreadPools() {        
     }
     
-    public static Executor getDefaultPool() {
-        return DEFAULT_POOL;
-    }
+//    public static Executor getDefaultPool() {
+//        return DEFAULT_POOL;
+//    }
     
     public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
         return DEFAULT_TASK_RUNNER_FACTORY;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Wed Mar 10 11:48:08 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.thread;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit;
  * 
  * @version $Revision: 1.5 $
  */
-public class TaskRunnerFactory {
+public class TaskRunnerFactory implements Executor {
 
     private ExecutorService executor;
     private int maxIterationsPerRun;
@@ -80,6 +81,18 @@ public class TaskRunnerFactory {
         }
     }
 
+    public void execute(Runnable runnable) {
+        execute(runnable, "ActiveMQ Task");
+    }
+    
+    public void execute(Runnable runnable, String name) {
+        if (executor != null) {
+            executor.execute(runnable);
+        } else {
+            new Thread(runnable, name).start();
+        }
+    }
+
     protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
Wed Mar 10 11:48:08 2010
@@ -149,7 +149,7 @@ public class NIOTransport extends TcpTra
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
-        selection.disable();
+        selection.close();
         super.doStop(stopper);
     }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Wed Mar 10 11:48:08 2010
@@ -20,8 +20,11 @@ import java.io.IOException;
 import java.nio.channels.SocketChannel;
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The SelectorManager will manage one Selector and the thread that checks the
@@ -36,16 +39,20 @@ public final class SelectorManager {
 
     public static final SelectorManager SINGLETON = new SelectorManager();
 
-    private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
{
-        public Thread newThread(Runnable r) {
-            Thread rc = new Thread(r);
-            rc.setName("NIO Transport Thread");
-            return rc;
-        }
-    });
+    private Executor selectorExecutor = createDefaultExecutor();
     private Executor channelExecutor = selectorExecutor;
     private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
-    private int maxChannelsPerWorker = 64;
+    private int maxChannelsPerWorker = 1024;
+    
+    protected ExecutorService createDefaultExecutor() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                return new Thread(runnable, "ActiveMQ NIO Worker");
+            }
+        });
+        // rc.allowCoreThreadTimeOut(true);
+        return rc;
+    }
     
     public static SelectorManager getInstance() {
         return SINGLETON;
@@ -61,15 +68,25 @@ public final class SelectorManager {
     public synchronized SelectorSelection register(SocketChannel socketChannel, Listener
listener)
         throws IOException {
 
-        SelectorWorker worker = null;
-        if (freeWorkers.size() > 0) {
-            worker = freeWorkers.getFirst();
-        } else {
-            worker = new SelectorWorker(this);
-            freeWorkers.addFirst(worker);
+        SelectorSelection selection = null;
+        while( selection == null ) {
+            if (freeWorkers.size() > 0) {
+                SelectorWorker worker = freeWorkers.getFirst();
+                if( worker.isReleased() ) {
+                    freeWorkers.remove(worker);
+                } else {
+                    worker.retain();
+                    selection = new SelectorSelection(worker, socketChannel, listener);
+                }
+                
+            } else {
+                // Worker starts /w retain count of 1
+                SelectorWorker worker = new SelectorWorker(this);
+                freeWorkers.addFirst(worker);
+                selection = new SelectorSelection(worker, socketChannel, listener);
+            }
         }
-
-        SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
+        
         return selection;
     }
 
@@ -82,7 +99,7 @@ public final class SelectorManager {
     }
 
     public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
-        freeWorkers.add(worker);
+        freeWorkers.addFirst(worker);
     }
 
     public Executor getChannelExecutor() {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
Wed Mar 10 11:48:08 2010
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.transport.nio;
 
+import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.transport.nio.SelectorManager.Listener;
 
@@ -28,23 +30,23 @@ import org.apache.activemq.transport.nio
 public final class SelectorSelection {
 
     private final SelectorWorker worker;
-    private final SelectionKey key;
     private final Listener listener;
     private int interest;
+    private SelectionKey key;
+    private AtomicBoolean closed = new AtomicBoolean();
 
-    public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener
listener) throws ClosedChannelException {
+    public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel,
Listener listener) throws ClosedChannelException {
         this.worker = worker;
         this.listener = listener;
-        
-        // Lock when mutating state of the selector
-        worker.lock();
-        
-        try {
-            this.key = socketChannel.register(worker.selector, 0, this);
-            worker.incrementUseCounter();
-        } finally {
-            worker.unlock();
-        }
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    SelectorSelection.this.key = socketChannel.register(worker.selector,
0, SelectorSelection.this);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
     }
 
     public void setInterestOps(int ops) {
@@ -52,25 +54,39 @@ public final class SelectorSelection {
     }
 
     public void enable() {
-        key.interestOps(interest);
-        worker.selector.wakeup();
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    key.interestOps(interest);
+                } catch (CancelledKeyException e) {
+                }
+            }
+        });        
     }
 
     public void disable() {
-        if (key.isValid()) {
-            key.interestOps(0);
-        }
+        worker.addIoTask(new Runnable() {
+            public void run() {
+                try {
+                    key.interestOps(0);
+                } catch (CancelledKeyException e) {
+                }
+            }
+        });        
     }
 
     public void close() {
-        worker.decrementUseCounter();
-    	
-        // Lock when mutating state of the selector
-        worker.lock();
-        try {
-            key.cancel();
-        } finally {
-            worker.unlock();
+        // guard against multiple closes.
+        if( closed.compareAndSet(false, true) ) {
+            worker.addIoTask(new Runnable() {
+                public void run() {
+                    try {
+                        key.cancel();
+                    } catch (CancelledKeyException e) {
+                    }
+                    worker.release();
+                }
+            });        
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Wed Mar 10 11:48:08 2010
@@ -21,10 +21,8 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
 import java.util.Set;
-
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SelectorWorker implements Runnable {
 
@@ -33,55 +31,71 @@ public class SelectorWorker implements R
     final SelectorManager manager;
     final Selector selector;
     final int id = NEXT_ID.getAndIncrement();
-    final AtomicInteger useCounter = new AtomicInteger();
     private final int maxChannelsPerWorker;
-    private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+
+    final AtomicInteger retainCounter = new AtomicInteger(1);
+    private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
        
     public SelectorWorker(SelectorManager manager) throws IOException {
         this.manager = manager;
         selector = Selector.open();
         maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+        manager.getSelectorExecutor().execute(this);
     }
 
-    void incrementUseCounter() {
-        int use = useCounter.getAndIncrement();
-        if (use == 0) {
-            manager.getSelectorExecutor().execute(this);
-        } else if (use + 1 == maxChannelsPerWorker) {
+    void retain() {
+        if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
             manager.onWorkerFullEvent(this);
         }
     }
 
-    void decrementUseCounter() {
-        int use = useCounter.getAndDecrement();
-        if (use == 1) {
+    void release() {
+        int use = retainCounter.decrementAndGet();
+        if (use == 0) {
             manager.onWorkerEmptyEvent(this);
-        } else if (use == maxChannelsPerWorker) {
+        } else if (use == maxChannelsPerWorker - 1) {
             manager.onWorkerNotFullEvent(this);
         }
     }
+    
+    boolean isReleased() {
+        return retainCounter.get()==0;
+    }
+
 
-    boolean isRunning() {
-        return useCounter.get() != 0;
+    public void addIoTask(Runnable work) {
+        ioTasks.add(work);
+        selector.wakeup();
     }
+    
+    private void processIoTasks() {
+        Runnable task; 
+        while( (task= ioTasks.poll()) !=null ) {
+            try {
+                task.run();
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    
 
     public void run() {
 
         String origName = Thread.currentThread().getName();
         try {
             Thread.currentThread().setName("Selector Worker: " + id);
-            while (isRunning()) {
-                
-                lockBarrier();       	
-                int count = selector.select(10);
+            while (!isReleased()) {
+            	
+            	processIoTasks();
+            	
+            	int count = selector.select(10);
+            	
                 if (count == 0) {
                     continue;
                 }
 
-                if (!isRunning()) {
-                    return;
-                }
-
                 // Get a java.util.Set containing the SelectionKey objects
                 // for all channels that are ready for I/O.
                 Set keys = selector.selectedKeys();
@@ -92,7 +106,9 @@ public class SelectorWorker implements R
 
                     final SelectorSelection s = (SelectorSelection)key.attachment();
                     try {
-                        s.disable();
+                        if( key.isValid() ) {
+                            key.interestOps(0);
+                        }
 
                         // Kick off another thread to find newly selected keys
                         // while we process the
@@ -115,11 +131,8 @@ public class SelectorWorker implements R
                 }
 
             }
-        } catch (IOException e) {
-
-            // Don't accept any more slections
-            manager.onWorkerEmptyEvent(this);
-
+        } catch (Throwable e) {         	
+            e.printStackTrace();
             // Notify all the selections that the error occurred.
             Set keys = selector.keys();
             for (Iterator i = keys.iterator(); i.hasNext();) {
@@ -127,24 +140,15 @@ public class SelectorWorker implements R
                 SelectorSelection s = (SelectorSelection)key.attachment();
                 s.onError(e);
             }
-
         } finally {
+            try {
+                manager.onWorkerEmptyEvent(this);
+                selector.close();
+            } catch (IOException ignore) {
+            	ignore.printStackTrace();
+            }
             Thread.currentThread().setName(origName);
         }
     }
 
-    private void lockBarrier() {
-        selectorLock.writeLock().lock();
-        selectorLock.writeLock().unlock();
-	}
-
-    public void lock() {
-        selectorLock.readLock().lock();
-        selector.wakeup();
-    }
-
-	public void unlock() {
-	    selectorLock.readLock().unlock();
-	}
-	
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Wed Mar 10 11:48:08 2010
@@ -25,10 +25,6 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 
 public class StompConnection {
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Wed Mar 10 11:48:08 2010
@@ -16,8 +16,9 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
@@ -30,11 +31,12 @@ import javax.net.SocketFactory;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.nio.NIOBufferedInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
@@ -48,6 +50,10 @@ public class StompNIOTransport extends T
 
     private SocketChannel channel;
     private SelectorSelection selection;
+    
+    private ByteBuffer inputBuffer;
+    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+    int previousByte = -1;
 
     public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -76,19 +82,54 @@ public class StompNIOTransport extends T
             }
         });
 
+        inputBuffer = ByteBuffer.allocate(8 * 1024);
         this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
     }
-
+    
     private void serviceRead() {
         try {
-            DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel,
8 * 1024));
-            while (true) {
-                Object command = wireFormat.unmarshal(in);
-                doConsume((Command)command);
-            }
-
+            
+           while (true) {
+               // read channel
+               int readSize = channel.read(inputBuffer);
+               // channel is closed, cleanup
+               if (readSize == -1) {
+                   onException(new EOFException());
+                   selection.close();
+                   break;
+               }
+               // nothing more to read, break
+               if (readSize == 0) {
+                   break;
+               }
+               
+               inputBuffer.flip();
+               
+               int b;
+               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
+               
+               int i = 0;
+               while(i++ < readSize) {
+                   b = input.read();
+                   // skip repeating nulls
+                   if (previousByte == 0 && b == 0) {
+                       continue;
+                   }
+                   currentCommand.write(b);
+                   // end of command reached, unmarshal
+                   if (b == 0) {
+                       Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
+                       doConsume((Command)command);
+                       currentCommand.reset();
+                   }
+                   previousByte = b;
+               }
+               // clear the buffer
+               inputBuffer.clear();
+               
+           }
         } catch (IOException e) {
-            onException(e);
+            onException(e);  
         } catch (Throwable e) {
             onException(IOExceptionSupport.create(e));
         }
@@ -101,7 +142,11 @@ public class StompNIOTransport extends T
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
-        selection.disable();
+        try {
+            selection.close();
+        } catch (Exception e) {
+        	e.printStackTrace();
+        }
         super.doStop(stopper);
     }
 }



Mime
View raw message