qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-380 Implement JMS ConnectionConsumer functionality
Date Fri, 20 Apr 2018 18:44:18 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 8dd97074a -> a4fa85a02


QPIDJMS-380 Implement JMS ConnectionConsumer functionality

Add support for JMS ConnectionConsumer and add tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a4fa85a0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a4fa85a0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a4fa85a0

Branch: refs/heads/master
Commit: a4fa85a02149c97a27259c41b227915bd11a99c8
Parents: 8dd9707
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Sep 14 17:12:48 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Apr 20 14:35:40 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 205 +++++--
 .../apache/qpid/jms/JmsConnectionConsumer.java  | 289 ++++++++++
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  15 +-
 .../java/org/apache/qpid/jms/JmsSession.java    | 103 +++-
 .../jms/message/JmsInboundMessageDispatch.java  |  10 +
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |  20 +
 .../qpid/jms/policy/JmsPrefetchPolicy.java      |   2 +-
 .../qpid/jms/provider/amqp/AmqpConnection.java  |   9 +
 .../provider/amqp/AmqpConnectionSession.java    |  16 +
 .../qpid/jms/provider/amqp/AmqpProvider.java    |   9 +-
 .../org/apache/qpid/jms/JmsConnectionTest.java  |  38 --
 .../org/apache/qpid/jms/JmsSessionTest.java     |   5 -
 .../ConnectionConsumerIntegrationTest.java      | 540 +++++++++++++++++++
 .../failover/FailoverIntegrationTest.java       |  71 +++
 14 files changed, 1217 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 623a580..1ef72f7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -55,6 +55,7 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConnectionId;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -80,6 +81,9 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.apache.qpid.jms.util.PriorityMessageQueue;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
@@ -92,7 +96,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class);
 
-    private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<JmsSessionId, JmsSession>();
+    private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<>();
+    private final Map<JmsConsumerId, JmsConnectionConsumer> connectionConsumers = new ConcurrentHashMap<>();
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean closing = new AtomicBoolean();
@@ -105,15 +110,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     private JmsMessageFactory messageFactory;
     private Provider provider;
 
-    private final Set<JmsConnectionListener> connectionListeners =
-        new CopyOnWriteArraySet<JmsConnectionListener>();
-    private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations =
-        new ConcurrentHashMap<JmsTemporaryDestination, JmsTemporaryDestination>();
+    private final Set<JmsConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
+    private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations = new ConcurrentHashMap<>();
     private final AtomicLong sessionIdGenerator = new AtomicLong();
     private final AtomicLong tempDestIdGenerator = new AtomicLong();
     private final AtomicLong transactionIdGenerator = new AtomicLong();
-
-    private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>();
+    private final AtomicLong connectionConsumerIdGenerator = new AtomicLong();
+    private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>();
 
     protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
 
@@ -205,6 +208,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                     session.shutdown();
                 }
 
+                for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+                    connectionConsumer.shutdown();
+                }
+
                 if (isConnected() && !isFailed()) {
                     ProviderFuture request = new ProviderFuture();
                     requests.put(request, request);
@@ -274,6 +281,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             session.shutdown(cause);
         }
 
+        for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+            connectionConsumer.shutdown();
+        }
+
         if (isConnected() && !isFailed() && !closing.get()) {
             destroyResource(connectionInfo);
         }
@@ -345,9 +356,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         createJmsConnection();
         if (started.compareAndSet(false, true)) {
             try {
-                for (JmsSession s : sessions.values()) {
-                    s.start();
+                for (JmsSession session : sessions.values()) {
+                    session.start();
                 }
+
+                for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+                    connectionConsumer.start();
+                 }
             } catch (Exception e) {
                 throw JmsExceptionSupport.create(e);
             }
@@ -379,9 +394,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         }
 
         if (started.compareAndSet(true, false)) {
-            synchronized(sessions) {
-                for (JmsSession s : sessions.values()) {
-                    s.stop();
+            synchronized (sessions) {
+                for (JmsSession session : sessions.values()) {
+                    session.stop();
+                }
+            }
+
+            synchronized (connectionConsumers) {
+                for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+                    connectionConsumer.stop();
                 }
             }
         }
@@ -391,48 +412,95 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, false, true);
     }
 
     @Override
     public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, true);
     }
 
     @Override
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
-                                                              String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, false);
     }
 
     @Override
-    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
-                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, null, false, false);
     }
 
     @Override
-    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
-                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, null, false, false);
     }
 
     @Override
