activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6305
Date Tue, 05 Jul 2016 19:55:42 GMT
https://issues.apache.org/jira/browse/AMQ-6305

Refactor credit handling and drain state tracking to ensure we stay in
sync with the remote state and always answer drain requests.  Start
adding some more tests around drain to the interop suite.
(cherry picked from commit 8448cf1cb886b242b54235b091259acbf43c2108)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8916beea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8916beea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8916beea

Branch: refs/heads/activemq-5.13.x
Commit: 8916beea4228e5432c948629e252d363c63cfea3
Parents: 508c12d
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jun 1 18:30:31 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jul 5 19:55:02 2016 +0000

----------------------------------------------------------------------
 .../amqp/protocol/AmqpAbstractLink.java         |   4 +-
 .../transport/amqp/protocol/AmqpConnection.java |   1 +
 .../transport/amqp/protocol/AmqpSender.java     |  86 +++---
 .../transport/amqp/JMSClientTestSupport.java    |  40 +++
 .../amqp/JMSClientTransactionTest.java          | 102 ++++++-
 .../transport/amqp/JMSQueueBrowserTest.java     | 245 +++++++++++++++-
 .../amqp/client/AmqpAbstractResource.java       |  13 +
 .../transport/amqp/client/AmqpConnection.java   |  32 ++-
 .../transport/amqp/client/AmqpReceiver.java     | 277 ++++++++++++++++++-
 .../transport/amqp/client/AmqpResource.java     |  12 +
 .../transport/amqp/client/AmqpSender.java       |   4 +-
 .../transport/amqp/client/AmqpSession.java      |  17 +-
 .../amqp/client/AmqpTransactionContext.java     |   6 +-
 .../amqp/client/util/NoOpAsyncResult.java       |  40 +++
 .../amqp/interop/AmqpReceiverDrainTest.java     | 154 +++++++++++
 .../amqp/interop/AmqpReceiverTest.java          |  28 --
 .../src/test/resources/log4j.properties         |   4 +-
 17 files changed, 969 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
