Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5043107EA for ; Mon, 2 Mar 2015 21:12:50 +0000 (UTC) Received: (qmail 74313 invoked by uid 500); 2 Mar 2015 21:12:50 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 74269 invoked by uid 500); 2 Mar 2015 21:12:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 74260 invoked by uid 99); 2 Mar 2015 21:12:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Mar 2015 21:12:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A8BEE0FC6; Mon, 2 Mar 2015 21:12:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5622 Date: Mon, 2 Mar 2015 21:12:50 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master bb83bf574 -> 4fe2bd534 https://issues.apache.org/jira/browse/AMQ-5622 Fix some issues with STOMP v1.2 protocol support. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fe2bd53 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fe2bd53 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fe2bd53 Branch: refs/heads/master Commit: 4fe2bd534a6b8b256790ce148dbad21f704e86e5 Parents: bb83bf5 Author: Timothy Bish Authored: Mon Mar 2 16:12:44 2015 -0500 Committer: Timothy Bish Committed: Mon Mar 2 16:12:44 2015 -0500 ---------------------------------------------------------------------- .../transport/stomp/ProtocolConverter.java | 6 +- .../activemq/transport/stomp/Stomp12Test.java | 107 ++++++++++++++++++- 2 files changed, 106 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4fe2bd53/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index d366962..7ec53b1 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -557,7 +557,7 @@ public class ProtocolConverter { String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); - if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { + if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { throw new ProtocolException("SUBSCRIBE received without a subscription id!"); } @@ -675,7 +675,7 @@ public class ProtocolConverter { } String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); - if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { + if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); } @@ -686,7 +686,7 @@ public class ProtocolConverter { // check if it is a durable subscription String durable = command.getHeaders().get("activemq.subscriptionName"); String clientId = durable; - if (this.version.equals(Stomp.V1_1)) { + if (!this.version.equals(Stomp.V1_0)) { clientId = connectionInfo.getClientId(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/4fe2bd53/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java index 1ccdbd8..cec51ee 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; @@ -319,9 +320,13 @@ public class Stomp12Test extends StompTestSupport { frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - try { - Thread.sleep(400); - } catch (InterruptedException e){} + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); // reconnect and send some messages to the offline subscribers and then try to get // them after subscribing again. @@ -411,7 +416,7 @@ public class Stomp12Test extends StompTestSupport { assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "receipt:1" + "id:12345\n\n" + Stomp.NULL; + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; stompConnection.sendFrame(unsub); StompFrame stompFrame = stompConnection.receive(); @@ -466,4 +471,98 @@ public class Stomp12Test extends StompTestSupport { String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } + + @Test(timeout = 60000) + public void testDurableSubAndUnSub() throws Exception { + BrokerViewMBean view = getProxyToBroker(); + + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "client-id:durableSubTest\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String frame = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + frame); + + assertTrue(frame.startsWith("CONNECTED")); + assertEquals(view.getDurableTopicSubscribers().length, 0); + + // subscribe to destination durably + frame = "SUBSCRIBE\n" + + "destination:/topic/" + getQueueName() + "1" + "\n" + + "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" + + "activemq.subscriptionName:test1\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame receipt = stompConnection.receive(); + LOG.debug("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + assertEquals("1", receipt.getHeaders().get("receipt-id")); + assertEquals(view.getDurableTopicSubscribers().length, 1); + + frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + stompConnection.close(); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); + + stompConnect(); + stompConnection.sendFrame(connectFrame); + frame = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + frame); + assertTrue(frame.startsWith("CONNECTED")); + assertEquals(view.getDurableTopicSubscribers().length, 0); + assertEquals(view.getInactiveDurableTopicSubscribers().length, 1); + + // unsubscribe from topic + frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" + + "id:durablesub-1\n" + "receipt:3\n" + + "activemq.subscriptionName:test1\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + receipt = stompConnection.receive(); + LOG.debug("Broker sent: " + frame); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + assertEquals("3", receipt.getHeaders().get("receipt-id")); + + assertEquals(view.getInactiveDurableTopicSubscribers().length, 0); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + @Test(timeout = 60000) + public void testSubscribeWithNoId() throws Exception { + + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } }