activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5666
Date Wed, 18 Mar 2015 19:00:18 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 20832f1f1 -> 4228e3d3e


https://issues.apache.org/jira/browse/AMQ-5666

Tests around durable subscription lookup and reattach.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4228e3d3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4228e3d3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4228e3d3

Branch: refs/heads/master
Commit: 4228e3d3e812436585ea7e7746f8eb42a7a45493
Parents: 20832f1
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Mar 18 14:59:49 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Mar 18 15:00:11 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   |   1 +
 .../transport/amqp/client/AmqpReceiver.java     |   7 +-
 .../transport/amqp/client/AmqpSession.java      |  37 +++++
 .../amqp/interop/AmqpDurableReceiverTest.java   | 150 ++++++++++++++-----
 4 files changed, 156 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4228e3d3/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 5a73a25..d1e9f5a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -1460,6 +1460,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     source.setAddress(destination.getQualifiedName());
                     source.setDurable(TerminusDurability.UNSETTLED_STATE);
                     source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+                    source.setDistributionMode(COPY);
                 } else {
                     consumerContext.closed = true;
                     sender.setSource(null);

http://git-wip-us.apache.org/repos/asf/activemq/blob/4228e3d3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 1290d27..585ba93 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -89,6 +89,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
      *        The unique ID assigned to this receiver.
      */
     public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+
+        if (address != null && address.isEmpty()) {
+            throw new IllegalArgumentException("Address cannot be empty.");
+        }
+
         this.session = session;
         this.address = address;
         this.receiverId = receiverId;
@@ -449,7 +454,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
         Source source = userSpecifiedSource;
         Target target = new Target();
 
-        if (userSpecifiedSource == null) {
+        if (userSpecifiedSource == null && address != null) {
             source = new Source();
             source.setAddress(address);
             configureSource(source);

http://git-wip-us.apache.org/repos/asf/activemq/blob/4228e3d3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 8b039b6..3b2a3d1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -268,6 +268,43 @@ public class AmqpSession extends AmqpAbstractResource<Session>
{
     }
 
     /**
+     * Create a receiver instance using the given address that creates a durable subscription.
+     *
+     * @param subscriptionName
+     *        the name of the subscription that should be queried for on the remote..
+     *
+     * @return a newly created receiver that is ready for use if the subscription exists.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver lookupSubscription(String subscriptionName) throws Exception {
+        checkClosed();
+
+        if (subscriptionName == null || subscriptionName.isEmpty()) {
+            throw new IllegalArgumentException("subscription name must not be null or empty.");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, (String) null, getNextReceiverId());
+        receiver.setSubscriptionName(subscriptionName);
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.setStateInspector(getStateInspector());
+                receiver.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
      * @return this session's parent AmqpConnection.
      */
     public AmqpConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4228e3d3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 7fd6080..028991d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -16,9 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.interop;
 
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.TimeUnit;
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -26,7 +27,10 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.apache.activemq.util.Wait;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Test;
 
 /**
@@ -52,20 +56,37 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
 
         final BrokerViewMBean brokerView = getProxyToBroker();
 
-        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDetachedDurableReceiverRemainsActive() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName());
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerView.getDurableTopicSubscribers().length == 1;
-            }
+        receiver.detach();
 
-        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
 
         connection.close();
     }
 
     @Test(timeout = 60000)
-    public void testDetachedDurableReceiverRemainsActive() throws Exception {
+    public void testCloseDurableReceiverRemovesSubscription() throws Exception {
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.createConnection();
@@ -77,31 +98,54 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
 
         final BrokerViewMBean brokerView = getProxyToBroker();
 
-        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+
+        receiver.close();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReattachToDurableNode() throws Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerView.getDurableTopicSubscribers().length == 1;
-            }
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName());
 
-        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
         receiver.detach();
 
-        assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition()
{
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerView.getInactiveDurableTopicSubscribers().length == 1;
-            }
+        receiver.close();
 
-        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
         connection.close();
     }
 
     @Test(timeout = 60000)
-    public void testCloseDurableReceiverRemovesSubscription() throws Exception {
+    public void testLookupExistingSubscription() throws Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.createConnection();
@@ -111,28 +155,58 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         AmqpSession session = connection.createSession();
         AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(),
getTestName());
 
-        final BrokerViewMBean brokerView = getProxyToBroker();
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.detach();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
 
-        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+        receiver = session.lookupSubscription(getTestName());
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerView.getDurableTopicSubscribers().length == 1;
-            }
+        assertNotNull(receiver);
 
-        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
+        Receiver protonReceiver = receiver.getReceiver();
+        assertNotNull(protonReceiver.getRemoteSource());
+        Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+        assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+        assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+        assertEquals(COPY, remoteSource.getDistributionMode());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
         receiver.close();
 
-        assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition()
{
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testLookupNonExistingSubscription() throws Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerView.getDurableTopicSubscribers().length == 0 &&
-                       brokerView.getInactiveDurableTopicSubscribers().length == 0;
-            }
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
 
-        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
+        try {
+            session.lookupSubscription(getTestName());
+            fail("Should throw an exception since there is not subscription");
+        } catch (Exception e) {
+            LOG.info("Error on lookup: {}", e.getMessage());
+        }
 
         connection.close();
     }


Mime
View raw message