activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manuel Teira Paz <mte...@tid.es>
Subject Re: tcp and nio transport considerations
Date Mon, 22 Sep 2008 16:45:01 GMT
First of all, sorry for being so boring about this issue, but I'm really 
in need of some help. After some findings, this is where I got:

-Given the fact that sockets get locked on write operations, I tried to 
detach those operations from the "ActiveMQ Transport: tcp://blablabla" 
threads, and hopefully, avoiding those threads to lock in a socketWrite 
operation.
-To achieve that, I just declared a per-instance Executor as 
Executors.newSingleThreadExecutor. That way, commands should be 
processed in order. So, in the TcpTransport run() method, I have 
something like:

while (!isStopped()) {
    doRun();
}

And in the doRun method, omitting all the exception handling:

final Object command = readcommand();
if (consumerExecutor != null) {
    consumerExecutor.execute(new Runnable() {
        public void run() {
            doConsume(command);
        }
    });
} else {
    doConsume(command);
}

So, when the member instance consumerExecutor is null, everything works 
as expected, in the non-detached current fashion. However, defining 
consumerExecutor leads to a lot of wireformat errors. For example, 
trying to connect from a client using the original activemq libraries:

[DEBUG][2008-09-22 17:59:06,363][ActiveMQ Transport Server: 
tcp://localhost:61666][org.apache.activemq.transport.tcp.TcpTransport] 
Creating TcpTransport(SO_SNDBUF: 49152), SO_RCVBUF: 57344)
[DEBUG][2008-09-22 17:59:06,405][ActiveMQ Transport Server: 
tcp://localhost:61666][org.apache.activemq.transport.WireFormatNegotiator] 
Sending: WireFormatInfo { version=2, 
properties={TightEncodingEnabled=true, CacheSize=1024, 
TcpNoDelayEnabled=true, SizePrefixDisabled=false, 
StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, 
magic=[A,c,t,i,v,e,M,Q]}
[DEBUG][2008-09-22 17:59:06,454][Tcp Transport 
Consumer][org.apache.activemq.transport.tcp.TcpTransport] Consuming 
command WireFormatInfo { version=2, 
properties={TightEncodingEnabled=true, CacheSize=1024, 
TcpNoDelayEnabled=true, SizePrefixDisabled=false, 
StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, 
magic=[A,c,t,i,v,e,M,Q]}
[DEBUG][2008-09-22 17:59:06,549][ActiveMQ Transport: 
tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection.Transport] 
Transport failed: java.io.IOException: Unknown data type: 115
java.io.IOException: Unknown data type: 115
        at 
org.apache.activemq.openwire.OpenWireFormat.looseUnmarshalNestedObject(OpenWireFormat.java:443)
        at 
org.apache.activemq.openwire.v2.BaseDataStreamMarshaller.looseUnmarsalNestedObject(BaseDataStreamMarshaller.java:436)
        at 
org.apache.activemq.openwire.v2.ConnectionInfoMarshaller.looseUnmarshal(ConnectionInfoMarshaller.java:154)
        at 
org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:349)
        at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273)
        at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:196)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:175)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:156)
        at java.lang.Thread.run(Unknown Source)
[DEBUG][2008-09-22 17:59:06,580][ActiveMQ Transport: 
tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection] 
Stopping connection: /127.0.0.1:61078
[DEBUG][2008-09-22 17:59:06,584][ActiveMQ Transport: 
tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection] 
Stopped connection: /127.0.0.1:61078
[DEBUG][2008-09-22 17:59:06,577][Tcp Transport 
Consumer][org.apache.activemq.transport.WireFormatNegotiator] Received 
WireFormat: WireFormatInfo { version=2, 
properties={TightEncodingEnabled=true, CacheSize=1024, 
TcpNoDelayEnabled=true, SizePrefixDisabled=false, 
StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, 
magic=[A,c,t,i,v,e,M,Q]}
[DEBUG][2008-09-22 17:59:06,585][Tcp Transport 
Consumer][org.apache.activemq.transport.WireFormatNegotiator] 
tcp:///127.0.0.1:61078 before negotiation: OpenWireFormat{version=2, 
cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, 
sizePrefixDisabled=false}
[DEBUG][2008-09-22 17:59:06,585][Tcp Transport 
Consumer][org.apache.activemq.transport.WireFormatNegotiator] 
tcp:///127.0.0.1:61078 after negotiation: OpenWireFormat{version=2, 
cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, 
sizePrefixDisabled=false}


Where "Tcp Transport Consumer" is the name of the threads of the 
consumerExecutor executor.

Similar effects are seen in the client side. Just a different data type:

Exception in thread "main" javax.jms.JMSException: Unknown data type: 68
        at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:46)
        at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1181)
        at 
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1263)
        at 
org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:449)
        at test.JMSThreads.main(JMSTest.java:73)
Caused by: java.io.IOException: Unknown data type: 68
        at 
