Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 35800 invoked from network); 7 Jul 2010 03:46:19 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:46:19 -0000 Received: (qmail 33295 invoked by uid 500); 7 Jul 2010 03:46:19 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33230 invoked by uid 500); 7 Jul 2010 03:46:17 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 33223 invoked by uid 99); 7 Jul 2010 03:46:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:46:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:46:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 80E7223889DD; Wed, 7 Jul 2010 03:45:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961077 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-stomp/src/test/scala/org/apache/activemq/apollo... Date: Wed, 07 Jul 2010 03:45:18 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707034518.80E7223889DD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 03:45:17 2010 New Revision: 961077 URL: http://svn.apache.org/viewvc?rev=961077&view=rev Log: transport tweaks to get perf up Removed: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:45:17 2010 @@ -22,6 +22,7 @@ import _root_.java.lang.{String} import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer} import _root_.org.fusesource.hawtdispatch._ import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ +import org.apache.activemq.transport.Transport trait DeliveryProducer { def collocate(queue:DispatchQueue):Unit @@ -384,11 +385,21 @@ case class Delivery ( // } //} +trait DeliverySink { + def full:Boolean + def send(delivery:Delivery):Unit +} + +class TransportDeliverySink(val transport:Transport) extends DeliverySink { + def full:Boolean = transport.isFull + def send(delivery:Delivery) = transport.oneway(delivery.message, delivery) +} + /** * * @author Hiram Chirino */ -class DeliveryBuffer(var maxSize:Int=1024*32) { +class DeliveryBuffer(var maxSize:Int=1024*32) extends DeliverySink { var deliveries = new LinkedList[Delivery]() private var size = 0 @@ -425,7 +436,7 @@ class DeliveryBuffer(var maxSize:Int=102 } -class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) { +class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink { private var overflow = new LinkedList[Delivery]() @@ -433,7 +444,7 @@ class DeliveryOverflowBuffer(val deliver while( !overflow.isEmpty && !full ) { val delivery = overflow.removeFirst delivery.release - send_to_delivery_queue(delivery) + send_to_delivery_buffer(delivery) } } @@ -444,11 +455,11 @@ class DeliveryOverflowBuffer(val deliver delivery.retain overflow.addLast(delivery) } else { - send_to_delivery_queue(delivery) + send_to_delivery_buffer(delivery) } } - protected def send_to_delivery_queue(value:Delivery) = { + protected def send_to_delivery_buffer(value:Delivery) = { var delivery = Delivery(value) delivery.setDisposer(^{ drainOverflow @@ -461,7 +472,7 @@ class DeliveryOverflowBuffer(val deliver } -class DeliverySessionManager(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained { +class DeliverySessionManager(val delivery_buffer:DeliverySink, val queue:DispatchQueue) extends BaseRetained { var sessions = List[SessionServer]() @@ -524,7 +535,7 @@ class DeliverySessionManager(val deliver override def full = credits <= 0 - override protected def send_to_delivery_queue(value:Delivery) = { + override protected def send_to_delivery_buffer(value:Delivery) = { var delivery = Delivery(value) delivery.setDisposer(^{ // This is called from the server/consumer thread Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:45:17 2010 @@ -28,7 +28,6 @@ import AsciiBuffer._ import Stomp._ import BufferConversions._ import StompFrameConstants._ -import org.apache.activemq.transport.CompletionCallback import java.io.IOException @@ -90,31 +89,17 @@ class StompProtocolHandler extends Proto } } - val outboundChannel = new DeliveryBuffer + var outboundChannel:TransportDeliverySink = null var closed = false var consumer:SimpleConsumer = null var producerRoute:DeliveryProducerRoute=null var host:VirtualHost = null - outboundChannel.eventHandler = ^{ - var delivery = outboundChannel.receive - while( delivery!=null ) { - connection.transport.oneway(delivery.message, new CompletionCallback() { - def onCompletion() = { - outboundChannel.ack(delivery) - } - def onFailure(e:Exception) = { - connection.onFailure(e) - } - }); - delivery = outboundChannel.receive - } - } - private def queue = connection.dispatchQueue override def onTransportConnected() = { + outboundChannel = new TransportDeliverySink(connection.transport) connection.broker.runtime.getDefaultVirtualHost( queue.wrap { (host)=> this.host=host @@ -172,7 +157,7 @@ class StompProtocolHandler extends Proto def on_stomp_connect(headers:HeaderMap) = { - connection.transport.oneway(StompFrame(Responses.CONNECTED)) + connection.transport.oneway(StompFrame(Responses.CONNECTED), null) } def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = { @@ -267,7 +252,7 @@ class StompProtocolHandler extends Proto private def die(msg:String) = { info("Shutting connection down due to: "+msg) connection.transport.suspendRead - connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg))) + connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null) ^ { connection.stop() } ->: queue Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:45:17 2010 @@ -74,8 +74,19 @@ class StompWireFormat extends WireFormat ByteBuffer.wrap(Array(x)); } + def marshal(command:Any, os:DataOutput) = { + marshal(command.asInstanceOf[StompFrame], os) + } + + def marshal(command:Any):Buffer= { val frame = command.asInstanceOf[StompFrame] + val os = new DataByteArrayOutputStream(frame.size); + marshal(frame, os) + os.toBuffer + } + + def marshal(frame:StompFrame, os:DataOutput) = { frame.action.writeTo(os) os.write(NEWLINE) @@ -102,15 +113,6 @@ class StompWireFormat extends WireFormat END_OF_FRAME_BUFFER.writeTo(os) } - def marshal(command:Any):Buffer= { - val frame = command.asInstanceOf[StompFrame] - // make a little bigger since size can be an estimate and we want to avoid - // a capacity re-size. - val os = new DataByteArrayOutputStream(frame.size + 100); - marshal(frame, os) - os.toBuffer - } - def unmarshal(packet:Buffer) = { throw new UnsupportedOperationException } Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:45:17 2010 @@ -21,7 +21,6 @@ import _root_.org.apache.activemq.apollo import _root_.org.apache.activemq.apollo.broker.perf._ import _root_.org.apache.activemq.apollo.stomp._ -import _root_.org.apache.activemq.transport.CompletionCallback import _root_.org.apache.activemq.util.buffer._ import collection.mutable.{ListBuffer, HashMap} @@ -29,6 +28,7 @@ import AsciiBuffer._ import Stomp._ import _root_.org.apache.activemq.apollo.stomp.StompFrame import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ +import org.fusesource.hawtdispatch.BaseRetained object StompBrokerPerfTest { def main(args:Array[String]) = { @@ -55,7 +55,7 @@ class StompRemoteConsumer extends Remote } var frame = StompFrame(Stomp.Commands.CONNECT); - transport.oneway(frame); + transport.oneway(frame, null); var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination) @@ -63,7 +63,7 @@ class StompRemoteConsumer extends Remote headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO) frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers); - transport.oneway(frame); + transport.oneway(frame, null); } def onTransportCommand(command:Object) = { @@ -99,36 +99,35 @@ class StompRemoteProducer extends Remote var stompDestination:AsciiBuffer = null - val send_next:CompletionCallback = new CompletionCallback() { - def onCompletion() = { + def send_next:Unit = { + var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil + headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination); + if (property != null) { + headers ::= (ascii(property), ascii(property)); + } +// var p = this.priority; +// if (priorityMod > 0) { +// p = if ((counter % priorityMod) == 0) { 0 } else { priority } +// } + + var content = ascii(createPayload()); + val frame = StompFrame(Stomp.Commands.SEND, headers, content) + val delivery = new BaseRetained() + delivery.setDisposer(^{ rate.increment(); val task = ^ { if( !stopping ) { - - var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil - headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination); - if (property != null) { - headers ::= (ascii(property), ascii(property)); - } -// var p = this.priority; -// if (priorityMod > 0) { -// p = if ((counter % priorityMod) == 0) { 0 } else { priority } -// } - - var content = ascii(createPayload()); - transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next) + send_next } - } + } if( thinkTime > 0 ) { dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task) } else { dispatchQueue << task } - } - def onFailure(error:Exception) = { - println("stopping due to: "+error); - stop - } + }) + transport.oneway(frame, delivery) + delivery.release } override def setupProducer() = { @@ -137,7 +136,8 @@ class StompRemoteProducer extends Remote } else { stompDestination = ascii("/topic/"+destination.getName().toString()); } - transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next); + transport.oneway(StompFrame(Stomp.Commands.CONNECT), null); + } def onTransportCommand(command:Object) = { Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 03:45:17 2010 @@ -36,7 +36,7 @@ object StompLoadClient { import StompLoadClient._ implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value) - var producerSleep = 1000*1000000; + var producerSleep = 0; var consumerSleep = 0; var producers = 1; var consumers = 1; Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:45:17 2010 @@ -16,7 +16,6 @@ */ package org.apache.activemq.transport.tcp; -import org.apache.activemq.transport.CompletionCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.buffer.Buffer; @@ -25,6 +24,7 @@ import org.apache.activemq.wireformat.Wi import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; import org.fusesource.hawtdispatch.DispatchSource; +import org.fusesource.hawtdispatch.Retained; import java.io.EOFException; import java.io.IOException; @@ -32,6 +32,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.LinkedList; import java.util.Map; @@ -40,7 +41,7 @@ import static org.apache.activemq.transp /** * An implementation of the {@link Transport} interface using raw tcp/ip - * + * * @author Hiram Chirino */ public class TcpTransport implements Transport { @@ -75,20 +76,21 @@ public class TcpTransport implements Tra private DispatchSource writeSource; final LinkedList outbound = new LinkedList(); - int maxOutbound = 1024*32; + int outboundSize = 0; + int maxOutbound = 1024 * 32; ByteBuffer outbound_frame; protected boolean useLocalHost = true; - int READ_BUFFFER_SIZE = 1024*32; - ByteBuffer readBuffer = ByteBuffer.allocate(1024*32); + int READ_BUFFFER_SIZE = 1024 * 32; + ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 32); static final class OneWay { final Buffer buffer; - final CompletionCallback callback; + final Retained retained; - public OneWay(Buffer buffer, CompletionCallback callback) { - this.callback = callback; + public OneWay(Buffer buffer, Retained retained) { + this.retained = retained; this.buffer = buffer; } } @@ -111,11 +113,11 @@ public class TcpTransport implements Tra } public void setDispatchQueue(DispatchQueue queue) { - if( dispatchQueue!=null ) { + if (dispatchQueue != null) { dispatchQueue.release(); } this.dispatchQueue = queue; - if( dispatchQueue!=null ) { + if (dispatchQueue != null) { dispatchQueue.retain(); } } @@ -127,18 +129,18 @@ public class TcpTransport implements Tra if (listener == null) { throw new IllegalArgumentException("listener is not set"); } - if( transportState!=CREATED ) { + if (transportState != CREATED) { throw new IllegalStateException("can only be started from the created stae"); } - transportState=RUNNING; - + transportState = RUNNING; + unmarshalSession = wireformat.createUnmarshalSession(); - if( socketState == CONNECTING ) { + if (socketState == CONNECTING) { channel = SocketChannel.open(); } channel.configureBlocking(false); - if( socketState == CONNECTING ) { + if (socketState == CONNECTING) { if (localLocation != null) { InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); @@ -152,7 +154,7 @@ public class TcpTransport implements Tra final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue); connectSource.setEventHandler(new Runnable() { public void run() { - if( transportState==RUNNING ) { + if (transportState == RUNNING) { try { socketState = CONNECTED; channel.finishConnect(); @@ -190,7 +192,7 @@ public class TcpTransport implements Tra } readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue); - readSource.setEventHandler(new Runnable(){ + readSource.setEventHandler(new Runnable() { public void run() { try { drainInbound(); @@ -201,12 +203,12 @@ public class TcpTransport implements Tra }); writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue); - writeSource.setEventHandler(new Runnable(){ + writeSource.setEventHandler(new Runnable() { public void run() { - if( transportState==RUNNING ) { + if (transportState == RUNNING) { // once the outbound is drained.. we can suspend getting // write events. - if( drainOutbound() ) { + if (drainOutbound()) { writeSource.suspend(); } } @@ -219,33 +221,31 @@ public class TcpTransport implements Tra public void stop() throws Exception { - if( readSource!=null ) { + if (readSource != null) { readSource.release(); readSource = null; } - if( writeSource!=null ) { + if (writeSource != null) { writeSource.release(); writeSource = null; } setDispatchQueue(null); - transportState=DISPOSED; + transportState = DISPOSED; } - @Deprecated - public void oneway(Object command) { - oneway(command, null); + + public boolean isFull() { + return outboundSize >= maxOutbound; } - public void oneway(Object command, CompletionCallback callback) { + public void oneway(Object command, Retained retained) { assert Dispatch.getCurrentQueue() == dispatchQueue; try { - if( socketState != CONNECTED ) { - throw new IllegalStateException("Not connected."); - } - } catch (IllegalStateException e) { - if( callback!=null ) { - callback.onFailure(e); + if (socketState != CONNECTED) { + throw new IOException("Not connected."); } + } catch (IOException e) { + listener.onTransportFailure(e); } // Marshall the command. @@ -253,79 +253,95 @@ public class TcpTransport implements Tra try { buffer = wireformat.marshal(command); } catch (IOException e) { - callback.onFailure(e); + listener.onTransportFailure(e); return; } - outbound.add(new OneWay(buffer, callback)); + OneWay oneway; + if (retained!=null && isFull() ) { + // retaining blocks the sender it is released. + retained.retain(); + oneway = new OneWay(buffer, retained); + } else { + oneway = new OneWay(buffer, null); + } + outbound.add(oneway); + outboundSize += buffer.length; // wait for write ready events if this write // cannot be drained. - if( outbound.size()==1 && !drainOutbound() ) { + if (outbound.size() == 1 && !drainOutbound()) { writeSource.resume(); } } /** - * @retruns true if the outbound has been drained of all objects and there are no in progress writes. - */ + * @retruns true if the outbound has been drained of all objects and there are no in progress writes. + */ private boolean drainOutbound() { try { - - while(socketState == CONNECTED) { - - // if we have a pending write that is being sent over the socket... - if( outbound_frame!=null ) { - - channel.write(outbound_frame); - if( outbound_frame.remaining() != 0 ) { - return false; - } else { - outbound_frame = null; - } - } else { + while (socketState == CONNECTED) { - // marshall all the available frames.. - ByteArrayOutputStream buffer = new ByteArrayOutputStream(maxOutbound << 2); - OneWay oneWay = outbound.poll(); - - while( oneWay!=null) { - buffer.write(oneWay.buffer); - if( oneWay.callback!=null ) { - oneWay.callback.onCompletion(); - } - if( buffer.size() < maxOutbound ) { - oneWay = outbound.poll(); + // if we have a pending write that is being sent over the socket... + if (outbound_frame != null) { + + channel.write(outbound_frame); + if (outbound_frame.remaining() != 0) { + return false; } else { - oneWay = null; + outbound_frame = null; } - } - - if( buffer.size()==0 ) { - // the source is now drained... - return true; } else { - outbound_frame = buffer.toBuffer().toByteBuffer(); + + // marshall all the available frames.. + OneWay oneWay = outbound.poll(); + + int size = 0; + ArrayList buffers = new ArrayList(outbound.size()); + while (oneWay != null) { + size+=oneWay.buffer.length; + buffers.add(oneWay.buffer); + if (oneWay.retained != null) { + oneWay.retained.release(); + } + if (size < maxOutbound) { + oneWay = outbound.poll(); + } else { + oneWay = null; + } + } + + if (size == 0) { + // the source is now drained... + return true; + } else { + // Make the write just one big buffer. + outboundSize -= size; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(size); + for (Buffer b : buffers) { + buffer.write(b); + } + outbound_frame = buffer.toBuffer().toByteBuffer(); + } } - } } } catch (IOException e) { listener.onTransportFailure(e); } - - return outbound.isEmpty() && outbound_frame==null; + + return outbound.isEmpty() && outbound_frame == null; } private void drainInbound() throws IOException { - if( transportState==DISPOSED || readSource.isSuspended() ) { + if (transportState == DISPOSED || readSource.isSuspended()) { return; } - while( true ) { + while (true) { // do we need to read in more data??? if (unmarshalSession.getEndPos() == readBuffer.position()) { @@ -359,12 +375,12 @@ public class TcpTransport implements Tra } } - Object command=unmarshalSession.unmarshal(readBuffer); - if( command!=null ) { + Object command = unmarshalSession.unmarshal(readBuffer); + if (command != null) { listener.onTransportCommand(command); // the transport may be suspended after processing a command. - if( transportState==DISPOSED || readSource.isSuspended() ) { + if (transportState == DISPOSED || readSource.isSuspended()) { return; } } @@ -391,14 +407,15 @@ public class TcpTransport implements Tra public void resumeRead() { readSource.resume(); } - - public void reconnect(URI uri, CompletionCallback callback) { + + public void reconnect(URI uri) { throw new UnsupportedOperationException(); } public TransportListener getTransportListener() { return listener; } + public void setTransportListener(TransportListener listener) { this.listener = listener; } @@ -406,6 +423,7 @@ public class TcpTransport implements Tra public WireFormat getWireformat() { return wireformat; } + public void setWireformat(WireFormat wireformat) { this.wireformat = wireformat; } @@ -417,6 +435,7 @@ public class TcpTransport implements Tra public boolean isDisposed() { return transportState == DISPOSED; } + public boolean isFaultTolerant() { return false; } @@ -677,6 +696,7 @@ public class TcpTransport implements Tra // this.minmumWireFormatVersion = minmumWireFormatVersion; // } // + public boolean isUseLocalHost() { return useLocalHost; } Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul 7 03:45:17 2010 @@ -22,6 +22,7 @@ import java.net.URI; import org.apache.activemq.Service; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Retained; /** * Represents an abstract connection. It can be a client side or server side connection. @@ -30,18 +31,16 @@ import org.fusesource.hawtdispatch.Dispa */ public interface Transport extends Service { - @Deprecated - void oneway(Object command); + + boolean isFull(); /** - * A one way asynchronous send. Once the command is transmitted the callback - * is invoked. + * A one way asynchronous send. * * @param command - * @param callback * @throws IOException */ - void oneway(Object command, CompletionCallback callback); + void oneway(Object command, Retained retained); /** * Returns the current transport listener @@ -121,6 +120,6 @@ public interface Transport extends Servi * @param uri * @throws IOException on failure of if not supported */ - void reconnect(URI uri, CompletionCallback callback); + void reconnect(URI uri); } Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:45:17 2010 @@ -21,6 +21,7 @@ import java.net.URI; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Retained; /** * @version $Revision: 1.5 $ @@ -105,16 +106,14 @@ public class TransportFilter implements return next.toString(); } - @Deprecated - public void oneway(Object command) { - oneway(command, null); + public void oneway(Object command, Retained retained) { + next.oneway(command, retained); } - public void oneway(Object command, CompletionCallback callback) { - next.oneway(command, callback); + public boolean isFull() { + return next.isFull(); } - public void onTransportFailure(IOException error) { transportListener.onTransportFailure(error); } @@ -154,8 +153,8 @@ public class TransportFilter implements return next.isConnected(); } - public void reconnect(URI uri, CompletionCallback callback) { - next.reconnect(uri, callback); + public void reconnect(URI uri) { + next.reconnect(uri); } public WireFormat getWireformat() { Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961077&r1=961076&r2=961077&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:45:17 2010 @@ -16,15 +16,11 @@ */ package org.apache.activemq.transport.pipe; -import org.apache.activemq.transport.CompletionCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.buffer.Buffer; import org.apache.activemq.wireformat.WireFormat; -import org.fusesource.hawtdispatch.CustomDispatchSource; -import org.fusesource.hawtdispatch.Dispatch; -import org.fusesource.hawtdispatch.DispatchQueue; -import org.fusesource.hawtdispatch.EventAggregators; +import org.fusesource.hawtdispatch.*; import java.io.EOFException; import java.io.IOException; @@ -139,11 +135,11 @@ public class PipeTransport implements Tr static final class OneWay { final Object command; - final CompletionCallback callback; + final Retained retained; - public OneWay(Object command, CompletionCallback callback) { - this.callback = callback; + public OneWay(Object command, Retained retained) { this.command = command; + this.retained = retained; } } @@ -151,34 +147,34 @@ public class PipeTransport implements Tr int outbound = 0; int maxOutbound = 100; - @Deprecated - public void oneway(Object command) { - oneway(command, null); + public boolean isFull() { + return outbound >= maxOutbound; } - public void oneway(Object command, CompletionCallback callback) { + public void oneway(Object command, Retained retained) { if( !connected ) { throw new IllegalStateException("Not connected."); } - if( outbound < maxOutbound ) { - transmit(command, callback); + if( isFull() && retained!=null) { + retained.retain(); + inbound.add(new OneWay(command, retained)); } else { - inbound.add(new OneWay(command, callback)); + transmit(command, null); } } private void drainInbound() { - while( outbound < maxOutbound && !inbound.isEmpty() ) { + while( !isFull() && !inbound.isEmpty() ) { OneWay oneWay = inbound.poll(); - transmit(oneWay.command, oneWay.callback); + transmit(oneWay.command, oneWay.retained); } } - private void transmit(Object command, CompletionCallback callback) { + private void transmit(Object command, Retained retained) { outbound++; peer.dispatchSource.merge(command); - if( callback!=null ) { - callback.onCompletion(); + if( retained!=null ) { + retained.release(); } } @@ -200,7 +196,7 @@ public class PipeTransport implements Tr public void resumeRead() { dispatchSource.resume(); } - public void reconnect(URI uri, CompletionCallback callback) { + public void reconnect(URI uri) { throw new UnsupportedOperationException(); }