qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/2] qpid-jms git commit: QPIDJMS-108 Refactoring the AMQP Provider resource management code.
Date Fri, 11 Sep 2015 18:00:18 GMT
QPIDJMS-108 Refactoring the AMQP Provider resource management code.



Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7be321e7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7be321e7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7be321e7

Branch: refs/heads/master
Commit: 7be321e7452d1e334819c1624e3d3e5d82294b47
Parents: b3f9546
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Sep 11 14:00:01 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Sep 11 14:00:01 2015 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpAbstractResource.java | 178 ++++----------
 .../amqp/AmqpAnonymousFallbackProducer.java     |  72 +++---
 .../qpid/jms/provider/amqp/AmqpConnection.java  | 162 +++----------
 .../provider/amqp/AmqpConnectionSession.java    |  60 +++--
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 169 ++------------
 .../qpid/jms/provider/amqp/AmqpEventSink.java   |  79 +++++++
 .../jms/provider/amqp/AmqpFixedProducer.java    |  79 +------
 .../jms/provider/amqp/AmqpJmsNoLocalType.java   |  44 ----
 .../jms/provider/amqp/AmqpJmsSelectorType.java  |  47 ----
 .../qpid/jms/provider/amqp/AmqpProducer.java    |  11 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 112 ++++++---
 .../qpid/jms/provider/amqp/AmqpResource.java    | 126 +---------
 .../jms/provider/amqp/AmqpResourceParent.java   |  43 ++++
 .../qpid/jms/provider/amqp/AmqpSession.java     | 121 +++++-----
 .../qpid/jms/provider/amqp/AmqpSupport.java     |  24 ++
 .../provider/amqp/AmqpTemporaryDestination.java | 113 +--------
 .../provider/amqp/AmqpTransactionContext.java   |  37 +--
 .../amqp/builders/AmqpConnectionBuilder.java    | 140 ++++++++++++
 .../builders/AmqpConnectionSessionBuilder.java  |  38 +++
 .../amqp/builders/AmqpConsumerBuilder.java      | 156 +++++++++++++
 .../amqp/builders/AmqpProducerBuilder.java      | 104 +++++++++
 .../amqp/builders/AmqpResourceBuilder.java      | 229 +++++++++++++++++++
 .../amqp/builders/AmqpSessionBuilder.java       |  85 +++++++
 .../AmqpTemporaryDestinationBuilder.java        | 118 ++++++++++
 .../builders/AmqpTransactionContextBuilder.java |  68 ++++++
 .../amqp/filters/AmqpJmsNoLocalType.java        |  44 ++++
 .../amqp/filters/AmqpJmsSelectorType.java       |  47 ++++
 .../integration/ConnectionIntegrationTest.java  |   2 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |   4 +-
 .../src/test/resources/log4j.properties         |   2 +-
 30 files changed, 1529 insertions(+), 985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 29bff50..336a296 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -37,61 +37,24 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
 
-    protected AsyncResult openRequest;
     protected AsyncResult closeRequest;
 
-    private E endpoint;
-    protected R resource;
-
-    /**
-     * Creates a new instance with the JmsResource provided, and sets the Endpoint to null.
-     *
-     * @param resource
-     *        The JmsResource instance that this AmqpResource is managing.
-     */
-    public AmqpAbstractResource(R resource) {
-        this(resource, null);
-    }
+    private final E endpoint;
+    private final R resourceInfo;
 
     /**
      * Creates a new instance with the JmsResource provided, and sets the Endpoint to the given value.
      *
-     * @param resource
+     * @param resourceInfo
      *        The JmsResource instance that this AmqpResource is managing.
      * @param endpoint
      *        The Proton Endpoint instance that this object maps to.
      */
