activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6359
Date Thu, 14 Jul 2016 20:30:05 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x c1a299866 -> b547e4613


https://issues.apache.org/jira/browse/AMQ-6359

Allow a receiver link to enable consumer options on the subscription
such as exclusive and retroactive using options encoded on the address
(cherry picked from commit a35e8dc8a28768ddf7f14c29cb41fdc6e2e8a605)

Conflicts:
	activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b547e461
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b547e461
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b547e461

Branch: refs/heads/activemq-5.13.x
Commit: b547e46131baf31f1cad6c79b4a5444ae82842fc
Parents: c1a2998
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Jul 14 16:08:34 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Jul 14 16:29:57 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSession.java    | 17 ++++
 .../activemq/transport/amqp/JMSClientTest.java  | 81 ++++++++++++++++++++
 2 files changed, 98 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b547e461/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 20a8b9f..bb1436c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -46,6 +46,7 @@ import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Target;
@@ -313,6 +314,22 @@ public class AmqpSession implements AmqpResource {
 
             int senderCredit = protonSender.getRemoteCredit();
 
+            // Allows the options on the destination to configure the consumerInfo
+            if (destination.getOptions() != null) {
+                Map<String, Object> options = IntrospectionSupport.extractProperties(
+                    new HashMap<String, Object>(destination.getOptions()), "consumer.");
+                IntrospectionSupport.setProperties(consumerInfo, options);
+                if (options.size() > 0) {
+                    String msg = "There are " + options.size()
+                        + " consumer options that couldn't be set on the consumer."
+                        + " Check the options are spelled correctly."
+                        + " Unknown parameters=[" + options + "]."
+                        + " This consumer cannot be started.";
+                    LOG.warn(msg);
+                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
+                }
+            }
+
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(destination);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b547e461/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e4b8d7a..3583656 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -55,8 +55,10 @@ import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Test;
 import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
@@ -1177,6 +1179,85 @@ public class JMSClientTest extends JMSClientTestSupport {
         }
     }
 
+    @Test(timeout = 60000)
+    public void testZeroPrefetchWithTwoConsumers() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+        producer.send(session.createTextMessage("Msg2"));
+
+        // now lets receive it
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer1.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+        answer = (TextMessage)consumer2.receive(5000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg2");
+
+        answer = (TextMessage)consumer2.receiveNoWait();
+        assertNull("Should have not received a message!", answer);
+    }
+
+    @Test(timeout=30000)
+    public void testRetroactiveConsumerSupported() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.retroactive=true");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertNotNull(queueView);
+        assertEquals(1, queueView.getSubscriptions().length);
+
+        SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+        assertTrue(subscriber.isRetroactive());
+
+        consumer.close();
+    }
+
+    @Test(timeout=30000)
+    public void testExclusiveConsumerSupported() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.exclusive=true");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertNotNull(queueView);
+        assertEquals(1, queueView.getSubscriptions().length);
+
+        SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
+        assertTrue(subscriber.isExclusive());
+
+        consumer.close();
+    }
+
+    @Test(timeout=30000)
+    public void testUnpplicableDestinationOption() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName() + "?consumer.unknoen=true");
+        try {
+            session.createConsumer(queue);
+            fail("Should have failed to create consumer");
+        } catch (JMSException jmsEx) {
+        }
+    }
+
     protected void receiveMessages(MessageConsumer consumer) throws Exception {
         for (int i = 0; i < 10; i++) {
             Message message = consumer.receive(1000);


Mime
View raw message