activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6044
Date Fri, 20 Nov 2015 22:54:30 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 4a27b7237 -> 272fb2b97


https://issues.apache.org/jira/browse/AMQ-6044

Add support for transactions to the test client.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/272fb2b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/272fb2b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/272fb2b9

Branch: refs/heads/master
Commit: 272fb2b97300362f2aaabf78763d62e95d3becaf
Parents: 4a27b72
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Nov 20 17:48:14 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Nov 20 17:48:14 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpConnection.java   |   9 +
 .../transport/amqp/client/AmqpReceiver.java     |  34 ++-
 .../amqp/client/AmqpRedirectedException.java    |  61 +++++
 .../transport/amqp/client/AmqpSender.java       |  41 ++-
 .../transport/amqp/client/AmqpSession.java      |  69 ++++-
 .../transport/amqp/client/AmqpSupport.java      | 194 ++++++++++++++
 .../amqp/client/AmqpTransactionContext.java     | 258 ++++++++++++++++++
 .../amqp/client/AmqpTransactionCoordinator.java | 259 +++++++++++++++++++
 .../amqp/client/AmqpTransactionId.java          |  97 +++++++
 .../amqp/client/util/ClientFuture.java          |  33 ++-
 .../util/ClientFutureSynchronization.java       |  30 +++
 .../amqp/interop/AmqpTransactionTest.java       | 181 +++++++++++++
 12 files changed, 1236 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 7ac7b00..523fa2a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -71,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicLong sessionIdGenerator = new AtomicLong();
+    private final AtomicLong txIdGenerator = new AtomicLong();
     private final Collector protonCollector = new CollectorImpl();
     private final NettyTransport transport;
     private final Transport protonTransport = Transport.Factory.create();
@@ -429,6 +430,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         return getEndpoint();
     }
 