org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:342)
        at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273)
        at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
        at java.lang.Thread.run(Unknown Source)



So, it seems that processing commands in a detached fashion is not 
possible. But, why? To demonstrate it, I've just set the following code 
in the doRun(), launched from the run() TcpTransport method:

    protected void doRun() throws IOException {
        try {
            final Object command = readCommand();
            log.debug("READ A SECOND COMMAND");
            Object command2 = readCommand();

Trying to read a second command from the data stream, leads to the same 
previous result:

[DEBUG][2008-09-22 18:21:05,420][ActiveMQ Transport: 
tcp:///127.0.0.1:61172][org.apache.activemq.transport.tcp.TcpTransport] 
READ A SECOND COMMAND
[DEBUG][2008-09-22 18:21:05,467][ActiveMQ Transport: 
tcp:///127.0.0.1:61172][org.apache.activemq.broker.TransportConnection.Transport] 
Transport failed: java.io.IOException: Unknown data type: 115
java.io.IOException: Unknown data type: 115
        at 
org.apache.activemq.openwire.OpenWireFormat.looseUnmarshalNestedObject(OpenWireFormat.java:443)
        at 
org.apache.activemq.openwire.v2.BaseDataStreamMarshaller.looseUnmarsalNestedObject(BaseDataStreamMarshaller.java:436)
        at 
org.apache.activemq.openwire.v2.ConnectionInfoMarshaller.looseUnmarshal(ConnectionInfoMarshaller.java:154)
        at 
org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:349)
        at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273)
        at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:198)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:177)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:156)
        at java.lang.Thread.run(Unknown Source)


What is actually reading from the stream? Shouldn't it wait for the next 
command to be available?
I'm a little lost with this, so please, shred some light into this problem.


Desperately,

Manuel.



