activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mark Gellings (JIRA)" <jira+amq...@apache.org>
Subject [jira] Commented: (AMQNET-194) Async error occurred: javax.jms.JMSException: Unmatched acknowledege when Acknowledgemode of Transactional used
Date Thu, 08 Oct 2009 21:29:54 GMT

    [ https://issues.apache.org/activemq/browse/AMQNET-194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=54675#action_54675
] 

Mark Gellings commented on AMQNET-194:
--------------------------------------

Checking back on the original version of this class, the logic has been exactly the same up
to the latest version in source control except changing to use a proper sync object. http://fisheye6.atlassian.com/browse/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?r=707747#l493

>From further testing of mine the command ids in requestMap are not getting removed. The
below line is where they should be getting removed yet I'm not ever getting a command back
to remove a request map with the consumer. *Producer works fine.

Further down t.onResponses(); is never invoked so the "removeTransactionState" method on the
ConnectionState never gets invoked. So bottom line transactions never get removed and on a
restore all the command in the transactionstate get sent again.

So here's the problem. Since we're never removing command ids from the requestMap this is
why we eventually get the "An item with the same key has already been added." exception. And
since we are never removing command ids we also are not removing transactionstate. When the
exception occurs it starts the infinite loop.

Flow through the code marked with "-->".