+    String getConnectionId() {
+        return this.connectionId;
+    }
+
+    AmqpTransactionId getNextTransactionId() {
+        return new AmqpTransactionId(connectionId + ":" + txIdGenerator.incrementAndGet());
+    }
+
     void pumpToProtonTransport() {
         try {
             boolean done = false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 9d139e6..87aa36a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -34,6 +34,7 @@ import javax.jms.InvalidDestinationException;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -44,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -301,10 +303,22 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                 checkClosed();
                 try {
                     if (!delivery.isSettled()) {
-                        delivery.disposition(Accepted.getInstance());
-                        delivery.settle();
-                        session.pumpToProtonTransport();
+                        if (session.isInTransaction()) {
+                            Binary txnId = session.getTransactionId().getRemoteTxId();
+                            if (txnId != null) {
+                                TransactionalState txState = new TransactionalState();
+                                txState.setOutcome(Accepted.getInstance());
+                                txState.setTxnId(txnId);
+                                delivery.disposition(txState);
+                                delivery.settle();
+                                session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
+                            }
+                        } else {
+                            delivery.disposition(Accepted.getInstance());
+                            delivery.settle();
+                        }
                     }
+                    session.pumpToProtonTransport();
                     request.onSuccess();
                 } catch (Exception e) {
                     request.onFailure(e);
@@ -657,4 +671,18 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
             throw new IllegalStateException("Receiver is already closed");
         }
     }
+
+    //----- Internal Transaction state callbacks -----------------------------//
+
+    void preCommit() {
+    }
+
+    void preRollback() {
+    }
+
+    void postCommit() {
+    }
+
+    void postRollback() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
new file mode 100644
index 0000000..f44a654
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+/**
+ * {@link IOException} derivative that defines that the remote peer has requested that this
+ * connection be redirected to some alternative peer.
+ */
+public class AmqpRedirectedException extends IOException {
+
+    private static final long serialVersionUID = 5872211116061710369L;
+
+    private final String hostname;
+    private final String networkHost;
+    private final int port;
+
+    public AmqpRedirectedException(String reason, String hostname, String networkHost, int port) {
+        super(reason);
+
+        this.hostname = hostname;
+        this.networkHost = networkHost;
+        this.port = port;
+    }
+
+    /**
+     * @return the host name of the container being redirected to.
+     */
+    public String getHostname() {
+        return hostname;
+    }
+
+    /**
+     * @return the DNS host name or IP address of the peer this connection is being redirected to.
+     */
+    public String getNetworkHost() {
+        return networkHost;
+    }
+
+    /**
+     * @return the port number on the peer this connection is being redirected to.
+     */
+    public int getPort() {
+        return port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index cfaaa4c..35fe56a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -29,14 +29,18 @@ import javax.jms.InvalidDestinationException;
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -316,20 +320,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     }
 
     private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
-
         LOG.trace("Producer sending message: {}", message);
 
-        byte[] tag = tagGenerator.getNextTag();
         Delivery delivery = null;
-
         if (presettle) {
             delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
         } else {
+            byte[] tag = tagGenerator.getNextTag();
             delivery = getEndpoint().delivery(tag, 0, tag.length);
         }
 
         delivery.setContext(request);
 
+        if (session.isInTransaction()) {
+            Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+            TransactionalState state = new TransactionalState();
+            state.setTxnId(amqpTxId);
+            delivery.disposition(state);
+        }
+
         encodeAndSend(message.getWrappedMessage(), delivery);
 
         if (presettle) {
@@ -390,26 +399,38 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             }
 
             AsyncResult request = (AsyncResult) delivery.getContext();
+            Exception deliveryError = null;
 
             if (outcome instanceof Accepted) {
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
-                tagGenerator.returnTag(delivery.getTag());
                 if (request != null && !request.isComplete()) {
                     request.onSuccess();
                 }
             } else if (outcome instanceof Rejected) {
-                Exception remoteError = getRemoteError();
                 LOG.trace("Outcome of delivery was rejected: {}", delivery);
-                tagGenerator.returnTag(delivery.getTag());
+                ErrorCondition remoteError = ((Rejected) outcome).getError();
+                if (remoteError == null) {
+                    remoteError = getEndpoint().getRemoteCondition();
+                }
+
+                deliveryError = AmqpSupport.convertToException(remoteError);
+            } else if (outcome instanceof Released) {
+                LOG.trace("Outcome of delivery was released: {}", delivery);
+                deliveryError = new IOException("Delivery failed: released by receiver");
+            } else if (outcome instanceof Modified) {
+                LOG.trace("Outcome of delivery was modified: {}", delivery);
+                deliveryError = new IOException("Delivery failed: failure at remote");
+            }
+
+            if (deliveryError != null) {
                 if (request != null && !request.isComplete()) {
-                    request.onFailure(remoteError);
+                    request.onFailure(deliveryError);
                 } else {
-                    connection.fireClientException(getRemoteError());
+                    connection.fireClientException(deliveryError);
                 }
-            } else if (outcome != null) {
-                LOG.warn("Message send updated with unsupported outcome: {}", outcome);
             }
 
+            tagGenerator.returnTag(delivery.getTag());
             delivery.settle();
             toRemove.add(delivery);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 62414bf..c0b097c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -36,6 +36,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
 
     private final AmqpConnection connection;
     private final String sessionId;
+    private final AmqpTransactionContext txContext;
 
     /**
      * Create a new session instance.
@@ -48,6 +49,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     public AmqpSession(AmqpConnection connection, String sessionId) {
         this.connection = connection;
         this.sessionId = sessionId;
+        this.txContext = new AmqpTransactionContext(this);
     }
 
     /**
@@ -363,7 +365,59 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
         return new UnmodifiableSession(getEndpoint());
     }
 
-    //----- Internal getters used from the child AmqpResource classes --------//
+    public boolean isInTransaction() {
+        return txContext.isInTransaction();
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSession { " + sessionId + " }";
+    }
+
+    //----- Session Transaction Methods --------------------------------------//
+
+    /**
+     * Starts a new transaction associated with this session.
+     *
+     * @throws Exception if an error occurs starting a new Transaction.
+     */
+    public void begin() throws Exception {
+        if (txContext.isInTransaction()) {
+            throw new javax.jms.IllegalStateException("Session already has an active transaction");
+        }
+
+        txContext.begin();
+    }
+
+    /**
+     * Commit the current transaction associated with this session.
+     *
+     * @throws Exception if an error occurs committing the Transaction.
+     */
+    public void commit() throws Exception {
+        if (!txContext.isInTransaction()) {
+            throw new javax.jms.IllegalStateException(
+                "Commit called on Session that does not have an active transaction");
+        }
+
+        txContext.commit();
+    }
+
+    /**
+     * Roll back the current transaction associated with this session.
+     *
+     * @throws Exception if an error occurs rolling back the Transaction.
+     */
+    public void rollback() throws Exception {
+        if (!txContext.isInTransaction()) {
+            throw new javax.jms.IllegalStateException(
+                "Rollback called on Session that does not have an active transaction");
+        }
+
+        txContext.rollback();
+    }
+
+    //----- Internal access used to manage resources -------------------------//
 
     ScheduledExecutorService getScheduler() {
         return connection.getScheduler();
@@ -377,6 +431,14 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
         connection.pumpToProtonTransport();
     }
 
+    AmqpTransactionId getTransactionId() {
+        return txContext.getTransactionId();
+    }
+
+    AmqpTransactionContext getTransactionContext() {
+        return txContext;
+    }
+
     //----- Private implementation details -----------------------------------//
 
     @Override
@@ -410,9 +472,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
             throw new IllegalStateException("Session is already closed");
         }
     }
