activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1167582 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/StompSubscription.java test/java/org/apache/activemq/transport/stomp/StompTest.java
Date Sat, 10 Sep 2011 19:48:54 GMT
Author: tabish
Date: Sat Sep 10 19:48:54 2011
New Revision: 1167582

URL: http://svn.apache.org/viewvc?rev=1167582&view=rev
Log:
fix for https://issues.apache.org/jira/browse/AMQ-3493

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1167582&r1=1167581&r2=1167582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Sat Sep 10 19:48:54 2011
@@ -113,7 +113,11 @@ public class StompSubscription {
             }
         }
 
-        unconsumedMessage.clear();
+        if (!unconsumedMessage.isEmpty()) {
+            MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE,
unconsumedMessage.size());
+            protocolConverter.getStompTransport().sendToActiveMQ(ack);
+            unconsumedMessage.clear();
+        }
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId)
{
@@ -129,7 +133,11 @@ public class StompSubscription {
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+            if (transactionId == null) {
+                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+            } else {
+                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+            }
             int count = 0;
             for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();)
{
 
@@ -138,20 +146,16 @@ public class StompSubscription {
                 MessageId id = (MessageId)entry.getKey();
                 MessageDispatch msg = (MessageDispatch)entry.getValue();
 
-                if (ack.getFirstMessageId() == null) {
-                    ack.setFirstMessageId(id);
-                }
-
                 if (transactionId != null) {
                     if (!unconsumedMessage.contains(msg)) {
                         unconsumedMessage.add(msg);
+                        count++;
                     }
                 } else {
                     iter.remove();
+                    count++;
                 }
 
-                count++;
-
                 if (id.equals(msgId)) {
                     ack.setLastMessageId(id);
                     break;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1167582&r1=1167581&r2=1167582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Sat Sep 10 19:48:54 2011
@@ -36,6 +36,7 @@ import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -43,6 +44,7 @@ import org.apache.activemq.CombinationTe
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.slf4j.Logger;
@@ -55,7 +57,6 @@ public class StompTest extends Combinati
     protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
     protected String jmsUri = "vm://localhost";
 
-
     private BrokerService broker;
     private StompConnection stompConnection = new StompConnection();
     private Connection connection;
@@ -1398,6 +1399,8 @@ public class StompTest extends Combinati
         stompConnection.ack(frame5, "tx3");
         stompConnection.commit("tx3");
 
+        waitForFrameToTakeEffect();
+
         stompDisconnect();
     }
 
@@ -1464,7 +1467,6 @@ public class StompTest extends Combinati
         TextMessage message = (TextMessage)consumer.receive(5000);
         assertNotNull(message);
         assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
-
     }
 
     public void testJMSXUserIDIsSetInStompMessage() throws Exception {
@@ -1493,10 +1495,8 @@ public class StompTest extends Combinati
         headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed");
         headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed");
 
-
         stompConnection.connect("system", "manager");
 
-
         stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
 
         stompConnection.subscribe("/queue/" + getQueueName());
@@ -1511,7 +1511,6 @@ public class StompTest extends Combinati
         assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
         assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
         assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
-
     }
 
     public void testExpire() throws Exception {
@@ -1559,30 +1558,28 @@ public class StompTest extends Combinati
         assertNotNull(stompMessage);
         assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
     }
-    
+
     public void testReceiptNewQueue() throws Exception {
-    	
+
         String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
-        
+
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3"
+ "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
-        
+
         StompFrame receipt = stompConnection.receive();
         assertTrue(receipt.getAction().startsWith("RECEIPT"));
         assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
 
-
         frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0"
+ " \n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2"
+ "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
-        
+
         receipt = stompConnection.receive();
         assertTrue(receipt.getAction().startsWith("RECEIPT"));
         assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
@@ -1598,6 +1595,51 @@ public class StompTest extends Combinati
         stompConnection.sendFrame(frame);
     }
 
+    public void testTransactedClientAckBrokerStats() throws Exception {
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        sendMessage(getName());
+        sendMessage(getName());
+
+        stompConnection.begin("tx1");
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame message = stompConnection.receive();
+        assertTrue(message.getAction().equals("MESSAGE"));
+        stompConnection.ack(message, "tx1");
+
+        message = stompConnection.receive();
+        assertTrue(message.getAction().equals("MESSAGE"));
+        stompConnection.ack(message, "tx1");
+
+        stompConnection.commit("tx1");
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        QueueViewMBean queueView = getProxyToQueue(getQueueName());
+        assertEquals(2, queueView.getDispatchCount());
+        assertEquals(2, queueView.getDequeueCount());
+        assertEquals(0, queueView.getQueueSize());
+    }
+
+    private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":Type=Queue,Destination=" + name
+                + ",BrokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;



Mime
View raw message