-    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
-                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
         createJmsConnection();
-        throw new JMSException("Not supported");
+
+        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, null, false, false);
     }
 
+    private ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, String subscriptionName, boolean durable, boolean shared) throws JMSException {
+        JmsDestination jmsDestination = JmsMessageTransformation.transformDestination(this, destination);
+
+        int configuredPrefetch = getPrefetchPolicy().getConfiguredPrefetch((JmsSession) null, jmsDestination, durable, false);
+
+        final MessageQueue messageQueue;
+
+        if (isLocalMessagePriority()) {
+            messageQueue = new PriorityMessageQueue();
+        } else {
+            messageQueue = new FifoMessageQueue(configuredPrefetch);
+        }
+
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue);
+        consumerInfo.setExplicitClientID(isExplicitClientID());
+        consumerInfo.setSelector(messageSelector);
+        consumerInfo.setDurable(durable);
+        consumerInfo.setSubscriptionName(subscriptionName);
+        consumerInfo.setShared(shared);
+        consumerInfo.setDestination(jmsDestination);
+        consumerInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
+        consumerInfo.setNoLocal(false);
+        consumerInfo.setBrowser(false);
+        consumerInfo.setPrefetchSize(configuredPrefetch);
+        consumerInfo.setRedeliveryPolicy(getRedeliveryPolicy().copy());
+        consumerInfo.setLocalMessageExpiry(isLocalMessageExpiry());
+        consumerInfo.setPresettle(false);
+        consumerInfo.setDeserializationPolicy(getDeserializationPolicy().copy());
+        consumerInfo.setMaxMessages(maxMessages);
+        consumerInfo.setConnectionConsumer(true);
+
+        JmsConnectionConsumer consumer = new JmsConnectionConsumer(this, consumerInfo, messageQueue, sessionPool);
+
+        try {
+            consumer.init();
+            if (started.get()) {
+                consumer.start();
+            }
+            return consumer;
+        } catch (JMSException jmsEx) {
+            consumer.close();
+            throw jmsEx;
+        }
+     }
+
     @Override
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
@@ -497,6 +565,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         sessions.put(sessionInfo.getId(), session);
     }
 
+    protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) throws JMSException {
+        connectionConsumers.remove(consumerInfo.getId());
+    }
+
+    protected void addConnectionConsumer(JmsConsumerInfo consumerInfo, JmsConnectionConsumer consumer) {
+        connectionConsumers.put(consumerInfo.getId(), consumer);
+    }
+
     private void createJmsConnection() throws JMSException {
         if (isConnected() || closed.get()) {
             return;
@@ -589,6 +665,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         return new JmsTransactionId(connectionInfo.getId(), transactionIdGenerator.incrementAndGet());
     }
 
+    protected JmsConsumerId getNextConnectionConsumerId() {
+        return new JmsConsumerId(connectionInfo.getId().toString(), -1, connectionConsumerIdGenerator.incrementAndGet());
+    }
+
     protected synchronized boolean isExplicitClientID() {
         return connectionInfo.isExplicitClientID();
     }
@@ -1106,6 +1186,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         JmsMessageDispatcher dispatcher = sessions.get(envelope.getConsumerId().getParentId());
         if (dispatcher != null) {
             dispatcher.onInboundMessage(envelope);
+        } else {
+            dispatcher = connectionConsumers.get(envelope.getConsumerId());
+            if (dispatcher != null) {
+                dispatcher.onInboundMessage(envelope);
+            }
         }
 
         // Run the application callbacks on the connection executor to allow the provider to
@@ -1176,6 +1261,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             request.sync();
         }
 
+        for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+            JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
+            if (consumerInfo.isOpen()) {
+                request = new ProviderFuture();
+                provider.create(consumerInfo, request);
+                request.sync();
+            }
+        }
+
         for (JmsSession session : sessions.values()) {
             session.onConnectionRecovery(provider);
         }
@@ -1188,6 +1282,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         setMessageFactory(provider.getMessageFactory());
         connectionInfo.setConnectedURI(provider.getRemoteURI());
 
+        for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+            JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
+            if (consumerInfo.isOpen()) {
+                ProviderFuture request = new ProviderFuture();
+                provider.start(consumerInfo, request);
+                request.sync();
+            }
+        }
+
         for (JmsSession session : sessions.values()) {
             session.onConnectionRecovered(provider);
         }
