From commits-return-13958-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Jul 07 03:26:04 2010 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 32790 invoked from network); 7 Jul 2010 03:26:04 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:26:04 -0000 Received: (qmail 23693 invoked by uid 500); 7 Jul 2010 03:26:04 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 23643 invoked by uid 500); 7 Jul 2010 03:26:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 23628 invoked by uid 99); 7 Jul 2010 03:26:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:26:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:25:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0E72623889ED; Wed, 7 Jul 2010 03:25:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961062 [3/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/... Date: Wed, 07 Jul 2010 03:24:44 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707032500.0E72623889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:24:02 2010 @@ -16,309 +16,135 @@ */ package org.apache.activemq.transport.tcp; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import javax.net.ServerSocketFactory; - -import org.apache.activemq.Service; -//import org.apache.activemq.ThreadPriorities; import org.apache.activemq.transport.Transport; -//import org.apache.activemq.transport.TransportLoggerFactory; +import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.ServiceListener; -import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.DispatchSource; + +import java.io.IOException; +import java.net.*; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; /** * A TCP based implementation of {@link TransportServer} - * - * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) - * @version $Revision: 1.1 $ + * + * @author Hiram Chirino */ -public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{ +public class TcpTransportServer implements TransportServer { - private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); - protected ServerSocket serverSocket; - protected int backlog = 5000; protected WireFormatFactory wireFormatFactory; - protected final TcpTransportFactory transportFactory; - protected long maxInactivityDuration = 30000; - protected long maxInactivityDurationInitalDelay = 10000; - protected int minmumWireFormatVersion; - protected boolean useQueueForAccept=true; - - /** - * trace=true -> the Transport stack where this TcpTransport - * object will be, will have a TransportLogger layer - * trace=false -> the Transport stack where this TcpTransport - * object will be, will NOT have a TransportLogger layer, and therefore - * will never be able to print logging messages. - * This parameter is most probably set in Connection or TransportConnector URIs. - */ - protected boolean trace = false; - - protected int soTimeout = 0; - protected int socketBufferSize = 64 * 1024; - protected int connectionTimeout = 30000; + private ServerSocketChannel channel; + private TransportAcceptListener listener; + private URI bindURI; + private URI connectURI; + private DispatchQueue dispatchQueue; + private DispatchSource acceptSource; + private int backlog = 500; + private Map transportOptions; - /** - * Name of the LogWriter implementation to use. - * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. - * This parameter is most probably set in Connection or TransportConnector URIs. - - protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; - */ - - /** - * Specifies if the TransportLogger will be manageable by JMX or not. - * Also, as long as there is at least 1 TransportLogger which is manageable, - * a TransportLoggerControl MBean will me created. - */ - protected boolean dynamicManagement = false; - /** - * startLogging=true -> the TransportLogger object of the Transport stack - * will initially write messages to the log. - * startLogging=false -> the TransportLogger object of the Transport stack - * will initially NOT write messages to the log. - * This parameter only has an effect if trace == true. - * This parameter is most probably set in Connection or TransportConnector URIs. - */ - protected boolean startLogging = true; - protected Map transportOptions; - protected final ServerSocketFactory serverSocketFactory; - protected BlockingQueue socketQueue = new LinkedBlockingQueue(); - protected Thread socketHandlerThread; - /** - * The maximum number of sockets allowed for this server - */ - protected int maximumConnections = Integer.MAX_VALUE; - protected int currentTransportCount=0; - - public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - super(location); - this.transportFactory = transportFactory; - this.serverSocketFactory = serverSocketFactory; - + public TcpTransportServer(URI location) { + this.bindURI = location; } - public void bind() throws IOException { - URI bind = getBindLocation(); - - String host = bind.getHost(); - host = (host == null || host.length() == 0) ? "localhost" : host; - InetAddress addr = InetAddress.getByName(host); - - try { - - this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); - configureServerSocket(this.serverSocket); - - } catch (IOException e) { - throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); - } - try { - setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind - .getFragment())); - } catch (URISyntaxException e) { - - // it could be that the host name contains invalid characters such - // as _ on unix platforms - // so lets try use the IP address instead - try { - setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); - } catch (URISyntaxException e2) { - throw IOExceptionSupport.create(e2); - } - } + public void setAcceptListener(TransportAcceptListener listener) { + this.listener = listener; } - private void configureServerSocket(ServerSocket socket) throws SocketException { - socket.setSoTimeout(2000); - if (transportOptions != null) { - IntrospectionSupport.setProperties(socket, transportOptions); - } + public URI getConnectURI() { + return connectURI; } - /** - * @return Returns the wireFormatFactory. - */ - public WireFormatFactory getWireFormatFactory() { - return wireFormatFactory; - } - - /** - * @param wireFormatFactory The wireFormatFactory to set. - */ - public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { - this.wireFormatFactory = wireFormatFactory; - } - - public long getMaxInactivityDuration() { - return maxInactivityDuration; - } - - public void setMaxInactivityDuration(long maxInactivityDuration) { - this.maxInactivityDuration = maxInactivityDuration; - } - - public long getMaxInactivityDurationInitalDelay() { - return this.maxInactivityDurationInitalDelay; - } - - public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { - this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; - } - - public int getMinmumWireFormatVersion() { - return minmumWireFormatVersion; - } - - public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { - this.minmumWireFormatVersion = minmumWireFormatVersion; - } - - public boolean isTrace() { - return trace; + public InetSocketAddress getSocketAddress() { + return (InetSocketAddress) channel.socket().getLocalSocketAddress(); } - public void setTrace(boolean trace) { - this.trace = trace; + public DispatchQueue getDispatchQueue() { + return dispatchQueue; } -// -// public String getLogWriterName() { -// return logWriterName; -// } -// -// public void setLogWriterName(String logFormat) { -// this.logWriterName = logFormat; -// } - public boolean isDynamicManagement() { - return dynamicManagement; + public void setDispatchQueue(DispatchQueue dispatchQueue) { + this.dispatchQueue = dispatchQueue; } - public void setDynamicManagement(boolean useJmx) { - this.dynamicManagement = useJmx; + public void suspend() { + acceptSource.resume(); } - public boolean isStartLogging() { - return startLogging; + public void resume() { + acceptSource.resume(); } - - public void setStartLogging(boolean startLogging) { - this.startLogging = startLogging; - } - - /** - * @return the backlog - */ - public int getBacklog() { - return backlog; + public void start() throws IOException { + bind(); + acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue); + acceptSource.setEventHandler(new Runnable() { + public void run() { + try { + SocketChannel client = channel.accept(); + handleSocket(client); + } catch (IOException e) { + listener.onAcceptError(e); + } + } + }); + acceptSource.setCancelHandler(new Runnable() { + public void run() { + try { + channel.close(); + } catch (IOException e) { + } + } + }); + acceptSource.resume(); } - /** - * @param backlog the backlog to set - */ - public void setBacklog(int backlog) { - this.backlog = backlog; - } + public void bind() throws IOException { + URI bind = bindURI; - /** - * @return the useQueueForAccept - */ - public boolean isUseQueueForAccept() { - return useQueueForAccept; - } + String host = bind.getHost(); + host = (host == null || host.length() == 0) ? "localhost" : host; + if (host.equals("localhost")) { + host = "0.0.0.0"; + } - /** - * @param useQueueForAccept the useQueueForAccept to set - */ - public void setUseQueueForAccept(boolean useQueueForAccept) { - this.useQueueForAccept = useQueueForAccept; - } - + InetAddress addr = InetAddress.getByName(host); + try { + channel = ServerSocketChannel.open(); + channel.socket().bind(new InetSocketAddress(addr, bind.getPort()), backlog); + } catch (IOException e) { + throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); + } - /** - * pull Sockets from the ServerSocket - */ - public void run() { - while (!isStopped()) { - Socket socket = null; + try { + connectURI = connectURI(resolveHostName(channel.socket(), addr)); + } catch (URISyntaxException e) { + // it could be that the host name contains invalid characters such + // as _ on unix platforms + // so lets try use the IP address instead try { - socket = serverSocket.accept(); - if (socket != null) { - if (isStopped() || getAcceptListener() == null) { - socket.close(); - } else { - if (useQueueForAccept) { - socketQueue.put(socket); - }else { - handleSocket(socket); - } - } - } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); - } + connectURI = connectURI(addr.getHostAddress()); + } catch (URISyntaxException e2) { + throw IOExceptionSupport.create(e2); } } } - /** - * Allow derived classes to override the Transport implementation that this - * transport server creates. - * - * @param socket - * @param format - * @return - * @throws IOException - */ - protected Transport createTransport(Socket socket, WireFormat format) throws IOException { - return new TcpTransport(format, socket); - } - - /** - * @return pretty print of this - */ - public String toString() { - return "" + getBindLocation(); + private URI connectURI(String hostname) throws URISyntaxException { + return new URI(bindURI.getScheme(), bindURI.getUserInfo(), hostname, channel.socket().getLocalPort(), bindURI.getPath(), bindURI.getQuery(), bindURI.getFragment()); } - /** - * @param socket - * @param inetAddress - * @return real hostName - * @throws UnknownHostException - */ protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { String result = null; if (socket.isBound()) { @@ -333,144 +159,426 @@ public class TcpTransportServer extends } return result; } - - protected void doStart() throws Exception { - if(useQueueForAccept) { - Runnable run = new Runnable() { - public void run() { - try { - while (!isStopped() && !isStopping()) { - Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); - if (sock != null) { - handleSocket(sock); - } - } - - } catch (InterruptedException e) { - LOG.info("socketQueue interuppted - stopping"); - if (!isStopping()) { - onAcceptError(e); - } - } - - } - - }; - socketHandlerThread = new Thread(null, run, - "ActiveMQ Transport Server Thread Handler: " + toString(), - getStackSize()); - socketHandlerThread.setDaemon(true); - //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1); - socketHandlerThread.start(); - } - super.doStart(); - - } - protected void doStop(ServiceStopper stopper) throws Exception { - super.doStop(stopper); - if (serverSocket != null) { - serverSocket.close(); - } + public void stop() throws Exception { + acceptSource.release(); } - public InetSocketAddress getSocketAddress() { - return (InetSocketAddress)serverSocket.getLocalSocketAddress(); + public WireFormatFactory getWireFormatFactory() { + return wireFormatFactory; } - public void setTransportOption(Map transportOptions) { - this.transportOptions = transportOptions; + public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { + this.wireFormatFactory = wireFormatFactory; } - - protected final void handleSocket(Socket socket) { - try { - if (this.currentTransportCount >= this.maximumConnections) { - - }else { - HashMap options = new HashMap(); - options.put("maxInactivityDuration", Long - .valueOf(maxInactivityDuration)); - options.put("maxInactivityDurationInitalDelay", Long - .valueOf(maxInactivityDurationInitalDelay)); - options.put("minmumWireFormatVersion", Integer - .valueOf(minmumWireFormatVersion)); - options.put("trace", Boolean.valueOf(trace)); - options.put("soTimeout", Integer.valueOf(soTimeout)); - options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); - options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); -// options.put("logWriterName", logWriterName); - options - .put("dynamicManagement", Boolean - .valueOf(dynamicManagement)); - options.put("startLogging", Boolean.valueOf(startLogging)); - - options.putAll(transportOptions); - WireFormat format = wireFormatFactory.createWireFormat(); - Transport transport = createTransport(socket, format); - if (transport instanceof ServiceSupport) { - ((ServiceSupport) transport).addServiceListener(this); - } - Transport configuredTransport = transportFactory.serverConfigure( - transport, format, options); - getAcceptListener().onAccept(configuredTransport); - } - } catch (SocketTimeoutException ste) { - // expect this to happen - } catch (Exception e) { - if (!isStopping()) { - onAcceptError(e); - } else if (!isStopped()) { - LOG.warn("run()", e); - onAcceptError(e); - } - } - - } - public int getSoTimeout() { - return soTimeout; + public URI getBindURI() { + return bindURI; } - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; + public void setBindURI(URI bindURI) { + this.bindURI = bindURI; } - public int getSocketBufferSize() { - return socketBufferSize; + public int getBacklog() { + return backlog; } - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; + public void setBacklog(int backlog) { + this.backlog = backlog; } - public int getConnectionTimeout() { - return connectionTimeout; + protected final void handleSocket(SocketChannel socket) throws IOException { + HashMap options = new HashMap(); +// options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); +// options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); +// options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); +// options.put("trace", Boolean.valueOf(trace)); +// options.put("soTimeout", Integer.valueOf(soTimeout)); +// options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); +// options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); +// options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); +// options.put("startLogging", Boolean.valueOf(startLogging)); + + Transport transport = createTransport(socket, options); + transport.setWireformat(wireFormatFactory.createWireFormat()); + listener.onAccept(transport); + } + + private Transport createTransport(SocketChannel socketChannel, HashMap options) throws IOException { + TcpTransport transport = new TcpTransport(); + transport.connected(socketChannel); + if( options!=null ) { + IntrospectionSupport.setProperties(transport, options); + } + if (transportOptions != null) { + IntrospectionSupport.setProperties(transport, transportOptions); + } + return transport; } - public void setConnectionTimeout(int connectionTimeout) { - this.connectionTimeout = connectionTimeout; + public void setTransportOption(Map transportOptions) { + this.transportOptions = transportOptions; } - /** - * @return the maximumConnections - */ - public int getMaximumConnections() { - return maximumConnections; - } + + +// private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); +// protected ServerSocket serverSocket; +// protected int backlog = 5000; +// protected WireFormatFactory wireFormatFactory; +// protected final TcpTransportFactory transportFactory; +// protected long maxInactivityDuration = 30000; +// protected long maxInactivityDurationInitalDelay = 10000; +// protected int minmumWireFormatVersion; +// protected boolean useQueueForAccept=true; +// +// /** +// * trace=true -> the Transport stack where this TcpTransport +// * object will be, will have a TransportLogger layer +// * trace=false -> the Transport stack where this TcpTransport +// * object will be, will NOT have a TransportLogger layer, and therefore +// * will never be able to print logging messages. +// * This parameter is most probably set in Connection or TransportConnector URIs. +// */ +// protected boolean trace = false; +// +// protected int soTimeout = 0; +// protected int socketBufferSize = 64 * 1024; +// protected int connectionTimeout = 30000; +// +// /** +// * Name of the LogWriter implementation to use. +// * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. +// * This parameter is most probably set in Connection or TransportConnector URIs. +// +// protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; +// */ +// +// /** +// * Specifies if the TransportLogger will be manageable by JMX or not. +// * Also, as long as there is at least 1 TransportLogger which is manageable, +// * a TransportLoggerControl MBean will me created. +// */ +// protected boolean dynamicManagement = false; +// /** +// * startLogging=true -> the TransportLogger object of the Transport stack +// * will initially write messages to the log. +// * startLogging=false -> the TransportLogger object of the Transport stack +// * will initially NOT write messages to the log. +// * This parameter only has an effect if trace == true. +// * This parameter is most probably set in Connection or TransportConnector URIs. +// */ +// protected boolean startLogging = true; +// protected Map transportOptions; +// protected final ServerSocketFactory serverSocketFactory; +// protected BlockingQueue socketQueue = new LinkedBlockingQueue(); +// protected Thread socketHandlerThread; +// /** +// * The maximum number of sockets allowed for this server +// */ +// protected int maximumConnections = Integer.MAX_VALUE; +// protected int currentTransportCount=0; +// +// public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { +// super(location); +// this.transportFactory = transportFactory; +// this.serverSocketFactory = serverSocketFactory; +// +// } +// +// public void bind() throws IOException { +// URI bind = getBindLocation(); +// +// String host = bind.getHost(); +// host = (host == null || host.length() == 0) ? "localhost" : host; +// InetAddress addr = InetAddress.getByName(host); +// +// try { +// +// this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); +// configureServerSocket(this.serverSocket); +// +// } catch (IOException e) { +// throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); +// } +// try { +// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind +// .getFragment())); +// } catch (URISyntaxException e) { +// +// // it could be that the host name contains invalid characters such +// // as _ on unix platforms +// // so lets try use the IP address instead +// try { +// setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); +// } catch (URISyntaxException e2) { +// throw IOExceptionSupport.create(e2); +// } +// } +// } +// +// private void configureServerSocket(ServerSocket socket) throws SocketException { +// socket.setSoTimeout(2000); +// if (transportOptions != null) { +// IntrospectionSupport.setProperties(socket, transportOptions); +// } +// } +// +// /** +// * @return Returns the wireFormatFactory. +// */ +// public WireFormatFactory getWireFormatFactory() { +// return wireFormatFactory; +// } +// +// /** +// * @param wireFormatFactory The wireFormatFactory to set. +// */ +// public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { +// this.wireFormatFactory = wireFormatFactory; +// } +// +// public long getMaxInactivityDuration() { +// return maxInactivityDuration; +// } +// +// public void setMaxInactivityDuration(long maxInactivityDuration) { +// this.maxInactivityDuration = maxInactivityDuration; +// } +// +// public long getMaxInactivityDurationInitalDelay() { +// return this.maxInactivityDurationInitalDelay; +// } +// +// public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { +// this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; +// } +// +// public int getMinmumWireFormatVersion() { +// return minmumWireFormatVersion; +// } +// +// public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { +// this.minmumWireFormatVersion = minmumWireFormatVersion; +// } +// +// public boolean isTrace() { +// return trace; +// } +// +// public void setTrace(boolean trace) { +// this.trace = trace; +// } +//// +//// public String getLogWriterName() { +//// return logWriterName; +//// } +//// +//// public void setLogWriterName(String logFormat) { +//// this.logWriterName = logFormat; +//// } +// +// public boolean isDynamicManagement() { +// return dynamicManagement; +// } +// +// public void setDynamicManagement(boolean useJmx) { +// this.dynamicManagement = useJmx; +// } +// +// public boolean isStartLogging() { +// return startLogging; +// } +// +// +// public void setStartLogging(boolean startLogging) { +// this.startLogging = startLogging; +// } +// +// /** +// * @return the backlog +// */ +// public int getBacklog() { +// return backlog; +// } +// +// /** +// * @param backlog the backlog to set +// */ +// public void setBacklog(int backlog) { +// this.backlog = backlog; +// } +// +// /** +// * @return the useQueueForAccept +// */ +// public boolean isUseQueueForAccept() { +// return useQueueForAccept; +// } +// +// /** +// * @param useQueueForAccept the useQueueForAccept to set +// */ +// public void setUseQueueForAccept(boolean useQueueForAccept) { +// this.useQueueForAccept = useQueueForAccept; +// } +// +// +// /** +// * pull Sockets from the ServerSocket +// */ +// public void run() { +// while (!isStopped()) { +// Socket socket = null; +// try { +// socket = serverSocket.accept(); +// if (socket != null) { +// if (isStopped() || getAcceptListener() == null) { +// socket.close(); +// } else { +// if (useQueueForAccept) { +// socketQueue.put(socket); +// }else { +// handleSocket(socket); +// } +// } +// } +// } catch (SocketTimeoutException ste) { +// // expect this to happen +// } catch (Exception e) { +// if (!isStopping()) { +// onAcceptError(e); +// } else if (!isStopped()) { +// LOG.warn("run()", e); +// onAcceptError(e); +// } +// } +// } +// } +// +// /** +// * Allow derived classes to override the Transport implementation that this +// * transport server creates. +// * +// * @param socket +// * @param format +// * @return +// * @throws IOException +// */ +// protected Transport createTransport(Socket socket, WireFormat format) throws IOException { +// return new TcpTransport(format, socket); +// } +// /** - * @param maximumConnections the maximumConnections to set + * @return pretty print of this */ - public void setMaximumConnections(int maximumConnections) { - this.maximumConnections = maximumConnections; - } - - - public void started(Service service) { - this.currentTransportCount++; + public String toString() { + return "" + bindURI; } +// +// /** +// * @param socket +// * @param inetAddress +// * @return real hostName +// * @throws UnknownHostException +// */ +// +// protected void doStart() throws Exception { +// if(useQueueForAccept) { +// Runnable run = new Runnable() { +// public void run() { +// try { +// while (!isStopped() && !isStopping()) { +// Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); +// if (sock != null) { +// handleSocket(sock); +// } +// } +// +// } catch (InterruptedException e) { +// LOG.info("socketQueue interuppted - stopping"); +// if (!isStopping()) { +// onAcceptError(e); +// } +// } +// +// } +// +// }; +// socketHandlerThread = new Thread(null, run, +// "ActiveMQ Transport Server Thread Handler: " + toString(), +// getStackSize()); +// socketHandlerThread.setDaemon(true); +// //socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1); +// socketHandlerThread.start(); +// } +// super.doStart(); +// +// } +// +// protected void doStop(ServiceStopper stopper) throws Exception { +// super.doStop(stopper); +// if (serverSocket != null) { +// serverSocket.close(); +// } +// } +// +// public InetSocketAddress getSocketAddress() { +// return (InetSocketAddress)serverSocket.getLocalSocketAddress(); +// } +// +// public void setTransportOption(Map transportOptions) { +// this.transportOptions = transportOptions; +// } +// +// +// public int getSoTimeout() { +// return soTimeout; +// } +// +// public void setSoTimeout(int soTimeout) { +// this.soTimeout = soTimeout; +// } +// +// public int getSocketBufferSize() { +// return socketBufferSize; +// } +// +// public void setSocketBufferSize(int socketBufferSize) { +// this.socketBufferSize = socketBufferSize; +// } +// +// public int getConnectionTimeout() { +// return connectionTimeout; +// } +// +// public void setConnectionTimeout(int connectionTimeout) { +// this.connectionTimeout = connectionTimeout; +// } +// +// /** +// * @return the maximumConnections +// */ +// public int getMaximumConnections() { +// return maximumConnections; +// } +// +// /** +// * @param maximumConnections the maximumConnections to set +// */ +// public void setMaximumConnections(int maximumConnections) { +// this.maximumConnections = maximumConnections; +// } +// +// +// public void started(Service service) { +// this.currentTransportCount++; +// } +// +// public void stopped(Service service) { +// this.currentTransportCount--; +// } - public void stopped(Service service) { - this.currentTransportCount--; - } } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul 7 03:24:02 2010 @@ -34,6 +34,12 @@ + org.fusesource.hawtdispatch + hawtdispatch + ${hawtdispatch-version} + + + org.apache.activemq activemq-transport Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Wed Jul 7 03:24:02 2010 @@ -19,33 +19,25 @@ package org.apache.activemq.apollo; import java.beans.ExceptionListener; import java.io.IOException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Service; import org.apache.activemq.dispatch.Dispatcher; -import org.apache.activemq.transport.DispatchableTransport; +import org.apache.activemq.transport.CompletionCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; abstract public class Connection implements TransportListener, Service { protected Transport transport; protected String name; - private int priorityLevels; - protected int outputWindowSize = 1024 * 1024; - protected int outputResumeThreshold = 900 * 1024; - protected int inputWindowSize = 1024 * 1024; - protected int inputResumeThreshold = 512 * 1024; - protected boolean useAsyncWriteThread = true; - - private Dispatcher dispatcher; - private final AtomicBoolean stopping = new AtomicBoolean(); - private ExecutorService blockingWriter; - private ExceptionListener exceptionListener; + protected DispatchQueue dispatchQueue = Dispatch.createQueue(); + protected boolean stopping; + protected ExceptionListener exceptionListener; public void setTransport(Transport transport) { this.transport = transport; @@ -53,44 +45,20 @@ abstract public class Connection impleme public void start() throws Exception { transport.setTransportListener(this); - - if (transport instanceof DispatchableTransport) { - DispatchableTransport dt = ((DispatchableTransport) transport); - if (name != null) { - dt.setName(name); - } - dt.setDispatcher(getDispatcher()); - } else { - if (useAsyncWriteThread) { - blockingWriter = Executors.newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable r) { - return new Thread(r, "Writer-" + name); - } - }); - } - } + transport.setDispatchQueue(dispatchQueue); transport.start(); } public void stop() throws Exception { - stopping.set(true); + stopping = true; if (transport != null) { transport.stop(); } - if (blockingWriter != null) { - blockingWriter.shutdown(); - } + dispatchQueue.release(); } public void setName(String name) { this.name = name; - if (blockingWriter != null) { - blockingWriter.execute(new Runnable() { - public void run() { - Thread.currentThread().setName("Writer-" + Connection.this.name); - } - }); - } } public String getName() { @@ -104,36 +72,8 @@ abstract public class Connection impleme write(o, null); } - public final void write(final Object o, final Runnable onCompleted) { - if (blockingWriter == null) { - try { - transport.oneway(o); - if (onCompleted != null) { - onCompleted.run(); - } - } catch (IOException e) { - onException(e); - } - } else { - try { - blockingWriter.execute(new Runnable() { - public void run() { - if (!stopping.get()) { - try { - transport.oneway(o); - if (onCompleted != null) { - onCompleted.run(); - } - } catch (IOException e) { - onException(e); - } - } - } - }); - } catch (RejectedExecutionException re) { - // Must be shutting down. - } - } + public final void write(final Object o, final CompletionCallback callback) { + transport.oneway(o, callback); } final public void onException(IOException error) { @@ -149,49 +89,21 @@ abstract public class Connection impleme } public void setStopping() { - stopping.set(true); + stopping = true; } public boolean isStopping() { - return stopping.get(); - } - - public void transportInterupted() { - } - - public void transportResumed() { - } - - public int getPriorityLevels() { - return priorityLevels; - } - - public void setPriorityLevels(int priorityLevels) { - this.priorityLevels = priorityLevels; - } - - public Dispatcher getDispatcher() { - return dispatcher; + return stopping; } - public void setDispatcher(Dispatcher dispatcher) { - this.dispatcher = dispatcher; + public void onDisconnected() { } - public int getOutputWindowSize() { - return outputWindowSize; + public void onConnected() { } - public int getOutputResumeThreshold() { - return outputResumeThreshold; - } - - public int getInputWindowSize() { - return inputWindowSize; - } - - public int getInputResumeThreshold() { - return inputResumeThreshold; + public DispatchQueue getDispatchQueue() { + return dispatchQueue; } public Transport getTransport() { @@ -206,12 +118,4 @@ abstract public class Connection impleme this.exceptionListener = exceptionListener; } - public boolean isUseAsyncWriteThread() { - return useAsyncWriteThread; - } - - public void setUseAsyncWriteThread(boolean useAsyncWriteThread) { - this.useAsyncWriteThread = useAsyncWriteThread; - } - } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Wed Jul 7 03:24:02 2010 @@ -25,9 +25,6 @@ import java.util.concurrent.atomic.Atomi import org.apache.activemq.Service; import org.apache.activemq.apollo.Connection; -import org.apache.activemq.dispatch.Dispatcher; -import org.apache.activemq.dispatch.DispatcherConfig; -import org.apache.activemq.dispatch.DispatcherAware; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportServer; @@ -35,6 +32,8 @@ import org.apache.activemq.util.IOHelper import org.apache.activemq.util.buffer.AsciiBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; public class Broker implements Service { @@ -42,16 +41,14 @@ public class Broker implements Service { static final private Log LOG = LogFactory.getLog(Broker.class); - public static final int MAX_USER_PRIORITY = 10; - public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1; - private final ArrayList clientConnections = new ArrayList(); private final ArrayList transportServers = new ArrayList(); private final ArrayList connectUris = new ArrayList(); private final LinkedHashMap virtualHosts = new LinkedHashMap(); private VirtualHost defaultVirtualHost; - private Dispatcher dispatcher; + + private DispatchQueue dispatchQueue = Dispatch.createQueue("broker"); private File dataDirectory; private final class BrokerAcceptListener implements TransportAcceptListener { @@ -59,8 +56,6 @@ public class Broker implements Service { BrokerConnection connection = new BrokerConnection(); connection.setBroker(Broker.this); connection.setTransport(transport); - connection.setPriorityLevels(MAX_PRIORITY); - connection.setDispatcher(dispatcher); clientConnections.add(connection); try { connection.start(); @@ -125,21 +120,12 @@ public class Broker implements Service { // Create the default virtual host if not explicitly defined. getDefaultVirtualHost(); - // Don't change the state to STARTING yet as we may need to - // apply some default configuration to this broker instance before it's started. - if( dispatcher == null ) { - int threads = Runtime.getRuntime().availableProcessors(); - dispatcher = DispatcherConfig.create(getName(), threads); - } - // Ok now we are ready to start the broker up.... if ( !state.compareAndSet(State.CONFIGURATION, State.STARTING) ) { throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state. Broker was "+state.get()); } try { - dispatcher.resume(); - synchronized(virtualHosts) { for (VirtualHost virtualHost : virtualHosts.values()) { virtualHost.setBroker(this); @@ -184,8 +170,6 @@ public class Broker implements Service { for (VirtualHost virtualHost : virtualHosts.values()) { stop(virtualHost); } - - dispatcher.release(); state.set(State.STOPPED); } @@ -374,17 +358,6 @@ public class Broker implements Service { } // ///////////////////////////////////////////////////////////////// - // Property Accessors - // ///////////////////////////////////////////////////////////////// - public Dispatcher getDispatcher() { - return dispatcher; - } - public void setDispatcher(Dispatcher dispatcher) { - assertInConfigurationState(); - this.dispatcher = dispatcher; - } - - // ///////////////////////////////////////////////////////////////// // Helper Methods // ///////////////////////////////////////////////////////////////// @@ -416,10 +389,8 @@ public class Broker implements Service { } private void startTransportServer(TransportServer server) throws Exception { + server.setDispatchQueue(dispatchQueue); server.setAcceptListener(new BrokerAcceptListener()); - if (server instanceof DispatcherAware ) { - ((DispatcherAware) server).setDispatcher(dispatcher); - } server.start(); } @@ -449,6 +420,9 @@ public class Broker implements Service { } } + public DispatchQueue getDispatchQueue() { + return dispatchQueue; + } } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Wed Jul 7 03:24:02 2010 @@ -16,40 +16,12 @@ */ package org.apache.activemq.apollo.broker; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.activemq.Service; +import org.apache.activemq.broker.store.QueueDescriptor; import org.apache.activemq.broker.store.Store; -import org.apache.activemq.broker.store.Store.Callback; -import org.apache.activemq.broker.store.Store.FatalStoreException; -import org.apache.activemq.broker.store.Store.KeyNotFoundException; -import org.apache.activemq.broker.store.Store.MessageRecord; -import org.apache.activemq.broker.store.Store.QueueQueryResult; -import org.apache.activemq.broker.store.Store.QueueRecord; -import org.apache.activemq.broker.store.Store.Session; -import org.apache.activemq.dispatch.Dispatcher; -import org.apache.activemq.dispatch.DispatcherAware; -import org.apache.activemq.flow.AbstractLimitedFlowResource; -import org.apache.activemq.flow.Flow; -import org.apache.activemq.flow.FlowController; -import org.apache.activemq.flow.IFlowResource; -import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.broker.store.Store.*; +import org.apache.activemq.flow.*; import org.apache.activemq.flow.ISinkController.FlowControllable; -import org.apache.activemq.queue.QueueDescriptor; import org.apache.activemq.queue.RestoreListener; import org.apache.activemq.queue.RestoredElement; import org.apache.activemq.queue.SaveableQueueElement; @@ -59,8 +31,16 @@ import org.apache.activemq.util.buffer.A import org.apache.activemq.util.buffer.Buffer; import org.apache.activemq.util.list.LinkedNode; import org.apache.activemq.util.list.LinkedNodeList; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; -public class BrokerDatabase extends AbstractLimitedFlowResource> implements Service, DispatcherAware { +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class BrokerDatabase extends AbstractLimitedFlowResource> implements Service { private static final boolean DEBUG = false; @@ -71,7 +51,7 @@ public class BrokerDatabase extends Abst private final FlowController> storeController; private final int FLUSH_QUEUE_SIZE = 10000 * 1024; - private Dispatcher dispatcher; + private DispatchQueue dispatcher; private Thread flushThread; private AtomicBoolean running = new AtomicBoolean(false); private DatabaseListener listener; @@ -328,7 +308,7 @@ public class BrokerDatabase extends Abst if (requestedDelayedFlushPointer == -1) { requestedDelayedFlushPointer = delayedFlushPointer; - dispatcher.getGlobalQueue().dispatchAfter(flushDelayCallback, flushDelay, TimeUnit.MILLISECONDS); + Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback); } } @@ -566,10 +546,7 @@ public class BrokerDatabase extends Abst /** * Deletes the given message from the store for the given queue. * - * @param storeTracking - * The tracking number of the element being deleted - * @param queue - * The queue. + * @param queueElement * @return The {@link OperationContext} associated with the operation */ public OperationContext deleteQueueElement(SaveableQueueElement queueElement) { @@ -578,7 +555,7 @@ public class BrokerDatabase extends Abst /** * Loads a batch of messages for the specified queue. The loaded messages - * are given the provided {@link MessageRestoreListener}. + * are given the provided {@link RestoreListener}. *

* NOTE: This method uses the queue sequence number for the * message not the store tracking number. @@ -592,7 +569,7 @@ public class BrokerDatabase extends Abst * begining) * @param maxSequence * The maximum sequence number to load (-1 if no limit) - * @param max + * @param maxCount * The maximum number of messages to load (-1 if no limit) * @param listener * The listener to which messags should be passed. @@ -632,7 +609,7 @@ public class BrokerDatabase extends Abst /** * This interface is used to execute transacted code. * - * It is used by the {@link Store#execute(Callback)} method, often as + * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as * anonymous class. */ public interface Operation extends OperationContext { @@ -646,8 +623,7 @@ public class BrokerDatabase extends Abst public boolean beginExecute(); /** - * Gets called by the - * {@link Store#add(Operation, ISourceController, boolean)} method + * Gets called by the {@link Store} * within a transactional context. If any exception is thrown including * Runtime exception, the transaction is rolled back. * @@ -760,7 +736,7 @@ public class BrokerDatabase extends Abst /** * Gets called by the - * {@link Store#add(Operation, ISourceController, boolean)} method + * {@link Store} method * within a transactional context. If any exception is thrown including * Runtime exception, the transaction is rolled back. * @@ -867,7 +843,7 @@ public class BrokerDatabase extends Abst * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument + * @param tu the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an @@ -1288,12 +1264,8 @@ public class BrokerDatabase extends Abst return store.allocateStoreTracking(); } - public Dispatcher getDispatcher() { - return dispatcher; - } - - public void setDispatcher(Dispatcher dispatcher) { - this.dispatcher = dispatcher; + public void setDispatchQueue(DispatchQueue queue) { + this.dispatcher = queue; } public Store getStore() { Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul 7 03:24:02 2010 @@ -22,9 +22,9 @@ import java.util.Collection; import java.util.HashMap; import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext; +import org.apache.activemq.broker.store.QueueDescriptor; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.queue.QueueDescriptor; import org.apache.activemq.queue.SaveableQueueElement; import org.apache.activemq.util.buffer.AsciiBuffer; import org.apache.activemq.util.buffer.Buffer; Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Wed Jul 7 03:24:02 2010 @@ -22,9 +22,9 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import org.apache.activemq.broker.store.QueueDescriptor; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.broker.store.Store.QueueQueryResult; -import org.apache.activemq.dispatch.Dispatcher; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.PrioritySizeLimiter; import org.apache.activemq.flow.SizeLimiter; @@ -33,7 +33,6 @@ import org.apache.activemq.queue.IPartit import org.apache.activemq.queue.IQueue; import org.apache.activemq.queue.PartitionedQueue; import org.apache.activemq.queue.PersistencePolicy; -import org.apache.activemq.queue.QueueDescriptor; import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.RestoreListener; import org.apache.activemq.queue.SaveableQueueElement; @@ -44,6 +43,7 @@ import org.apache.activemq.util.Mapper; import org.apache.activemq.util.buffer.AsciiBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.fusesource.hawtdispatch.DispatchQueue; public class BrokerQueueStore implements QueueStore { @@ -52,7 +52,7 @@ public class BrokerQueueStore implements private static final boolean USE_PRIORITY_QUEUES = true; private BrokerDatabase database; - private Dispatcher dispatcher; + private DispatchQueue dispatchQueue; private static HashMap protocolHandlers = new HashMap(); private static final BrokerDatabase.MessageRecordMarshaller MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller() { @@ -226,8 +226,8 @@ public class BrokerQueueStore implements this.database = database; } - public void setDispatcher(Dispatcher dispatcher) { - this.dispatcher = dispatcher; + public void setDispatchQueue(DispatchQueue dispatchQueue) { + this.dispatchQueue = dispatchQueue; } public void loadQueues() throws Exception { @@ -380,7 +380,6 @@ public class BrokerQueueStore implements } }; queue = new ExclusivePersistentQueue(name, limiter); - queue.setDispatcher(dispatcher); queue.setStore(this); queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY); queue.setExpirationMapper(EXPIRATION_MAPPER); @@ -409,7 +408,7 @@ public class BrokerQueueStore implements break; } case QueueDescriptor.SHARED_PRIORITY: { - PrioritySizeLimiter limiter = new PrioritySizeLimiter(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, Broker.MAX_PRIORITY); + PrioritySizeLimiter limiter = new PrioritySizeLimiter(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10); limiter.setPriorityMapper(PRIORITY_MAPPER); limiter.setSizeMapper(SIZE_MAPPER); SharedPriorityQueue queue = new SharedPriorityQueue(name, limiter); @@ -444,7 +443,6 @@ public class BrokerQueueStore implements } } ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE); - ret.setDispatcher(dispatcher); ret.setStore(this); ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY); ret.setExpirationMapper(EXPIRATION_MAPPER); Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul 7 03:24:02 2010 @@ -98,7 +98,6 @@ class TopicSubscription implements Broke String name = subscription.getResourceName(); IFlowLimiter limiter = new SizeLimiter(100, 50); ExclusiveQueue queue = new ExclusiveQueue(flow, name, limiter); - queue.setDispatcher(host.getBroker().getDispatcher()); queue.setDrain( new QueueDispatchTarget() { public void drain(MessageDelivery elem, ISourceController controller) { subscription.add(elem, controller); Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul 7 03:24:02 2010 @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext; import org.apache.activemq.apollo.broker.Transaction.TxOp; +import org.apache.activemq.broker.store.QueueDescriptor; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.broker.store.Store.QueueQueryResult; import org.apache.activemq.flow.ISourceController; @@ -31,7 +32,6 @@ import org.apache.activemq.flow.SizeLimi import org.apache.activemq.queue.ExclusivePersistentQueue; import org.apache.activemq.queue.IQueue; import org.apache.activemq.queue.PersistencePolicy; -import org.apache.activemq.queue.QueueDescriptor; import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.RestoreListener; import org.apache.activemq.queue.SaveableQueueElement; @@ -314,7 +314,6 @@ public class TransactionManager { } }; queue = new ExclusivePersistentQueue(name, limiter); - queue.setDispatcher(host.getBroker().getDispatcher()); queue.setStore(txStore); queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY); queue.setExpirationMapper(EXPIRATION_MAPPER); Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Wed Jul 7 03:24:02 2010 @@ -31,6 +31,8 @@ import org.apache.activemq.dispatch.inte import org.apache.activemq.queue.IQueue; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.buffer.AsciiBuffer; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; /** * @author chirino @@ -48,6 +50,8 @@ public class VirtualHost implements Serv private BrokerDatabase database; private TransactionManager txnManager; + private DispatchQueue dispatchQueue = Dispatch.createQueue("virtual-host"); + public VirtualHost() { this.router.setVirtualHost(this); } @@ -117,14 +121,14 @@ public class VirtualHost implements Serv database = new BrokerDatabase(store); } - database.setDispatcher(broker.getDispatcher()); + database.setDispatchQueue(broker.getDispatchQueue()); database.start(); router.setDatabase(database); //Recover queues: queueStore.setDatabase(database); - queueStore.setDispatcher(broker.getDispatcher()); + queueStore.setDispatchQueue(dispatchQueue); queueStore.loadQueues(); // Create Queue instances @@ -277,14 +281,14 @@ public class VirtualHost implements Serv /** * A destination has bean created * - * @param destination + * @param queue */ public void onCreate(Queue queue); /** * A destination has bean destroyed * - * @param destination + * @param queue */ public void onDestroy(Queue queue); Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Wed Jul 7 03:24:02 2010 @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -29,14 +30,18 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.pipe.Pipe; +import org.apache.activemq.transport.pipe.PipeTransport; import org.apache.activemq.transport.pipe.PipeTransportFactory; +import org.apache.activemq.transport.pipe.PipeTransportServer; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.URISupport; import org.apache.activemq.wireformat.WireFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import static org.apache.activemq.transport.TransportFactorySupport.configure; +import static org.apache.activemq.transport.TransportFactorySupport.verify; + /** * Implements the vm transport which behaves like the pipe transport except that * it can start embedded brokers up on demand. @@ -46,7 +51,8 @@ import org.apache.commons.logging.LogFac */ public class VMTransportFactory extends PipeTransportFactory { static final private Log LOG = LogFactory.getLog(VMTransportFactory.class); - + private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString(); + /** * This extension of the PipeTransportServer shuts down the broker * when all the connections are disconnected. @@ -58,10 +64,9 @@ public class VMTransportFactory extends private Broker broker; @Override - protected PipeTransport createClientTransport(Pipe pipe) { + protected PipeTransport createClientTransport() { refs.incrementAndGet(); - - return new PipeTransport(pipe) { + return new PipeTransport(this) { AtomicBoolean stopped = new AtomicBoolean(); @Override public void stop() throws Exception { @@ -90,20 +95,13 @@ public class VMTransportFactory extends } } - - private static final String DEFAULT_PIPE_NAME = Broker.DEFAULT_VIRTUAL_HOST_NAME.toString(); - - @Override - public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - // Wishing right now the options would have been passed to the createTransport(URI location, WireFormat wf) method so we did don't - // need to remove these here. - options.remove("create"); - options.remove("broker"); - return super.compositeConfigure(transport, format, options); + @Override + public TransportServer bind(URI uri) { + return new VmTransportServer(); } - - @Override - protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { + + @Override + public Transport connect(URI location) throws IOException { try { String brokerURI = null; @@ -147,7 +145,6 @@ public class VMTransportFactory extends // We want to use a vm transport server impl. VmTransportServer vmTransportServer = (VmTransportServer) TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")); vmTransportServer.setBroker(broker); - vmTransportServer.setWireFormatFactory(wf.getWireFormatFactory()); broker.addTransportServer(vmTransportServer); broker.start(); @@ -159,9 +156,8 @@ public class VMTransportFactory extends } PipeTransport transport = server.connect(); - transport.setWireFormat(wf); - return transport; - + return verify( configure(transport, options), options); + } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } catch (Exception e) { @@ -169,10 +165,4 @@ public class VMTransportFactory extends } } - - @Override - protected PipeTransportServer createTransportServer() { - return new VmTransportServer(); - } - } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul 7 03:24:02 2010 @@ -36,7 +36,7 @@ public class VMTransportTest { @Test() public void autoCreateBroker() throws Exception { - Transport connect = TransportFactory.compositeConnect(new URI("vm://test1?wireFormat=mock")); + Transport connect = TransportFactory.connect(new URI("vm://test1?wireFormat=mock")); connect.start(); assertNotNull(connect); connect.stop(); @@ -44,12 +44,12 @@ public class VMTransportTest { @Test(expected=IOException.class) public void noAutoCreateBroker() throws Exception { - TransportFactory.compositeConnect(new URI("vm://test2?create=false&wireFormat=mock")); + TransportFactory.connect(new URI("vm://test2?create=false&wireFormat=mock")); } @Test(expected=IllegalArgumentException.class) public void badOptions() throws Exception { - TransportFactory.compositeConnect(new URI("vm://test3?crazy-option=false&wireFormat=mock")); + TransportFactory.connect(new URI("vm://test3?crazy-option=false&wireFormat=mock")); } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Wed Jul 7 03:24:02 2010 @@ -476,7 +476,6 @@ public abstract class BrokerTestBase { consumer.setDestination(destination); consumer.setName("consumer" + (i + 1)); consumer.setTotalConsumerRate(totalConsumerRate); - consumer.setDispatcher(dispatcher); return consumer; } @@ -498,7 +497,6 @@ public abstract class BrokerTestBase { producer.setDestination(destination); producer.setMessageIdGenerator(msgIdGenerator); producer.setTotalProducerRate(totalProducerRate); - producer.setDispatcher(dispatcher); return producer; } @@ -508,7 +506,6 @@ public abstract class BrokerTestBase { Broker broker = new Broker(); broker.addTransportServer(TransportFactory.bind(new URI(bindURI))); broker.addConnectUri(connectUri); - broker.setDispatcher(dispatcher); broker.getDefaultVirtualHost().setStore(createStore(broker)); return broker; } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Wed Jul 7 03:24:02 2010 @@ -9,7 +9,6 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.flow.ISourceController; import org.apache.activemq.metric.MetricAggregator; import org.apache.activemq.metric.MetricCounter; -import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.TransportFactory; abstract public class RemoteConsumer extends Connection { @@ -29,11 +28,8 @@ abstract public class RemoteConsumer ext consumerRate.name("Consumer " + name + " Rate"); totalConsumerRate.add(consumerRate); - transport = TransportFactory.compositeConnect(uri); - if(transport instanceof DispatchableTransport) - { - schedualWait = true; - } + transport = TransportFactory.connect(uri); + schedualWait = true; initialize(); super.start(); setupSubscription(); @@ -46,12 +42,12 @@ abstract public class RemoteConsumer ext protected void messageReceived(final ISourceController controller, final MessageDelivery elem) { if( schedualWait ) { if (thinkTime > 0) { - getDispatcher().getGlobalQueue().dispatchAfter(new Runnable(){ + dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){ public void run() { consumerRate.increment(); controller.elementDispatched(elem); } - }, thinkTime, TimeUnit.MILLISECONDS); + }); } else Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Wed Jul 7 03:24:02 2010 @@ -31,7 +31,6 @@ abstract public class RemoteProducer ext protected String property; protected MetricAggregator totalProducerRate; protected MessageDelivery next; - protected DispatchQueue dispatchQueue; protected Runnable dispatchTask; protected String filler; protected int payloadSize = 20; @@ -55,13 +54,12 @@ abstract public class RemoteProducer ext totalProducerRate.add(rate); - transport = TransportFactory.compositeConnect(uri); + transport = TransportFactory.connect(uri); initialize(); super.start(); setupProducer(); - dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD); dispatchTask = new Runnable(){ public void run() { dispatch(); Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Wed Jul 7 03:24:02 2010 @@ -26,9 +26,9 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.apollo.broker.MessageDelivery; import org.apache.activemq.broker.store.Store; import org.apache.activemq.broker.store.StoreFactory; -import org.apache.activemq.dispatch.Dispatcher; -import org.apache.activemq.dispatch.DispatcherConfig; import org.apache.activemq.queue.IQueue; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; /** * @author cmacnaug @@ -37,7 +37,7 @@ import org.apache.activemq.queue.IQueue; public class SharedQueueTest extends TestCase { - Dispatcher dispatcher; + DispatchQueue dispatchQueue; BrokerDatabase database; BrokerQueueStore queueStore; private static final boolean USE_KAHA_DB = true; @@ -46,8 +46,8 @@ public class SharedQueueTest extends Tes protected ArrayList> queues = new ArrayList>(); - protected Dispatcher createDispatcher() { - return DispatcherConfig.create("test", Runtime.getRuntime().availableProcessors()); + protected DispatchQueue createDispatcher() { + return Dispatch.createQueue(); } protected int consumerStartDelay = 0; @@ -63,20 +63,19 @@ public class SharedQueueTest extends Tes } protected void startServices() throws Exception { - dispatcher = createDispatcher(); - dispatcher.resume(); + dispatchQueue = createDispatcher(); + dispatchQueue.resume(); database = new BrokerDatabase(createStore()); - database.setDispatcher(dispatcher); + database.setDispatchQueue(dispatchQueue); database.start(); queueStore = new BrokerQueueStore(); queueStore.setDatabase(database); - queueStore.setDispatcher(dispatcher); queueStore.loadQueues(); } protected void stopServices() throws Exception { database.stop(); - dispatcher.release(); + dispatchQueue.release(); queues.clear(); } Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Jul 7 03:24:02 2010 @@ -89,6 +89,7 @@ import org.apache.activemq.management.St import org.apache.activemq.management.StatsImpl; import org.apache.activemq.state.CommandVisitorAdapter; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IdGenerator; @@ -148,7 +149,7 @@ public class ActiveMQConnection implemen private int sendTimeout =0; private boolean sendAcksAsync=true; - private final Transport transport; + private final ResponseCorrelator transport; private final IdGenerator clientIdGenerator; private final JMSStatsImpl factoryStats; private final JMSConnectionStatsImpl stats; @@ -196,7 +197,7 @@ public class ActiveMQConnection implemen */ protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { - this.transport = transport; + this.transport = new ResponseCorrelator(transport); this.clientIdGenerator = clientIdGenerator; this.factoryStats = factoryStats; @@ -1221,11 +1222,11 @@ public class ActiveMQConnection implemen } private void doAsyncSendPacket(Command command) throws JMSException { - try { +// try { this.transport.oneway(command); - } catch (IOException e) { - throw JMSExceptionSupport.create(e); - } +// } catch (IOException e) { +// throw JMSExceptionSupport.create(e); +// } } /** @@ -1820,21 +1821,21 @@ public class ActiveMQConnection implemen } } - public void transportInterupted() { + public void onDisconnected() { for (Iterator i = this.sessions.iterator(); i.hasNext();) { ActiveMQSession s = i.next(); s.clearMessagesInProgress(); } for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); - listener.transportInterupted(); + listener.onDisconnected(); } } - public void transportResumed() { + public void onConnected() { for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); - listener.transportResumed(); + listener.onConnected(); } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Jul 7 03:24:02 2010 @@ -238,7 +238,7 @@ public class ActiveMQConnectionFactory e */ protected Transport createTransport() throws JMSException { try { - return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR); + return TransportFactory.connect(brokerURL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul 7 03:24:02 2010 @@ -43,15 +43,10 @@ public class BrokerXml { private List transportServers = new ArrayList(); @XmlElement(name="connect-uri") private List connectUris = new ArrayList(); - @XmlElement(required = false) - private DispatcherXml dispatcher; public Broker createMessageBroker() throws Exception { Broker rc = new Broker(); - if( dispatcher!=null ) { - rc.setDispatcher(dispatcher.createDispatcher(this)); - } for (VirtualHostXml element : virtualHosts) { rc.addVirtualHost(element.createVirtualHost(this)); } @@ -98,13 +93,4 @@ public class BrokerXml { this.connectUris = connectUris; } - - public DispatcherXml getDispatcher() { - return dispatcher; - } - public void setDispatcher(DispatcherXml dispatcher) { - this.dispatcher = dispatcher; - } - - } Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul 7 03:24:02 2010 @@ -44,7 +44,6 @@ public class JAXBConfigTest extends Test LOG.info("Loading broker configuration from the classpath with URI: " + uri); Broker broker = BrokerFactory.createBroker(uri); - Dispatcher p = (Dispatcher)broker.getDispatcher(); // assertEquals(4, p.getSize()); // assertEquals("test dispatcher", p.getName()); Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml?rev=961062&r1=961061&r2=961062&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/pom.xml Wed Jul 7 03:24:02 2010 @@ -25,11 +25,11 @@ org.apache.activemq - activemq-kaha + activemq-hawtdb jar 6.0-SNAPSHOT - ActiveMQ :: ${artifactId} :: ${version} + ActiveMQ :: HawtDB @@ -40,8 +40,9 @@ - org.apache.activemq - kahadb + org.fusesource.hawtdb + hawtdb + ${hawtdb-version} false