index d4fe301..798e356 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
@@ -142,7 +142,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
     }
 
     /**
-     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * Shortcut method to hand off an ActiveMQ Command to the broker and assign
      * a ResponseHandler to deal with any reply from the broker.
      *
      * @param command
@@ -153,7 +153,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
     }
 
     /**
-     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * Shortcut method to hand off an ActiveMQ Command to the broker and assign
      * a ResponseHandler to deal with any reply from the broker.
      *
      * @param command

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 45b67ab..2997d7f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -173,6 +173,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
 
         this.protonTransport.bind(this.protonConnection);
         this.protonTransport.setChannelMax(CHANNEL_MAX);
+        this.protonTransport.setEmitFlowEventOnSend(false);
 
         this.protonConnection.collect(eventCollector);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 8cf7033..6c30828 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -81,7 +81,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final ConsumerInfo consumerInfo;
     private final boolean presettle;
 
-    private int currentCredit;
     private boolean draining;
     private long lastDeliveredSequenceId;
 
@@ -101,7 +100,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
         super(session, endpoint);
 
-        this.currentCredit = endpoint.getRemoteCredit();
         this.consumerInfo = consumerInfo;
         this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
     }
@@ -120,7 +118,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         if (!isClosed() && isOpened()) {
             RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
             removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-            sendToActiveMQ(removeCommand, null);
+            sendToActiveMQ(removeCommand);
 
             session.unregisterSender(getConsumerId());
         }
@@ -133,7 +131,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         if (!isClosed() && isOpened()) {
             RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
             removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-            sendToActiveMQ(removeCommand, null);
+            sendToActiveMQ(removeCommand);
 
             if (consumerInfo.isDurable()) {
                 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
@@ -141,7 +139,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 rsi.setSubscriptionName(getEndpoint().getName());
                 rsi.setClientId(session.getConnection().getClientId());
 
-                sendToActiveMQ(rsi, null);
+                sendToActiveMQ(rsi);
             }
 
             session.unregisterSender(getConsumerId());
@@ -152,17 +150,13 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     @Override
     public void flow() throws Exception {
-        int updatedCredit = getEndpoint().getCredit();
-
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
-                      currentCredit, draining, getEndpoint().getDrain(),
+            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
+                      draining, getEndpoint().getDrain(),
                       getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
         }
 
-        if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
-            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
-            draining = true;
+        if (getEndpoint().getDrain() && !draining) {
 
             // Revert to a pull consumer.
             ConsumerControl control = new ConsumerControl();
@@ -170,35 +164,42 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             control.setDestination(getDestination());
             control.setPrefetch(0);
 
-            LOG.trace("Flow: Pull case -> consumer control with prefetch (0)");
+            LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
+
+            sendToActiveMQ(control);
+
+            if (endpoint.getCredit() > 0) {
+                draining = true;
 
-            sendToActiveMQ(control, null);
+                // Now request dispatch of the drain amount, we request immediate
+                // timeout and an completion message regardless so that we can know
+                // when we should marked the link as drained.
+                MessagePull pullRequest = new MessagePull();
+                pullRequest.setConsumerId(getConsumerId());
+                pullRequest.setDestination(getDestination());
+                pullRequest.setTimeout(-1);
+                pullRequest.setAlwaysSignalDone(true);
+                pullRequest.setQuantity(endpoint.getCredit());
 
-            // Now request dispatch of the drain amount, we request immediate
-            // timeout and an completion message regardless so that we can know
-            // when we should marked the link as drained.
-            MessagePull pullRequest = new MessagePull();
-            pullRequest.setConsumerId(getConsumerId());
-            pullRequest.setDestination(getDestination());
-            pullRequest.setTimeout(-1);
-            pullRequest.setAlwaysSignalDone(true);
-            pullRequest.setQuantity(currentCredit);
+                LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
 
-            LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit);
+                sendToActiveMQ(pullRequest);
+            } else {
+                LOG.trace("Pull case -> sending any Queued messages and marking drained");
 
-            sendToActiveMQ(pullRequest, null);
-        } else if (updatedCredit != currentCredit) {
-            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
+                pumpOutbound();
+                getEndpoint().drained();
+                session.pumpProtonToSocket();
+            }
+        } else {
             ConsumerControl control = new ConsumerControl();
             control.setConsumerId(getConsumerId());
             control.setDestination(getDestination());
-            control.setPrefetch(currentCredit);
+            control.setPrefetch(getEndpoint().getCredit());
 
-            LOG.trace("Flow: update -> consumer control with prefetch (0)");
+            LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
 
-            sendToActiveMQ(control, null);
-        } else {
-            LOG.trace("Flow: no credit change -> no broker updates needed");
+            sendToActiveMQ(control);
         }
     }
 
@@ -415,14 +416,29 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                     // It's the end of browse signal in response to a MessagePull
                     getEndpoint().drained();
                     draining = false;
-                    currentCredit = 0;
                 } else {
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
-                                  getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(),
+                        LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
+                                  getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
                                   getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
                     }
 
+                    if (draining && getEndpoint().getCredit() == 0) {
+                        LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
+                        getEndpoint().drained();
+                        draining = false;
+                    } else {
+                        LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.",
+                                  getEndpoint().getName(), getEndpoint().getCredit());
+
+                        ConsumerControl control = new ConsumerControl();
+                        control.setConsumerId(getConsumerId());
+                        control.setDestination(getDestination());
+                        control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1));
+
+                        sendToActiveMQ(control);
+                    }
+
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
                     jms.setReadOnlyBody(true);
                     final EncodedMessage amqp = outboundTransformer.transform(jms);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
index eebceb0..d855c6b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -85,6 +86,45 @@ public class JMSClientTestSupport extends AmqpTestSupport {
         return amqpURI;
     }
 
+    protected URI getAmqpURI() {
+        return getAmqpURI("");
+    }
+
+    protected URI getAmqpURI(String uriOptions) {
+
+        boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
+
+        String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
+
+        if (uriOptions != null && !uriOptions.isEmpty()) {
+            if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {
+                uriOptions = uriOptions.substring(1);
+            }
+        } else {
+            uriOptions = "";
+        }
+
+        if (useSSL) {
+            amqpURI += "?transport.verifyHost=false";
+        }
+
+        if (!uriOptions.isEmpty()) {
+            if (useSSL) {
+                amqpURI += "&" + uriOptions;
+            } else {
+                amqpURI += "?" + uriOptions;
+            }
+        }
+
+        URI result = getBrokerURI();
+        try {
+            result = new URI(amqpURI);
+        } catch (URISyntaxException e) {
+        }
+
+        return result;
+    }
+
     protected Connection createConnection() throws JMSException {
         return createConnection(name.toString(), false);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 560edda..e979714 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -30,6 +35,7 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -186,13 +192,107 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
         assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
         SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
         assertNotNull(subscription);
+        LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
         assertTrue(subscription.getPrefetchSize() > 0);
 
         for (int i = 1; i <= MSG_COUNT; i++) {
             LOG.info("Trying to receive message: {}", i);
             TextMessage message = (TextMessage) consumer.receive(1000);
-            assertNotNull("Message " + i + "should be available", message);
+            assertNotNull("Message " + i + " should be available", message);
             assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
         }
+
+        session.commit();
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueTXRollbackAndCommitAsyncConsumer() throws Exception {
+        final int MSG_COUNT = 3;
+
+        final AtomicInteger counter = new AtomicInteger();
+
+        connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue destination = session.createQueue(getDestinationName());
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("Received Message {}", message.getJMSMessageID());
+                } catch (JMSException e) {
+                }
+                counter.incrementAndGet();
+            }
+        });
+
+        int msgIndex = 0;
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Sending message: {} to rollback", msgIndex++);
+            TextMessage message = session.createTextMessage("Rolled back Message: " + msgIndex);
+            message.setIntProperty("MessageSequence", msgIndex);
+            producer.send(message);
+        }
+
+        LOG.info("ROLLBACK of sent message here:");
+        session.rollback();
+
+        assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
+
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Sending message: {} to commit", msgIndex++);
+            TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
+            message.setIntProperty("MessageSequence", msgIndex);
+            producer.send(message);
+        }
+
+        LOG.info("COMMIT of sent message here:");
+        session.commit();
+
+        assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
+        SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
+        assertNotNull(subscription);
+        LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
+        assertTrue(subscription.getPrefetchSize() > 0);
+
+        assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return counter.get() == MSG_COUNT;
+            }
+        }));
+
+        LOG.info("COMMIT of first received batch here:");
+        session.commit();
+
+        assertTrue(subscription.getPrefetchSize() > 0);
+        for (int i = 1; i <= MSG_COUNT; i++) {
+            LOG.info("Sending message: {} to commit", msgIndex++);
+            TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
+            message.setIntProperty("MessageSequence", msgIndex);
+            producer.send(message);
+        }
+
+        LOG.info("COMMIT of next sent message batch here:");
+        session.commit();
+
+        LOG.info("WAITING -> for next three messages to arrive:");
+
+        assertTrue(subscription.getPrefetchSize() > 0);
+        assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Read {} messages so far", counter.get());
+                return counter.get() == MSG_COUNT * 2;
+            }
+        }));
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java
index dfcc108..070bb20 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java
@@ -19,13 +19,18 @@ package org.apache.activemq.transport.amqp;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.junit.ActiveMQTestRunner;
@@ -45,12 +50,12 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
 
     @Test(timeout = 60000)
-    @Repeat(repetitions = 1)
+    @Repeat(repetitions = 5)
     public void testBrowseAllInQueueZeroPrefetch() throws Exception {
 
         final int MSG_COUNT = 5;
 
-        JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0");
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
         connection = cf.createConnection();
         connection.start();
 
@@ -78,6 +83,242 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
         assertEquals(5, count);
     }
 
+    @Test(timeout = 40000)
+    public void testCreateQueueBrowser() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        session.createConsumer(queue).close();
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(0, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 40000)
+    public void testNoMessagesBrowserHasNoElements() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        session.createConsumer(queue).close();
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(0, proxy.getQueueSize());
+
+        Enumeration<?> enumeration = browser.getEnumeration();
+        assertFalse(enumeration.hasMoreElements());
+    }
+
+    @Test(timeout=30000)
+    public void testBroseOneInQueue() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("hello"));
+        producer.close();
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        while (enumeration.hasMoreElements()) {
+            Message m = (Message) enumeration.nextElement();
+            assertTrue(m instanceof TextMessage);
+            LOG.debug("Browsed message {} from Queue {}", m, queue);
+        }
+
+        browser.close();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 5)
+    public void testBrowseAllInQueue() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(name.getMethodName(), 5, false);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(5, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+            TimeUnit.MILLISECONDS.sleep(50);
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(5, count);
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 5)
+    public void testBrowseAllInQueuePrefetchOne() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=1"));
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(name.getMethodName(), 5, false);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(5, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(5, count);
+    }
+
+    @Test(timeout = 40000)
+    public void testBrowseAllInQueueTxSession() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(name.getMethodName(), 5, false);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(5, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(5, count);
+    }
+
+    @Test(timeout = 40000)
+    public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
+
+        connection = cf.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(name.getMethodName(), 5, false);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(5, proxy.getQueueSize());
+
+        // Send some TX work but don't commit.
+        MessageProducer txProducer = session.createProducer(queue);
+        for (int i = 0; i < 5; ++i) {
+            txProducer.send(session.createMessage());
+        }
+
+        assertEquals(5, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(5, count);
+
+        browser.close();
+
+        // Now check that all browser work did not affect the session transaction.
+        assertEquals(5, proxy.getQueueSize());
+        session.commit();
+        assertEquals(10, proxy.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testBrowseAllInQueueSmallPrefetch() throws Exception {
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=5"));
+
+        connection = cf.createConnection();
+        connection.start();
+
+        final int MSG_COUNT = 30;
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(getDestinationName());
+        sendMessages(name.getMethodName(), MSG_COUNT, false);
+
+        QueueViewMBean proxy = getProxyToQueue(getDestinationName());
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+
+        QueueBrowser browser = session.createBrowser(queue);
+        assertNotNull(browser);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int count = 0;
+        while (enumeration.hasMoreElements()) {
+            Message msg = (Message) enumeration.nextElement();
+            assertNotNull(msg);
+            LOG.debug("Recv: {}", msg);
+            count++;
+        }
+        assertFalse(enumeration.hasMoreElements());
+        assertEquals(MSG_COUNT, count);
+    }
+
     @Override
     protected boolean isUseOpenWireConnector() {
         return true;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 9d02027..e17a0c9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -152,6 +152,19 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
         connection.fireClientException(error);
     }
 
+    @Override
+    public void locallyClosed(AmqpConnection connection, Exception error) {
+        if (endpoint != null) {
+            // TODO: if this is a producer/consumer link then we may only be detached,
+            // rather than fully closed, and should respond appropriately.
+            endpoint.close();
+        }
+
+        LOG.info("Resource {} was locally closed", this);
+
+        connection.fireClientException(error);
+    }
+
     public E getEndpoint() {
         return this.endpoint;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 6c35e4c..00488b8 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -17,9 +17,6 @@
 package org.apache.activemq.transport.amqp.client;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.net.URI;
@@ -39,8 +36,10 @@ import org.apache.activemq.transport.InactivityIOException;
 import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
 import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
 import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IdGenerator;
+import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Collector;
@@ -54,10 +53,16 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+
 public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
 
+    private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
+
     private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
     // NOTE: Limit default channel max to signed short range to deal with
     //       brokers that don't currently handle the unsigned range well.
@@ -66,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
 
     public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
     public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
+    public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
 
     private final ScheduledExecutorService serializer;
     private final AtomicBoolean closed = new AtomicBoolean();
@@ -95,6 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private int channelMax = DEFAULT_CHANNEL_MAX;
     private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
     private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
+    private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
 
     public AmqpConnection(NettyTransport transport, String username, String password) {
         setEndpoint(Connection.Factory.create());
@@ -150,7 +157,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                     authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
                     open(future);
 
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(future);
                 }
             });
 
@@ -190,7 +197,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                             request.onSuccess();
                         }
 
-                        pumpToProtonTransport();
+                        pumpToProtonTransport(request);
                     } catch (Exception e) {
                         LOG.debug("Caught exception while closing proton connection");
                     }
@@ -241,7 +248,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                 session.setEndpoint(getEndpoint().session());
                 session.setStateInspector(getStateInspector());
                 session.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -355,6 +362,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         this.closeTimeout = closeTimeout;
     }
 
+    public long getDrainTimeout() {
+        return drainTimeout;
+    }
+
+    public void setDrainTimeout(long drainTimeout) {
+        this.drainTimeout = drainTimeout;
+    }
+
     public List<Symbol> getOfferedCapabilities() {
         return offeredCapabilities;
     }
@@ -439,6 +454,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     }
 
     void pumpToProtonTransport() {
+        pumpToProtonTransport(NOOP_REQUEST);
+    }
+
+    void pumpToProtonTransport(AsyncResult request) {
         try {
             boolean done = false;
             while (!done) {
@@ -454,6 +473,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
             }
         } catch (IOException e) {
             fireClientException(e);
+            request.onFailure(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 87aa36a..77a529d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -26,14 +26,17 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.InvalidDestinationException;
 
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -74,6 +77,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     private boolean presettle;
     private boolean noLocal;
 
+    private AsyncResult pullRequest;
+    private AsyncResult stopRequest;
+
     /**
      * Create a new receiver instance.
      *
@@ -133,7 +139,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                 public void run() {
                     checkClosed();
                     close(request);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                 }
             });
 
@@ -156,7 +162,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                 public void run() {
                     checkClosed();
                     detach(request);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                 }
             });
 
@@ -223,6 +229,108 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
+     * Request a remote peer send a Message to this client waiting until one arrives.
+     *
+     * @return the pulled AmqpMessage or null if none was pulled from the remote.
+     *
+     * @throws IOException if an error occurs
+     */
+    public AmqpMessage pull() throws IOException {
+        return pull(-1, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Request a remote peer send a Message to this client using an immediate drain request.
+     *
+     * @return the pulled AmqpMessage or null if none was pulled from the remote.
+     *
+     * @throws IOException if an error occurs
+     */
+    public AmqpMessage pullImmediate() throws IOException {
+        return pull(0, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Request a remote peer send a Message to this client.
+     *
+     *   {@literal timeout < 0} then it should remain open until a message is received.
+     *   {@literal timeout = 0} then it returns a message or null if none available
+     *   {@literal timeout > 0} then it should remain open for timeout amount of time.
+     *
+     * The timeout value when positive is given in milliseconds.
+     *
+     * @param timeout
+     *        the amount of time to tell the remote peer to keep this pull request valid.
+     * @param unit
+     *        the unit of measure that the timeout represents.
+     *
+     * @return the pulled AmqpMessage or null if none was pulled from the remote.
+     *
+     * @throws IOException if an error occurs
+     */
+    public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+
+                long timeoutMills = unit.toMillis(timeout);
+
+                try {
+                    LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills);
+                    if (timeoutMills < 0) {
+                        // Wait until message arrives. Just give credit if needed.
+                        if (getEndpoint().getCredit() == 0) {
+                            LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                            getEndpoint().flow(1);
+                        }
+
+                        // Await the message arrival
+                        pullRequest = request;
+                    } else if (timeoutMills == 0) {
+                        // If we have no credit then we need to issue some so that we can
+                        // try to fulfill the request, then drain down what is there to
+                        // ensure we consume what is available and remove all credit.
+                        if (getEndpoint().getCredit() == 0){
+                            LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                            getEndpoint().flow(1);
+                        }
+
+                        // Drain immediately and wait for the message(s) to arrive,
+                        // or a flow indicating removal of the remaining credit.
+                        stop(request);
+                    } else if (timeoutMills > 0) {
+                        // If we have no credit then we need to issue some so that we can
+                        // try to fulfill the request, then drain down what is there to
+                        // ensure we consume what is available and remove all credit.
+                        if (getEndpoint().getCredit() == 0) {
+                            LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                            getEndpoint().flow(1);
+                        }
+
+                        // Wait for the timeout for the message(s) to arrive, then drain if required
+                        // and wait for remaining message(s) to arrive or a flow indicating
+                        // removal of the remaining credit.
+                        stopOnSchedule(timeoutMills, request);
+                    }
+
+                    session.pumpToProtonTransport(request);
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+
+        return prefetch.poll();
+    }
+
+
+    /**
      * Controls the amount of credit given to the receiver link.
      *
      * @param credit
@@ -240,7 +348,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                 checkClosed();
                 try {
                     getEndpoint().flow(credit);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception e) {
                     request.onFailure(e);
@@ -269,7 +377,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                 checkClosed();
                 try {
                     getEndpoint().drain(credit);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception e) {
                     request.onFailure(e);
@@ -281,6 +389,31 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
+     * Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
+     *
+     * @throws IOException if an error occurs while sending the drain.
+     */
+    public void stop() throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    stop(request);
+                    session.pumpToProtonTransport(request);
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
      * Accepts a message that was dispatched under the given Delivery instance.
      *
      * @param delivery
@@ -318,7 +451,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                             delivery.settle();
                         }
                     }
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception e) {
                     request.onFailure(e);
@@ -360,7 +493,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                         disposition.setDeliveryFailed(deliveryFailed);
                         delivery.disposition(disposition);
                         delivery.settle();
-                        session.pumpToProtonTransport();
+                        session.pumpToProtonTransport(request);
                     }
                     request.onSuccess();
                 } catch (Exception e) {
@@ -397,7 +530,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                     if (!delivery.isSettled()) {
                         delivery.disposition(Released.getInstance());
                         delivery.settle();
-                        session.pumpToProtonTransport();
+                        session.pumpToProtonTransport(request);
                     }
                     request.onSuccess();
                 } catch (Exception e) {
@@ -454,6 +587,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         this.noLocal = noLocal;
     }
 
