activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5630 - add rejectDurableConsumers boolen attribute - when true, requests to create durable subscriptions will fail with a JMSException - not allowed
Date Tue, 03 Mar 2015 15:11:31 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 4fe2bd534 -> 741e3aad3


https://issues.apache.org/jira/browse/AMQ-5630 - add rejectDurableConsumers boolen attribute
- when true, requests to create durable subscriptions will fail with a JMSException - not
allowed


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/741e3aad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/741e3aad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/741e3aad

Branch: refs/heads/master
Commit: 741e3aad3eb455220759afdd8001311727083882
Parents: 4fe2bd5
Author: gtully <gary.tully@gmail.com>
Authored: Tue Mar 3 13:29:36 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Mar 3 13:30:10 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   9 ++
 .../activemq/broker/region/TopicRegion.java     |   3 +
 .../activemq/BrokerDurableRejectedTest.java     | 124 +++++++++++++++++++
 3 files changed, 136 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index b250d32..f37381c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -249,6 +249,7 @@ public class BrokerService implements Service {
 
     private boolean restartAllowed = true;
     private boolean restartRequested = false;
+    private boolean rejectDurableConsumers = false;
 
     private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
 
@@ -3053,4 +3054,12 @@ public class BrokerService implements Service {
     public void incrementTotalConnections() {
         this.totalConnections.incrementAndGet();
     }
+
+    public boolean isRejectDurableConsumers() {
+        return rejectDurableConsumers;
+    }
+
+    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
+        this.rejectDurableConsumers = rejectDurableConsumers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index 80088d7..d0e15cd 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -109,6 +109,9 @@ public class TopicRegion extends AbstractRegion {
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
         if (info.isDurable()) {
+            if (broker.getBrokerService().isRejectDurableConsumers()) {
+                throw new JMSException("Durable Consumers are not allowed");
+            }
             ActiveMQDestination destination = info.getDestination();
             if (!destination.isPattern()) {
                 // Make sure the destination is created.

http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java
new file mode 100755
index 0000000..e8b75ef
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+
+public class BrokerDurableRejectedTest extends TestSupport {
+
+    protected Connection connection;
+    protected Session consumeSession;
+    protected Destination consumerDestination;
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm:(broker:(stomp://localhost:0)?persistent=false&rejectDurableConsumers=true)");
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+
+        connection.setClientID(getClass().getName());
+
+        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        consumerDestination = consumeSession.createTopic("TestDurableRejected");
+        connection.start();
+    }
+
+    public void testDurableTopicConsumerJms() throws Exception {
+
+        consumeSession.createConsumer(consumerDestination);
+        try {
+
+            consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
+            fail("Expect not allowed jms exception on durable creation");
+
+        } catch (JMSException expected) {
+            assertTrue("expected exception", expected.getMessage().contains("not allowed"));
+        }
+    }
+
+    public void testDurableTopicConsumerStomp() throws Exception {
+
+        // verify stomp ok in this case
+        StompConnection stompConnection = new StompConnection();
+        stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort());
+
+        // connect
+        String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        // subscribe
+        frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName()
+ "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+        assertTrue("contains expected message -" + frame, frame.contains("not allowed"));
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+
+    public void testDurableTopicConsumerStompWithReceipt() throws Exception {
+
+        // verify stomp ok in this case
+        StompConnection stompConnection = new StompConnection();
+        stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort());
+
+        // connect
+        String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        // subscribe
+        frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName()
+ "\nreceipt:1\n"
+                + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+        assertTrue("contains expected message -" + frame, frame.contains("not allowed"));
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+}


Mime
View raw message