qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] qpid-jms git commit: QPIDJMS-108 Refactoring the AMQP Provider resource management code.
Date Fri, 11 Sep 2015 18:00:17 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master b3f954619 -> 7be321e74


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 96f4719..1123b97 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -31,58 +31,30 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpConsumerBuilder;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
 import org.apache.qpid.proton.engine.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
+public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> implements AmqpResourceParent {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
 
     private final AmqpConnection connection;
-    private final AmqpTransactionContext txContext;
+    private AmqpTransactionContext txContext;
 
     private final Map<JmsConsumerId, AmqpConsumer> consumers = new HashMap<JmsConsumerId, AmqpConsumer>();
 
-    public AmqpSession(AmqpConnection connection, JmsSessionInfo info) {
-        super(info, connection.getProtonConnection().session());
+    public AmqpSession(AmqpConnection connection, JmsSessionInfo info, Session session) {
+        super(info, session);
         this.connection = connection;
-
-        this.resource.getSessionId().setProviderHint(this);
-        if (this.resource.isTransacted()) {
-            txContext = new AmqpTransactionContext(this);
-        } else {
-            txContext = null;
-        }
-    }
-
-    @Override
-    public void opened() {
-        if (this.txContext != null) {
-            this.txContext.open(openRequest);
-        } else {
-            super.opened();
-        }
-    }
-
-    @Override
-    protected void doOpen() {
-        long outgoingWindow = getProvider().getSessionOutgoingWindow();
-
-        Session session = this.getEndpoint();
-        session.setIncomingCapacity(Integer.MAX_VALUE);
-        if (outgoingWindow >= 0) {
-            session.setOutgoingWindow(outgoingWindow);
-        }
-
-        this.connection.addSession(this);
-
-        super.doOpen();
+        getResourceInfo().getSessionId().setProviderHint(this);
     }
 
     @Override
     protected void doClose() {
-        this.connection.removeSession(this);
+        connection.removeChildResource(this);
         super.doClose();
     }
 
@@ -108,20 +80,20 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
         }
     }
 
-    public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
-        AmqpProducer producer = null;
-
-        if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported()) {
-            LOG.debug("Creating AmqpFixedProducer for: {}", producerInfo.getDestination());
-            producer = new AmqpFixedProducer(this, producerInfo);
-        } else {
+    public void createProducer(JmsProducerInfo producerInfo, AsyncResult request) {
+        if (producerInfo.getDestination() == null && !getConnection().getProperties().isAnonymousRelaySupported()) {
             LOG.debug("Creating an AmqpAnonymousFallbackProducer");
-            producer = new AmqpAnonymousFallbackProducer(this, producerInfo);
-        }
 
-        producer.setPresettle(connection.isPresettleProducers());
+            AmqpProducer producer = new AmqpAnonymousFallbackProducer(this, producerInfo);
+            producer.setPresettle(getConnection().isPresettleProducers());
 
-        return producer;
+            // No producer is created yet so this is always successful.
+            request.onSuccess();
+        } else {
+            LOG.debug("Creating AmqpFixedProducer for: {}", producerInfo.getDestination());
+            AmqpProducerBuilder builder = new AmqpProducerBuilder(this, producerInfo);
+            builder.buildResource(request);
+        }
     }
 
     public AmqpProducer getProducer(JmsProducerInfo producerInfo) {
@@ -136,10 +108,9 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
         return null;
     }
 