Manuel Teira Paz escribió:
> Filip Hanik - Dev Lists escribió:
>   
>> it might help setting transport.soTimeout and that should cause the
>> socketWrite to back out, if it gets stuck.
>>
>>     
> AFAIK, soTimeout is only valid for read timeouts, no write ones. At
> least that's what I've understood from the java API docs.
>
>   
>> what you're explaining is a common problem with many AMQ installations,
>> and yes, the correct way to fix it would be to remove the locks that are
>> in the stack trace
>>
>>
>>     
> Those locks are actually used to avoid using the transport at the same
> time from different threads/sessions involved in a given connection. I
> don't think they're the problem.
>
> The problem is IMHO the Tcp Transport threads being writing acks, hence
> if they get locked into those write attempts, nobody is going to read
> anymore from the socket, and hence, will finish with more threads locked
> trying to write into that socket.
>
> I've tried to separate the doConsume into a different thread. An easy
> change, but it doesn't work. I don't know why, but subsequent
> readCommand attempts failed with EOF exceptions. I'm not able to see
> what's the difference from the point of view of the readCommand call
> into having the doConsume working into the same or a different thread.
> Any hint ?
>
> In the meantime and as a temporary worarround, I'm going to increase the
> send and receive buffers of the socket, trying to avoid the effect.
>
>
> Regards
>
> Manuel.
>
>   
>> Filip
>>
>> Manuel Teira Paz wrote:
>>
>>     
>>> Filip Hanik - Dev Lists escribió:
>>>
>>>       
>>>> hi Manuel,
>>>> I may not be understanding your theory completely, but if I do, I'd have
>>>> to disagree with parts of your assessment,
>>>>
>>>> the problem you describe doesn't really have anything to do with
>>>> blocking vs non blocking IO. instead its the implementation on top of
>>>> the socket API.
>>>>
>>>> taking a simple java program, you can read and write from blocking
>>>> sockets simultaneously.
>>>>
>>>>
>>>>         
>>> Hello Filip, and thanks for your comments. Actually, yes, you must be
>>> able to read and write simultaneously on a given socket. Sorry for
>>> being to clear enough in my exposition. I've taken a deeper look into
>>> the details of the problem and the actual problem is that all my
>>> consumer threads get locked, one trying to write in the socket, and
>>> the others trying to adquire the MutexTransport in the transport
>>> filter chain. What they are actually trying to do is to ack some
>>> already sent messages. The stack looks like this:
>>>
>>> "Session(recv,TaskManagerQueue)#56" prio=10 tid=0x00b4a6d8 nid=0x6a
>>> runnable [0xeb9fe000..0xeb9ffaa8]at
>>> java.net.SocketOutputStream.socketWrite0(Native Method)
>>> at java.net.SocketOutputStream.socketWrite(Unknown Source)
>>> at java.net.SocketOutputStream.write(Unknown Source)
>>> at
>>> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)
>>>
>>> at java.io.DataOutputStream.flush(Unknown Source)
>>> at
>>> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)
>>>
>>> at
>>> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)
>>>
>>> at
>>> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)
>>>
>>> at
>>> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)
>>>
>>> at
>>> org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:392)
>>>
>>> - locked <0x2abab308> (a java.lang.Object)
>>> at
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)
>>>
>>> - locked <0x2aba90f0> (a java.lang.Object)
>>> at
>>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>>
>>> at
>>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)
>>>
>>> at
>>> org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
>>> at
>>> org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)
>>>
>>>
>>> That is the locker, and never goes out of
>>> java.net.SocketOutputStream.socketWrite0. What I understand from that
>>> stack is that is trying to ack some messages before actually consuming
>>> the message, so, sending through the socket is involved. The thread is
>>> runnable, but since the socket buffer is full, is not able to continue.
>>>
>>> The other consumers sharing that connection show this stack:
>>>
>>> waiting for monitor entry [0xec0ff000..0xec0ffa28]
>>> at
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:46)
>>>
>>> - waiting to lock <0x2aba90f0> (a java.lang.Object)
>>> at
>>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>>>
>>> at
>>> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)
>>>
>>> at
>>> org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
>>> at
>>> org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)
>>>
>>> at
>>> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)
>>>
>>>
>>> In this situation, nobody is able to consume from the socket, since
>>> all the consumers are locked trying to ack some messages. So, it seems
>>> to me a implicit deadlock.
>>>
>>> On the other way, ActiveMQ Transport, (I think they are responsible
>>> for reading stuff from the socket)  threads seems to be stuck in a
>>> similar situation (this is from a different stack dump):
>>>
>>> "ActiveMQ Transport: tcp:///127.0.0.1:25047" daemon prio=10
>>> tid=0x003764c0 nid=0x30 runnable [0xef3fe000..0xef3ffca8]
>>> at java.net.SocketOutputStream.socketWrite0(Native Method)
>>> at java.net.SocketOutputStream.socketWrite(Unknown Source)
>>> at java.net.SocketOutputStream.write(Unknown Source)
>>> at
>>> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)
>>>
>>> at java.io.DataOutputStream.flush(Unknown Source)
>>> at
>>> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)
>>>
>>> at
>>> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)
>>>
>>> at
>>> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)
>>>
>>> at
>>> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)
>>>
>>> at
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)
>>>
>>> - locked <0x2ab72b20> (a java.lang.Object)
>>> at
>>> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1138)
>>>
>>> at
>>> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:805)
>>>
>>> at
>>> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:770)
>>>
>>> at
>>> org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:404)
>>>
>>> at
>>> org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:172)
>>>
>>> at
>>> org.apache.activemq.broker.region.PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:369)
>>>
>>> at
>>> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:204)
>>>
>>> - locked <0x2af14b48> (a
>>> org.apache.activemq.broker.region.QueueSubscription)
>>> at
>>> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:299)
>>>
>>> at
>>> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:402)
>>>
>>> at
>>> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
>>>
>>> at
>>> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>>> at
>>> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>>> at
>>> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:88)
>>>
>>> at
>>> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:506)
>>>
>>> at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
>>> at
>>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
>>>
>>> at
>>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
>>>
>>> at
>>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
>>>
>>> at
>>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:133)
>>>
>>> at
>>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:124)
>>>
>>> at
>>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>>
>>> at
>>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:137)
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>>
>>> So, will non-blocking IO be a solution for this? After a deeper look,
>>> I'm afraid it won't. The only actual difference from this point of
>>> view is that transport.nio.NIOOutputStream.flush() will write the data
>>> in chunks, as big as the socket allows, but it won't give up until all
>>> the data get written. We won't be stuck in the socketWrite0 call,
>>> instead of that, the "while (remaining > 0)" loop in NIOOutputStream,
>>> will be repeated forever, and since the other consumers sharing the
>>> connection are also trying to write into the socket to ack some
>>> message, will be no chance to free the socket buffer.
>>>
>>> If I'm not mistaken, since both "ActiveMQ Transport" and "Session"
>>> threads need to write into the socket, this could lead to the
>>> situation where everybody is trying to do so, finishing with some
>>> threads locking into the socketWrite0 call, and some others waiting
>>> for the MutexTransport writeLock to proceed.
>>>
>>> After this new review, thanks to Filip, I'm afraid that NIO is not
>>> going to be a solution (unfortunately I was not able to reproduce the
>>> hang in our labs). So, the questions are:
>>>
>>> -Do you agree with this new analysis?
>>> -Is this design still present in the 5.x releases?
>>>
>>> I'm not sure whether using different connections for consumers and
>>> producers would be a valid workaround. Perhaps it will just mitigate
>>> the situation, as more sockets get involved and so, more effective
>>> buffer space gets used, but I would like to be sure that this is not
>>> going to happen, since it leads to service unavailability situations
>>> in the product.
>>>
>>> Best regards and thanks a lot for your feedback. Looking forward for
>>> more. :)
>>>
>>> -
>>> Manuel.
>>>
>>>
>>>
>>>
>>>
>>>       
>>     
>
>   


Mime
View raw message