Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5550B7F5B for ; Sat, 10 Sep 2011 19:49:17 +0000 (UTC) Received: (qmail 19824 invoked by uid 500); 10 Sep 2011 19:49:17 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 19758 invoked by uid 500); 10 Sep 2011 19:49:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19751 invoked by uid 99); 10 Sep 2011 19:49:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2011 19:49:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2011 19:49:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C681E2388900 for ; Sat, 10 Sep 2011 19:48:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1167582 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/StompSubscription.java test/java/org/apache/activemq/transport/stomp/StompTest.java Date: Sat, 10 Sep 2011 19:48:54 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110910194854.C681E2388900@eris.apache.org> Author: tabish Date: Sat Sep 10 19:48:54 2011 New Revision: 1167582 URL: http://svn.apache.org/viewvc?rev=1167582&view=rev Log: fix for https://issues.apache.org/jira/browse/AMQ-3493 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1167582&r1=1167581&r2=1167582&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Sat Sep 10 19:48:54 2011 @@ -113,7 +113,11 @@ public class StompSubscription { } } - unconsumedMessage.clear(); + if (!unconsumedMessage.isEmpty()) { + MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + protocolConverter.getStompTransport().sendToActiveMQ(ack); + unconsumedMessage.clear(); + } } synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { @@ -129,7 +133,11 @@ public class StompSubscription { ack.setConsumerId(consumerInfo.getConsumerId()); if (ackMode == CLIENT_ACK) { - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + if (transactionId == null) { + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + } else { + ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); + } int count = 0; for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { @@ -138,20 +146,16 @@ public class StompSubscription { MessageId id = (MessageId)entry.getKey(); MessageDispatch msg = (MessageDispatch)entry.getValue(); - if (ack.getFirstMessageId() == null) { - ack.setFirstMessageId(id); - } - if (transactionId != null) { if (!unconsumedMessage.contains(msg)) { unconsumedMessage.add(msg); + count++; } } else { iter.remove(); + count++; } - count++; - if (id.equals(msgId)) { ack.setLastMessageId(id); break; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1167582&r1=1167581&r2=1167582&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Sat Sep 10 19:48:54 2011 @@ -36,6 +36,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; @@ -43,6 +44,7 @@ import org.apache.activemq.CombinationTe import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; @@ -55,7 +57,6 @@ public class StompTest extends Combinati protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; protected String jmsUri = "vm://localhost"; - private BrokerService broker; private StompConnection stompConnection = new StompConnection(); private Connection connection; @@ -1398,6 +1399,8 @@ public class StompTest extends Combinati stompConnection.ack(frame5, "tx3"); stompConnection.commit("tx3"); + waitForFrameToTakeEffect(); + stompDisconnect(); } @@ -1464,7 +1467,6 @@ public class StompTest extends Combinati TextMessage message = (TextMessage)consumer.receive(5000); assertNotNull(message); assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID)); - } public void testJMSXUserIDIsSetInStompMessage() throws Exception { @@ -1493,10 +1495,8 @@ public class StompTest extends Combinati headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed"); headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed"); - stompConnection.connect("system", "manager"); - stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); stompConnection.subscribe("/queue/" + getQueueName()); @@ -1511,7 +1511,6 @@ public class StompTest extends Combinati assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED)); assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION)); assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID)); - } public void testExpire() throws Exception { @@ -1559,30 +1558,28 @@ public class StompTest extends Combinati assertNotNull(stompMessage); assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); } - + public void testReceiptNewQueue() throws Exception { - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + StompFrame receipt = stompConnection.receive(); assertTrue(receipt.getAction().startsWith("RECEIPT")); assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id")); - frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + receipt = stompConnection.receive(); assertTrue(receipt.getAction().startsWith("RECEIPT")); assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id")); @@ -1598,6 +1595,51 @@ public class StompTest extends Combinati stompConnection.sendFrame(frame); } + public void testTransactedClientAckBrokerStats() throws Exception { + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + sendMessage(getName()); + sendMessage(getName()); + + stompConnection.begin("tx1"); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + stompConnection.commit("tx1"); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + waitForFrameToTakeEffect(); + + QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertEquals(2, queueView.getDispatchCount()); + assertEquals(2, queueView.getDequeueCount()); + assertEquals(0, queueView.getQueueSize()); + } + + private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":Type=Queue,Destination=" + name + + ",BrokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;