-    public AmqpAbstractResource(R resource, E endpoint) {
-        this.resource = resource;
-        setEndpoint(endpoint);
-    }
-
-    @Override
-    public void open(AsyncResult request) {
-        this.openRequest = request;
-        doOpen();
-        getEndpoint().setContext(this);
-    }
-
-    @Override
-    public boolean isOpen() {
-        return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
-    }
-
-    @Override
-    public boolean isAwaitingOpen() {
-        return this.openRequest != null;
-    }
-
-    @Override
-    public void opened() {
-        if (this.openRequest != null) {
-            this.openRequest.onSuccess();
-            this.openRequest = null;
-        }
+    public AmqpAbstractResource(R resourceInfo, E endpoint) {
+        this.resourceInfo = resourceInfo;
+        this.endpoint = endpoint;
     }
 
-    @Override
     public void close(AsyncResult request) {
         // If already closed signal success or else the caller might never get notified.
         if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
@@ -111,18 +74,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         doClose();
     }
 
-    @Override
-    public boolean isClosed() {
-        return getEndpoint().getLocalState() == EndpointState.CLOSED;
-    }
-
-    @Override
-    public boolean isAwaitingClose() {
-        return this.closeRequest != null;
-    }
-
-    @Override
-    public void closed() {
+    public void resourceClosed() {
         endpoint.close();
         endpoint.free();
 
@@ -132,25 +84,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         }
     }
 
-    @Override
-    public void failed(Exception cause) {
-        if (openRequest != null) {
-            if (endpoint != null) {
-                // TODO: if this is a producer/consumer link then we may only be detached,
-                // rather than fully closed, and should respond appropriately.
-                endpoint.close();
-            }
-            openRequest.onFailure(cause);
-            openRequest = null;
-        }
-
-        if (closeRequest != null) {
-            closeRequest.onFailure(cause);
-            closeRequest = null;
-        }
-    }
-
-    @Override
     public void remotelyClosed(AmqpProvider provider) {
         Exception error = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
 
@@ -160,25 +93,46 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
             endpoint.close();
         }
 
-        LOG.info("Resource {} was remotely closed", getJmsResource());
+        LOG.info("Resource {} was remotely closed", getResourceInfo());
 
-        if (getJmsResource() instanceof JmsConnectionInfo) {
+        if (getResourceInfo() instanceof JmsConnectionInfo) {
             provider.fireProviderException(error);
         } else {
-            provider.fireResourceRemotelyClosed(getJmsResource(), error);
+            provider.fireResourceRemotelyClosed(getResourceInfo(), error);
         }
     }
 
+    /**
+     * Perform the close operation on the managed endpoint.  A subclass may
+     * override this method to provide additional close actions or alter the
+     * standard close path such as endpoint detach etc.
+     */
+    protected void doClose() {
+        getEndpoint().close();
+    }
+
+    //----- Access methods ---------------------------------------------------//
+
     public E getEndpoint() {
         return this.endpoint;
     }
 
-    public void setEndpoint(E endpoint) {
-        this.endpoint = endpoint;
+    public R getResourceInfo() {
+        return this.resourceInfo;
+    }
+
+    //----- Endpoint state access methods ------------------------------------//
+
+    public boolean isOpen() {
+        return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
     }
 
-    public R getJmsResource() {
-        return this.resource;
+    public boolean isClosed() {
+        return getEndpoint().getLocalState() == EndpointState.CLOSED;
+    }
+
+    public boolean isAwaitingClose() {
+        return this.closeRequest != null;
     }
 
     public EndpointState getLocalState() {
@@ -195,20 +149,18 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
         return getEndpoint().getRemoteState();
     }
 
-    protected boolean hasRemoteError() {
-        return getEndpoint().getRemoteCondition().getCondition() != null;
-    }
+    //----- AmqpResource implementation --------------------------------------//
 
     @Override
-    public void processRemoteOpen(AmqpProvider provider) throws IOException {
-        doOpenCompletion();
+    public final void processRemoteOpen(AmqpProvider provider) throws IOException {
+        // Open is handled by the resource builder
     }
 
     @Override
     public void processRemoteDetach(AmqpProvider provider) throws IOException {
         if (isAwaitingClose()) {
             LOG.debug("{} is now closed: ", this);
-            closed();
+            resourceClosed();
         } else {
             remotelyClosed(provider);
         }
@@ -218,18 +170,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
     public void processRemoteClose(AmqpProvider provider) throws IOException {
         if (isAwaitingClose()) {
             LOG.debug("{} is now closed: ", this);
-            closed();
-        } else if (isAwaitingOpen()) {
-            // Error on Open, create exception and signal failure.
-            LOG.warn("Open of {} failed: ", this);
-            Exception openError;
-            if (hasRemoteError()) {
-                openError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
-            } else {
-                openError = getOpenAbortException();
-            }
-
-            failed(openError);
+            resourceClosed();
         } else {
             remotelyClosed(provider);
         }
@@ -237,46 +178,11 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
 
     @Override
     public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+        // Nothing do be done here, subclasses can override as needed.
     }
 
     @Override
     public void processFlowUpdates(AmqpProvider provider) throws IOException {
-    }
-
-    /**
-     * Perform the open operation on the managed endpoint.  A subclass may
-     * override this method to provide additional open actions or configuration
-     * updates.
-     */
-    protected void doOpen() {
-        getEndpoint().open();
-    }
-
-    /**
-     * Complete the open operation on the managed endpoint. A subclass may
-     * override this method to provide additional verification actions or configuration
-     * updates.
-     */
-    protected void doOpenCompletion() {
-        LOG.debug("{} is now open: ", this);
-        opened();
-    }
-
-    /**
-     * When aborting the open operation, and there isn't an error condition,
-     * provided by the peer, the returned exception will be used instead.
-     * A subclass may override this method to provide alternative behavior.
-     */
-    protected Exception getOpenAbortException() {
-        return new IOException("Open failed unexpectedly.");
-    }
-
-    /**
-     * Perform the close operation on the managed endpoint.  A subclass may
-     * override this method to provide additional close actions or alter the
-     * standard close path such as endpoint detach etc.
-     */
-    protected void doClose() {
-        getEndpoint().close();
+        // Nothing do be done here, subclasses can override as needed.
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index f34f5f9..a541b91 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -27,6 +27,7 @@ import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
 import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.jms.util.LRUCache;
 import org.apache.qpid.proton.engine.EndpointState;
@@ -81,14 +82,13 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
 
             // We open a Fixed Producer instance with the target destination.  Once it opens
             // it will trigger the open event which will in turn trigger the send event.
-            producer = new AmqpFixedProducer(session, info);
-            producer.setPresettle(isPresettle());
-            AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope);
-            producer.open(open);
+            // If caching is disabled the created producer will be closed immediately.
+            AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
+            builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
 
             if (connection.isAnonymousProducerCache()) {
                 // Cache it in hopes of not needing to create large numbers of producers.
-                producerCache.put(envelope.getDestination(), producer);
+                producerCache.put(envelope.getDestination(), builder.getResource());
             }
 
             return true;
@@ -98,13 +98,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     }
 
     @Override
-    public void open(AsyncResult request) {
-        // Trigger an immediate open, we don't talk to the Broker until
-        // a send occurs so we must not let the client block.
-        request.onSuccess();
-    }
-
-    @Override
     public void close(AsyncResult request) {
         // Trigger an immediate close, the internal producers that are currently in the cache
         for (AmqpProducer producer : producerCache.values()) {
@@ -135,12 +128,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
 
     private abstract class AnonymousRequest extends WrappedAsyncResult {
 
-        protected final AmqpProducer producer;
         protected final JmsOutboundMessageDispatch envelope;
 
-        public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
+        public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope) {
             super(sendResult);
-            this.producer = producer;
             this.envelope = envelope;
         }
 
@@ -153,36 +144,51 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId());
             super.onFailure(result);
         }
+
+        public abstract AmqpProducer getProducer();
     }
 
-    private final class AnonymousOpenRequest extends AnonymousRequest {
+    private final class AnonymousSendRequest extends AnonymousRequest {
 
-        public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
-            super(sendResult, producer, envelope);
+        private final AmqpProducerBuilder producerBuilder;
+
+        public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
+            super(sendResult, envelope);
+
+            this.producerBuilder = producerBuilder;
         }
 
         @Override
         public void onSuccess() {
             LOG.trace("Open phase of anonymous send complete: {} ", getProducerId());
-            AnonymousSendRequest send = new AnonymousSendRequest(this);
+            AnonymousSendCompleteRequest send = new AnonymousSendCompleteRequest(this);
             try {
-                producer.send(envelope, send);
+                getProducer().send(envelope, send);
             } catch (Exception e) {
                 super.onFailure(e);
             }
         }
+
+        @Override
+        public AmqpProducer getProducer() {
+            return producerBuilder.getResource();
+        }
     }
 
-    private final class AnonymousSendRequest extends AnonymousRequest {
+    private final class AnonymousSendCompleteRequest extends AnonymousRequest {
 
-        public AnonymousSendRequest(AnonymousOpenRequest open) {
-            super(open.getWrappedRequest(), open.producer, open.envelope);
+        private final AmqpProducer producer;
+
+        public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
+            super(open.getWrappedRequest(), open.envelope);
+
+            this.producer = open.getProducer();
         }
 
         @Override
         public void onFailure(Throwable result) {
             // Ensure that cache get purged of any failed producers.
-            AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getJmsResource().getDestination());
+            AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getResourceInfo().getDestination());
             super.onFailure(result);
         }
 
@@ -196,12 +202,21 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
                 super.onSuccess();
             }
         }
+
+        @Override
+        public AmqpProducer getProducer() {
+            return producer;
+        }
     }
 
     private final class AnonymousCloseRequest extends AnonymousRequest {
 
-        public AnonymousCloseRequest(AnonymousSendRequest send) {
-            super(send.getWrappedRequest(), send.producer, send.envelope);
+        private final AmqpProducer producer;
+
+        public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) {
+            super(sendComplete.getWrappedRequest(), sendComplete.envelope);
+
+            this.producer = sendComplete.getProducer();
         }
 
         @Override
@@ -209,6 +224,11 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
             super.onSuccess();
         }
