activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r804943 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/
Date Mon, 17 Aug 2009 11:43:35 GMT
Author: dejanb
Date: Mon Aug 17 11:43:34 2009
New Revision: 804943

URL: http://svn.apache.org/viewvc?rev=804943&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-1807 - fixing test case and
making it work

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Aug 17 11:43:34 2009
@@ -236,7 +236,6 @@
                                                 dequeueCounter++;
                                                 dispatched.remove(node);
                                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                                                prefetchExtension--;
                                             }
                                         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Mon Aug 17 11:43:34 2009
@@ -361,11 +361,7 @@
         }
         for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator();
iter.hasNext();) {
             StompSubscription sub = iter.next();
-            try {
-            	sub.onStompAbort(activemqTx);
-            } catch (Exception e) {
-            	throw new ProtocolException("Transaction abort failed", false, e);
-            }
+            sub.onStompAbort(activemqTx);
         }
 
         TransactionInfo tx = new TransactionInfo();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Mon Aug 17 11:43:34 2009
@@ -188,15 +188,6 @@
     }
     
     public void abort(String transaction) throws Exception {
-    	// discard all content on the wire before
-    	// aborting the transaction
-    	try {
-    		StompFrame discarded = this.receive(100);
-    		while (discarded != null) {
-    			discarded = this.receive(100);
-    		}
-    	} catch (Exception e) {    		
-    	}
     	HashMap<String, String> headers = new HashMap<String, String>();
     	headers.put("transaction", transaction);
     	StompFrame frame = new StompFrame("ABORT", headers);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Mon Aug 17 11:43:34 2009
@@ -99,43 +99,20 @@
         protocolConverter.getTransportFilter().sendToStomp(command);
     }
     
-    synchronized void onStompAbort(TransactionId transactionId) throws IOException, JMSException
{
-    	//ack all unacked messages
-    	for (MessageDispatch md : dispatchedMessage.values()) {
-    		if (!unconsumedMessage.contains(md)) {
-    	        MessageAck ack = new MessageAck();
-    	        ack.setDestination(consumerInfo.getDestination());
-    	        ack.setConsumerId(consumerInfo.getConsumerId());
-    	        ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
-    	        ack.setFirstMessageId(md.getMessage().getMessageId());
-    	        ack.setLastMessageId(md.getMessage().getMessageId());
-    	        ack.setMessageCount(1);
-    	        ack.setTransactionId(transactionId);
-    	        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-    	        unconsumedMessage.add(md);
-    		}
-    	}
-    	// redeliver all unconsumed messages
-    	for (MessageDispatch md : unconsumedMessage) {
-    		onMessageDispatch(md);
-    	}
+    synchronized void onStompAbort(TransactionId transactionId) {
+    	unconsumedMessage.clear();
     }
     
     synchronized void onStompCommit(TransactionId transactionId) {
-    	// ack all messages
-    	if (!unconsumedMessage.isEmpty()) {
-    		MessageAck ack = new MessageAck();
-    		ack.setDestination(consumerInfo.getDestination());
-    		ack.setConsumerId(consumerInfo.getConsumerId());
-    		ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-    		ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
-    		ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
-    		ack.setMessageCount(unconsumedMessage.size());
-    		ack.setTransactionId(transactionId);
-    		protocolConverter.getTransportFilter().sendToActiveMQ(ack);
-    		// clear lists
-    		unconsumedMessage.clear();
+    	for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Entry)iter.next();
+            MessageId id = (MessageId)entry.getKey();
+            MessageDispatch msg = (MessageDispatch)entry.getValue();
+            if (unconsumedMessage.contains(msg)) {
+            	iter.remove();
+            }
     	}
+    	unconsumedMessage.clear();
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId)
{
@@ -151,11 +128,7 @@
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-        	if (transactionId != null) {
-        		ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
-        	} else {
-        		ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-        	}
+        	ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
             int count = 0;
             for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();)
{
 
@@ -168,10 +141,12 @@
                 }
                 
                 if (transactionId != null) {
-                	if (!unconsumedMessage.contains(msg))
+                	if (!unconsumedMessage.contains(msg)) {
                 		unconsumedMessage.add(msg);
+                	}
+                } else {
+                	iter.remove();
                 }
-                iter.remove();
                 
                 
                 count++;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=804943&r1=804942&r2=804943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Aug 17 11:43:34 2009
@@ -977,24 +977,24 @@
         
         stompConnection.begin("tx2");
         
+        // Previously delivered message need to get re-acked...
+        stompConnection.ack(frame, "tx2");
+        stompConnection.ack(frame1, "tx2");
+        
         StompFrame frame3 = stompConnection.receive();
-        assertEquals(frame3.getBody(), "message 1");
+        assertEquals(frame3.getBody(), "message 3");
         stompConnection.ack(frame3, "tx2");
         
         StompFrame frame4 = stompConnection.receive();
-        assertEquals(frame4.getBody(), "message 2");
+        assertEquals(frame4.getBody(), "message 4");
         stompConnection.ack(frame4, "tx2");
         
-        StompFrame frame5 = stompConnection.receive();
-        assertEquals(frame5.getBody(), "message 3");        
-        stompConnection.ack(frame5, "tx2");
-        
         stompConnection.commit("tx2");
         
         stompConnection.begin("tx3");
-        StompFrame frame6 = stompConnection.receive();
-        assertEquals(frame6.getBody(), "message 4");
-        stompConnection.ack(frame6, "tx3");
+        StompFrame frame5 = stompConnection.receive();
+        assertEquals(frame5.getBody(), "message 5");
+        stompConnection.ack(frame5, "tx3");
         stompConnection.commit("tx3");
         
         stompDisconnect();



Mime
View raw message