Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 19975 invoked from network); 10 Mar 2010 11:49:02 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 10 Mar 2010 11:49:02 -0000 Received: (qmail 88798 invoked by uid 500); 10 Mar 2010 11:48:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 88768 invoked by uid 500); 10 Mar 2010 11:48:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88761 invoked by uid 99); 10 Mar 2010 11:48:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Mar 2010 11:48:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Mar 2010 11:48:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 772972388A1C; Wed, 10 Mar 2010 11:48:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r921318 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/thread/ activemq-core/src/main/java/org/apache/activemq/transport/nio/ activemq-core/src/m... Date: Wed, 10 Mar 2010 11:48:09 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100310114809.772972388A1C@eris.apache.org> Author: dejanb Date: Wed Mar 10 11:48:08 2010 New Revision: 921318 URL: http://svn.apache.org/viewvc?rev=921318&view=rev Log: merging 897262,897939,898774,916762,916780,920325,920330,920827,920838,920881 - https://issues.apache.org/activemq/browse/AMQ-2440 - stomp+nio Added: activemq/branches/activemq-5.3/assembly/src/sample-conf/activemq-stomp.xml - copied unchanged from r897262, activemq/trunk/assembly/src/sample-conf/activemq-stomp.xml Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Mar 10 11:48:08 2010 @@ -77,6 +77,7 @@ import org.apache.activemq.state.Consume import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; import org.apache.activemq.state.TransactionState; +import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -91,6 +92,8 @@ import org.apache.activemq.util.ServiceS import org.apache.activemq.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import static org.apache.activemq.thread.DefaultThreadPools.*; /** * @version $Revision: 1.8 $ */ @@ -908,8 +911,7 @@ public class TransportConnection impleme cs.getContext().getStopping().set(true); } try { - new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { - @Override + getDefaultTaskRunnerFactory().execute(new Runnable(){ public void run() { serviceLock.writeLock().lock(); try { @@ -922,7 +924,7 @@ public class TransportConnection impleme serviceLock.writeLock().unlock(); } } - }.start(); + }); } catch (Throwable t) { LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); stopped.countDown(); Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Mar 10 11:48:08 2010 @@ -21,6 +21,7 @@ import org.apache.activemq.broker.jmx.Ma import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.security.MessageAuthorizationPolicy; +import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; @@ -32,6 +33,9 @@ import org.apache.activemq.util.ServiceS import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import static org.apache.activemq.thread.DefaultThreadPools.*; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -202,9 +206,7 @@ public class TransportConnector implemen server.setAcceptListener(new TransportAcceptListener() { public void onAccept(final Transport transport) { try { - // Starting the connection could block due to - // wireformat negotiation, so start it in an async thread. - Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) { + getDefaultTaskRunnerFactory().execute(new Runnable(){ public void run() { try { Connection connection = createConnection(transport); @@ -214,8 +216,7 @@ public class TransportConnector implemen onAcceptError(e); } } - }; - startThread.start(); + }); } catch (Exception e) { String remoteHost = transport.getRemoteAddress(); ServiceSupport.dispose(transport); Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java Wed Mar 10 11:48:08 2010 @@ -26,24 +26,24 @@ import java.util.concurrent.ThreadFactor */ public final class DefaultThreadPools { - private static final Executor DEFAULT_POOL; - static { - DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread"); - thread.setDaemon(true); - return thread; - } - }); - } +// private static final Executor DEFAULT_POOL; +// static { +// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { +// public Thread newThread(Runnable runnable) { +// Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread"); +// thread.setDaemon(true); +// return thread; +// } +// }); +// } private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory(); private DefaultThreadPools() { } - public static Executor getDefaultPool() { - return DEFAULT_POOL; - } +// public static Executor getDefaultPool() { +// return DEFAULT_POOL; +// } public static TaskRunnerFactory getDefaultTaskRunnerFactory() { return DEFAULT_TASK_RUNNER_FACTORY; Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Wed Mar 10 11:48:08 2010 @@ -16,6 +16,7 @@ */ package org.apache.activemq.thread; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit; * * @version $Revision: 1.5 $ */ -public class TaskRunnerFactory { +public class TaskRunnerFactory implements Executor { private ExecutorService executor; private int maxIterationsPerRun; @@ -80,6 +81,18 @@ public class TaskRunnerFactory { } } + public void execute(Runnable runnable) { + execute(runnable, "ActiveMQ Task"); + } + + public void execute(Runnable runnable, String name) { + if (executor != null) { + executor.execute(runnable); + } else { + new Thread(runnable, name).start(); + } + } + protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Wed Mar 10 11:48:08 2010 @@ -149,7 +149,7 @@ public class NIOTransport extends TcpTra } protected void doStop(ServiceStopper stopper) throws Exception { - selection.disable(); + selection.close(); super.doStop(stopper); } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Wed Mar 10 11:48:08 2010 @@ -20,8 +20,11 @@ import java.io.IOException; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * The SelectorManager will manage one Selector and the thread that checks the @@ -36,16 +39,20 @@ public final class SelectorManager { public static final SelectorManager SINGLETON = new SelectorManager(); - private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() { - public Thread newThread(Runnable r) { - Thread rc = new Thread(r); - rc.setName("NIO Transport Thread"); - return rc; - } - }); + private Executor selectorExecutor = createDefaultExecutor(); private Executor channelExecutor = selectorExecutor; private LinkedList freeWorkers = new LinkedList(); - private int maxChannelsPerWorker = 64; + private int maxChannelsPerWorker = 1024; + + protected ExecutorService createDefaultExecutor() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + return new Thread(runnable, "ActiveMQ NIO Worker"); + } + }); + // rc.allowCoreThreadTimeOut(true); + return rc; + } public static SelectorManager getInstance() { return SINGLETON; @@ -61,15 +68,25 @@ public final class SelectorManager { public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) throws IOException { - SelectorWorker worker = null; - if (freeWorkers.size() > 0) { - worker = freeWorkers.getFirst(); - } else { - worker = new SelectorWorker(this); - freeWorkers.addFirst(worker); + SelectorSelection selection = null; + while( selection == null ) { + if (freeWorkers.size() > 0) { + SelectorWorker worker = freeWorkers.getFirst(); + if( worker.isReleased() ) { + freeWorkers.remove(worker); + } else { + worker.retain(); + selection = new SelectorSelection(worker, socketChannel, listener); + } + + } else { + // Worker starts /w retain count of 1 + SelectorWorker worker = new SelectorWorker(this); + freeWorkers.addFirst(worker); + selection = new SelectorSelection(worker, socketChannel, listener); + } } - - SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener); + return selection; } @@ -82,7 +99,7 @@ public final class SelectorManager { } public synchronized void onWorkerNotFullEvent(SelectorWorker worker) { - freeWorkers.add(worker); + freeWorkers.addFirst(worker); } public Executor getChannelExecutor() { Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Wed Mar 10 11:48:08 2010 @@ -16,9 +16,11 @@ */ package org.apache.activemq.transport.nio; +import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.transport.nio.SelectorManager.Listener; @@ -28,23 +30,23 @@ import org.apache.activemq.transport.nio public final class SelectorSelection { private final SelectorWorker worker; - private final SelectionKey key; private final Listener listener; private int interest; + private SelectionKey key; + private AtomicBoolean closed = new AtomicBoolean(); - public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException { + public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException { this.worker = worker; this.listener = listener; - - // Lock when mutating state of the selector - worker.lock(); - - try { - this.key = socketChannel.register(worker.selector, 0, this); - worker.incrementUseCounter(); - } finally { - worker.unlock(); - } + worker.addIoTask(new Runnable() { + public void run() { + try { + SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); } public void setInterestOps(int ops) { @@ -52,25 +54,39 @@ public final class SelectorSelection { } public void enable() { - key.interestOps(interest); - worker.selector.wakeup(); + worker.addIoTask(new Runnable() { + public void run() { + try { + key.interestOps(interest); + } catch (CancelledKeyException e) { + } + } + }); } public void disable() { - if (key.isValid()) { - key.interestOps(0); - } + worker.addIoTask(new Runnable() { + public void run() { + try { + key.interestOps(0); + } catch (CancelledKeyException e) { + } + } + }); } public void close() { - worker.decrementUseCounter(); - - // Lock when mutating state of the selector - worker.lock(); - try { - key.cancel(); - } finally { - worker.unlock(); + // guard against multiple closes. + if( closed.compareAndSet(false, true) ) { + worker.addIoTask(new Runnable() { + public void run() { + try { + key.cancel(); + } catch (CancelledKeyException e) { + } + worker.release(); + } + }); } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Wed Mar 10 11:48:08 2010 @@ -21,10 +21,8 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.Set; - +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; public class SelectorWorker implements Runnable { @@ -33,55 +31,71 @@ public class SelectorWorker implements R final SelectorManager manager; final Selector selector; final int id = NEXT_ID.getAndIncrement(); - final AtomicInteger useCounter = new AtomicInteger(); private final int maxChannelsPerWorker; - private final ReadWriteLock selectorLock = new ReentrantReadWriteLock(); + + final AtomicInteger retainCounter = new AtomicInteger(1); + private final ConcurrentLinkedQueue ioTasks = new ConcurrentLinkedQueue(); public SelectorWorker(SelectorManager manager) throws IOException { this.manager = manager; selector = Selector.open(); maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); + manager.getSelectorExecutor().execute(this); } - void incrementUseCounter() { - int use = useCounter.getAndIncrement(); - if (use == 0) { - manager.getSelectorExecutor().execute(this); - } else if (use + 1 == maxChannelsPerWorker) { + void retain() { + if (retainCounter.incrementAndGet() == maxChannelsPerWorker) { manager.onWorkerFullEvent(this); } } - void decrementUseCounter() { - int use = useCounter.getAndDecrement(); - if (use == 1) { + void release() { + int use = retainCounter.decrementAndGet(); + if (use == 0) { manager.onWorkerEmptyEvent(this); - } else if (use == maxChannelsPerWorker) { + } else if (use == maxChannelsPerWorker - 1) { manager.onWorkerNotFullEvent(this); } } + + boolean isReleased() { + return retainCounter.get()==0; + } + - boolean isRunning() { - return useCounter.get() != 0; + public void addIoTask(Runnable work) { + ioTasks.add(work); + selector.wakeup(); } + + private void processIoTasks() { + Runnable task; + while( (task= ioTasks.poll()) !=null ) { + try { + task.run(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + public void run() { String origName = Thread.currentThread().getName(); try { Thread.currentThread().setName("Selector Worker: " + id); - while (isRunning()) { - - lockBarrier(); - int count = selector.select(10); + while (!isReleased()) { + + processIoTasks(); + + int count = selector.select(10); + if (count == 0) { continue; } - if (!isRunning()) { - return; - } - // Get a java.util.Set containing the SelectionKey objects // for all channels that are ready for I/O. Set keys = selector.selectedKeys(); @@ -92,7 +106,9 @@ public class SelectorWorker implements R final SelectorSelection s = (SelectorSelection)key.attachment(); try { - s.disable(); + if( key.isValid() ) { + key.interestOps(0); + } // Kick off another thread to find newly selected keys // while we process the @@ -115,11 +131,8 @@ public class SelectorWorker implements R } } - } catch (IOException e) { - - // Don't accept any more slections - manager.onWorkerEmptyEvent(this); - + } catch (Throwable e) { + e.printStackTrace(); // Notify all the selections that the error occurred. Set keys = selector.keys(); for (Iterator i = keys.iterator(); i.hasNext();) { @@ -127,24 +140,15 @@ public class SelectorWorker implements R SelectorSelection s = (SelectorSelection)key.attachment(); s.onError(e); } - } finally { + try { + manager.onWorkerEmptyEvent(this); + selector.close(); + } catch (IOException ignore) { + ignore.printStackTrace(); + } Thread.currentThread().setName(origName); } } - private void lockBarrier() { - selectorLock.writeLock().lock(); - selectorLock.writeLock().unlock(); - } - - public void lock() { - selectorLock.readLock().lock(); - selector.wakeup(); - } - - public void unlock() { - selectorLock.readLock().unlock(); - } - } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Wed Mar 10 11:48:08 2010 @@ -25,10 +25,6 @@ import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.HashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe; public class StompConnection { Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=921318&r1=921317&r2=921318&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Wed Mar 10 11:48:08 2010 @@ -16,8 +16,9 @@ */ package org.apache.activemq.transport.stomp; -import java.io.DataInputStream; +import java.io.ByteArrayInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; import java.net.Socket; import java.net.URI; @@ -30,11 +31,12 @@ import javax.net.SocketFactory; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.nio.NIOBufferedInputStream; import org.apache.activemq.transport.nio.NIOOutputStream; import org.apache.activemq.transport.nio.SelectorManager; import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; @@ -48,6 +50,10 @@ public class StompNIOTransport extends T private SocketChannel channel; private SelectorSelection selection; + + private ByteBuffer inputBuffer; + ByteArrayOutputStream currentCommand = new ByteArrayOutputStream(); + int previousByte = -1; public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -76,19 +82,54 @@ public class StompNIOTransport extends T } }); + inputBuffer = ByteBuffer.allocate(8 * 1024); this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024)); } - + private void serviceRead() { try { - DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); - while (true) { - Object command = wireFormat.unmarshal(in); - doConsume((Command)command); - } - + + while (true) { + // read channel + int readSize = channel.read(inputBuffer); + // channel is closed, cleanup + if (readSize == -1) { + onException(new EOFException()); + selection.close(); + break; + } + // nothing more to read, break + if (readSize == 0) { + break; + } + + inputBuffer.flip(); + + int b; + ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array()); + + int i = 0; + while(i++ < readSize) { + b = input.read(); + // skip repeating nulls + if (previousByte == 0 && b == 0) { + continue; + } + currentCommand.write(b); + // end of command reached, unmarshal + if (b == 0) { + Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray())); + doConsume((Command)command); + currentCommand.reset(); + } + previousByte = b; + } + // clear the buffer + inputBuffer.clear(); + + } } catch (IOException e) { - onException(e); + onException(e); } catch (Throwable e) { onException(IOExceptionSupport.create(e)); } @@ -101,7 +142,11 @@ public class StompNIOTransport extends T } protected void doStop(ServiceStopper stopper) throws Exception { - selection.disable(); + try { + selection.close(); + } catch (Exception e) { + e.printStackTrace(); + } super.doStop(stopper); } }