-
-    @Override
-    public String toString() {
-        return "AmqpSession { " + sessionId + " }";
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
new file mode 100644
index 0000000..9aca5b8
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+
+public class AmqpSupport {
+
+    // Symbols used for connection capabilities
+    public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
+    public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+
+    // Symbols used to announce connection error information
+    public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+    public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+    public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+    // Symbols used to announce connection redirect ErrorCondition 'info'
+    public static final Symbol PORT = Symbol.valueOf("port");
+    public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+    public static final Symbol OPEN_HOSTNAME = Symbol.valueOf("hostname");
+
+    // Symbols used for connection properties
+    public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+    public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+
+    public static final Symbol PRODUCT = Symbol.valueOf("product");
+    public static final Symbol VERSION = Symbol.valueOf("version");
+    public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+    // Symbols used for receivers.
+    public static final Symbol COPY = Symbol.getSymbol("copy");
+    public static final Symbol NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
+    public static final Symbol SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+
+    // Delivery states
+    public static final Rejected REJECTED = new Rejected();
+    public static final Modified MODIFIED_FAILED = new Modified();
+    public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified();
+
+    // Temporary Destination constants
+    public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+    public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
+    public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
+
+    //----- Static initializer -----------------------------------------------//
+
+    static {
+        MODIFIED_FAILED.setDeliveryFailed(true);
+
+        MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true);
+        MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true);
+    }
+
+    //----- Utility Methods --------------------------------------------------//
+
+    /**
+     * Given an ErrorCondition instance create a new Exception that best matches
+     * the error type.
+     *
+     * @param errorCondition
+     *      The ErrorCondition returned from the remote peer.
+     *
+     * @return a new Exception instance that best matches the ErrorCondition value.
+     */
+    public static Exception convertToException(ErrorCondition errorCondition) {
+        Exception remoteError = null;
+
+        if (errorCondition != null && errorCondition.getCondition() != null) {
+            Symbol error = errorCondition.getCondition();
+            String message = extractErrorMessage(errorCondition);
+
+            if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+                remoteError = new JMSSecurityException(message);
+            } else if (error.equals(AmqpError.RESOURCE_LIMIT_EXCEEDED)) {
+                remoteError = new ResourceAllocationException(message);
+            } else if (error.equals(AmqpError.NOT_FOUND)) {
+                remoteError = new InvalidDestinationException(message);
+            } else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
+                remoteError = new TransactionRolledBackException(message);
+            } else if (error.equals(ConnectionError.REDIRECT)) {
+                remoteError = createRedirectException(error, message, errorCondition);
+            } else if (error.equals(AmqpError.INVALID_FIELD)) {
+                Map<?, ?> info = errorCondition.getInfo();
+                if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) {
+                    remoteError = new InvalidClientIDException(message);
+                } else {
+                    remoteError = new JMSException(message);
+                }
+            } else {
+                remoteError = new JMSException(message);
+            }
+        } else {
+            remoteError = new JMSException("Unknown error from remote peer");
+        }
+
+        return remoteError;
+    }
+
+    /**
+     * Attempt to read and return the embedded error message in the given ErrorCondition
+     * object.  If no message can be extracted a generic message is returned.
+     *
+     * @param errorCondition
+     *      The ErrorCondition to extract the error message from.
+     *
+     * @return an error message extracted from the given ErrorCondition.
+     */
+    public static String extractErrorMessage(ErrorCondition errorCondition) {
+        String message = "Received error from remote peer without description";
+        if (errorCondition != null) {
+            if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) {
+                message = errorCondition.getDescription();
+            }
+
+            Symbol condition = errorCondition.getCondition();
+            if (condition != null) {
+                message = message + " [condition = " + condition + "]";
+            }
+        }
+
+        return message;
+    }
+
+    /**
+     * When a redirect type exception is received this method is called to create the
+     * appropriate redirect exception type containing the error details needed.
+     *
+     * @param error
+     *        the Symbol that defines the redirection error type.
+     * @param message
+     *        the basic error message that should used or amended for the returned exception.
+     * @param condition
+     *        the ErrorCondition that describes the redirection.
+     *
+     * @return an Exception that captures the details of the redirection error.
+     */
+    public static Exception createRedirectException(Symbol error, String message, ErrorCondition condition) {
+        Exception result = null;
+        Map<?, ?> info = condition.getInfo();
+
+        if (info == null) {
+            result = new IOException(message + " : Redirection information not set.");
+        } else {
+            String hostname = (String) info.get(OPEN_HOSTNAME);
+
+            String networkHost = (String) info.get(NETWORK_HOST);
+            if (networkHost == null || networkHost.isEmpty()) {
+                result = new IOException(message + " : Redirection information not set.");
+            }
+
+            int port = 0;
+            try {
+                port = Integer.valueOf(info.get(PORT).toString());
+            } catch (Exception ex) {
+                result = new IOException(message + " : Redirection information not set.");
+            }
+
+            result = new AmqpRedirectedException(message, hostname, networkHost, port);
+        }
+
+        return result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
new file mode 100644
index 0000000..64f854b
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.ClientFutureSynchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines a context under which resources in a given session
+ * will operate inside transaction scoped boundaries.
+ */
+public class AmqpTransactionContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
+
+    private final AmqpSession session;
+    private final Set<AmqpReceiver> txReceivers = new LinkedHashSet<AmqpReceiver>();
+
+    private AmqpTransactionCoordinator coordinator;
+    private AmqpTransactionId transactionId;
+
+    public AmqpTransactionContext(AmqpSession session) {
+        this.session = session;
+    }
+
+    /**
+     * Begins a new transaction scoped to the target session.
+     *
+     * @param txId
+     *      The transaction Id to use for this new transaction.
+     *
+     * @throws Exception if an error occurs while starting the transaction.
+     */
+    public void begin() throws Exception {
+        if (transactionId != null) {
+            throw new IOException("Begin called while a TX is still Active.");
+        }
+
+        final AmqpTransactionId txId = session.getConnection().getNextTransactionId();
+        final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                transactionId = txId;
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+                transactionId = null;
+            }
+        });
+
+        LOG.info("Attempting to Begin TX:[{}]", txId);
+
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                if (coordinator == null || coordinator.isClosed()) {
+                    LOG.info("Creating new Coordinator for TX:[{}]", txId);
+                    coordinator = new AmqpTransactionCoordinator(session);
+                    coordinator.open(new AsyncResult() {
+
+                        @Override
+                        public void onSuccess() {
+                            try {
+                                LOG.info("Attempting to declare TX:[{}]", txId);
+                                coordinator.declare(txId, request);
+                            } catch (Exception e) {
+                                request.onFailure(e);
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Throwable result) {
+                            request.onFailure(result);
+                        }
+
+                        @Override
+                        public boolean isComplete() {
+                            return request.isComplete();
+                        }
+                    });
+                } else {
+                    try {
+                        LOG.info("Attempting to declare TX:[{}]", txId);
+                        coordinator.declare(txId, request);
+                    } catch (Exception e) {
+                        request.onFailure(e);
+                    }
+                }
+
+                session.pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Commit this transaction which then ends the lifetime of the transacted operation.
+     *
+     * @throws Exception if an error occurs while performing the commit
+     */
+    public void commit() throws Exception {
+        if (transactionId == null) {
+            throw new IllegalStateException("Commit called with no active Transaction.");
+        }
+
+        preCommit();
+
+        final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                transactionId = null;
+                postCommit();
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+                transactionId = null;
+                postCommit();
+            }
+        });
+
+        LOG.debug("Commit on TX[{}] initiated", transactionId);
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Attempting to commit TX:[{}]", transactionId);
+                    coordinator.discharge(transactionId, request, true);
+                    session.pumpToProtonTransport();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Rollback any transacted work performed under the current transaction.
+     *
+     * @throws Exception if an error occurs during the rollback operation.
+     */
+    public void rollback() throws Exception {
+        if (transactionId == null) {
+            throw new IllegalStateException("Rollback called with no active Transaction.");
+        }
+
+        preRollback();
+
+        final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
+
+            @Override
+            public void onPendingSuccess() {
+                transactionId = null;
+                postRollback();
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+                transactionId = null;
+                postRollback();
+            }
+        });
+
+        LOG.debug("Rollback on TX[{}] initiated", transactionId);
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Attempting to roll back TX:[{}]", transactionId);
+                    coordinator.discharge(transactionId, request, false);
+                    session.pumpToProtonTransport();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    //----- Internal access to context properties ----------------------------//
+
+    AmqpTransactionCoordinator getCoordinator() {
+        return coordinator;
+    }
+
+    AmqpTransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    void registerTxConsumer(AmqpReceiver consumer) {
+        txReceivers.add(consumer);
+    }
+
+    //----- Transaction pre / post completion --------------------------------//
+
+    private void preCommit() {
+        for (AmqpReceiver receiver : txReceivers) {
+            receiver.preCommit();
+        }
+    }
+
+    private void preRollback() {
+        for (AmqpReceiver receiver : txReceivers) {
+            receiver.preRollback();
+        }
+    }
+
+    private void postCommit() {
+        for (AmqpReceiver receiver : txReceivers) {
+            receiver.postCommit();
+        }
+
+        txReceivers.clear();
+    }
+
+    private void postRollback() {
+        for (AmqpReceiver receiver : txReceivers) {
+            receiver.postRollback();
+        }
+
+        txReceivers.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..8ce1f09
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the AMQP Transaction coordinator link used by the transaction context
+ * of a session to control the lifetime of a given transaction.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+    private final byte[] OUTBOUND_BUFFER = new byte[64];
+
+    private final AmqpSession session;
+    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
+
+    private List<Delivery> pendingDeliveries = new LinkedList<Delivery>();
+    private Map<AmqpTransactionId, AsyncResult> pendingRequests = new HashMap<AmqpTransactionId, AsyncResult>();
+
+    public AmqpTransactionCoordinator(AmqpSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+        try {
+            Iterator<Delivery> deliveries = pendingDeliveries.iterator();
+            while (deliveries.hasNext()) {
+                Delivery pendingDelivery = deliveries.next();
+                if (!pendingDelivery.remotelySettled()) {
+                    continue;
+                }
+
+                DeliveryState state = pendingDelivery.getRemoteState();
+                AmqpTransactionId txId = (AmqpTransactionId) pendingDelivery.getContext();
+                AsyncResult pendingRequest = pendingRequests.get(txId);
+
+                if (pendingRequest == null) {
+                    throw new IllegalStateException("Pending tx operation with no pending request");
+                }
+
+                if (state instanceof Declared) {
+                    LOG.debug("New TX started: {}", txId.getTxId());
+                    Declared declared = (Declared) state;
+                    txId.setRemoteTxId(declared.getTxnId());
+                    pendingRequest.onSuccess();
+                } else if (state instanceof Rejected) {
+                    LOG.debug("Last TX request failed: {}", txId.getTxId());
+                    Rejected rejected = (Rejected) state;
+                    Exception cause = AmqpSupport.convertToException(rejected.getError());
+                    JMSException failureCause = null;
+                    if (txId.isCommit()) {
+                        failureCause = new TransactionRolledBackException(cause.getMessage());
+                    } else {
+                        failureCause = new JMSException(cause.getMessage());
+                    }
+
+                    pendingRequest.onFailure(failureCause);
+                } else {
+                    LOG.debug("Last TX request succeeded: {}", txId.getTxId());
+                    pendingRequest.onSuccess();
+                }
+
+                // Clear state data
+                pendingDelivery.settle();
+                pendingRequests.remove(txId.getTxId());
+                deliveries.remove();
+            }
+
+            super.processDeliveryUpdates(connection);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public void declare(AmqpTransactionId txId, AsyncResult request) throws Exception {
+        if (txId.getRemoteTxId() != null) {
+            throw new IllegalStateException("Declar called while a TX is still Active.");
+        }
+
+        if (isClosed()) {
+            request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed"));
+            return;
+        }
+
+        Message message = Message.Factory.create();
+        Declare declare = new Declare();
+        message.setBody(new AmqpValue(declare));
+
+        Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(txId);
+
+        // Store away for completion
+        pendingDeliveries.add(pendingDelivery);
+        pendingRequests.put(txId, request);
+
+        sendTxCommand(message);
+    }
+
+    public void discharge(AmqpTransactionId txId, AsyncResult request, boolean commit) throws Exception {
+
+        if (isClosed()) {
+            Exception failureCause = null;
+
+            if (commit) {
+                failureCause = new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed");
+            } else {
+                failureCause = new JMSException("Rollback cannot complete: Coordinator remotely closed");
+            }
+
+            request.onFailure(failureCause);
+            return;
+        }
+
+        // Store the context of this action in the transaction ID for later completion.
+        txId.setState(commit ? AmqpTransactionId.COMMIT_MARKER : AmqpTransactionId.ROLLBACK_MARKER);
+
+        Message message = Message.Factory.create();
+        Discharge discharge = new Discharge();
+        discharge.setFail(!commit);
+        discharge.setTxnId((Binary) txId.getRemoteTxId());
+        message.setBody(new AmqpValue(discharge));
+
+        Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(txId);
+
+        // Store away for completion
+        pendingDeliveries.add(pendingDelivery);
+        pendingRequests.put(txId, request);
+
+        sendTxCommand(message);
+    }
+
+    //----- Base class overrides ---------------------------------------------//
+
+    @Override
+    public void remotelyClosed(AmqpConnection connection) {
+
+        Exception txnError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+
+        // Alert any pending operation that the link failed to complete the pending
+        // begin / commit / rollback operation.
+        for (AsyncResult pendingRequest : pendingRequests.values()) {
+            pendingRequest.onFailure(txnError);
+        }
+
+        // Purge linkages to pending operations.
+        pendingDeliveries.clear();
+        pendingRequests.clear();
+
+        // Override the base class version because we do not want to propagate
+        // an error up to the client if remote close happens as that is an
+        // acceptable way for the remote to indicate the discharge could not
+        // be applied.
+
+        if (getEndpoint() != null) {
+            getEndpoint().close();
+            getEndpoint().free();
+        }
+
+        LOG.debug("Transaction Coordinator link {} was remotely closed", getEndpoint());
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void sendTxCommand(Message message) throws IOException {
+        int encodedSize = 0;
+        byte[] buffer = OUTBOUND_BUFFER;
+        while (true) {
+            try {
+                encodedSize = message.encode(buffer, 0, buffer.length);
+                break;
+            } catch (BufferOverflowException e) {
+                buffer = new byte[buffer.length * 2];
+            }
+        }
+
+        Sender sender = getEndpoint();
+        sender.send(buffer, 0, encodedSize);
+        sender.advance();
+    }
+
+
+    @Override
+    protected void doOpen() {
+        Coordinator coordinator = new Coordinator();
+        coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
+        Source source = new Source();
+
+        String coordinatorName = "qpid-jms:coordinator:" + session.getConnection().getConnectionId();
+
+        Sender sender = session.getEndpoint().sender(coordinatorName);
+        sender.setSource(source);
+        sender.setTarget(coordinator);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
+
+        super.doOpen();
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        // TODO
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        // TODO
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
new file mode 100644
index 0000000..7f39a7e
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+
+/**
+ * Wrapper For Transaction state in identification
+ */
+public class AmqpTransactionId {
+
+    public static final int DECLARE_MARKER = 1;
+    public static final int ROLLBACK_MARKER = 2;
+    public static final int COMMIT_MARKER = 3;
+
+    private final String txId;
+    private Binary remoteTxId;
+    private int state = DECLARE_MARKER;
+
+    public AmqpTransactionId(String txId) {
+        this.txId = txId;
+    }
+
+    public boolean isDeclare() {
+        return state == DECLARE_MARKER;
+    }
+
+    public boolean isCommit() {
+        return state == COMMIT_MARKER;
+    }
+
+    public boolean isRollback() {
+        return state == ROLLBACK_MARKER;
+    }
+
+    public void setState(int state) {
+        this.state = state;
+    }
+
+    public String getTxId() {
+        return txId;
+    }
+
+    public Binary getRemoteTxId() {
+        return remoteTxId;
+    }
+
+    public void setRemoteTxId(Binary remoteTxId) {
+        this.remoteTxId = remoteTxId;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((txId == null) ? 0 : txId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        AmqpTransactionId other = (AmqpTransactionId) obj;
+        if (txId == null) {
+            if (other.txId != null) {
+                return false;
+            }
+        } else if (!txId.equals(other.txId)) {
+            return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
index 01ececb..f181b2e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
@@ -19,21 +19,24 @@ package org.apache.activemq.transport.amqp.client.util;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Asynchronous Client Future class.
  */
-public class ClientFuture extends WrappedAsyncResult {
+public class ClientFuture implements AsyncResult {
 
-    protected final CountDownLatch latch = new CountDownLatch(1);
-    protected Throwable error;
+    private final AtomicBoolean completer = new AtomicBoolean();
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private final ClientFutureSynchronization synchronization;
+    private volatile Throwable error;
 
     public ClientFuture() {
-        super(null);
+        this(null);
     }
 
-    public ClientFuture(AsyncResult watcher) {
-        super(watcher);
+    public ClientFuture(ClientFutureSynchronization synchronization) {
+        this.synchronization = synchronization;
     }
 
     @Override
@@ -43,15 +46,23 @@ public class ClientFuture extends WrappedAsyncResult {
 
     @Override
     public void onFailure(Throwable result) {
-        error = result;
-        latch.countDown();
-        super.onFailure(result);
+        if (completer.compareAndSet(false, true)) {
+            error = result;
+            if (synchronization != null) {
+                synchronization.onPendingFailure(error);
+            }
+            latch.countDown();
+        }
     }
 
     @Override
     public void onSuccess() {
-        latch.countDown();
-        super.onSuccess();
+        if (completer.compareAndSet(false, true)) {
+            if (synchronization != null) {
+                synchronization.onPendingSuccess();
+            }
+            latch.countDown();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
new file mode 100644
index 0000000..934b305
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Synchronization callback interface used to execute state updates
+ * or similar tasks in the thread context where the associated
+ * ProviderFuture is managed.
+ */
+public interface ClientFutureSynchronization {
+
+    void onPendingSuccess();
+
+    void onPendingFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/272fb2b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
new file mode 100644
index 0000000..a998290
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test various aspects of Transaction support.
+ */
+public class AmqpTransactionTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 30000)
+    public void testBeginAndCommitTransaction() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        assertNotNull(session);
+
+        session.begin();
+        assertTrue(session.isInTransaction());
+        session.commit();
+
+        connection.close();
+    }
+
+    @Test(timeout = 30000)
+    public void testBeginAndRollbackTransaction() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        assertNotNull(session);
+
+        session.begin();
+        assertTrue(session.isInTransaction());
+        session.rollback();
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToQueueWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        session.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertEquals(0, queue.getQueueSize());
+
+        session.commit();
+
+        assertEquals(1, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToQueueWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        session.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertEquals(0, queue.getQueueSize());
+
+        session.rollback();
+
+        assertEquals(0, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        session.begin();
+
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        session.commit();
+
+        assertEquals(0, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        session.begin();
+
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        session.rollback();
+
+        assertEquals(1, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+}


Mime
View raw message