qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [05/10] qpid-broker-j git commit: QPID-7531: [Java Broker, AMQP 1.0] Improve error handling when receiving unknown a transaction-id.
Date Thu, 05 Oct 2017 19:20:07 GMT
QPID-7531: [Java Broker, AMQP 1.0] Improve error handling when receiving unknown a transaction-id.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/876ddd6d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/876ddd6d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/876ddd6d

Branch: refs/heads/master
Commit: 876ddd6dd04246a0a2605ef709081f44d42661d1
Parents: a9c8725
Author: Lorenz Quack <lquack@apache.org>
Authored: Tue Oct 3 14:57:55 2017 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Thu Oct 5 12:18:29 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |  20 +-
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  39 +++-
 .../protocol/v1_0/ConsumerTarget_1_0.java       |  25 ++-
 .../protocol/v1_0/DeserializationFactories.java |   4 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      |  20 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  29 +--
 .../v1_0/StandardReceivingLinkEndpoint.java     |  14 +-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  14 +-
 .../v1_0/UnknownTransactionException.java       |  34 ++++
 .../type/codec/AMQPDescribedTypeRegistry.java   |   4 +-
 .../v1_0/type/transaction/TransactionError.java | 101 ++++++++++
 .../type/transaction/TransactionErrors.java     | 101 ----------
 .../qpid/tests/protocol/v1_0/Interaction.java   |   6 +
 .../v1_0/transaction/DischargeTest.java         |  65 ++++++-
 .../transaction/TransactionalTransferTest.java  | 191 ++++++++++++++++++-
 15 files changed, 514 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 1a91f95..04db60a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1901,15 +1901,27 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     @Override
     public ServerTransaction getTransaction(final int txnId)
     {
-        // TODO - bounds check
-        return _openTransactions[txnId];
+        try
+        {
+            return _openTransactions[txnId];
+        }
+        catch (ArrayIndexOutOfBoundsException e)
+        {
+            throw new UnknownTransactionException(txnId);
+        }
     }
 
     @Override
     public void removeTransaction(final int txnId)
     {
-        // TODO - bounds check
-        _openTransactions[txnId] = null;
+        try
+        {
+            _openTransactions[txnId] = null;
+        }
+        catch (ArrayIndexOutOfBoundsException e)
+        {
+            throw new UnknownTransactionException(txnId);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 09dcdb0..00264b7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -76,18 +77,16 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     {
         if(isAttached())
         {
-            if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
-                && ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
+            Error error = validateTransfer(transfer);
+            if (error != null)
             {
-                Error error = new Error(AmqpError.INVALID_FIELD,
-                                  "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
                 close(error);
                 return;
             }
 
             if (_currentDelivery == null)
             {
-                Error error = validateNewTransfer(transfer);
+                error = validateNewTransfer(transfer);
                 if (error != null)
                 {
                     close(error);
@@ -104,7 +103,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             }
             else
             {
-                Error error = validateSubsequentTransfer(transfer);
+                error = validateSubsequentTransfer(transfer);
                 if (error != null)
                 {
                     close(error);
@@ -136,7 +135,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
                         _unsettled.remove(_currentDelivery.getDeliveryTag());
                         getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
                     }
-                    Error error = receiveDelivery(_currentDelivery);
+                    error = receiveDelivery(_currentDelivery);
                     if (error != null)
                     {
                         close(error);
@@ -155,6 +154,31 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         }
     }
 
+    private Error validateTransfer(final Transfer transfer)
+    {
+        Error error = null;
+        if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+            && ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                              "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+        }
+        else if (transfer.getState() instanceof TransactionalState)
+        {
+            final Binary txnId = ((TransactionalState) transfer.getState()).getTxnId();
+            try
+            {
+                getSession().getTransaction(txnId);
+            }
+            catch (UnknownTransactionException e)
+            {
+                error = new Error(TransactionError.UNKNOWN_ID,
+                                  String.format("Transfer has an unknown transaction-id '%s'.", txnId));
+            }
+        }
+        return error;
+    }
+
     private Error validateNewTransfer(final Transfer transfer)
     {
         Error error = null;
@@ -221,6 +245,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             error = new Error(AmqpError.INVALID_FIELD,
                               "Transfer \"message-format\" is set to different value than on previous transfer.");
         }
+
         return error;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 5c8cfd2..66a1fd3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -54,7 +54,9 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -267,12 +269,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                     state.setTxnId(_transactionId);
                     transfer.setState(state);
                 }
-                // TODO - need to deal with failure here
                 if (_acquires && _transactionId != null)
                 {
-                    ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);
-                    if (txn != null)
+                    try
                     {
+                        ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);
+
                         txn.addPostTransactionAction(new ServerTransaction.Action()
                         {
                             @Override
@@ -288,9 +290,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                             }
                         });
                     }
-                    else
+                    catch (UnknownTransactionException e)
                     {
-                        // TODO - deal with the case of an invalid txn id
+                        entry.release(consumer);
+                        getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
+                        return;
                     }
 
                 }
@@ -415,10 +419,15 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
             {
                 transactionId = ((TransactionalState)state).getTxnId();
                 outcome = ((TransactionalState)state).getOutcome();
-                txn = _linkEndpoint.getTransaction(transactionId);
-                if(txn == null)
+                try
+                {
+                    txn = _linkEndpoint.getTransaction(transactionId);
+                }
+                catch (UnknownTransactionException e)
                 {
-                    // TODO - invalid txn id supplied
+                    getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
+                    applyModifiedOutcome();
+                    return false;
                 }
             }
             else if (state instanceof Outcome)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
index b8b00a1..58d104f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -191,7 +191,7 @@ public class DeserializationFactories
                         {
                             try
                             {
-                                return TransactionErrors.valueOf(value);
+                                return TransactionError.valueOf(value);
                             }
                             catch (IllegalArgumentException e4)
                             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index fe10015..2b6a36b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
@@ -108,7 +109,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 
     private void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
     {
-        // TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
+        // TODO QPID-7952: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
         _destination = destination;
         final Source source = getSource();
 
@@ -415,7 +416,21 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         Map<Symbol, Object> properties = flow.getProperties();
         if (properties != null)
         {
-             _transactionId = (Binary) properties.get(Symbol.valueOf("txn-id"));
+            final Binary transactionId = (Binary) properties.get(Symbol.valueOf("txn-id"));
+            if (transactionId != null)
+            {
+                try
+                {
+                    getSession().getTransaction(transactionId);
+                }
+                catch (UnknownTransactionException e)
+                {
+                    close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
+                    return;
+                }
+            }
+
+            _transactionId = transactionId;
         }
 
         if(receiverDeliveryCount == null)
@@ -526,7 +541,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 
             // TODO: QPID-7845 : Resuming links is unsupported at the moment. Destroying link unconditionally.
             destroy();
-
             getConsumerTarget().updateNotifyWorkDesired();
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 77c2714..5308d18 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -871,8 +871,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     ServerTransaction getTransaction(Binary transactionId)
     {
-        // TODO - deal with the case where the txn id is invalid
-        return _connection.getTransaction(binaryToInteger(transactionId));
+        final int txnId;
+        try
+        {
+            txnId = transactionIdToInteger(transactionId);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new UnknownTransactionException(e.getMessage());
+        }
+        return _connection.getTransaction(txnId);
     }
 
     void remoteEnd(End end)
@@ -894,31 +902,30 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     }
 
-    Integer binaryToInteger(final Binary txnId)
+    static Integer transactionIdToInteger(final Binary txnId)
     {
-        if(txnId == null)
+        if (txnId == null)
         {
-            return null;
+            throw new UnknownTransactionException("'null' is not a valid transaction-id.");
         }
 
         byte[] data = txnId.getArray();
-        if(data.length > 4)
+        if (data.length > 4)
         {
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException("transaction-id cannot have more than 32-bit.");
         }
 
         int id = 0;
-        for(int i = 0; i < data.length; i++)
+        for (int i = 0; i < data.length; i++)
         {
             id <<= 8;
-            id |= ((int)data[i] & 0xff);
+            id |= ((int) data[i] & 0xff);
         }
 
         return id;
-
     }
 
-    Binary integerToBinary(final int txnId)
+    static Binary integerToTransactionId(final int txnId)
     {
         byte[] data = new byte[4];
         data[3] = (byte) (txnId & 0xff);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bb9fa92..c744437 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -53,11 +53,13 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -97,7 +99,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         @Override
         public void destinationRemoved(final MessageDestination destination)
         {
-            // TODO - we should probably schedule a link closure here!
+            // TODO - we should probably schedule a link closure here! (QPID-7541)
         }
 
         @Override
@@ -200,7 +202,15 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                 boolean setRollbackOnly = true;
                 if (transactionId != null)
                 {
-                    transaction = getSession().getTransaction(transactionId);
+                    try
+                    {
+                        transaction = getSession().getTransaction(transactionId);
+                    }
+                    catch (UnknownTransactionException e)
+                    {
+                        return new Error(TransactionError.UNKNOWN_ID,
+                                         String.format("transaction-id '%s' is unknown.", transactionId));
+                    }
                     if (!(transaction instanceof AutoCommitTransaction))
                     {
                         transaction.addPostTransactionAction(new ServerTransaction.Action()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 2f8887c..e063815 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
@@ -97,7 +97,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
 
                         session.incrementStartedTransactions();
 
-                        state.setTxnId(session.integerToBinary(txn.getId()));
+                        state.setTxnId(session.integerToTransactionId(txn.getId()));
                         updateDisposition(delivery.getDeliveryTag(), state, true);
 
                     }
@@ -162,12 +162,12 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
         ServerTransaction txn = null;
         try
         {
-            transactionId = getSession().binaryToInteger(transactionIdAsBinary);
+            transactionId = getSession().transactionIdToInteger(transactionIdAsBinary);
             txn = _createdTransactions.get(transactionId);
         }
-        catch (IllegalArgumentException e)
+        catch (UnknownTransactionException | IllegalArgumentException e)
         {
-           // pass
+           // handle error below
         }
 
         if(txn != null)
@@ -187,7 +187,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                 txn.rollback();
                 getSession().incrementRolledBackTransactions();
                 error = new Error();
-                error.setCondition(TransactionErrors.TRANSACTION_ROLLBACK);
+                error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
                 error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
@@ -196,7 +196,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
         else
         {
             error = new Error();
-            error.setCondition(TransactionErrors.UNKNOWN_ID);
+            error.setCondition(TransactionError.UNKNOWN_ID);
             error.setDescription("Unknown transactionId " + transactionIdAsBinary.toString());
         }
         return error;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnknownTransactionException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnknownTransactionException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnknownTransactionException.java
new file mode 100644
index 0000000..bbe139a
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/UnknownTransactionException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+public class UnknownTransactionException extends RuntimeException
+{
+    public UnknownTransactionException(final String message)
+    {
+        super(message);
+    }
+
+    public UnknownTransactionException(final int transactionId)
+    {
+        super(String.format("Unknown transaction-id '%d'", transactionId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
index 29c136c..6f4d1e5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
@@ -46,7 +46,7 @@ import org.apache.qpid.server.protocol.v1_0.type.security.codec.SaslOutcomeConst
 import org.apache.qpid.server.protocol.v1_0.type.security.codec.SaslOutcomeWriter;
 import org.apache.qpid.server.protocol.v1_0.type.security.codec.SaslResponseConstructor;
 import org.apache.qpid.server.protocol.v1_0.type.security.codec.SaslResponseWriter;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.codec.CoordinatorConstructor;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.codec.CoordinatorWriter;
@@ -235,7 +235,7 @@ public class AMQPDescribedTypeRegistry implements DescribedTypeConstructorRegist
         DeclaredWriter.register(registry);
         TransactionalStateWriter.register(registry);
         RestrictedTypeValueWriter.register(registry,TxnCapability.class);
-        RestrictedTypeValueWriter.register(registry,TransactionErrors.class);
+        RestrictedTypeValueWriter.register(registry,TransactionError.class);
     }
 
     private static void registerSecurityWriters(final AMQPDescribedTypeRegistry registry)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionError.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionError.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionError.java
new file mode 100644
index 0000000..d5214bf
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionError.java
@@ -0,0 +1,101 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v1_0.type.transaction;
+
+
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+
+public class TransactionError implements ErrorCondition, RestrictedType<Symbol>
+{
+    private final Symbol _val;
+
+    public static final TransactionError UNKNOWN_ID =
+            new TransactionError(Symbol.valueOf("amqp:transaction:unknown-id"));
+
+    public static final TransactionError TRANSACTION_ROLLBACK =
+            new TransactionError(Symbol.valueOf("amqp:transaction:rollback"));
+
+    public static final TransactionError TRANSACTION_TIMEOUT =
+            new TransactionError(Symbol.valueOf("amqp:transaction:timeout"));
+
+    private TransactionError(Symbol val)
+    {
+        _val = val;
+    }
+
+    @Override
+    public Symbol getValue()
+    {
+        return _val;
+    }
+
+    @Override
+    public String toString()
+    {
+        if (this == UNKNOWN_ID)
+        {
+            return "unknown-id";
+        }
+
+        if (this == TRANSACTION_ROLLBACK)
+        {
+            return "transaction-rollback";
+        }
+
+        if (this == TRANSACTION_TIMEOUT)
+        {
+            return "transaction-timeout";
+        }
+
+        else
+        {
+            return String.valueOf(_val);
+        }
+    }
+
+    public static TransactionError valueOf(Object obj)
+    {
+        if (obj instanceof Symbol)
+        {
+            Symbol val = (Symbol) obj;
+
+            if (UNKNOWN_ID._val.equals(val))
+            {
+                return UNKNOWN_ID;
+            }
+
+            if (TRANSACTION_ROLLBACK._val.equals(val))
+            {
+                return TRANSACTION_ROLLBACK;
+            }
+
+            if (TRANSACTION_TIMEOUT._val.equals(val))
+            {
+                return TRANSACTION_TIMEOUT;
+            }
+        }
+
+        final String message = String.format("Cannot convert '%s' into 'transaction-error'", obj);
+        throw new IllegalArgumentException(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionErrors.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionErrors.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionErrors.java
deleted file mode 100644
index 1995eff..0000000
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionErrors.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol.v1_0.type.transaction;
-
-
-import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
-import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-
-public class TransactionErrors implements ErrorCondition, RestrictedType<Symbol>
-{
-    private final Symbol _val;
-
-    public static final TransactionErrors UNKNOWN_ID =
-            new TransactionErrors(Symbol.valueOf("amqp:transaction:unknown-id"));
-
-    public static final TransactionErrors TRANSACTION_ROLLBACK =
-            new TransactionErrors(Symbol.valueOf("amqp:transaction:rollback"));
-
-    public static final TransactionErrors TRANSACTION_TIMEOUT =
-            new TransactionErrors(Symbol.valueOf("amqp:transaction:timeout"));
-
-    private TransactionErrors(Symbol val)
-    {
-        _val = val;
-    }
-
-    @Override
-    public Symbol getValue()
-    {
-        return _val;
-    }
-
-    @Override
-    public String toString()
-    {
-        if (this == UNKNOWN_ID)
-        {
-            return "unknown-id";
-        }
-
-        if (this == TRANSACTION_ROLLBACK)
-        {
-            return "transaction-rollback";
-        }
-
-        if (this == TRANSACTION_TIMEOUT)
-        {
-            return "transaction-timeout";
-        }
-
-        else
-        {
-            return String.valueOf(_val);
-        }
-    }
-
-    public static TransactionErrors valueOf(Object obj)
-    {
-        if (obj instanceof Symbol)
-        {
-            Symbol val = (Symbol) obj;
-
-            if (UNKNOWN_ID._val.equals(val))
-            {
-                return UNKNOWN_ID;
-            }
-
-            if (TRANSACTION_ROLLBACK._val.equals(val))
-            {
-                return TRANSACTION_ROLLBACK;
-            }
-
-            if (TRANSACTION_TIMEOUT._val.equals(val))
-            {
-                return TRANSACTION_TIMEOUT;
-            }
-        }
-
-        final String message = String.format("Cannot convert '%s' into 'transaction-error'", obj);
-        throw new IllegalArgumentException(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index dbb8660..bb5c236 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -838,6 +838,12 @@ public class Interaction
         return this;
     }
 
+    public Interaction dispositionFirst(final UnsignedInteger deliveryId)
+    {
+        _disposition.setFirst(deliveryId);
+        return this;
+    }
+
     public Interaction disposition() throws Exception
     {
         sendPerformativeAndChainFuture(copyDisposition(_disposition), _sessionChannel);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index d1a47db..19ecd3b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -22,14 +22,19 @@ package org.apache.qpid.tests.protocol.v1_0.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -40,7 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
@@ -48,7 +53,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -104,7 +112,7 @@ public class DischargeTest extends BrokerAdminUsingTestBase
             assertThat(dischargeDisposition.getState(), is(instanceOf(Rejected.class)));
             final Error error = ((Rejected) dischargeDisposition.getState()).getError();
             assertThat(error, is(notNullValue()));
-            assertThat(error.getCondition(), is(equalTo(TransactionErrors.UNKNOWN_ID)));
+            assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
         }
     }
 
@@ -147,7 +155,58 @@ public class DischargeTest extends BrokerAdminUsingTestBase
                                                                 .getLatestResponse(Detach.class);
             Error error = detachResponse.getError();
             assertThat(error, is(notNullValue()));
-            assertThat(error.getCondition(), is(equalTo(TransactionErrors.UNKNOWN_ID)));
+            assertThat(error.getCondition(), is(equalTo(TransactionError.UNKNOWN_ID)));
         }
     }
+
+    @Test
+    @Ignore("QPID-7950")
+    @SpecificationTest(section = "4.4.2",
+            description = "Transactional Retirement [...]"
+                          + " To associate an outcome with a transaction the controller sends a disposition"
+                          + " performative which sets the state of the delivery to a transactional-state with the"
+                          + " desired transaction identifier and the outcome to be applied upon a successful discharge.")
+    public void commitAfterDetach() throws Exception
+    {
+        assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test message");
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+
+                                                  .txnAttachCoordinatorLink(txnState)
+                                                  .txnDeclare(txnState)
+
+                                                  .attachRole(Role.RECEIVER)
+                                                  .attachHandle(UnsignedInteger.ONE)
+                                                  .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                  .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                  .attach().consumeResponse(Attach.class)
+
+                                                  .flowIncomingWindow(UnsignedInteger.ONE)
+                                                  .flowLinkCredit(UnsignedInteger.ONE)
+                                                  .flowHandleFromLinkHandle()
+                                                  .flow()
+
+                                                  .receiveDelivery()
+                                                  .getLatestDelivery();
+            assertThat(transfers, is(notNullValue()));
+            assertThat(transfers, is(not(empty())));
+            final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId();
+            interaction.detach().consumeResponse(Detach.class)
+                       .dispositionFirst(deliveryId)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition()
+                       .txnDischarge(txnState, false);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/876ddd6d/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 64094aa..1496d13 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -21,9 +21,11 @@
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -34,25 +36,32 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 {
@@ -214,6 +223,44 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "4.4.1",
+            description = "If the transaction controller wishes to associate an outgoing transfer with a transaction,"
+                          + " it MUST set the state of the transfer with a transactional-state carrying the appropriate"
+                          + " transaction identifier.")
+    public void sendTransactionalPostingTransferFailsDueToUnknownTransactionId() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Response<?> response = interaction.negotiateProtocol().consumeResponse()
+                                              .open().consumeResponse(Open.class)
+                                              .begin().consumeResponse(Begin.class)
+
+                                              .txnAttachCoordinatorLink(txnState)
+                                              .txnDeclare(txnState)
+
+                                              .attachRole(Role.SENDER)
+                                              .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                              .attachHandle(linkHandle)
+                                              .attach().consumeResponse(Attach.class)
+                                              .consumeResponse(Flow.class)
+
+                                              .transferHandle(linkHandle)
+                                              .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                              .transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
+                                              .transfer()
+                                              .consumeResponse()
+                                              .getLatestResponse();
+
+            assertUnknownTransactionIdError(response);
+        }
+    }
+
+
+    @Test
     @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
                                                         + "wish to associate the outcome of a delivery with a transaction.")
     public void receiveTransactionalRetirementReceiverSettleFirst() throws Exception
@@ -314,6 +361,61 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...]"
+                                                        + " To associate an outcome with a transaction the controller"
+                                                        + " sends a disposition performative which sets the state"
+                                                        + " of the delivery to a transactional-state with the desired"
+                                                        + " transaction identifier and the outcome to be applied"
+                                                        + " upon a successful discharge.")
+    public void receiveTransactionalRetirementDispositionFailsDueToUnknownTransactionId() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+
+                                                  .txnAttachCoordinatorLink(txnState)
+                                                  .txnDeclare(txnState)
+
+                                                  .attachRole(Role.RECEIVER)
+                                                  .attachHandle(UnsignedInteger.ONE)
+                                                  .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                  .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                  .attach().consumeResponse(Attach.class)
+
+                                                  .flowIncomingWindow(UnsignedInteger.ONE)
+                                                  .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                  .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                  .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                  .flowLinkCredit(UnsignedInteger.ONE)
+                                                  .flowHandleFromLinkHandle()
+                                                  .flow()
+
+                                                  .receiveDelivery()
+                                                  .getLatestDelivery();
+
+            UnsignedInteger deliveryId = transfers.get(0).getDeliveryId();
+            assertThat(deliveryId, is(notNullValue()));
+
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Response<?> response = interaction.dispositionSettled(true)
+                                              .dispositionRole(Role.RECEIVER)
+                                              .dispositionTransactionalState(integerToBinary(Integer.MAX_VALUE),
+                                                                             new Accepted())
+                                              .dispositionFirst(deliveryId)
+                                              .disposition()
+                                              .consumeResponse().getLatestResponse();
+            assertUnknownTransactionIdError(response);
+        }
+    }
+
     @Ignore("TODO disposition is currently not being sent by Broker")
     @Test
     @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
@@ -435,7 +537,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
     }
 
     @Test
-    @SpecificationTest(section = "4.4.2", description = "Transactional Acquisition[...]In the case of the flow frame,"
+    @SpecificationTest(section = "4.4.3", description = "Transactional Acquisition[...]In the case of the flow frame,"
                                                         + " the transactional work is not necessarily directly"
                                                         + " initiated or entirely determined when the flow frame"
                                                         + " arrives at the resource, but can in fact occur at some "
@@ -495,4 +597,87 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
         }
     }
+
+    @Test
+    @Ignore("QPID-7951")
+    @SpecificationTest(section = "4.4.3", description = "Transactional Acquisition[...]"
+                                                        + " the resource associates an additional piece of state with"
+                                                        + " outgoing link endpoints, a txn-id that identifies"
+                                                        + " the transaction with which acquired messages"
+                                                        + " will be associated. This state is determined by"
+                                                        + " the controller by specifying a txn-id entry in the"
+                                                        + " properties map of the flow frame.")
+    public void receiveTransactionalAcquisitionFlowFailsDueToUnknownTransactionId() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Response<?> response = interaction.negotiateProtocol()
+                                              .consumeResponse()
+                                              .open()
+                                              .consumeResponse(Open.class)
+                                              .begin()
+                                              .consumeResponse(Begin.class)
+
+                                              .txnAttachCoordinatorLink(txnState)
+                                              .txnDeclare(txnState)
+
+                                              .attachRole(Role.RECEIVER)
+                                              .attachHandle(UnsignedInteger.ONE)
+                                              .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                              .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                              .attach()
+                                              .consumeResponse(Attach.class)
+
+                                              .flowIncomingWindow(UnsignedInteger.ONE)
+                                              .flowLinkCredit(UnsignedInteger.ONE)
+                                              .flowHandleFromLinkHandle()
+                                              .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
+                                                                                       integerToBinary(Integer.MAX_VALUE)))
+                                              .flow()
+                                              .consumeResponse()
+                                              .getLatestResponse();
+
+            assertUnknownTransactionIdError(response);
+        }
+    }
+
+    private void assertUnknownTransactionIdError(final Response<?> response)
+    {
+        assertThat(response, is(notNullValue()));
+        final Object body = response.getBody();
+        assertThat(body, is(notNullValue()));
+        Error error = null;
+        if (body instanceof Close)
+        {
+            error = ((Close) body).getError();
+        }
+        else if (body instanceof End)
+        {
+            error = ((End) body).getError();
+        }
+        else if (body instanceof Detach)
+        {
+            error = ((Detach) body).getError();
+        }
+        else
+        {
+            fail(String.format("Unexpected response %s", body.getClass().getSimpleName()));
+        }
+
+        assertThat(error, is(notNullValue()));
+        assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
+    }
+
+    Binary integerToBinary(final int txnId)
+    {
+        byte[] data = new byte[4];
+        data[3] = (byte) (txnId & 0xff);
+        data[2] = (byte) ((txnId & 0xff00) >> 8);
+        data[1] = (byte) ((txnId & 0xff0000) >> 16);
+        data[0] = (byte) ((txnId & 0xff000000) >> 24);
+        return new Binary(data);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message