public void onCommand(ITransport sender, Command command)
{
if(command != null)
{
if(command.IsResponse)
{
Object oo = null;
lock(((ICollection) requestMap).SyncRoot)
{
int v = ((Response) command).CorrelationId;
try
{
oo = requestMap[v];
--> requestMap.Remove(v);
}
catch
{
}
}

Tracked t = oo as Tracked;
if(t != null)
{
--> t.onResponses();
}
}

public class Tracked : Response
{
private ThreadSimulator runnable = null;

public Tracked(ThreadSimulator runnable)
{
this.runnable = runnable;
}

--> public void onResponses()
{
if(runnable != null)
{
--> runnable.Run();
runnable = null;
}
}

private class RemoveTransactionAction : ThreadSimulator
{
private TransactionInfo info;
private ConnectionStateTracker cst;

public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
{
this.info = info;
this.cst = aCst;
}

public override void Run()
{
ConnectionId connectionId = info.ConnectionId;
ConnectionState cs = cst.connectionStates[connectionId];
--> cs.removeTransactionState(info.TransactionId);
}
}

public TransactionState removeTransactionState(TransactionId id)
{
TransactionState ret = transactions[id];
--> (this never happens) transactions.Remove(id);
return ret;
}

To simulate the problem I'm seeing, add the following lines to this code:

public void onCommand(ITransport sender, Command command)
{
if(command != null)
{
if(command.IsResponse)
{
Object oo = null;
lock(((ICollection) requestMap).SyncRoot)
{
int v = ((Response) command).CorrelationId;
try
{
oo = requestMap[v];
            --> Console.WriteLine("Removing {0}. Is {0} in requestMap? {1}", v, requestMap.ContainsKey(v));
requestMap.Remove(v);
}
catch
{
}
}

Tracked t = oo as Tracked;
if(t != null)
{
t.onResponses();
}
}

if(!initialized)
{
if(command.IsBrokerInfo)
{
BrokerInfo info = (BrokerInfo) command;
BrokerInfo[] peers = info.PeerBrokerInfos;
if(peers != null)
{
for(int i = 0; i < peers.Length; i++)
{
String brokerString = peers[i].BrokerURL;
add(brokerString);
}
}

initialized = true;
}
}
}

this.Command(sender, command);


      --> Console.WriteLine("Request map count is now {0}", requestMap.Count);
}

Comment out the following lines in the OnMessage event.

public static void OnMessage(IMessage message)
        {
            try
            {
               // Console.WriteLine("LISTENER Message:");
               // Console.WriteLine("LISTENER " + message);

> Async error occurred: javax.jms.JMSException: Unmatched acknowledege when Acknowledgemode
of Transactional used
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-194
>                 URL: https://issues.apache.org/activemq/browse/AMQNET-194
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ, NMS
>    Affects Versions: 1.0.0, 1.1.0
>         Environment: ActiveMQ 5.2 as Windows Service on Windows Server 2008 64-bit. 
Client producer and consumer on Windows XP
>            Reporter: Mark Gellings
>            Assignee: Timothy Bish
>             Fix For: 1.2.0
>
>         Attachments: activemq.xml, NativeNMSConsumerAndProducer.zip
>
>
> See http://www.nabble.com/Async-error-occurred--%3E-ActiveMQ-5.2-and-NMS-td25474605.html
for a primer to this issue.
> After troubleshooting this,  it appears that if a producer sends 65535+ messages to the
broker that the consumer gets into an infinite loop after consuming 65535 messages.  
> I will be attaching a test case for this.  Run a producer and a consumer, send 100,000
messages and the test case fails at 65535 messages consumed exactly.
> What appears to be happening is that the NMS provider (or ActiveMQ 5.2) is hitting the
upper bound of a ushort and then starting all back over again sending message acknowledgements
from message #1.  This message was already dispatched though and the broker lot shows an error
that the message is not in the dispatched-list .
> The consumer shows a message acknowledgement going out.
> These errors repeat infinitely it seems.  Once the 65535th message errors out then it
starts all over again at the 1st message.
> Notice the ProducerSequenceId growing beyond 65535.  I can't seem to avoid this error,
even if I reinstantiate the producer sporadically.
> At the beginning of the source code I have included a loop which shows the behavior of
incrementing a ushort beyond 65535.  The variable resets to the lower bound as expected.
> [DEBUG] Apache.NMS.Tracer - Sending Ack: MessageAck[ Destination=queue://testmonday150
TransactionId=LocalTransactionId[ Value=30558 ConnectionId=ConnectionId[ Value=2b6ae5a1-c0e1-4523-8d9c-4143cc9d74e8
] ] ConsumerId=ConsumerId[ ConnectionId=2b6ae5a1-c0e1-4523-8d9c-4143cc9d74e8 SessionId=1 Value=1
] AckType=2 FirstMessageId=MessageId[ ProducerId=ProducerId[ ConnectionId=9e256437-df18-44e5-9cb2-b7f3feaebcfd
Value=1 SessionId=1 ] ProducerSequenceId=30558 BrokerSequenceId=80280 ] LastMessageId=MessageId[
ProducerId=ProducerId[ ConnectionId=9e256437-df18-44e5-9cb2-b7f3feaebcfd Value=1 SessionId=1
] ProducerSequenceId=30558 BrokerSequenceId=80280 ] MessageCount=1 ]
>  ERROR Service                        -
>  Async error occurred: javax.jms.JMSException: Unmatched acknowledege: MessageAc
> k {commandId = 26140, responseRequired = false, ackType = 2, consumerId = 41b092
> 97-da4b-4d11-9ed7-09d344825740:1:1, firstMessageId = 4128971e-c69b-4892-8be5-d99
> 868045881:1:1:30558, lastMessageId = 4128971e-c69b-4892-8be5-d99868045881:1:1:30
> 558, destination = queue://testwed924, transactionId = TX:41b09297-da4b-4d11-9ed
> 7-09d344825740:30558, messageCount = 1}; Could not find Message-ID 4128971e-c69b
> -4892-8be5-d99868045881:1:1:30558 in dispatched-list (start of ack)
> INFO   | jvm 1    | 2009/10/07 14:03:27 | javax.jms.JMSException: Unmatched ackn
> owledege: MessageAck {commandId = 26140, responseRequired = false, ackType = 2,
> consumerId = 41b09297-da4b-4d11-9ed7-09d344825740:1:1, firstMessageId = 4128971e
> -c69b-4892-8be5-d99868045881:1:1:30558, lastMessageId = 4128971e-c69b-4892-8be5-
> d99868045881:1:1:30558, destination = queue://testwed924, transactionId = TX:41b
> 09297-da4b-4d11-9ed7-09d344825740:30558, messageCount = 1}; Could not find Messa
> ge-ID 4128971e-c69b-4892-8be5-d99868045881:1:1:30558 in dispatched-list (start o
> f ack)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.re
> gion.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:4
> 38)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.re
> gion.PrefetchSubscription.acknowledge(PrefetchSubscription.java:188)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.re
> gion.AbstractRegion.acknowledge(AbstractRegion.java:373)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.re
> gion.RegionBroker.acknowledge(RegionBroker.java:462)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Tr
> ansactionBroker.acknowledge(TransactionBroker.java:194)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Br
> okerFilter.acknowledge(BrokerFilter.java:74)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Br
> okerFilter.acknowledge(BrokerFilter.java:74)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Br
> okerFilter.acknowledge(BrokerFilter.java:74)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Br
> okerFilter.acknowledge(BrokerFilter.java:74)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Mu
> tableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Tr
> ansportConnection.processMessageAck(TransportConnection.java:456)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.command.M
> essageAck.visit(MessageAck.java:205)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Tr
> ansportConnection.service(TransportConnection.java:305)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.broker.Tr
> ansportConnection$1.onCommand(TransportConnection.java:179)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .TransportFilter.onCommand(TransportFilter.java:68)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .InactivityMonitor.onCommand(InactivityMonitor.java:206)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .TransportSupport.doConsume(TransportSupport.java:84)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .tcp.TcpTransport.doRun(TcpTransport.java:203)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at org.apache.activemq.transport
> .tcp.TcpTransport.run(TcpTransport.java:185)
> INFO   | jvm 1    | 2009/10/07 14:03:27 |       at java.lang.Thread.run(Unknown
> Source)

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message