+    public long getDrainTimeout() {
+        return session.getConnection().getDrainTimeout();
+    }
+
     //----- Internal implementation ------------------------------------------//
 
     @Override
@@ -604,6 +741,15 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                     LOG.trace("{} has a partial incoming Message(s), deferring.", this);
                     incoming = null;
                 }
+            } else {
+                // We have exhausted the locally queued messages on this link.
+                // Check if we tried to stop and have now run out of credit.
+                if (getEndpoint().getRemoteCredit() <= 0) {
+                    if (stopRequest != null) {
+                        stopRequest.onSuccess();
+                        stopRequest = null;
+                    }
+                }
             }
         } while (incoming != null);
 
@@ -624,6 +770,35 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         // Store reference to envelope in delivery context for recovery
         incoming.setContext(amqpMessage);
         prefetch.add(amqpMessage);
+
+        // We processed a message, signal completion
+        // of a message pull request if there is one.
+        if (pullRequest != null) {
+            pullRequest.onSuccess();
+            pullRequest = null;
+        }
+    }
+
+    @Override
+    public void processFlowUpdates(AmqpConnection connection) throws IOException {
+        if (pullRequest != null || stopRequest != null) {
+            Receiver receiver = getEndpoint();
+            if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
+                if (pullRequest != null) {
+                    pullRequest.onSuccess();
+                    pullRequest = null;
+                }
+
+                if (stopRequest != null) {
+                    stopRequest.onSuccess();
+                    stopRequest = null;
+                }
+            }
+        }
+
+        LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit());
+
+        super.processFlowUpdates(connection);
     }
 
     protected Message decodeIncomingMessage(Delivery incoming) {
@@ -661,6 +836,61 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         }
     }
 
