activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1233860 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/
Date Fri, 20 Jan 2012 10:59:06 GMT
Author: dejanb
Date: Fri Jan 20 10:59:05 2012
New Revision: 1233860

URL: http://svn.apache.org/viewvc?rev=1233860&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3670 - fix stomp nack support

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1233860&r1=1233859&r2=1233860&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Fri Jan 20 10:59:05 2012
@@ -256,7 +256,7 @@ public class ProtocolConverter {
 
         checkConnected();
 
-        if (this.version.equals(Stomp.V1_1)) {
+        if (this.version.equals(Stomp.V1_0)) {
             throw new ProtocolException("NACK received but connection is in v1.0 mode.");
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1233860&r1=1233859&r2=1233860&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Fri Jan 20 10:59:05 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import org.apache.activemq.command.*;
+
+import javax.jms.JMSException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -23,17 +26,6 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.TransactionId;
-
 /**
  * Keeps track of the STOMP subscription so that acking is correctly done.
  *
@@ -197,7 +189,7 @@ public class StompSubscription {
         }
         dispatchedMessage.remove(msgId);
 
-        return null;
+        return ack;
     }
 
     public String getAckMode() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1233860&r1=1233859&r2=1233860&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
Fri Jan 20 10:59:05 2012
@@ -16,20 +16,6 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
@@ -39,6 +25,12 @@ import org.apache.activemq.command.Activ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.*;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.*;
+import java.util.concurrent.TimeUnit;
+
 public class Stomp11Test extends CombinationTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
@@ -650,4 +642,60 @@ public class Stomp11Test extends Combina
         stompConnection.sendFrame(frame);
     }
 
+    public void testNackMessage() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                "login: system\n" +
+                "passcode: manager\n" +
+                "accept-version:1.1\n" +
+                "host:localhost\n" +
+                "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\npersistent:true\n\n"
+ "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        // nack it
+        frame = "NACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        //consume it from dlq
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+                "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        StompFrame receivedDLQ = stompConnection.receive(200);
+        assertEquals(receivedDLQ.getHeaders().get("message-id"), received.getHeaders().get("message-id"));
+
+        frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
 }



Mime
View raw message