-    public AmqpConsumer createConsumer(JmsConsumerInfo consumerInfo) {
-        AmqpConsumer result = new AmqpConsumer(this, consumerInfo);
-        result.setPresettle(connection.isPresettleConsumers());
-        return result;
+    public void createConsumer(JmsConsumerInfo consumerInfo, AsyncResult request) {
+        AmqpConsumerBuilder builder = new AmqpConsumerBuilder(this, consumerInfo);
+        builder.buildResource(request);
     }
 
     public AmqpConsumer getConsumer(JmsConsumerInfo consumerInfo) {
@@ -150,11 +121,11 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
         if (consumerId.getProviderHint() instanceof AmqpConsumer) {
             return (AmqpConsumer) consumerId.getProviderHint();
         }
-        return this.consumers.get(consumerId);
+        return consumers.get(consumerId);
     }
 
     public AmqpTransactionContext getTransactionContext() {
-        return this.txContext;
+        return txContext;
     }
 
     /**
@@ -169,7 +140,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
-        if (!this.resource.isTransacted()) {
+        if (!getResourceInfo().isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -185,7 +156,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void commit(AsyncResult request) throws Exception {
-        if (!this.resource.isTransacted()) {
+        if (!getResourceInfo().isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -201,7 +172,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void rollback(AsyncResult request) throws Exception {
-        if (!this.resource.isTransacted()) {
+        if (!getResourceInfo().isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -227,12 +198,28 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
         return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS);
     }
 
-    void addResource(AmqpConsumer consumer) {
-        consumers.put(consumer.getConsumerId(), consumer);
+    @Override
+    public void addChildResource(AmqpResource resource) {
+        // delegate to the connection if the type is not managed here.
+        if (resource instanceof AmqpConsumer) {
+            AmqpConsumer consumer = (AmqpConsumer) resource;
+            consumers.put(consumer.getConsumerId(), consumer);
+        } else if (resource instanceof AmqpTransactionContext) {
+            txContext = (AmqpTransactionContext) resource;
+        } else {
+            connection.addChildResource(resource);
+        }
     }
 
-    void removeResource(AmqpConsumer consumer) {
-        consumers.remove(consumer.getConsumerId());
+    @Override
+    public void removeChildResource(AmqpResource resource) {
+        // delegate to the connection if the type is not managed here.
+        if (resource instanceof AmqpConsumer) {
+            AmqpConsumer consumer = (AmqpConsumer) resource;
+            consumers.remove(consumer.getConsumerId());
+        } else {
+            connection.removeChildResource(resource);
+        }
     }
 
     /**
@@ -246,7 +233,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
      */
     public boolean containsSubscription(String subscriptionName) {
         for (AmqpConsumer consumer : consumers.values()) {
-            if (subscriptionName.equals(consumer.getJmsResource().getSubscriptionName())) {
+            if (subscriptionName.equals(consumer.getResourceInfo().getSubscriptionName())) {
                 return true;
             }
         }
@@ -266,27 +253,27 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
     }
 
     public AmqpProvider getProvider() {
-        return this.connection.getProvider();
+        return connection.getProvider();
     }
 
     public AmqpConnection getConnection() {
-        return this.connection;
+        return connection;
     }
 
     public JmsSessionId getSessionId() {
-        return this.resource.getSessionId();
+        return getResourceInfo().getSessionId();
     }
 
     public Session getProtonSession() {
-        return this.getEndpoint();
+        return getEndpoint();
     }
 
     boolean isTransacted() {
-        return this.resource.isTransacted();
+        return getResourceInfo().isTransacted();
     }
 
     boolean isAsyncAck() {
-        return this.resource.isSendAcksAsync() || isTransacted();
+        return getResourceInfo().isSendAcksAsync() || isTransacted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index f23eb69..9e5356d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -26,6 +26,7 @@ import javax.jms.JMSSecurityException;
 
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -54,6 +55,29 @@ public class AmqpSupport {
     public static final Symbol VERSION = Symbol.valueOf("version");
     public static final Symbol PLATFORM = Symbol.valueOf("platform");
 
+    // Symbols used for receivers.
+    public static final Symbol COPY = Symbol.getSymbol("copy");
+    public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
+    public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+    public static final Modified MODIFIED_FAILED = new Modified();
+    public static final Modified MODIFIED_UNDELIVERABLE = new Modified();
+
+    // Temporary Destination constants
+    public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+    public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
+    public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
+
+    //----- Static initializer -----------------------------------------------//
+
+    static {
+        MODIFIED_FAILED.setDeliveryFailed(true);
+
+        MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
+        MODIFIED_UNDELIVERABLE.setUndeliverableHere(true);
+    }
+
+    //----- Utility Methods --------------------------------------------------//
+
     /**
      * Given an ErrorCondition instance create a new JMSException that best matches
      * the error type.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 2759e7a..ce87f94 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -16,23 +16,8 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsTemporaryDestination;
-import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Sender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Manages a Temporary Destination linked to a given Connection.
@@ -48,116 +33,32 @@ import org.slf4j.LoggerFactory;
  */
 public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryDestination, Sender> {
 
-    public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
-    private static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
-    private static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
-
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class);
-
     private final AmqpConnection connection;
     private final AmqpSession session;
 
-    public AmqpTemporaryDestination(AmqpSession session, JmsTemporaryDestination destination) {
-        super(destination);
+    public AmqpTemporaryDestination(AmqpSession session, JmsTemporaryDestination destination, Sender endpoint) {
+        super(destination, endpoint);
+
         this.session = session;
         this.connection = session.getConnection();
     }
 
     @Override
-    public void opened() {
-
-        // Once our producer is opened we can read the updated name from the target address.
-        String oldDestinationName = resource.getName();
-        String destinationName = getEndpoint().getRemoteTarget().getAddress();
-
-        this.resource.setName(destinationName);
-
-        LOG.trace("Updated temp destination to: {} from: {}", resource, oldDestinationName);
-
-        this.connection.addTemporaryDestination(this);
-
-        super.opened();
-    }
-
-    @Override
-    protected void doOpen() {
-        // Form a link name, use the local generated name with a prefix to aid debugging
-        String localDestinationName = resource.getName();
-        String senderLinkName = null;
-        if (resource.isQueue()) {
-            senderLinkName = "qpid-jms:" + TEMP_QUEUE_CREATOR + localDestinationName;
-        } else {
-            senderLinkName = "qpid-jms:" + TEMP_TOPIC_CREATOR + localDestinationName;
-        }
-
-        // Just use a bare Source, this is a producer which
-        // wont send anything and the link name is unique.
-        Source source = new Source();
-
-        Target target = new Target();
-        target.setDynamic(true);
-        target.setDurable(TerminusDurability.NONE);
-        target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-
-        // Set the dynamic node lifetime-policy
-        Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
-        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
-        target.setDynamicNodeProperties(dynamicNodeProperties);
-
-        // Set the capability to indicate the node type being created
-        if (resource.isQueue()) {
-            target.setCapabilities(AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY);
-        } else {
-            target.setCapabilities(AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY);
-        }
-
-        Sender sender = session.getProtonSession().sender(senderLinkName);
-        sender.setSource(source);
-        sender.setTarget(target);
-        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
-        setEndpoint(sender);
-
-        super.doOpen();
-    }
-
-    @Override
-    protected void doOpenCompletion() {
-        // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
-        if (target != null) {
-            super.doOpenCompletion();
-        } else {
-            // No link terminus was created, the peer will now detach/close us.
-        }
-    }
-
-    @Override
     protected void doClose() {
-        this.connection.removeTemporaryDestination(this);
-
+        connection.removeChildResource(this);
         super.doClose();
     }
 
     public AmqpConnection getConnection() {
-        return this.connection;
+        return connection;
     }
 
     public AmqpSession getSession() {
-        return this.session;
-    }
-
-    public Sender getProtonSender() {
-        return getEndpoint();
-    }
-
-    public JmsDestination getJmsDestination() {
-        return this.resource;
+        return session;
     }
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + " { " + resource + "}";
+        return getClass().getSimpleName() + " { " + getResourceInfo() + "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index c150bb5..486aef5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -31,15 +31,10 @@ import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.apache.qpid.proton.amqp.transaction.Declare;
 import org.apache.qpid.proton.amqp.transaction.Declared;
 import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transaction.TxnCapability;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.message.Message;
@@ -68,13 +63,18 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
     private AsyncResult pendingRequest;
 
     /**
-     * Creates a new AmqpTransaction instance.
+     * Creates a new AmqpTransactionContext instance.
      *
      * @param session
-     *        The session that owns this transaction
+     *        The session that owns this transaction context.
+     * @param resourceInfo
+     *        The resourceInfo that defines this transaction context.
+     * @param sender
+     *        The local sender endpoint for this transaction context.
      */
-    public AmqpTransactionContext(AmqpSession session) {
-        super(session.getJmsResource());
+    public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo, Sender sender) {
+        super(resourceInfo, sender);
+
         this.session = session;
     }
 
@@ -128,25 +128,6 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         }
     }
 
-    @Override
-    protected void doOpen() {
-        Coordinator coordinator = new Coordinator();
-        coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
-        Source source = new Source();
-
-        String coordinatorName = "qpid-jms:coordinator:" + resource.getSessionId().toString();
-
-        Sender sender = session.getProtonSession().sender(coordinatorName);
-        sender.setSource(source);
-        sender.setTarget(coordinator);
-        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
-        setEndpoint(sender);
-
-        super.doOpen();
-    }
-
     public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
         if (current != null) {
             throw new IOException("Begin called while a TX is still Active.");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
new file mode 100644
index 0000000..f4c290f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.Session;
+
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.jms.util.MetaDataSupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpConnectionSession instance.
+ */
+public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, AmqpProvider, JmsConnectionInfo, Connection> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectionBuilder.class);
+
+    public AmqpConnectionBuilder(AmqpProvider parent, JmsConnectionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    public void buildResource(final AsyncResult request) {
+
+        AsyncResult connectionRequest = new AsyncResult() {
+
+            @Override
+            public void onSuccess() {
+                // Create a Session for this connection that is used for Temporary Destinations
+                // and perhaps later on management and advisory monitoring.
+                JmsSessionInfo sessionInfo = new JmsSessionInfo(getResourceInfo(), -1);
+                sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
+
+                final AmqpConnectionSessionBuilder builder = new AmqpConnectionSessionBuilder(getResource(), sessionInfo);
+                builder.buildResource(new AsyncResult() {
+
+                    @Override
+                    public boolean isComplete() {
+                        return builder.getResource().isOpen();
+                    }
+
+                    @Override
+                    public void onSuccess() {
+                        LOG.debug("{} is now open: ", getResource());
+                        request.onSuccess();
+                    }
+
+                    @Override
+                    public void onFailure(Throwable result) {
+                        LOG.debug("AMQP Connection Session failed to open.");
+                        request.onFailure(result);
+                    }
+                });
+            }
+
+            @Override
+            public void onFailure(Throwable result) {
+                request.onFailure(result);
+            }
+
+            @Override
+            public boolean isComplete() {
+                return getResource().isOpen();
+            }
+        };
+
+        super.buildResource(connectionRequest);
+    }
+
+    @Override
+    protected void handleOpened(AmqpProvider provider) throws IOException {
+        // Initialize the connection properties so that the state of the remote can
+        // be determined, this allows us to check for close pending.
+        getResource().getProperties().initialize(
+            getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
+
+        super.handleOpened(provider);
+    }
+
+    @Override
+    protected Connection createEndpoint(JmsConnectionInfo resourceInfo) {
+        String hostname = getParent().getVhost();
+        if (hostname == null) {
+            hostname = getParent().getRemoteURI().getHost();
+        } else if (hostname.isEmpty()) {
+            hostname = null;
+        }
+
+        Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
+        props.put(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME);
+        props.put(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION);
+        props.put(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS);
+
+        Connection connection = getParent().getProtonConnection();
+        connection.setHostname(hostname);
+        connection.setContainer(resourceInfo.getClientId());
+        connection.setDesiredCapabilities(new Symbol[] { SOLE_CONNECTION_CAPABILITY });
+        connection.setProperties(props);
+
+        return connection;
+    }
+
+    @Override
+    protected AmqpConnection createResource(AmqpProvider parent, JmsConnectionInfo resourceInfo, Connection endpoint) {
+        return new AmqpConnection(parent, resourceInfo, endpoint);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        return getResource().getProperties().isConnectionOpenFailed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionSessionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionSessionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionSessionBuilder.java
new file mode 100644
index 0000000..d87b0eb
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionSessionBuilder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConnectionSession;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpConnectionSession instance.
+ */
+public class AmqpConnectionSessionBuilder extends AmqpSessionBuilder {
+
+    public AmqpConnectionSessionBuilder(AmqpConnection parent, JmsSessionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected AmqpSession createResource(AmqpConnection parent, JmsSessionInfo resourceInfo, Session endpoint) {
+        return new AmqpConnectionSession(parent, resourceInfo, endpoint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
new file mode 100644
index 0000000..50dece3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.COPY;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_NO_LOCAL_SYMBOL;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsNoLocalType;
+import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsSelectorType;
+import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpConsumer instance.
+ */
+public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpSession, JmsConsumerInfo, Receiver> {
+
+    public AmqpConsumerBuilder(AmqpSession parent, JmsConsumerInfo consumerInfo) {
+        super(parent, consumerInfo);
+    }
+
+    @Override
+    protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) {
+        JmsDestination destination = resourceInfo.getDestination();
+        String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+
+        Source source = new Source();
+        source.setAddress(subscription);
+        Target target = new Target();
+
+        configureSource(source);
+
+        String receiverName = "qpid-jms:receiver:" + resourceInfo.getConsumerId() + ":" + subscription;
+        if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
+            // In the case of Durable Topic Subscriptions the client must use the same
+            // receiver name which is derived from the subscription name property.
+            receiverName = resourceInfo.getSubscriptionName();
+        }
+
+        Receiver receiver = getParent().getProtonSession().receiver(receiverName);
+        receiver.setSource(source);
+        receiver.setTarget(target);
+        if (getParent().getConnection().isPresettleConsumers() || resourceInfo.isBrowser()) {
+            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        return receiver;
+    }
+
+    @Override
+    protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
+        return new AmqpConsumer(parent, resourceInfo, endpoint);
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null Source
+        org.apache.qpid.proton.amqp.transport.Source source = endpoint.getRemoteSource();
+        if (source != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, create IDE.
+            return new InvalidDestinationException("Link creation was refused");
+        }
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        org.apache.qpid.proton.amqp.transport.Source source = endpoint.getRemoteSource();
+
+        // When no link terminus was created, the peer will now detach/close us otherwise
+        // we need to validate the returned remote source prior to open completion.
+        return source == null;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void configureSource(Source source) {
+        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+        Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
+                                          Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
+
+        if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setDistributionMode(COPY);
+        } else {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+
+        if (resourceInfo.isBrowser()) {
+            source.setDistributionMode(COPY);
+        }
+
+        Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(resourceInfo.getDestination());
+        if(typeCapability != null) {
+            source.setCapabilities(typeCapability);
+        }
+
+        source.setOutcomes(outcomes);
+        source.setDefaultOutcome(MODIFIED_FAILED);
+
+        if (resourceInfo.isNoLocal()) {
+            filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
+        }
+
+        if (resourceInfo.getSelector() != null && !resourceInfo.getSelector().trim().equals("")) {
+            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resourceInfo.getSelector()));
+        }
+
+        if (!filters.isEmpty()) {
+            source.setFilter(filters);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
new file mode 100644
index 0000000..29a4740
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpFixedProducer;
+import org.apache.qpid.jms.provider.amqp.AmqpProducer;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpProducer instance.
+ */
+public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpSession, JmsProducerInfo, Sender> {
+
+    public AmqpProducerBuilder(AmqpSession parent, JmsProducerInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected Sender createEndpoint(JmsProducerInfo resourceInfo) {
+        JmsDestination destination = resourceInfo.getDestination();
+        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+
+        Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL };
+        String sourceAddress = resourceInfo.getProducerId().toString();
+        Source source = new Source();
+        source.setAddress(sourceAddress);
+        source.setOutcomes(outcomes);
+        // TODO: default outcome. Accepted normally, Rejected for transaction controller?
+
+        Target target = new Target();
+        target.setAddress(targetAddress);
+        Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
+        if (typeCapability != null) {
+            target.setCapabilities(typeCapability);
+        }
+
+        String senderName = "qpid-jms:sender:" + sourceAddress + ":" + targetAddress;
+
+        Sender sender = getParent().getProtonSession().sender(senderName);
+        sender.setSource(source);
+        sender.setTarget(target);
+        if (getParent().getConnection().isPresettleProducers()) {
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        return sender;
+    }
+
+    @Override
+    protected AmqpProducer createResource(AmqpSession parent, JmsProducerInfo resourceInfo, Sender endpoint) {
+        return new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+
+        // When no link terminus was created, the peer will now detach/close us otherwise
+        // we need to validate the returned remote source prior to open completion.
+        return target == null;
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null target
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+        if (target != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, create IDE.
+            return new InvalidDestinationException("Link creation was refused");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
new file mode 100644
index 0000000..eed2b20
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+import org.apache.qpid.jms.provider.amqp.AmqpResource;
+import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base for all AmqpResource builders.
+ */
+public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class);
+
+    protected AsyncResult request;
+    protected TARGET resource;
+    protected ENDPOINT endpoint;
+    protected final PARENT parent;
+    protected final INFO resourceInfo;
+
+    public AmqpResourceBuilder(PARENT parent, INFO resourceInfo) {
+        this.parent = parent;
+        this.resourceInfo = resourceInfo;
+    }
+
+    /**
+     * Called to initiate the process of building the resource type that is
+     * managed by this builder.  The resource is created and the open process
+     * occurs asynchronously.  If the resource is successfully opened it will
+     * added to its parent resource for use.
+     *
+     * @param request
+     *      The request that initiated the resource creation.
+     */
+    public void buildResource(AsyncResult request) {
+        this.request = request;
+
+        // Create the local end of the manage resource.
+        endpoint = createEndpoint(resourceInfo);
+        endpoint.setContext(this);
+        endpoint.open();
+
+        // Create the resource object now
+        resource = createResource(parent, resourceInfo, endpoint);
+    }
+
+    //----- Event handlers ---------------------------------------------------//
+
+    @Override
+    public void processRemoteOpen(AmqpProvider provider) throws IOException {
+        handleOpened(provider);
+    }
+
+    @Override
+    public void processRemoteClose(AmqpProvider provider) throws IOException {
+        handleClosed(provider);
+    }
+
+    @Override
+    public void processRemoteDetach(AmqpProvider provider) throws IOException {
+        // No implementation needed here for this event.
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+        // No implementation needed here for this event.
+    }
+
+    @Override
+    public void processFlowUpdates(AmqpProvider provider) throws IOException {
+        // No implementation needed here for this event.
+    }
+
+    //----- Standard open and close handlers ---------------------------------//
+
+    protected void handleOpened(AmqpProvider provider) throws IOException {
+
+        if (isClosePending()) {
+            return;
+        }
+
+        if (isOpenedEndpointValid()) {
+            afterOpened();
+
+            getEndpoint().setContext(resource);
+            getParent().addChildResource(resource);
+            getRequest().onSuccess();
+        } else {
+            getEndpoint().close();
+            getEndpoint().free();
+
+            // TODO: Perhaps the validate method should thrown an exception so that we
+            // can return a specific error message to the create initiator.
+            getRequest().onFailure(new IOException("Failed to open requested endpoint"));
+        }
+    }
+
+    protected void handleClosed(AmqpProvider provider) throws IOException {
+        // If the resource being built is closed during the creation process
+        // then this is always an error.
+
+        Exception openError;
+        if (hasRemoteError()) {
+            openError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+        } else {
+            openError = getOpenAbortException();
+        }
+
+        LOG.warn("Open of resource:({}) failed: {}", resourceInfo, openError.getMessage());
+
+        // This resource is now terminated.
+        getEndpoint().close();
+        getEndpoint().free();
+
+        getRequest().onFailure(openError);
+    }
+
+    //----- Implementation methods used to customize the build process -------//
+
+    /**
+     * Given the resource information provided create and configure the local endpoint
+     * whose open phase is managed by this builder.
+     *
+     * @return a new endpoint to be managed.
+     */
+    protected abstract ENDPOINT createEndpoint(INFO resourceInfo);
+
+    /**
+     * Create the managed resource instance.
+     *
+     * @param parent
+     *      The parent of the newly created resource.
+     * @param resourceInfo
+     *      The resource information used to configure the resource.
+     * @param endpoint
+     *      The local endpoint for the managed resource to wrap.
+     *
+     * @return the resource instance who open life-cycle is managed by this builder.
+     */
+    protected abstract TARGET createResource(PARENT parent, INFO resourceInfo, ENDPOINT endpoint);
+
+    /**
+     * If the resource was opened but its current state indicates a close is pending
+     * then we do no need to proceed further into the resource creation process.  Each
+     * endpoint build must implement this and examine the opened endpoint to determine
+     * if a close frame will follow the open.
+     *
+     * @return true if the resource state indicates it will be immediately closed.
+     */
+    protected abstract boolean isClosePending();
+
+    /**
+     * Following the open of the endpoint implementations of this method should validate
+     * that the endpoint properties match what was requested.
+     *
+     * @return true if the endpoint is valid based on what was requested.
+     */
+    protected boolean isOpenedEndpointValid() {
+        return true;
+    }
+
+    /**
+     * Called once an endpoint has been opened and validated to give the subclasses a
+     * place to perform any follow-on processing or setup steps before the operation
+     * is deemed to have been completed and success is signalled.
+     */
+    protected void afterOpened() {
+        // Nothing to do here.
+    }
+
+    protected boolean hasRemoteError() {
+        return getEndpoint().getRemoteCondition().getCondition() != null;
+    }
+
+    /**
+     * When aborting the open operation, and there isn't an error condition,
+     * provided by the peer, the returned exception will be used instead.
+     * A subclass may override this method to provide alternative behavior.
+     */
+    protected Exception getOpenAbortException() {
+        return new IOException("Open failed unexpectedly.");
+    }
+
+    //----- Public access methods for the managed resources ------------------//
+
+    public ENDPOINT getEndpoint() {
+        return endpoint;
+    }
+
+    public AsyncResult getRequest() {
+        return request;
+    }
+
+    public TARGET getResource() {
+        return resource;
+    }
+
+    public PARENT getParent() {
+        return parent;
+    }
+
+    public INFO getResourceInfo() {
+        return resourceInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
new file mode 100644
index 0000000..439f08c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpSession instance.
+ */
+public class AmqpSessionBuilder extends AmqpResourceBuilder<AmqpSession, AmqpConnection, JmsSessionInfo, Session> {
+
+    public AmqpSessionBuilder(AmqpConnection parent, JmsSessionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    public void buildResource(final AsyncResult request) {
+
+        AsyncResult opened = request;
+
+        if (getResourceInfo().isTransacted()) {
+            opened = new AsyncResult() {
+
+                @Override
+                public void onSuccess() {
+                    AmqpTransactionContextBuilder builder = new AmqpTransactionContextBuilder(getResource(), getResourceInfo());
+                    builder.buildResource(request);
+                }
+
+                @Override
+                public void onFailure(Throwable result) {
+                    request.onFailure(result);
+                }
+
+                @Override
+                public boolean isComplete() {
+                    return request.isComplete();
+                }
+            };
+        }
+
+        super.buildResource(opened);
+    }
+
+    @Override
+    protected Session createEndpoint(JmsSessionInfo resourceInfo) {
+        long outgoingWindow = getParent().getProvider().getSessionOutgoingWindow();
+
+        Session session = getParent().getProtonConnection().session();
+        session.setIncomingCapacity(Integer.MAX_VALUE);
+        if (outgoingWindow >= 0) {
+            session.setOutgoingWindow(outgoingWindow);
+        }
+
+        return session;
+    }
+
+    @Override
+    protected AmqpSession createResource(AmqpConnection parent, JmsSessionInfo resourceInfo, Session endpoint) {
+        return new AmqpSession(parent, resourceInfo, endpoint);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
new file mode 100644
index 0000000..93e0f91
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TEMP_QUEUE_CREATOR;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TEMP_TOPIC_CREATOR;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.JmsTemporaryDestination;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
+import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpTemporaryDestination instance.
+ */
+public class AmqpTemporaryDestinationBuilder extends AmqpResourceBuilder<AmqpTemporaryDestination, AmqpSession, JmsTemporaryDestination, Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestinationBuilder.class);
+
+    public AmqpTemporaryDestinationBuilder(AmqpSession parent, JmsTemporaryDestination resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected Sender createEndpoint(JmsTemporaryDestination resourceInfo) {
+        // Form a link name, use the local generated name with a prefix to aid debugging
+        String localDestinationName = resourceInfo.getName();
+        String senderLinkName = null;
+        if (resourceInfo.isQueue()) {
+            senderLinkName = "qpid-jms:" + TEMP_QUEUE_CREATOR + localDestinationName;
+        } else {
+            senderLinkName = "qpid-jms:" + TEMP_TOPIC_CREATOR + localDestinationName;
+        }
+
+        // Just use a bare Source, this is a producer which
+        // wont send anything and the link name is unique.
+        Source source = new Source();
+
+        Target target = new Target();
+        target.setDynamic(true);
+        target.setDurable(TerminusDurability.NONE);
+        target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+        // Set the dynamic node lifetime-policy
+        Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
+        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        target.setDynamicNodeProperties(dynamicNodeProperties);
+
+        // Set the capability to indicate the node type being created
+        if (resourceInfo.isQueue()) {
+            target.setCapabilities(AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY);
+        } else {
+            target.setCapabilities(AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY);
+        }
+
+        Sender sender = getParent().getProtonSession().sender(senderLinkName);
+        sender.setSource(source);
+        sender.setTarget(target);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        return sender;
+    }
+
+    @Override
+    protected AmqpTemporaryDestination createResource(AmqpSession parent, JmsTemporaryDestination resourceInfo, Sender endpoint) {
+        return new AmqpTemporaryDestination(getParent(), getResourceInfo(), endpoint);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+
+        // When no link terminus was created, the peer will now detach/close us otherwise
+        // we need to validate the returned remote source prior to open completion.
+        return target == null;
+    }
+
+    @Override
+    protected void afterOpened() {
+        // Once our sender is opened we can read the updated name from the target address.
+        String oldDestinationName = resourceInfo.getName();
+        String destinationName = getEndpoint().getRemoteTarget().getAddress();
+
+        resourceInfo.setName(destinationName);
+
+        LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
new file mode 100644
index 0000000..7398ec1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpTransactionContext;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpTemporaryDestination instance.
+ */
+public class AmqpTransactionContextBuilder extends AmqpResourceBuilder<AmqpTransactionContext, AmqpSession, JmsSessionInfo, Sender> {
+
+    public AmqpTransactionContextBuilder(AmqpSession parent, JmsSessionInfo resourceInfo) {
+        super(parent, resourceInfo);
+    }
+
+    @Override
+    protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
+        Coordinator coordinator = new Coordinator();
+        coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
+        Source source = new Source();
+
+        String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getSessionId().toString();
+
+        Sender sender = getParent().getProtonSession().sender(coordinatorName);
+        sender.setSource(source);
+        sender.setTarget(coordinator);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        return sender;
+    }
+
+    @Override
+    protected AmqpTransactionContext createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Sender endpoint) {
+        return new AmqpTransactionContext(parent, resourceInfo, endpoint);
+    }
+
+    @Override
+    protected boolean isClosePending() {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+
+        // When no link terminus was created, the peer will now detach/close us otherwise
+        // we need to validate the returned remote source prior to open completion.
+        return target == null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsNoLocalType.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsNoLocalType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsNoLocalType.java
new file mode 100644
index 0000000..e042f50
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsNoLocalType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.filters;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpJmsNoLocalType implements DescribedType {
+
+    public static final AmqpJmsNoLocalType NO_LOCAL = new AmqpJmsNoLocalType();
+
+    private final String noLocal;
+
+    public AmqpJmsNoLocalType() {
+        this.noLocal = "NoLocalFilter{}";
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000003L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.noLocal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsSelectorType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsSelectorType.java
new file mode 100644
index 0000000..3b10ad5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/filters/AmqpJmsSelectorType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.filters;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorType implements DescribedType {
+
+    private final String selector;
+
+    public AmqpJmsSelectorType(String selector) {
+        this.selector = selector;
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000004L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.selector;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpJmsSelectorType{" + selector + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index a3b2b15..3e03295 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -99,7 +99,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test(timeout = 20000)
+    @Test //(timeout = 20000)
     public void testCreateTransactedSession() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index ce48fdd..9fae2d1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.jms.test.testpeer;
 
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -38,7 +39,6 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
-import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode;
 import org.apache.qpid.jms.test.testpeer.basictypes.Role;
@@ -714,7 +714,7 @@ public class TestAmqpPeer implements AutoCloseable
         targetMatcher.withDynamic(equalTo(true));
         targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
         targetMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
-        targetMatcher.withDynamicNodeProperties(hasEntry(equalTo(AmqpTemporaryDestination.DYNAMIC_NODE_LIFETIME_POLICY), new DeleteOnCloseMatcher()));
+        targetMatcher.withDynamicNodeProperties(hasEntry(equalTo(DYNAMIC_NODE_LIFETIME_POLICY), new DeleteOnCloseMatcher()));
         targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
 
         final AttachMatcher attachMatcher = new AttachMatcher()

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7be321e7/qpid-jms-client/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/log4j.properties b/qpid-jms-client/src/test/resources/log4j.properties
index 11c98c6..c5c11d1 100644
--- a/qpid-jms-client/src/test/resources/log4j.properties
+++ b/qpid-jms-client/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out, stdout
+log4j.rootLogger=TRACE, out, stdout
 
 log4j.logger.org.apache.qpid.jms=DEBUG
 log4j.logger.org.apache.qpid.jms.provider=TRACE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message