+    private void stop(final AsyncResult request) {
+        Receiver receiver = getEndpoint();
+        if (receiver.getRemoteCredit() <= 0) {
+            if (receiver.getQueued() == 0) {
+                // We have no remote credit and all the deliveries have been processed.
+                request.onSuccess();
+            } else {
+                // There are still deliveries to process, wait for them to be.
+                stopRequest = request;
+            }
+        } else {
+            // TODO: We don't actually want the additional messages that could be sent while
+            // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
+            // of drain if it was supported. We would first need to understand what happens
+            // if we reduce credit below the number of messages already in-flight before
+            // the peer sees the update.
+            stopRequest = request;
+            receiver.drain(0);
+
+            if (getDrainTimeout() > 0) {
+                // If the remote doesn't respond we will close the consumer and break any
+                // blocked receive or stop calls that are waiting.
+                final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.trace("Consumer {} drain request timed out", this);
+                        Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+                        locallyClosed(session.getConnection(), cause);
+                        stopRequest.onFailure(cause);
+                        session.pumpToProtonTransport(stopRequest);
+                    }
+                }, getDrainTimeout(), TimeUnit.MILLISECONDS);
+
+                stopRequest = new ScheduledRequest(future, stopRequest);
+            }
+        }
+    }
+
+    private void stopOnSchedule(long timeout, final AsyncResult request) {
+        LOG.trace("Receiver {} scheduling stop", this);
+        // We need to drain the credit if no message(s) arrive to use it.
+        final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+            @Override
+            public void run() {
+                LOG.trace("Receiver {} running scheduled stop", this);
+                if (getEndpoint().getRemoteCredit() != 0) {
+                    stop(request);
+                    session.pumpToProtonTransport(request);
+                }
+            }
+        }, timeout, TimeUnit.MILLISECONDS);
+
+        stopRequest = new ScheduledRequest(future, request);
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + "{ address = " + address + "}";
@@ -685,4 +915,37 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
 
     void postRollback() {
     }
