activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r801916 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/StompSubscription.java test/java/org/apache/activemq/transport/stomp/StompTest.java
Date Fri, 07 Aug 2009 09:18:24 GMT
Author: dejanb
Date: Fri Aug  7 09:18:23 2009
New Revision: 801916

URL: http://svn.apache.org/viewvc?rev=801916&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2280 - stomp transactions and multiple
destinations

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Fri Aug  7 09:18:23 2009
@@ -123,18 +123,19 @@
     
     synchronized void onStompCommit(TransactionId transactionId) {
     	// ack all messages
-        MessageAck ack = new MessageAck();
-        ack.setDestination(consumerInfo.getDestination());
-        ack.setConsumerId(consumerInfo.getConsumerId());
-        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-        ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-        ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
-        ack.setMessageCount(unconsumedMessage.size());
-        ack.setTransactionId(transactionId);
-        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-        // clear lists
-    	unconsumedMessage.clear();
-    	dispatchedMessage.clear();
+    	if (!unconsumedMessage.isEmpty()) {
+    		MessageAck ack = new MessageAck();
+    		ack.setDestination(consumerInfo.getDestination());
+    		ack.setConsumerId(consumerInfo.getConsumerId());
+    		ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+    		ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+    		ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+    		ack.setMessageCount(unconsumedMessage.size());
+    		ack.setTransactionId(transactionId);
+    		protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+    		// clear lists
+    		unconsumedMessage.clear();
+    	}
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId)
{
@@ -169,9 +170,8 @@
                 if (transactionId != null) {
                 	if (!unconsumedMessage.contains(msg))
                 		unconsumedMessage.add(msg);
-                } else {
-                	iter.remove();
                 }
+                iter.remove();
                 
                 
                 count++;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=801916&r1=801915&r2=801916&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Aug  7 09:18:23 2009
@@ -998,7 +998,38 @@
         
         stompDisconnect();
     	
-    }       
+    }
+    
+    public void testTransactionsWithMultipleDestinations() throws Exception {
+
+    	stompConnection.connect("system", "manager");
+    	
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("activemq.prefetchSize", "1");
+        headers.put("activemq.exclusive", "true");
+    	
+    	stompConnection.subscribe("/queue/test1", "client", headers);
+    	
+    	stompConnection.begin("ID:tx1");
+    	
+    	headers.clear();
+    	headers.put("receipt", "ID:msg1");
+    	stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
+    	
+    	stompConnection.commit("ID:tx1");
+    	
+    	// make sure connection is active after commit
+    	Thread.sleep(1000);
+    	stompConnection.send("/queue/test1", "another message");
+    	
+    	StompFrame frame = stompConnection.receive(500);
+    	System.out.println(frame);
+    	assertNotNull(frame);
+    	
+    	
+    	stompConnection.disconnect();
+    }
+    
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;



Mime
View raw message