activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "gustav.mauer" <gustav.ma...@iona.com>
Subject Re: Stomp protocol problems
Date Wed, 30 Jan 2008 13:31:45 GMT

I assume you only want to ACK a message once the consumer/worker thread has
processed it? I have also been wondering what the best pattern is for that.
I was also wondering if the correct way to proceed is to create a listener
for each worker thread? And if a listener thread is busy, with the request
be delivered to one of the idle listener threads, so that multiple messages
can be worked on in parallel in the server.


Sebastjan Trepca wrote:
> 
> Yes, sorry for not RTFM, it's not very logical though, to ACK all
> messages when you ack one.
> 
> So what are my options then? Stomp protocol doesn't seem to support
> sessions:
> 
> "The session-id header is a unique identifier for this session (though
> it isn't actually used yet).
> "
> Should I create separate transaction for each message and group them with
> that?
> 
> I currently have only one listener that fills a local queue that gets
> processed by consumer threads. Should I create a listener/consumer for
> each thread?
> 
> Thanks, Sebastjan
> 
> On 1/30/08, gustav.mauer <gustav.mauer@iona.com> wrote:
>>
>> I am under the impression this is correct behaviour. See for example:
>> http://java.sun.com/products/jms/tutorial/  page 63
>>
>>
>> Sebastjan Trepca wrote:
>> >
>> > Hi,
>> >
>> > I'm using ActiveMQ(both versions are affected) with STOMP protocol and
>> > noticed a problem with it. At least I hope it's a problem.
>> >
>> > When you ACK a message through stomp, all messages until the right one
>> > gets ACKed too.
>> >
>> > If we check the code:
>> >
>> >     protected void onStompAck(StompFrame command) throws
>> ProtocolException
>> > {
>> >         checkConnected();
>> >
>> >         // TODO: acking with just a message id is very bogus
>> >         // since the same message id could have been sent to 2
>> different
>> >         // subscriptions
>> >         // on the same stomp connection. For example, when 2 subs are
>> > created on
>> >         // the same topic.
>> >
>> >         Map<String, String> headers = command.getHeaders();
>> >         String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
>> >         if (messageId == null) {
>> >             throw new ProtocolException("ACK received without a
>> > message-id to acknowledge!");
>> >         }
>> >
>> >         TransactionId activemqTx = null;
>> >         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
>> >         if (stompTx != null) {
>> >             activemqTx = transactions.get(stompTx);
>> >             if (activemqTx == null) {
>> >                 throw new ProtocolException("Invalid transaction id: "
>> > + stompTx);
>> >             }
>> >         }
>> >
>> >         boolean acked = false;
>> >         for (Iterator<StompSubscription> iter =
>> > subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
>> >             StompSubscription sub = iter.next();
>> >             MessageAck ack = sub.onStompMessageAck(messageId);
>> >             if (ack != null) {
>> >                 ack.setTransactionId(activemqTx);
>> >                 sendToActiveMQ(ack, createResponseHandler(command));
>> >                 acked = true;
>> >                 break;
>> >             }
>> >         }
>> >
>> >         if (!acked) {
>> >             throw new ProtocolException("Unexpected ACK received for
>> > message-id [" + messageId + "]");
>> >         }
>> >
>> >     }
>> >
>> >
>> > synchronized MessageAck onStompMessageAck(String messageId) {
>> >
>> >         if (!dispatchedMessage.containsKey(messageId)) {
>> >             return null;
>> >         }
>> >
>> >         MessageAck ack = new MessageAck();
>> >         ack.setDestination(consumerInfo.getDestination());
>> >         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
>> >         ack.setConsumerId(consumerInfo.getConsumerId());
>> >
>> >         int count = 0;
>> >         for (Iterator iter = dispatchedMessage.entrySet().iterator();
>> > iter.hasNext();) {
>> >
>> >             Map.Entry entry = (Entry)iter.next();
>> >             String id = (String)entry.getKey();
>> >             MessageId msgid = (MessageId)entry.getValue();
>> >
>> >             if (ack.getFirstMessageId() == null) {
>> >                 ack.setFirstMessageId(msgid);
>> >             }
>> >
>> >             iter.remove();
>> >             count++;
>> >
>> >             if (id.equals(messageId)) {
>> >                 ack.setLastMessageId(msgid);
>> >                 break;
>> >             }
>> >
>> >         }
>> >
>> >         ack.setMessageCount(count);
>> >         return ack;
>> >     }
>> >
>> > It loops through all messages and ACKs them until it finds the correct
>> > message id. So when you send few messages and the last one is the
>> > processed first all previous ones get ACKed?
>> >
>> > Is this a feature of STOMP protocol or is it a bug?
>> >
>> > Thanks, Sebastjan
>> >
>> >
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15177216.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 

-- 
View this message in context: http://www.nabble.com/Stomp-protocol-problems-tp15144123s2354p15182191.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message