+
+        @Override
+        public AmqpProducer getProducer() {
+            return producer;
+        }
     }
 
     private final class CloseRequest implements AsyncResult {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index ebec59a..b3da9a5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -16,18 +16,13 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
-
 import java.net.URI;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsTemporaryDestination;
@@ -35,15 +30,14 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpSessionBuilder;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpTemporaryDestinationBuilder;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
-import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.jms.util.MetaDataSupport;
-import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Connection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Connection> {
+public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Connection> implements AmqpResourceParent {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
 
@@ -53,66 +47,34 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     private final Map<JmsSessionId, AmqpSession> sessions = new HashMap<JmsSessionId, AmqpSession>();
     private final Map<JmsDestination, AmqpTemporaryDestination> tempDests = new HashMap<JmsDestination, AmqpTemporaryDestination>();
     private final AmqpProvider provider;
-    private AmqpSaslAuthenticator authenticator;
-    private final AmqpConnectionSession connectionSession;
     private final AmqpConnectionProperties properties;
+    private AmqpConnectionSession connectionSession;
 
     private boolean objectMessageUsesAmqpTypes = false;
     private boolean anonymousProducerCache = false;
     private int anonymousProducerCacheSize = 10;
 
-    public AmqpConnection(AmqpProvider provider, Connection protonConnection, AmqpSaslAuthenticator authenticator, JmsConnectionInfo info) {
+    public AmqpConnection(AmqpProvider provider, JmsConnectionInfo info, Connection protonConnection) {
         super(info, protonConnection);
 
         this.provider = provider;
         this.remoteURI = provider.getRemoteURI();
         this.amqpMessageFactory = new AmqpJmsMessageFactory(this);
-        this.authenticator = authenticator;
 
-        this.resource.getConnectionId().setProviderHint(this);
+        getResourceInfo().getConnectionId().setProviderHint(this);
 
         // Create connection properties initialized with defaults from the JmsConnectionInfo
         this.properties = new AmqpConnectionProperties(info);
-
-        // Create a Session for this connection that is used for Temporary Destinations
-        // and perhaps later on management and advisory monitoring.
-        JmsSessionInfo sessionInfo = new JmsSessionInfo(this.resource, -1);
-        sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
-
-        this.connectionSession = new AmqpConnectionSession(this, sessionInfo);
     }
 
-    @Override
-    protected void doOpen() {
-        String hostname = provider.getVhost();
-        if(hostname == null) {
-            hostname = remoteURI.getHost();
-        } else if (hostname.isEmpty()) {
-            hostname = null;
-        }
-
-        Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
-        props.put(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME);
-        props.put(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION);
-        props.put(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS);
-
-        Connection conn = getEndpoint();
-        conn.setHostname(hostname);
-        conn.setContainer(resource.getClientId());
-        conn.setDesiredCapabilities(new Symbol[] { SOLE_CONNECTION_CAPABILITY });
-        conn.setProperties(props);
-
-        super.doOpen();
+    public void createSession(JmsSessionInfo sessionInfo, AsyncResult request) {
+        AmqpSessionBuilder builder = new AmqpSessionBuilder(this, sessionInfo);
+        builder.buildResource(request);
     }
 
-    public AmqpSession createSession(JmsSessionInfo sessionInfo) {
-        AmqpSession session = new AmqpSession(this, sessionInfo);
-        return session;
-    }
-
-    public AmqpTemporaryDestination createTemporaryDestination(JmsTemporaryDestination destination) {
-        AmqpTemporaryDestination temporary = new AmqpTemporaryDestination(connectionSession, destination);
-        return temporary;
+    public void createTemporaryDestination(JmsTemporaryDestination destination, AsyncResult request) {
+        AmqpTemporaryDestinationBuilder builder = new AmqpTemporaryDestinationBuilder(connectionSession, destination);
+        builder.buildResource(request);
     }
 
     public AmqpTemporaryDestination getTemporaryDestination(JmsTemporaryDestination destination) {
@@ -132,93 +94,39 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     @Override
-    protected void doOpenCompletion() {
-        properties.initialize(getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
-
-        // If the remote reports that the connection attempt failed then we can assume a
-        // close follows so do nothing and wait so a proper error can be constructed from
-        // the information in the remote close.
-        if (!properties.isConnectionOpenFailed()) {
-            connectionSession.open(new AsyncResult() {
-
-                @Override
-                public boolean isComplete() {
-                    return connectionSession.isOpen();
-                }
-
-                @Override
-                public void onSuccess() {
-                    LOG.debug("{} is now open: ", AmqpConnection.this);
-                    opened();
-                }
-
-                @Override
-                public void onFailure(Throwable result) {
-                    LOG.debug("AMQP Connection Session failed to open.");
-                    failed(IOExceptionSupport.create(result));
-                }
-            });
+    public void addChildResource(AmqpResource resource) {
+        if (resource instanceof AmqpConnectionSession) {
+            connectionSession = (AmqpConnectionSession) resource;
+        } else if (resource instanceof AmqpSession) {
+            AmqpSession session = (AmqpSession) resource;
+            sessions.put(session.getSessionId(), session);
+        } else if (resource instanceof AmqpTemporaryDestination) {
+            AmqpTemporaryDestination tempDest = (AmqpTemporaryDestination) resource;
+            tempDests.put(tempDest.getResourceInfo(), tempDest);
         }
     }
 
-    public void processSaslAuthentication() {
-        if (authenticator == null) {
-            return;
-        }
-
-        try {
-            if (authenticator.authenticate()) {
-                authenticator = null;
-            }
-        } catch (JMSSecurityException ex) {
-            try {
-                // TODO: this is a hack to stop Proton sending the open(+close) frame(s)
-                org.apache.qpid.proton.engine.Transport t = getEndpoint().getTransport();
-                t.close_head();
-            } finally {
-                failed(ex);
-            }
+    @Override
+    public void removeChildResource(AmqpResource resource) {
+        if (resource instanceof AmqpSession) {
+            AmqpSession session = (AmqpSession) resource;
+            sessions.remove(session.getSessionId());
+        } else if (resource instanceof AmqpTemporaryDestination) {
+            AmqpTemporaryDestination tempDest = (AmqpTemporaryDestination) resource;
+            tempDests.remove(tempDest.getResourceInfo());
         }
     }
 
-    void addTemporaryDestination(AmqpTemporaryDestination destination) {
-        tempDests.put(destination.getJmsDestination(), destination);
-    }
-
-    void removeTemporaryDestination(AmqpTemporaryDestination destination) {
-        tempDests.remove(destination.getJmsDestination());
-    }
-
-    void addSession(AmqpSession session) {
-        this.sessions.put(session.getSessionId(), session);
-    }
-
-    void removeSession(AmqpSession session) {
-        this.sessions.remove(session.getSessionId());
-    }
-
-    public JmsConnectionInfo getConnectionInfo() {
-        return this.resource;
-    }
-
     public Connection getProtonConnection() {
-        return this.getEndpoint();
+        return getEndpoint();
     }
 
     public URI getRemoteURI() {
-        return this.remoteURI;
-    }
-
-    public String getUsername() {
-        return this.resource.getUsername();
-    }
-
-    public String getPassword() {
-        return this.resource.getPassword();
+        return remoteURI;
     }
 
     public AmqpProvider getProvider() {
-        return this.provider;
+        return provider;
     }
 
     public String getQueuePrefix() {
@@ -249,7 +157,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         if (sessionId.getProviderHint() instanceof AmqpSession) {
             return (AmqpSession) sessionId.getProviderHint();
         }
-        return this.sessions.get(sessionId);
+        return sessions.get(sessionId);
     }
 
     /**
@@ -318,7 +226,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
      * @return the AMQP based JmsMessageFactory for this Connection.
      */
     public AmqpJmsMessageFactory getAmqpMessageFactory() {
-        return this.amqpMessageFactory;
+        return amqpMessageFactory;
     }
 
     /**
@@ -353,6 +261,6 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
 
     @Override
     public String toString() {
-        return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }";
+        return "AmqpConnection { " + getResourceInfo().getConnectionId() + " }";
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 935c9a4..2181e5d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -25,10 +25,12 @@ import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.NoOpAsyncResult;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +51,11 @@ public class AmqpConnectionSession extends AmqpSession {
      *        the connection that owns this session.
      * @param info
      *        the <code>JmsSessionInfo</code> for the Session to create.
+     * @param session
+     *        the Proton session instance that this resource wraps.
      */
-    public AmqpConnectionSession(AmqpConnection connection, JmsSessionInfo info) {
-        super(connection, info);
+    public AmqpConnectionSession(AmqpConnection connection, JmsSessionInfo info, Session session) {
+        super(connection, info, session);
     }
 
     /**
@@ -63,47 +67,70 @@ public class AmqpConnectionSession extends AmqpSession {
      *        the request that awaits the completion of this action.
      */
     public void unsubscribe(String subscriptionName, AsyncResult request) {
-        DurableSubscriptionReattach subscriber = new DurableSubscriptionReattach(getJmsResource(), subscriptionName);
-        DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(subscriber, request);
+        DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), subscriptionName);
+        DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(builder, request);
         pendingUnsubs.put(subscriptionName, subscribeRequest);
 
         LOG.debug("Attempting remove of subscription: {}", subscriptionName);
-        subscriber.open(subscribeRequest);
+        builder.buildResource(subscribeRequest);
     }
 
     private class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {
+
+        public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver) {
+            super(resource, receiver);
+        }
+
+        public String getSubscriptionName() {
+            return getEndpoint().getName();
+        }
+    }
+
+    private final class DurableSubscriptionReattachBuilder extends AmqpResourceBuilder<DurableSubscriptionReattach, AmqpSession, JmsSessionInfo, Receiver> {
+
         private final String subscriptionName;
 
-        public DurableSubscriptionReattach(JmsSessionInfo resource, String subscriptionName) {
-            super(resource, AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName));
+        public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String subscriptionName) {
+            super(parent, resourceInfo);
+
             this.subscriptionName = subscriptionName;
         }
 
         @Override
-        protected void doOpen() {
-            Receiver receiver = getEndpoint();
+        protected Receiver createEndpoint(JmsSessionInfo resourceInfo) {
+            Receiver receiver = getParent().getProtonSession().receiver(subscriptionName);
             receiver.setTarget(new Target());
             receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
             receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-            super.doOpen();
+
+            return receiver;
         }
 
-        public String getSubscriptionName() {
-            return subscriptionName;
+        @Override
+        protected DurableSubscriptionReattach createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Receiver endpoint) {
+            return new DurableSubscriptionReattach(resourceInfo, endpoint);
+        }
+
+        @Override
+        protected boolean isClosePending() {
+            // When no link terminus was created, the peer will now detach/close us otherwise
+            // we need to validate the returned remote source prior to open completion.
+            return endpoint.getRemoteSource() == null;
         }
     }
 
     private class DurableSubscriptionReattachRequest extends WrappedAsyncResult {
 
-        private final DurableSubscriptionReattach subscriber;
+        private final DurableSubscriptionReattachBuilder subscriberBuilder;
 
-        public DurableSubscriptionReattachRequest(DurableSubscriptionReattach subscriber, AsyncResult originalRequest) {
+        public DurableSubscriptionReattachRequest(DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) {
             super(originalRequest);
-            this.subscriber = subscriber;
+            this.subscriberBuilder = subscriberBuilder;
         }
 
         @Override
         public void onSuccess() {
+            DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
             LOG.trace("Reattached to subscription: {}", subscriber.getSubscriptionName());
             pendingUnsubs.remove(subscriber.getSubscriptionName());
             if (subscriber.getEndpoint().getRemoteSource() != null) {
@@ -116,9 +143,10 @@ public class AmqpConnectionSession extends AmqpSession {
 
         @Override
         public void onFailure(Throwable result) {
+            DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
             LOG.trace("Failed to reattach to subscription: {}", subscriber.getSubscriptionName());
             pendingUnsubs.remove(subscriber.getSubscriptionName());
-            subscriber.closed();
+            subscriber.resourceClosed();
             super.onFailure(result);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 58dfe26..62971a2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_UNDELIVERABLE;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -23,14 +25,12 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
@@ -41,24 +41,13 @@ import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderListener;
-import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
@@ -72,22 +61,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
-    protected static final Symbol COPY = Symbol.getSymbol("copy");
-    protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
-    protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
-
     private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
 
-    private static final Modified MODIFIED_FAILED = new Modified();
-    private static final Modified MODIFIED_UNDELIVERABLE = new Modified();
-
-    static {
-        MODIFIED_FAILED.setDeliveryFailed(true);
-
-        MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
-        MODIFIED_UNDELIVERABLE.setUndeliverableHere(true);
-    }
-
     protected final AmqpSession session;
     protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
     protected boolean presettle;
@@ -96,12 +71,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
     protected final AtomicLong incomingSequence = new AtomicLong(0);
 
-    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
-        super(info);
+    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
+        super(info, receiver);
+
         this.session = session;
 
         // Add a shortcut back to this Consumer for quicker lookups
-        this.resource.getConsumerId().setProviderHint(this);
+        getResourceInfo().getConsumerId().setProviderHint(this);
     }
 
     /**
@@ -162,110 +138,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     @Override
-    protected void doOpen() {
-        JmsDestination destination = resource.getDestination();
-        String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
-
-        Source source = new Source();
-        source.setAddress(subscription);
-        Target target = new Target();
-
-        configureSource(source);
-
-        String receiverName = "qpid-jms:receiver:" + getConsumerId() + ":" + subscription;
-        if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
-            // In the case of Durable Topic Subscriptions the client must use the same
-            // receiver name which is derived from the subscription name property.
-            receiverName = resource.getSubscriptionName();
-        }
-
-        Receiver receiver = session.getProtonSession().receiver(receiverName);
-        receiver.setSource(source);
-        receiver.setTarget(target);
-        if (isPresettle()) {
-            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
-        } else {
-            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        }
-        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
-        setEndpoint(receiver);
-
-        super.doOpen();
-    }
-
-    @Override
-    protected void doOpenCompletion() {
-        // Verify the attach response contained a non-null Source
-        org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
-        if (source != null) {
-            super.doOpenCompletion();
-        } else {
-            // No link terminus was created, the peer will now detach/close us.
-        }
-    }
-
-    @Override
-    protected Exception getOpenAbortException() {
-        // Verify the attach response contained a non-null Source
-        org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
-        if (source != null) {
-            return super.getOpenAbortException();
-        } else {
-            // No link terminus was created, the peer has detach/closed us, create IDE.
-            return new InvalidDestinationException("Link creation was refused");
-        }
-    }
-
-    @Override
-    public void opened() {
-        this.session.addResource(this);
-        super.opened();
-    }
-
-    @Override
-    public void closed() {
-        this.session.removeResource(this);
-        super.closed();
-    }
-
-    protected void configureSource(Source source) {
-        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
-        Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
-                                          Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
-
-        if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
-            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-            source.setDurable(TerminusDurability.UNSETTLED_STATE);
-            source.setDistributionMode(COPY);
-        } else {
-            source.setDurable(TerminusDurability.NONE);
-            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-        }
-
-        if (resource.isBrowser()) {
-            source.setDistributionMode(COPY);
-        }
-
-        Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(resource.getDestination());
-        if(typeCapability != null) {
-            source.setCapabilities(typeCapability);
-        }
-
-        source.setOutcomes(outcomes);
-        source.setDefaultOutcome(MODIFIED_FAILED);
-
-        if (resource.isNoLocal()) {
-            filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
-        }
-
-        if (resource.getSelector() != null && !resource.getSelector().trim().equals("")) {
-            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resource.getSelector()));
-        }
-
-        if (!filters.isEmpty()) {
-            source.setFilter(filters);
-        }
+    public void resourceClosed() {
+        this.session.removeChildResource(this);
+        super.resourceClosed();
     }
 
     /**
@@ -278,7 +153,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      * would already have been given for these so we just need to settle them.
      */
     public void acknowledge() {
-        LOG.trace("Session Acknowledge for consumer: {}", resource.getConsumerId());
+        LOG.trace("Session Acknowledge for consumer: {}", getResourceInfo().getConsumerId());
         for (Delivery delivery : delivered.values()) {
             delivery.disposition(Accepted.getInstance());
             delivery.settle();
@@ -326,7 +201,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
             LOG.debug("Consumed Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
-                if (session.isTransacted() && !resource.isBrowser()) {
+                if (session.isTransacted() && !getResourceInfo().isBrowser()) {
                     Binary txnId = session.getTransactionContext().getAmqpTransactionId();
                     if (txnId != null) {
                         TransactionalState txState = new TransactionalState();
@@ -359,13 +234,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      * or we are draining then we never send credit here.
      */
     private void sendFlowIfNeeded() {
-        if (resource.getPrefetchSize() == 0 || isDraining()) {
+        if (getResourceInfo().getPrefetchSize() == 0 || isDraining()) {
             return;
         }
 
         int currentCredit = getEndpoint().getCredit();
-        if (currentCredit <= resource.getPrefetchSize() * 0.2) {
-            int newCredit = resource.getPrefetchSize() - currentCredit;
+        if (currentCredit <= getResourceInfo().getPrefetchSize() * 0.2) {
+            int newCredit = getResourceInfo().getPrefetchSize() - currentCredit;
             LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), newCredit);
             getEndpoint().flow(newCredit);
         }
@@ -377,7 +252,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      * @throws Exception if an error occurs while performing the recover.
      */
     public void recover() throws Exception {
-        LOG.debug("Session Recover for consumer: {}", resource.getConsumerId());
+        LOG.debug("Session Recover for consumer: {}", getResourceInfo().getConsumerId());
         Collection<JmsInboundMessageDispatch> values = delivered.keySet();
         ArrayList<JmsInboundMessageDispatch> envelopes = new ArrayList<JmsInboundMessageDispatch>(values);
         ListIterator<JmsInboundMessageDispatch> reverseIterator = envelopes.listIterator(values.size());
@@ -422,7 +297,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 // try to fulfill the pull request, otherwise we want to drain down
                 // what is there to ensure we consume everything available.
                 if (getEndpoint().getCredit() == 0) {
-                    getEndpoint().drain(resource.getPrefetchSize() > 0 ? resource.getPrefetchSize() : 1);
+                    getEndpoint().drain(getResourceInfo().getPrefetchSize() > 0 ? getResourceInfo().getPrefetchSize() : 1);
                 } else {
                     getEndpoint().drain(0);
                 }
@@ -513,7 +388,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
         envelope.setMessage(message);
-        envelope.setConsumerId(resource.getConsumerId());
+        envelope.setConsumerId(getResourceInfo().getConsumerId());
         // Store link to delivery in the hint for use in acknowledge requests.
         envelope.setProviderHint(incoming);
         envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
@@ -541,7 +416,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     @Override
     protected void doClose() {
-        if (resource.isDurable()) {
+        if (getResourceInfo().isDurable()) {
             getEndpoint().detach();
         } else {
             getEndpoint().close();
@@ -557,11 +432,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     public JmsConsumerId getConsumerId() {
-        return this.resource.getConsumerId();
+        return this.getResourceInfo().getConsumerId();
     }
 
     public JmsDestination getDestination() {
-        return this.resource.getDestination();
+        return this.getResourceInfo().getDestination();
     }
 
     public Receiver getProtonReceiver() {
@@ -569,7 +444,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     public boolean isPresettle() {
-        return presettle || resource.isBrowser();
+        return presettle || getResourceInfo().isBrowser();
     }
 
     public boolean isDraining() {
@@ -582,7 +457,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     @Override
     public String toString() {
-        return "AmqpConsumer { " + this.resource.getConsumerId() + " }";
+        return "AmqpConsumer { " + getResourceInfo().getConsumerId() + " }";
     }
 
     protected void deliveryFailed(Delivery incoming) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
new file mode 100644
index 0000000..75c1056
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+/**
+ * Interface used by classes that want to process AMQP events sent from
+ * the transport layer.
+ */
+public interface AmqpEventSink {
+
+    /**
+     * Event handler for remote peer open of this resource.
+     *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteOpen(AmqpProvider provider) throws IOException;
+
+    /**
+     * Event handler for remote peer detach of this resource.
+     *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteDetach(AmqpProvider provider) throws IOException;
+
+    /**
+     * Event handler for remote peer close of this resource.
+     *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteClose(AmqpProvider provider) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Delivery related event has been triggered
+     * for the given endpoint.
+     *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processDeliveryUpdates(AmqpProvider provider) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Flow related event has been triggered
+     * for the given endpoint.
+     *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processFlowUpdates(AmqpProvider provider) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 3f4ccb4..c8ec42b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -23,29 +23,21 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
-import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.message.Message;
@@ -72,6 +64,10 @@ public class AmqpFixedProducer extends AmqpProducer {
         super(session, info);
     }
 
+    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
+        super(session, info, sender);
+    }
+
     @Override
     public void close(AsyncResult request) {
         // If any sends are held we need to wait for them to complete.
@@ -247,76 +243,17 @@ public class AmqpFixedProducer extends AmqpProducer {
         super.processDeliveryUpdates(provider);
     }
 
-    @Override
-    protected void doOpen() {
-        JmsDestination destination = resource.getDestination();
-        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
-
-        Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL };
-        String sourceAddress = getProducerId().toString();
-        Source source = new Source();
-        source.setAddress(sourceAddress);
-        source.setOutcomes(outcomes);
-        //TODO: default outcome. Accepted normally, Rejected for transaction controller?
-
-        Target target = new Target();
-        target.setAddress(targetAddress);
-        Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
-        if (typeCapability != null) {
-            target.setCapabilities(typeCapability);
-        }
-
-        String senderName = "qpid-jms:sender:" + sourceAddress + ":" + targetAddress;
-
-        Sender sender = session.getProtonSession().sender(senderName);
-        sender.setSource(source);
-        sender.setTarget(target);
-        if (presettle) {
-            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
-        } else {
-            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        }
-        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
-        setEndpoint(sender);
-
-        super.doOpen();
-    }
-
-    @Override
-    protected void doOpenCompletion() {
-        // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
-        if (target != null) {
-            super.doOpenCompletion();
-        } else {
-            // No link terminus was created, the peer will now detach/close us.
-        }
-    }
-
-    @Override
-    protected Exception getOpenAbortException() {
-        // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
-        if (target != null) {
-            return super.getOpenAbortException();
-        } else {
-            // No link terminus was created, the peer has detach/closed us, create IDE.
-            return new InvalidDestinationException("Link creation was refused");
-        }
-    }
-
     public AmqpSession getSession() {
-        return this.session;
+        return session;
     }
 
     public Sender getProtonSender() {
-        return this.getEndpoint();
+        return getEndpoint();
     }
 
     @Override
     public boolean isAnonymous() {
-        return this.resource.getDestination() == null;
+        return getResourceInfo().getDestination() == null;
     }
 
     @Override
@@ -326,7 +263,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     public boolean isPresettle() {
-        return this.presettle;
+        return presettle;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
deleted file mode 100644
index 24c6286..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-/**
- * A Described Type wrapper for JMS no local option for MessageConsumer.
- */
-public class AmqpJmsNoLocalType implements DescribedType {
-
-    public static final AmqpJmsNoLocalType NO_LOCAL = new AmqpJmsNoLocalType();
-
-    private final String noLocal;
-
-    public AmqpJmsNoLocalType() {
-        this.noLocal = "NoLocalFilter{}";
-    }
-
-    @Override
-    public Object getDescriptor() {
-        return UnsignedLong.valueOf(0x0000468C00000003L);
-    }
-
-    @Override
-    public Object getDescribed() {
-        return this.noLocal;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
deleted file mode 100644
index c6c2601..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-/**
- * A Described Type wrapper for JMS selector values.
- */
-public class AmqpJmsSelectorType implements DescribedType {
-
-    private final String selector;
-
-    public AmqpJmsSelectorType(String selector) {
-        this.selector = selector;
-    }
-
-    @Override
-    public Object getDescriptor() {
-        return UnsignedLong.valueOf(0x0000468C00000004L);
-    }
-
-    @Override
-    public Object getDescribed() {
-        return this.selector;
-    }
-
-    @Override
-    public String toString() {
-        return "AmqpJmsSelectorType{" + selector + "}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index a27820b..7fb8d53 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -36,12 +36,17 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
     protected boolean presettle;
 
     public AmqpProducer(AmqpSession session, JmsProducerInfo info) {
-        super(info);
+        this(session, info, null);
+    }
+
+    public AmqpProducer(AmqpSession session, JmsProducerInfo info, Sender endpoint) {
+        super(info, endpoint);
+
         this.session = session;
         this.connection = session.getConnection();
 
         // Add a shortcut back to this Producer for quicker lookup.
-        this.resource.getProducerId().setProviderHint(this);
+        getResourceInfo().getProducerId().setProviderHint(this);
     }
 
     /**
@@ -69,7 +74,7 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
      * @return the JmsProducerId that was assigned to this AmqpProducer.
      */
     public JmsProducerId getProducerId() {
-        return this.resource.getProducerId();
+        return getResourceInfo().getProducerId();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index bce9282..648ad40 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
 
 import org.apache.qpid.jms.JmsTemporaryDestination;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -57,6 +58,7 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
 import org.apache.qpid.jms.transports.SSLTransport;
 import org.apache.qpid.jms.transports.TransportFactory;
 import org.apache.qpid.jms.transports.TransportListener;
@@ -87,7 +89,7 @@ import org.slf4j.LoggerFactory;
  * All work within this Provider is serialized to a single Thread.  Any asynchronous exceptions
  * will be dispatched from that Thread and all in-bound requests are handled there as well.
  */
-public class AmqpProvider implements Provider, TransportListener {
+public class AmqpProvider implements Provider, TransportListener , AmqpResourceParent {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProvider.class);
 
@@ -102,6 +104,7 @@ public class AmqpProvider implements Provider, TransportListener {
 
     private ProviderListener listener;
     private AmqpConnection connection;
+    private AmqpSaslAuthenticator authenticator;
     private org.apache.qpid.jms.transports.Transport transport;
     private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE;
     private String vhost;
@@ -125,6 +128,7 @@ public class AmqpProvider implements Provider, TransportListener {
     private final ScheduledExecutorService serializer;
     private final Transport protonTransport = Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
+    private final Connection protonConnection = Connection.Factory.create();
 
     private AsyncResult connectionOpenRequest;
     private ScheduledFuture<?> nextIdleTimeoutCheck;
@@ -253,22 +257,19 @@ public class AmqpProvider implements Provider, TransportListener {
                     resource.visit(new JmsResourceVistor() {
                         @Override
                         public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
-                            AmqpSession session = connection.createSession(sessionInfo);
-                            session.open(request);
+                            connection.createSession(sessionInfo, request);
                         }
 
                         @Override
                         public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
                             AmqpSession session = connection.getSession(producerInfo.getParentId());
-                            AmqpProducer producer = session.createProducer(producerInfo);
-                            producer.open(request);
+                            session.createProducer(producerInfo, request);
                         }
 
                         @Override
                         public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                             AmqpSession session = connection.getSession(consumerInfo.getParentId());
-                            AmqpConsumer consumer = session.createConsumer(consumerInfo);
-                            consumer.open(request);
+                            session.createConsumer(consumerInfo, request);
                         }
 
                         @Override
@@ -278,17 +279,15 @@ public class AmqpProvider implements Provider, TransportListener {
                             sendTimeout = connectionInfo.getSendTimeout();
                             requestTimeout = connectionInfo.getRequestTimeout();
 
-                            Connection protonConnection = Connection.Factory.create();
-
                             if (getMaxFrameSize() > 0) {
                                 protonTransport.setMaxFrameSize(getMaxFrameSize());
                             }
+
                             protonTransport.setChannelMax(getChannelMax());
                             protonTransport.setIdleTimeout(idleTimeout);
                             protonTransport.bind(protonConnection);
                             protonConnection.collect(protonCollector);
 
-                            AmqpSaslAuthenticator authenticator = null;
                             if (saslLayer) {
                                 Sasl sasl = protonTransport.sasl();
                                 sasl.client();
@@ -305,7 +304,7 @@ public class AmqpProvider implements Provider, TransportListener {
                                 authenticator = new AmqpSaslAuthenticator(sasl, connectionInfo, getLocalPrincipal(), saslMechanisms);
                             }
 
-                            connection = new AmqpConnection(AmqpProvider.this, protonConnection, authenticator, connectionInfo);
+                            AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
                             AsyncResult wrappedOpenRequest = new AsyncResult() {
                                 @Override
                                 public void onSuccess() {
@@ -326,14 +325,13 @@ public class AmqpProvider implements Provider, TransportListener {
 
                             connectionOpenRequest = wrappedOpenRequest;
 
-                            connection.open(wrappedOpenRequest);
+                            builder.buildResource(wrappedOpenRequest);
                         }
 
                         @Override
                         public void processDestination(JmsTemporaryDestination destination) throws Exception {
                             if (destination.isTemporary()) {
-                                AmqpTemporaryDestination temporary = connection.createTemporaryDestination(destination);
-                                temporary.open(request);
+                                connection.createTemporaryDestination(destination, request);
                             } else {
                                 request.onSuccess();
                             }
@@ -705,6 +703,7 @@ public class AmqpProvider implements Provider, TransportListener {
                 // Process the state changes from the latest data and then answer back
                 // any pending updates to the Broker.
                 processUpdates();
+                LOG.info("Pumping proton transport");
                 pumpToProtonTransport();
             }
         });
@@ -727,7 +726,7 @@ public class AmqpProvider implements Provider, TransportListener {
                     if (!closed.get()) {
                         fireProviderException(error);
                         if (connection != null) {
-                            connection.closed();
+                            connection.resourceClosed();
                         }
                     }
                 }
@@ -752,7 +751,7 @@ public class AmqpProvider implements Provider, TransportListener {
                         fireProviderException(new IOException("Transport connection remotely closed."));
                         //TODO: close the proton transport as well/instead?
                         if (connection != null) {
-                            connection.closed();
+                            connection.resourceClosed();
                         }
                     }
                 }
@@ -768,43 +767,43 @@ public class AmqpProvider implements Provider, TransportListener {
                     LOG.trace("New Proton Event: {}", protonEvent.getType());
                 }
 
-                AmqpResource amqpResource = null;
+                AmqpEventSink amqpEventSink = null;
                 switch (protonEvent.getType()) {
                     case CONNECTION_REMOTE_CLOSE:
-                        amqpResource = (AmqpResource) protonEvent.getConnection().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case CONNECTION_REMOTE_OPEN:
-                        amqpResource = (AmqpResource) protonEvent.getConnection().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case SESSION_REMOTE_CLOSE:
-                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case SESSION_REMOTE_OPEN:
-                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case LINK_REMOTE_CLOSE:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case LINK_REMOTE_DETACH:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteDetach(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteDetach(this);
                         break;
                     case LINK_REMOTE_OPEN:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case LINK_FLOW:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processFlowUpdates(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processFlowUpdates(this);
                         break;
                     case DELIVERY:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processDeliveryUpdates(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processDeliveryUpdates(this);
                         break;
                     default:
                         break;
@@ -814,15 +813,32 @@ public class AmqpProvider implements Provider, TransportListener {
             }
 
             // We have to do this to pump SASL bytes in as SASL is not event driven yet.
-            if (connection != null) {
-                connection.processSaslAuthentication();
-            }
+            processSaslAuthentication();
         } catch (Exception ex) {
             LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex);
             fireProviderException(ex);
         }
     }
 
+    private void processSaslAuthentication() {
+        if (authenticator == null) {
+            return;
+        }
+
+        try {
+            if (authenticator.authenticate()) {
+                authenticator = null;
+            }
+        } catch (JMSSecurityException ex) {
+            try {
+                org.apache.qpid.proton.engine.Transport t = protonConnection.getTransport();
+                t.close_head();
+            } finally {
+                fireProviderException(ex);
+            }
+        }
+    }
+
     protected boolean pumpToProtonTransport() {
         return pumpToProtonTransport(NOOP_REQUEST);
     }
@@ -898,6 +914,18 @@ public class AmqpProvider implements Provider, TransportListener {
         }
     }
 
+    @Override
+    public void addChildResource(AmqpResource resource) {
+        if (resource instanceof AmqpConnection) {
+            this.connection = (AmqpConnection) resource;
+        }
+    }
+
+    @Override
+    public void removeChildResource(AmqpResource resource) {
+        // No need to remove resources
+    }
+
     //---------- Property Setters and Getters --------------------------------//
 
     @Override
@@ -1086,6 +1114,14 @@ public class AmqpProvider implements Provider, TransportListener {
         return remoteURI;
     }
 
+    public Transport getProtonTransport() {
+        return protonTransport;
+    }
+
+    public Connection getProtonConnection() {
+        return protonConnection;
+    }
+
     ScheduledExecutorService getScheduler() {
         return this.serializer;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
index e1b7148..da558ab 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -16,136 +16,12 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.io.IOException;
-
-import org.apache.qpid.jms.provider.AsyncResult;
-
 /**
  * AmqpResource specification.
  *
  * All AMQP types should implement this interface to allow for control of state
  * and configuration details.
  */
-public interface AmqpResource {
-
-    /**
-     * Perform all the work needed to open this resource and store the request
-     * until such time as the remote peer indicates the resource has become active.
-     *
-     * @param request
-     *        The initiating request that triggered this open call.
-     */
-    void open(AsyncResult request);
-
-    /**
-     * @return if the resource has moved to the opened state on the remote.
-     */
-    boolean isOpen();
-
-    /**
-     * @return true if the resource is awaiting the remote end to signal opened.
-     */
-    boolean isAwaitingOpen();
-
-    /**
-     * Called to indicate that this resource is now remotely opened.  Once opened a
-     * resource can start accepting incoming requests.
-     */
-    void opened();
-
-    /**
-     * Perform all work needed to close this resource and store the request
-     * until such time as the remote peer indicates the resource has been closed.
-     *
-     * @param request
-     *        The initiating request that triggered this close call.
-     */
-    void close(AsyncResult request);
-
-    /**
-     * @return if the resource has moved to the closed state on the remote.
-     */
-    boolean isClosed();
-
-    /**
-     * @return true if the resource is awaiting the remote end to signal closed.
-     */
-    boolean isAwaitingClose();
-
-    /**
-     * Called to indicate that this resource is now remotely closed.  Once closed a
-     * resource can not accept any incoming requests.
-     */
-    void closed();
-
-    /**
-     * Called to indicate that the remote end has become closed but the resource
-     * was not awaiting an open/close completion.
-     *
-     * @param provider
-     *        a reference to the AMQP provider to use to send the remote close event.
-     */
-    void remotelyClosed(AmqpProvider provider);
-
-    /**
-     * Sets the failed state for this Resource and triggers a failure signal for
-     * any pending ProduverRequest.
-     *
-     * @param cause
-     *        The Exception that triggered the failure.
-     */
-    void failed(Exception cause);
-
-    /**
-     * Event handler for remote peer open of this resource.
-     *
-     * @param provider
-     *        the AmqpProvider instance for easier access to fire events.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteOpen(AmqpProvider provider) throws IOException;
-
-    /**
-     * Event handler for remote peer detach of this resource.
-     *
-     * @param provider
-     *        the AmqpProvider instance for easier access to fire events.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteDetach(AmqpProvider provider) throws IOException;
-
-    /**
-     * Event handler for remote peer close of this resource.
-     *
-     * @param provider
-     *        the AmqpProvider instance for easier access to fire events.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteClose(AmqpProvider provider) throws IOException;
-
-    /**
-     * Called when the Proton Engine signals an Delivery related event has been triggered
-     * for the given endpoint.
-     *
-     * @param provider
-     *        the AmqpProvider instance for easier access to fire events.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processDeliveryUpdates(AmqpProvider provider) throws IOException;
-
-    /**
-     * Called when the Proton Engine signals an Flow related event has been triggered
-     * for the given endpoint.
-     *
-     * @param provider
-     *        the AmqpProvider instance for easier access to fire events.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processFlowUpdates(AmqpProvider provider) throws IOException;
+public interface AmqpResource extends AmqpEventSink {
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
new file mode 100644
index 0000000..6486719
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+/**
+ * Interface for any object that can will manage the lifetime of AmqpResource
+ * based object instances.
+ */
+public interface AmqpResourceParent {
+
+    /**
+     * Adds the given resource as a child of this resource so that it's
+     * lifetime becomes managed by that of its parent.
+     *
+     * @param resource
+     *      The AmqpResource that is a child of this one.
+     */
+    void addChildResource(AmqpResource resource);
+
+    /**
+     * Removes the given resource from the registered child resources
+     * managed by this one.
+     *
+     * @param resource
+     *      The AmqpResource that is no longer a child of this one.
+     */
+    void removeChildResource(AmqpResource resource);
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message