@@ -1317,14 +1420,22 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                     }
                 }
             } else if (resource instanceof JmsConsumerInfo) {
-                JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
-                JmsSession session = sessions.get(parentId);
-                if (session != null) {
-                    JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+                JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
+                if (consumerInfo.isConnectionConsumer()) {
+                    JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
                     if (consumer != null) {
                         consumer.setFailureCause(cause);
                     }
-                }
+                } else {
+                    JmsSessionId parentId = consumerInfo.getParentId();
+                    JmsSession session = sessions.get(parentId);
+                    if (session != null) {
+                        JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+                        if (consumer != null) {
+                            consumer.setFailureCause(cause);
+                        }
+                    }
+                 }
             }
 
             executor.execute(new Runnable() {
@@ -1351,16 +1462,32 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                             }
                         }
                     } else if (resource instanceof JmsConsumerInfo) {
-                        JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
-                        JmsSession session = sessions.get(parentId);
-                        if (session != null) {
-                            JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
-                            if (consumer != null) {
-                                for (JmsConnectionListener listener : connectionListeners) {
-                                    listener.onConsumerClosed(consumer, cause);
+                        JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
+                        if (consumerInfo.isConnectionConsumer()) {
+                            JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
+                             if (consumer != null) {
+                                try {
+                                    if (consumer != null) {
+                                        consumer.shutdown(cause);
+                                    }
+                                } catch (Throwable error) {
+                                    LOG.trace("Ignoring exception thrown during cleanup of closed connection consumer", error);
                                 }
+
+                                onAsyncException(new JMSException("Connection Consumer remotely closed").initCause(cause));
                             }
-                        }
+                        } else {
+                            JmsSessionId parentId = consumerInfo.getParentId();
+                            JmsSession session = sessions.get(parentId);
+                            if (session != null) {
+                                JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
+                                if (consumer != null) {
+                                    for (JmsConnectionListener listener : connectionListeners) {
+                                        listener.onConsumerClosed(consumer, cause);
+                                    }
+                                 }
+                             }
+                         }
                     } else {
                         LOG.info("A JMS resource has been remotely closed: {}", resource);
                     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
new file mode 100644
index 0000000..862b959
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMS Connection Consumer implementation.
+ */
+public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDispatcher {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionConsumer.class);
+
+    private static final long DEFAULT_DISPATCH_RETRY_DELAY = 1000;
+
+    private final JmsConnection connection;
+    private final JmsConsumerInfo consumerInfo;
+    private final ServerSessionPool sessionPool;
+    private final MessageQueue messageQueue;
+
+    private final Lock stateLock = new ReentrantLock();
+    private final Lock dispatchLock = new ReentrantLock();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+    private final ScheduledThreadPoolExecutor dispatcher;
+
+    public JmsConnectionConsumer(JmsConnection connection, JmsConsumerInfo consumerInfo, MessageQueue messageQueue, ServerSessionPool sessionPool) throws JMSException {
+        this.connection = connection;
+        this.consumerInfo = consumerInfo;
+        this.sessionPool = sessionPool;
+        this.messageQueue = messageQueue;
+        this.dispatcher = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName(this.getClass().getSimpleName() + ":(" + consumerInfo.getId() + ")");
+                return serial;
+            }
+        });
+
+        // Ensure a timely shutdown for consumer close.
+        dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+        connection.addConnectionConsumer(consumerInfo, this);
+        try {
+            connection.createResource(consumerInfo);
+        } catch (JMSException jmse) {
+            connection.removeConnectionConsumer(consumerInfo);
+            throw jmse;
+        }
+    }
+
+    public JmsConnectionConsumer init() throws JMSException {
+        getConnection().startResource(consumerInfo);
+        return this;
+    }
+
+    @Override
+    public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+        envelope.setConsumerInfo(consumerInfo);
+
+        stateLock.lock();
+        try {
+            if (envelope.isEnqueueFirst()) {
+                this.messageQueue.enqueueFirst(envelope);
+            } else {
+                this.messageQueue.enqueue(envelope);
+            }
+
+            if (messageQueue.isRunning()) {
+                try {
+                    dispatcher.execute(() -> deliverNextPending());
+                } catch (RejectedExecutionException rje) {
+                    LOG.debug("Rejected on attempt to queue message dispatch", rje);
+                }
+            }
+        } finally {
+            stateLock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (!closed.get()) {
+            doClose();
+        }
+    }
+
+    /**
+     * Called to initiate shutdown of consumer resources and request that the remote
+     * peer remove the registered producer.
+     *
+     * @throws JMSException if an error occurs during the consumer close operation.
+     */
+    protected void doClose() throws JMSException {
+        shutdown();
+        this.connection.destroyResource(consumerInfo);
+    }
+
+    protected void shutdown() throws JMSException {
+        shutdown(null);
+    }
+
+    protected void shutdown(Throwable cause) throws JMSException {
+        if (closed.compareAndSet(false, true)) {
+            dispatchLock.lock();
+            try {
+                failureCause.set(cause);
+                consumerInfo.setState(ResourceState.CLOSED);
+                connection.removeConnectionConsumer(consumerInfo);
+                stop(true);
+                dispatcher.shutdown();
+                try {
+                    dispatcher.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    LOG.trace("ConnectionConsumer shutdown of dispatcher was interupted");
+                }
+            } finally {
+                dispatchLock.unlock();
+            }
+        }
+    }
+
+    public void start() {
+        stateLock.lock();
+        try {
+            if (!messageQueue.isRunning()) {
+                this.messageQueue.start();
+                this.dispatcher.execute(new BoundedMessageDeliverTask(messageQueue.size()));
+            }
+        } finally {
+            stateLock.unlock();
+        }
+    }
+
+    public void stop() {
+        stop(false);
+    }
+
+    private void stop(boolean closeMessageQueue) {
+        dispatchLock.lock();
+        stateLock.lock();
+        try {
+            if (closeMessageQueue) {
+                this.messageQueue.close();
+            } else {
+                this.messageQueue.stop();
+            }
+        } finally {
+            stateLock.unlock();
+            dispatchLock.unlock();
+        }
+    }
+
+    @Override
+    public ServerSessionPool getServerSessionPool() throws JMSException {
+        checkClosed();
+        return sessionPool;
+    }
+
+    JmsConnection getConnection() {
+        return connection;
+    }
+
+    JmsConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    void setFailureCause(Throwable failureCause) {
+        this.failureCause.set(failureCause);
+    }
+
+    Throwable getFailureCause() {
+        return failureCause.get();
+    }
+
+    @Override
+    public String toString() {
+        return "JmsConnectionConsumer { id=" + consumerInfo.getId() + " }";
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (closed.get()) {
+            IllegalStateException jmsEx = null;
+
+            if (getFailureCause() == null) {
+                jmsEx = new IllegalStateException("The ConnectionConsumer is closed");
+            } else {
+                jmsEx = new IllegalStateException("The ConnectionConsumer was closed due to an unrecoverable error.");
+                jmsEx.initCause(getFailureCause());
+            }
+
+            throw jmsEx;
+        }
+    }
+
+    private boolean deliverNextPending() {
+        if (messageQueue.isRunning() && !messageQueue.isEmpty()) {
+            dispatchLock.lock();
+
+            try {
+                ServerSession serverSession = getServerSessionPool().getServerSession();
+                if (serverSession == null) {
+                    // There might not be an available session so queue a task to try again
+                    // and hope that by then one is available in the pool.
+                    dispatcher.schedule(new BoundedMessageDeliverTask(messageQueue.size()), DEFAULT_DISPATCH_RETRY_DELAY, TimeUnit.MILLISECONDS);
+                    return false;
+                }
+
+                Session session = serverSession.getSession();
+
+                JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
+
+                if (session instanceof JmsSession) {
+                    ((JmsSession) session).enqueueInSession(envelope);
+                } else {
+                    LOG.warn("ServerSession provided an onknown JMS Session type to this connection consumer: {}", session);
+                }
+
+                serverSession.start();
+            } catch (JMSException e) {
+                connection.onAsyncException(e);
+                stop(true);
+            } finally {
+                dispatchLock.unlock();
+            }
+        }
+
+        return !messageQueue.isEmpty();
+    }
+
+    private final class BoundedMessageDeliverTask implements Runnable {
+
+        private final int deliveryCount;
+
+        public BoundedMessageDeliverTask(int deliveryCount) {
+            this.deliveryCount = deliveryCount;
+        }
+
+        @Override
+        public void run() {
+            int current = 0;
+
+            while (messageQueue.isRunning() && current++ < deliveryCount) {
+                if (!deliverNextPending()) {
+                    return;  // Another task already drained the queue.
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 9bafd02..7a21e73 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -328,7 +328,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
                     performPullIfRequired(timeout, false);
-                } else if (redeliveryExceeded(envelope)) {
+                } else if (session.redeliveryExceeded(envelope)) {
                     LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                     applyRedeliveryPolicyOutcome(envelope);
                     if (timeout > 0) {
@@ -356,15 +356,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
-    protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
-        LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount());
-
-        JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
-        return redeliveryPolicy != null &&
-               redeliveryPolicy.getMaxRedeliveries(getDestination()) >= 0 &&
-               redeliveryPolicy.getMaxRedeliveries(getDestination()) < envelope.getRedeliveryCount();
-    }
-
     protected void checkClosed() throws IllegalStateException {
         if (closed.get()) {
             IllegalStateException jmsEx = null;
@@ -470,6 +461,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
      */
     @Override
     public void onInboundMessage(final JmsInboundMessageDispatch envelope) {
+        envelope.setConsumerInfo(consumerInfo);
+
         lock.lock();
         try {
             if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
@@ -720,7 +713,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                 if (consumeExpiredMessage(envelope)) {
                     LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                     doAckExpired(envelope);
-                } else if (redeliveryExceeded(envelope)) {
+                } else if (session.redeliveryExceeded(envelope)) {
                     LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                     applyRedeliveryPolicyOutcome(envelope);
                 } else {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 192cfc8..6152991 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms;
 
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
 import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -92,6 +94,8 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.selector.SelectorParser;
 import org.apache.qpid.jms.selector.filter.FilterException;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.NoOpExecutor;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
 import org.slf4j.Logger;
@@ -113,6 +117,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
     private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
     private MessageListener messageListener;
+    private final MessageQueue sessionQueue = new FifoMessageQueue(16);
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean started = new AtomicBoolean();
     private final JmsSessionInfo sessionInfo;
@@ -193,7 +198,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     @Override
     public void setMessageListener(MessageListener listener) throws JMSException {
-        checkClosed();
+        if (listener != null) {
+            checkClosed();
+        }
+
         this.messageListener = listener;
     }
 
@@ -253,17 +261,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     }
 
     @Override
-    public void run() {
-        try {
-            checkClosed();
-        } catch (IllegalStateException e) {
-            throw new RuntimeException(e);
-        }
-
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void close() throws JMSException {
         checkIsDeliveryThread();
         checkIsCompletionThread();
@@ -721,6 +718,55 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return connection.createTemporaryTopic();
     }
 
+    //----- Session dispatch support -----------------------------------------//
+
+    @Override
+    public void run() {
+        try {
+            checkClosed();
+        } catch (IllegalStateException ex) {
+            throw new RuntimeException(ex);
+        }
+
+        JmsInboundMessageDispatch envelope = null;
+        while ((envelope = sessionQueue.dequeueNoWait()) != null) {
+            try {
+                JmsMessage copy = null;
+
+                if (envelope.getMessage().isExpired()) {
+                    LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
+                    acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+                } else if (redeliveryExceeded(envelope)) {
+                    LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
+                    JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
+                    acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
+                } else {
+                    boolean deliveryFailed = false;
+
+                    copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
+
+                    clearSessionRecovered();
+
+                    try {
+                        messageListener.onMessage(copy);
+                    } catch (RuntimeException rte) {
+                        deliveryFailed = true;
+                    }
+
+                    if (!isSessionRecovered()) {
+                        if (!deliveryFailed) {
+                            acknowledge(envelope, ACK_TYPE.ACCEPTED);
+                        } else {
+                            acknowledge(envelope, ACK_TYPE.RELEASED);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                getConnection().onException(e);
+            }
+        }
+    }
+
     //----- Session Implementation methods -----------------------------------//
 
     protected void add(JmsMessageConsumer consumer) throws JMSException {
@@ -921,8 +967,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
-    void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+    JmsInboundMessageDispatch acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
         transactionContext.acknowledge(connection, envelope, ackType);
+        return envelope;
     }
 
     /**
@@ -1061,6 +1108,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             for (JmsMessageConsumer consumer : consumers.values()) {
                 consumer.start();
             }
+
+            sessionQueue.start();
         }
     }
 
@@ -1071,6 +1120,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             consumer.stop();
         }
 
+        sessionQueue.stop();
+
         synchronized (sessionInfo) {
             if (deliveryExecutor != null) {
                 deliveryExecutor.shutdown();
@@ -1280,6 +1331,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
+    boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
+        LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount());
+
+        JmsConsumerInfo consumerInfo = envelope.getConsumerInfo();
+
+        JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
+        return redeliveryPolicy != null &&
+               redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) >= 0 &&
+               redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) < envelope.getRedeliveryCount();
+    }
+
     //----- Event handlers ---------------------------------------------------//
 
     @Override
@@ -1354,16 +1416,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         if (id == null) {
             this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage()));
         }
-        if (messageListener != null) {
-            messageListener.onMessage(envelope.getMessage());
-        } else {
-            JmsMessageConsumer consumer = consumers.get(id);
-            if (consumer != null) {
-                consumer.onInboundMessage(envelope);
-            }
+
+        JmsMessageConsumer consumer = consumers.get(id);
+        if (consumer != null) {
+            consumer.onInboundMessage(envelope);
         }
     }
 
+    void enqueueInSession(JmsInboundMessageDispatch envelope) {
+        sessionQueue.enqueue(envelope);
+    }
+
     //----- Asynchronous Send Helpers ----------------------------------------//
 
     private final class FailOrCompleteAsyncCompletionsTask implements Runnable {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index fbb5a1d..e55426f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.message;
 
 import org.apache.qpid.jms.meta.JmsAbstractResourceId;
 import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
 
 /**
  * Envelope used to deliver incoming messages to their targeted consumer.
@@ -31,6 +32,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
     private boolean enqueueFirst;
     private boolean delivered;
 
+    private transient JmsConsumerInfo consumerInfo;
     private transient String stringView;
 
     public JmsInboundMessageDispatch(long sequence) {
@@ -83,6 +85,14 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
         return redeliveryCount;
     }
 
+    public JmsConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    public void setConsumerInfo(JmsConsumerInfo consumerInfo) {
+        this.consumerInfo = consumerInfo;
+    }
+
     @Override
     public String toString() {
         if (stringView == null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 74256ff..b2c1d07 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -38,6 +38,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
     private int acknowledgementMode;
     private boolean localMessageExpiry;
     private boolean presettle;
+    private boolean connectionConsumer;
+    private int maxMessages;
     private volatile boolean listener;
     private final MessageQueue messageQueue;
 
@@ -76,6 +78,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
         info.redeliveryPolicy = getRedeliveryPolicy().copy();
         info.deserializationPolicy = getDeserializationPolicy().copy();
         info.listener = listener;
+        info.connectionConsumer = connectionConsumer;
+        info.maxMessages = maxMessages;
     }
 
     public int getPrefetchedMessageCount() {
@@ -225,6 +229,22 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
         this.presettle = presettle;
     }
 
+    public boolean isConnectionConsumer() {
+        return connectionConsumer;
+    }
+
+    public void setConnectionConsumer(boolean connectionConsumer) {
+        this.connectionConsumer = connectionConsumer;
+    }
+
+    public int getMaxMessages() {
+        return maxMessages;
+    }
+
+    public void setMaxMessages(int maxMessages) {
+        this.maxMessages = maxMessages;
+    }
+
     @Override
     public String toString() {
         return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
index efc49de..2834c60 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
@@ -36,7 +36,7 @@ public interface JmsPrefetchPolicy {
      * Returns the prefetch value to use when creating a MessageConsumer instance.
      *
      * @param session
-     *      the Session that own the MessageConsumer being created.
+     *      the Session that own the MessageConsumer being created. (null for a ConnectionConsumer).
      * @param destination
      *      the Destination that the consumer will be subscribed to.
      * @param durable

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index c93107b..0c43e43 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -177,6 +177,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     /**
+     * Retrieves the AmqpConnectionSession owned by this AmqpConnection.
+     *
+     * @return the AmqpConnectionSession owned by this AmqpConnection.
+     */
+    public AmqpConnectionSession getConnectionSession() {
+        return connectionSession;
+    }
+
+    /**
      * @return true if anonymous producers should be cached or closed on send complete.
      */
     public boolean isAnonymousProducerCache() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 5c0b53f..beb5a0b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -85,11 +85,27 @@ public class AmqpConnectionSession extends AmqpSession {
     }
 
     @Override
+    public void addChildResource(AmqpResource resource) {
+        // When a Connection Consumer is created the Connection is doing so
+        // without a known session to associate it with, we link up the consumer
+        // to this session by adding this session as the provider hint on the
+        // consumer's parent session ID.
+        if (resource instanceof AmqpConsumer) {
+            AmqpConsumer consumer = (AmqpConsumer) resource;
+            consumer.getConsumerId().getParentId().setProviderHint(this);
+        }
+
+        super.addChildResource(resource);
+    }
+
+    @Override
     public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
         List<AsyncResult> pending = new ArrayList<>(pendingUnsubs.values());
         for (AsyncResult unsubscribeRequest : pending) {
             unsubscribeRequest.onFailure(cause);
         }
+
+        super.handleResourceClosure(provider, cause);
     }
 
     private static final class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 916e87c..176a3a0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -382,7 +382,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
                         @Override
                         public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
-                            AmqpSession session = connection.getSession(consumerInfo.getParentId());
+                            final AmqpSession session;
+
+                            if (consumerInfo.isConnectionConsumer()) {
+                                session = connection.getConnectionSession();
+                            } else {
+                                session = connection.getSession(consumerInfo.getParentId());
+                            }
+
                             session.createConsumer(consumerInfo, request);
                         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 9cf74c4..7577338 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -305,42 +305,4 @@ public class JmsConnectionTest {
         int minor = metaData.getProviderMinorVersion();
         assertTrue("Expected non-zero provider major(" + major + ") / minor(" + minor +") version.", (major + minor) != 0);
     }
-
-    //----- Currently these are unimplemented, these will fail after that ----//
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateConnectionConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createConnectionConsumer((JmsDestination) new JmsTopic(), "", null, 1);
-    }
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateConnectionTopicConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createConnectionConsumer(new JmsTopic(), "", null, 1);
-    }
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateConnectionQueueConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createConnectionConsumer(new JmsQueue(), "", null, 1);
-    }
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateDurableConnectionQueueConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createDurableConnectionConsumer(new JmsTopic(), "", "", null, 1);
-    }
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateSharedConnectionConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1);
-    }
-
-    @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateSharedDurableConnectionConsumer() throws Exception {
-        connection = new JmsConnection(connectionInfo, provider);
-        connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index 6dbb661..034ea59 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -364,11 +364,6 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
     public void testSessionRunFailsWhenSessionIsClosed() throws Exception {
         JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        try {
-            session.run();
-            fail("Not implemented");
-        } catch (UnsupportedOperationException usoe) {}
-
         session.close();
 
         try {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
new file mode 100644
index 0000000..934b385
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for expected behaviors of JMS Connection Consumer implementation.
+ */
+public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionConsumerIntegrationTest.class);
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    @Test(timeout = 20000)
+    public void testCreateConnectionConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool();
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            // No additional Begin calls as there's no Session created for a Connection Consumer
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectionConsumerDispatchesToSessionConnectionSratedBeforeCreate() throws Exception {
+        doTestConnectionConsumerDispatchesToSession(true);
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectionConsumerDispatchesToSessionConnectionSratedAfterCreate() throws Exception {
+        doTestConnectionConsumerDispatchesToSession(false);
+    }
+
+    private void doTestConnectionConsumerDispatchesToSession(boolean startBeforeCreate) throws Exception {
+        final CountDownLatch messageArrived = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            if (startBeforeCreate) {
+                connection.start();
+            }
+
+            testPeer.expectBegin();
+
+            // Create a session for our ServerSessionPool to use
+            Session session = connection.createSession();
+            session.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    messageArrived.countDown();
+                }
+            });
+            JmsServerSession serverSession = new JmsServerSession(session);
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            if (!startBeforeCreate) {
+                connection.start();
+            }
+
+            assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testNonStartedConnectionConsumerDoesNotDispatch() throws Exception {
+        final CountDownLatch messageArrived = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+
+            // Create a session for our ServerSessionPool to use
+            Session session = connection.createSession();
+            session.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    messageArrived.countDown();
+                }
+            });
+            JmsServerSession serverSession = new JmsServerSession(session);
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertFalse("Message Arrived unexpectedly", messageArrived.await(500, TimeUnit.MILLISECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDispositionThatIsReleasedAndSettled();
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testQueuedMessagesAreDrainedToServerSession() throws Exception {
+        final int MESSAGE_COUNT = 10;
+        final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
+        final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    messagesDispatched.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+
+            // Create a session for our ServerSessionPool to use
+            Session session = connection.createSession();
+            session.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    messagesArrived.countDown();
+                }
+            });
+
+            JmsServerSession serverSession = new JmsServerSession(session);
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+                testPeer.expectDispositionThatIsAcceptedAndSettled();
+            }
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
+            assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
+
+            connection.start();
+
+            assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConsumerRecoversAfterSessionPoolReturnsNullSession() throws Exception {
+        final int MESSAGE_COUNT = 10;
+        final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
+        final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    messagesDispatched.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+
+            // Create a session for our ServerSessionPool to use
+            Session session = connection.createSession();
+            session.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    messagesArrived.countDown();
+                }
+            });
+
+            JmsServerSession serverSession = new JmsServerSession(session);
+            JmsServerSessionPoolFirstAttemptGetsNull sessionPool = new JmsServerSessionPoolFirstAttemptGetsNull(serverSession);
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+                testPeer.expectDispositionThatIsAcceptedAndSettled();
+            }
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
+            assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
+
+            connection.start();
+
+            assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testRemotelyCloseConnectionConsumer() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch connectionError = new CountDownLatch(1);
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool();
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    connectionError.countDown();
+                }
+            });
+
+            // Create a consumer, then remotely end it afterwards.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            // Verify the consumer gets marked closed
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    try {
+                        consumer.getServerSessionPool();
+                    } catch (IllegalStateException jmsise) {
+                        LOG.debug("Error reported from consumer.getServerSessionPool()", jmsise);
+                        if (jmsise.getCause() != null) {
+                            String message = jmsise.getCause().getMessage();
+                            return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
+                                   message.contains(BREAD_CRUMB);
+                        } else {
+                            return false;
+                        }
+                    }
+                    return false;
+                }
+            }, 10000, 10));
+
+            assertTrue("Consumer closed callback didn't trigger", connectionError.await(5, TimeUnit.SECONDS));
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
+        final CountDownLatch exceptionFired = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    exceptionFired.countDown();
+                }
+            });
+
+            connection.start();
+
+            JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDispositionThatIsReleasedAndSettled();
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnExceptionFiredOnServerSessionFailure() throws Exception {
+        final CountDownLatch exceptionFired = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    exceptionFired.countDown();
+                }
+            });
+
+            connection.start();
+
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession());
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
+
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDispositionThatIsReleasedAndSettled();
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    //----- Internal ServerSessionPool ---------------------------------------//
+
+    private class JmsFailingServerSessionPool implements ServerSessionPool {
+
+        public JmsFailingServerSessionPool() {
+        }
+
+        @Override
+        public ServerSession getServerSession() throws JMSException {
+            throw new JMSException("Something is wrong with me");
+        }
+    }
+
+    private class JmsServerSessionPool implements ServerSessionPool {
+
+        private JmsServerSession serverSession;
+
+        public JmsServerSessionPool() {
+            this.serverSession = new JmsServerSession();
+        }
+
+        public JmsServerSessionPool(JmsServerSession serverSession) {
+            this.serverSession = serverSession;
+        }
+
+        @Override
+        public ServerSession getServerSession() throws JMSException {
+            return serverSession;
+        }
+    }
+
+    private class JmsServerSessionPoolFirstAttemptGetsNull implements ServerSessionPool {
+
+        private volatile boolean firstAttempt = true;
+        private JmsServerSession serverSession;
+
+        public JmsServerSessionPoolFirstAttemptGetsNull(JmsServerSession serverSession) {
+            this.serverSession = serverSession;
+        }
+
+        @Override
+        public ServerSession getServerSession() throws JMSException {
+            if (firstAttempt) {
+                firstAttempt = false;
+                return null;
+            } else {
+                return serverSession;
+            }
+        }
+    }
+
+    private class JmsServerSession implements ServerSession {
+
+        private final Session session;
+        private final ExecutorService runner = Executors.newSingleThreadExecutor();
+
+        public JmsServerSession() {
+            this.session = null;
+        }
+
+        public JmsServerSession(Session session) {
+            this.session = session;
+        }
+
+        @Override
+        public Session getSession() throws JMSException {
+            return session;
+        }
+
+        @Override
+        public void start() throws JMSException {
+            runner.execute(() -> {
+                session.run();
+            });
+        }
+    }
+
+    private class JmsFailingServerSession extends JmsServerSession {
+
+        public JmsFailingServerSession() {
+        }
+
+        @Override
+        public Session getSession() throws JMSException {
+            throw new JMSException("Something is wrong with me");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a4fa85a0/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 7f9cec5..63756b6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -47,6 +47,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
+import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
@@ -56,6 +57,7 @@ import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsQueue;
 import org.apache.qpid.jms.JmsResourceNotFoundException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -76,6 +78,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1844,6 +1847,74 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testConnectionConsumerRecreatedAfterReconnect() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            ServerSessionPool sessionPool = Mockito.mock(ServerSessionPool.class);
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlow();
+            originalPeer.dropAfterLastHandler();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            Queue queue = new JmsQueue("myQueue");
+            connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow();
+            finalPeer.expectClose();
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            // Shut it down
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message