+
+    //----- Inner classes used in message pull operations --------------------//
+
+    protected static final class ScheduledRequest implements AsyncResult {
+
+        private final ScheduledFuture<?> sheduledTask;
+        private final AsyncResult origRequest;
+
+        public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
+            this.sheduledTask = completionTask;
+            this.origRequest = origRequest;
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            sheduledTask.cancel(false);
+            origRequest.onFailure(cause);
+        }
+
+        @Override
+        public void onSuccess() {
+            boolean cancelled = sheduledTask.cancel(false);
+            if (cancelled) {
+                // Signal completion. Otherwise wait for the scheduled task to do it.
+                origRequest.onSuccess();
+            }
+        }
+
+        @Override
+        public boolean isComplete() {
+            return origRequest.isComplete();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
index af68c2d..95ff2d6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -92,6 +92,18 @@ public interface AmqpResource extends AmqpEventSink {
     void remotelyClosed(AmqpConnection connection);
 
     /**
+     * Called to indicate that the local end has become closed but the resource
+     * was not awaiting a close.  This could happen during an open request where
+     * the remote does not set an error condition or during normal operation.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     * @param error
+     *        The error that triggered the local close of this resource.
+     */
+    void locallyClosed(AmqpConnection connection, Exception error);
+
+    /**
      * Sets the failed state for this Resource and triggers a failure signal for
      * any pending ProduverRequest.
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 35fe56a..f9d6435 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -135,7 +135,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             public void run() {
                 try {
                     doSend(message, sendRequest);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(sendRequest);
                 } catch (Exception e) {
                     sendRequest.onFailure(e);
                     session.getConnection().fireClientException(e);
@@ -165,7 +165,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
                 public void run() {
                     checkClosed();
                     close(request);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index c0b097c..e327534 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
 import org.apache.qpid.proton.amqp.messaging.Source;
@@ -92,7 +93,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 sender.setStateInspector(getStateInspector());
                 sender.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -124,7 +125,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 sender.setStateInspector(getStateInspector());
                 sender.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -195,7 +196,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 receiver.setStateInspector(getStateInspector());
                 receiver.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -227,7 +228,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 receiver.setStateInspector(getStateInspector());
                 receiver.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -308,7 +309,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 receiver.setStateInspector(getStateInspector());
                 receiver.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -345,7 +346,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
                 checkClosed();
                 receiver.setStateInspector(getStateInspector());
                 receiver.open(request);
-                pumpToProtonTransport();
+                pumpToProtonTransport(request);
             }
         });
 
@@ -427,8 +428,8 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
         return connection.getProtonConnection();
     }
 
-    void pumpToProtonTransport() {
-        connection.pumpToProtonTransport();
+    void pumpToProtonTransport(AsyncResult request) {
+        connection.pumpToProtonTransport(request);
     }
 
     AmqpTransactionId getTransactionId() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
index 64f854b..cf5ef26 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -111,7 +111,7 @@ public class AmqpTransactionContext {
                     }
                 }
 
-                session.pumpToProtonTransport();
+                session.pumpToProtonTransport(request);
             }
         });
 
@@ -153,7 +153,7 @@ public class AmqpTransactionContext {
                 try {
                     LOG.info("Attempting to commit TX:[{}]", transactionId);
                     coordinator.discharge(transactionId, request, true);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                 } catch (Exception e) {
                     request.onFailure(e);
                 }
@@ -198,7 +198,7 @@ public class AmqpTransactionContext {
                 try {
                     LOG.info("Attempting to roll back TX:[{}]", transactionId);
                     coordinator.discharge(transactionId, request, false);
-                    session.pumpToProtonTransport();
+                    session.pumpToProtonTransport(request);
                 } catch (Exception e) {
                     request.onFailure(e);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java
new file mode 100644
index 0000000..9d59eac
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Simple NoOp implementation used when the result of the operation does not matter.
+ */
+public class NoOpAsyncResult implements AsyncResult {
+
+    public final static NoOpAsyncResult INSTANCE = new NoOpAsyncResult();
+
+    @Override
+    public void onFailure(Throwable result) {
+
+    }
+
+    @Override
+    public void onSuccess() {
+
+    }
+
+    @Override
+    public boolean isComplete() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
new file mode 100644
index 0000000..8379bfb
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Tests various behaviors of broker side drain support.
+ */
+public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testReceiverCanDrainMessages() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.drain(MSG_COUNT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+        }
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testPullWithNoMessageGetDrained() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        receiver.flow(10);
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+        assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+        assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testPullOneFromRemote() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+        AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+        receiver.close();
+
+        assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
+        assertEquals(1, queueView.getDispatchCount());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleZeroResultPulls() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        receiver.flow(10);
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+        assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+        assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+        assertNull(receiver.pull(1, TimeUnit.SECONDS));
+        assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+        assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+        connection.close();
+    }
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 1502bda..1226f3e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -342,34 +342,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testReceiverCanDrainMessages() throws Exception {
-        int MSG_COUNT = 20;
-        sendMessages(getTestName(), MSG_COUNT, false);
-
-        AmqpClient client = createAmqpClient();
-        AmqpConnection connection = client.connect();
-        AmqpSession session = connection.createSession();
-
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
-
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(MSG_COUNT, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
-
-        receiver.drain(MSG_COUNT);
-        for (int i = 0; i < MSG_COUNT; ++i) {
-            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull(message);
-            message.accept();
-        }
-        receiver.close();
-
-        assertEquals(0, queueView.getQueueSize());
-
-        connection.close();
-    }
-
-    @Test(timeout = 60000)
     public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
         AmqpClient client = createAmqpClient();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8916beea/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index be0efab..5208b3f 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp=INFO
 log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
 log4j.logger.org.fusesource=INFO
 
@@ -30,7 +30,7 @@ log4j.logger.org.apache.qpid.jms.provider=INFO
 log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
 log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
 
-# Console will only display warnnings
+# Console will only display warnings
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n


Mime
View raw message