qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [2/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled
Date Fri, 30 Jun 2017 16:19:54 GMT
QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/737c5280
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/737c5280
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/737c5280

Branch: refs/heads/master
Commit: 737c52807080b2e54fa6f4b419c0086df375e2bc
Parents: 72ed1aa
Author: Alex Rudyy <orudyy@apache.org>
Authored: Fri Jun 30 17:15:04 2017 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Fri Jun 30 17:15:04 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  82 +-
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  60 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      |   3 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  |   6 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |  25 +-
 .../protocol/v1_0/framing/FrameHandler.java     |   1 +
 .../v1_0/framing/OversizeFrameException.java    |   1 +
 .../protocol/v1_0/type/messaging/Accepted.java  |  30 +-
 .../protocol/v1_0/type/messaging/Modified.java  |  47 +-
 .../protocol/v1_0/type/messaging/Received.java  |  24 +-
 .../protocol/v1_0/type/messaging/Rejected.java  |  40 +-
 .../protocol/v1_0/type/messaging/Released.java  |  29 +-
 .../v1_0/type/transaction/Declared.java         |  31 +-
 .../type/transaction/TransactionalState.java    |  25 +-
 .../server/protocol/v1_0/Session_1_0Test.java   |  10 +
 .../tests/protocol/v1_0/FrameTransport.java     |   2 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |  77 +-
 .../apache/qpid/tests/protocol/v1_0/Utils.java  |  48 ++
 .../v1_0/messaging/MultiTransferTest.java       |  91 +--
 .../protocol/v1_0/messaging/TransferTest.java   |  42 +-
 .../transaction/TransactionalTransferTest.java  |   1 -
 .../transport/link/ResumeDeliveriesTest.java    | 766 +++++++++++++++++++
 .../qpid/systest/prefetch/ZeroPrefetchTest.java |   8 +-
 23 files changed, 1124 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 55432c9..5410766 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -38,8 +41,11 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
@@ -49,6 +55,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
+    private static final int FRAME_HEADER_SIZE = 8;
+
     private final Link_1_0<S, T> _link;
     private final Session_1_0 _session;
 
@@ -66,6 +74,9 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     private volatile Map<Symbol, Object> _properties;
     private volatile State _state = State.ATTACH_RECVD;
 
+    protected boolean _remoteIncompleteUnsettled;
+    protected boolean _localIncompleteUnsettled;
+
     protected enum State
     {
         DETACHED,
@@ -129,7 +140,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         _receivingSettlementMode = attach.getRcvSettleMode();
         _properties = initProperties(attach);
         _state = State.ATTACH_RECVD;
-
+        _remoteIncompleteUnsettled = Boolean.TRUE.equals(attach.getIncompleteUnsettled());
         if (getRole() == Role.RECEIVER)
         {
             getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
@@ -306,6 +317,8 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
             attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
         }
 
+        attachToSend = handleOversizedUnsettledMapIfNecessary(attachToSend);
+
         switch (_state)
         {
             case DETACHED:
@@ -322,6 +335,73 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
 
     }
 
+    private Attach handleOversizedUnsettledMapIfNecessary(final Attach attachToSend)
+    {
+        final AMQPDescribedTypeRegistry describedTypeRegistry = getSession().getConnection().getDescribedTypeRegistry();
+        final ValueWriter<Attach> valueWriter = describedTypeRegistry.getValueWriter(attachToSend);
+        if (valueWriter.getEncodedSize() + 8 > getSession().getConnection().getMaxFrameSize())
+        {
+            _localIncompleteUnsettled = true;
+            attachToSend.setIncompleteUnsettled(true);
+            final int targetSize = getSession().getConnection().getMaxFrameSize();
+            int lowIndex = 0;
+            Map<Binary, DeliveryState> localUnsettledMap = attachToSend.getUnsettled();
+            if (localUnsettledMap == null)
+            {
+                localUnsettledMap = Collections.emptyMap();
+            }
+            int highIndex = localUnsettledMap.size();
+            int currentIndex = (highIndex - lowIndex) / 2;
+            int oldIndex;
+            HashMap<Binary, DeliveryState> unsettledMap = null;
+            int totalSize;
+            do
+            {
+                HashMap<Binary, DeliveryState> partialUnsettledMap = new HashMap<>(currentIndex);
+                final Iterator<Map.Entry<Binary, DeliveryState>> iterator = localUnsettledMap.entrySet().iterator();
+                for (int i = 0; i < currentIndex; ++i)
+                {
+                    final Map.Entry<Binary, DeliveryState> entry = iterator.next();
+                    partialUnsettledMap.put(entry.getKey(), entry.getValue());
+                }
+                attachToSend.setUnsettled(partialUnsettledMap);
+                totalSize = describedTypeRegistry.getValueWriter(attachToSend).getEncodedSize() + FRAME_HEADER_SIZE;
+                if (totalSize > targetSize)
+                {
+                    highIndex = currentIndex;
+                }
+                else if (totalSize < targetSize)
+                {
+                    lowIndex = currentIndex;
+                    unsettledMap = partialUnsettledMap;
+                }
+                else
+                {
+                    lowIndex = highIndex = currentIndex;
+                    unsettledMap = partialUnsettledMap;
+                }
+
+                oldIndex = currentIndex;
+                currentIndex = lowIndex + (highIndex - lowIndex) / 2;
+            }
+            while (oldIndex != currentIndex);
+
+            if (unsettledMap == null || unsettledMap.isEmpty())
+            {
+                final End endWithError = new End();
+                endWithError.setError(new Error(AmqpError.FRAME_SIZE_TOO_SMALL, "Cannot fit a single unsettled delivery into Attach frame."));
+                getSession().end(endWithError);
+            }
+
+            attachToSend.setUnsettled(unsettledMap);
+        }
+        else
+        {
+            _localIncompleteUnsettled = false;
+        }
+        return attachToSend;
+    }
+
     public void detach()
     {
         detach(null, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index e87fffb..0b78622 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -117,16 +117,8 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             {
                 _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
             }
-            else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
-            {
-                final Error error = new Error(AmqpError.ILLEGAL_STATE,
-                                              String.format("Resumed transfer with delivery tag '%s' is not found.",
-                                                            _currentDelivery.getDeliveryTag()));
-                close(error);
-                return;
-            }
 
-            if (_currentDelivery.isAborted())
+            if (_currentDelivery.isAborted() || (_currentDelivery.getResume() && !_unsettled.containsKey(_currentDelivery.getDeliveryTag())))
             {
                 _unsettled.remove(_currentDelivery.getDeliveryTag());
                 getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
@@ -176,14 +168,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             error = new Error(AmqpError.INVALID_FIELD,
                                     "Transfer \"delivery-tag\" is required for a new delivery.");
         }
-        else if (_unsettled.containsKey(transfer.getDeliveryTag()))
+        else if (!Boolean.TRUE.equals(transfer.getResume()))
         {
-            error = new Error(AmqpError.ILLEGAL_STATE,
-                              String.format("Delivery-tag '%s' is used by another unsettled delivery."
-                                            + " The delivery-tag MUST be unique amongst all deliveries that"
-                                            + " could be considered unsettled by either end of the link.",
-                                            transfer.getDeliveryTag()));
+            if (_unsettled.containsKey(transfer.getDeliveryTag()))
+            {
+                error = new Error(AmqpError.ILLEGAL_STATE,
+                                  String.format("Delivery-tag '%s' is used by another unsettled delivery."
+                                                + " The delivery-tag MUST be unique amongst all deliveries that"
+                                                + " could be considered unsettled by either end of the link.",
+                                                transfer.getDeliveryTag()));
+            }
+            else if (_localIncompleteUnsettled || _remoteIncompleteUnsettled)
+            {
+                error = new Error(AmqpError.ILLEGAL_STATE,
+                                  "Cannot accept new deliveries while incomplete-unsettled is true.");
+            }
         }
+
         return error;
     }
 
@@ -297,20 +298,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         sendFlowConditional();
     }
 
-    @Override
-    public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
-    {
-        super.receiveDeliveryState(deliveryTag, state, settled);
-        if(_creditWindow)
-        {
-            if(Boolean.TRUE.equals(settled))
-            {
-                setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
-                sendFlowConditional();
-            }
-        }
-    }
-
     SectionDecoder getSectionDecoder()
     {
         return _sectionDecoder;
@@ -321,11 +308,11 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     {
         super.settle(deliveryTag);
         _unsettled.remove(deliveryTag);
-        if(_creditWindow)
+        if (_creditWindow)
         {
-             sendFlowConditional();
+            setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+            sendFlowConditional();
         }
-
     }
 
     public void flowStateChanged()
@@ -341,13 +328,10 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         }
         finally
         {
-            if (close)
+            if (_currentDelivery != null)
             {
-                if (_currentDelivery != null)
-                {
-                    _currentDelivery.discard();
-                    _currentDelivery = null;
-                }
+                _currentDelivery.discard();
+                _currentDelivery = null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index e6763eb..a78f7f0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -644,11 +644,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         Map<Binary, DeliveryState> remoteUnsettled =
                 attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
 
+        final boolean isUnsettledComplete = !Boolean.TRUE.equals(attach.getIncompleteUnsettled());
         for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
             final MessageInstance queueEntry = entry.getValue().getMessageInstance();
-            if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+            if (!remoteUnsettled.containsKey(deliveryTag) && isUnsettledComplete)
             {
                 queueEntry.setRedelivered();
                 queueEntry.release(oldConsumer);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 65a5265..ba06376 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -311,7 +311,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         try
         {
             List<QpidByteBuffer> payload = xfr.getPayload();
-            final long remaining = QpidByteBufferUtils.remaining(payload);
+            final long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
             int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
 
             if(payload != null && payloadSent < remaining && payloadSent >= 0)
@@ -320,11 +320,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
                 Transfer secondTransfer = new Transfer();
 
-                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
                 secondTransfer.setHandle(xfr.getHandle());
-                secondTransfer.setSettled(xfr.getSettled());
+                secondTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
                 secondTransfer.setState(xfr.getState());
-                secondTransfer.setMessageFormat(xfr.getMessageFormat());
                 secondTransfer.setPayload(payload);
 
                 sendTransfer(secondTransfer, endpoint, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 048fa20..447e7cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -270,8 +271,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
     @Override
     protected void remoteDetachedPerformDetach(Detach detach)
     {
-        if(!TerminusDurability.UNSETTLED_STATE.equals(getDurability()) ||
-           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        final TerminusExpiryPolicy expiryPolicy = getTarget().getExpiryPolicy();
+        if((detach != null && Boolean.TRUE.equals(detach.getClosed()))
+           || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
+           || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+           || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
             close();
         }
@@ -332,6 +336,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             }
             target.setCapabilities(targetCapabilities.toArray(new Symbol[targetCapabilities.size()]));
         }
+        target.setExpiryPolicy(attachTarget.getExpiryPolicy());
 
         final ReceivingDestination destination = getSession().getReceivingDestination(getLink(), target);
 
@@ -341,17 +346,19 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         setCapabilities(targetCapabilities);
         setDestination(destination);
 
-        Map remoteUnsettled = attach.getUnsettled();
-        Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
-        for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
+        if (!Boolean.TRUE.equals(attach.getIncompleteUnsettled()))
         {
-            Binary deliveryTag = entry.getKey();
-            if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+            Map remoteUnsettled = attach.getUnsettled();
+            Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+            for (Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
             {
-                _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
+                Binary deliveryTag = entry.getKey();
+                if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+                {
+                    _unsettled.remove(deliveryTag);
+                }
             }
         }
-
         getLink().setTermini(source, target);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index c5e0334..581c3c5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -59,6 +59,7 @@ public class FrameHandler implements ProtocolHandler
     {
         try
         {
+            LOGGER.debug("RECV {} bytes", in.remaining());
             Error frameParsingError = null;
             int size;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
index e4a94bf..a77e587 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
@@ -26,6 +26,7 @@ public class OversizeFrameException extends RuntimeException
 
     public OversizeFrameException(final AMQFrame frame, final int size)
     {
+        super("Tried to send frame of size: " + String.valueOf(size));
         _frame = frame;
         _size = size;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
index fb9f446..abf8509 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
@@ -23,31 +23,25 @@
 
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
+public class Accepted implements Outcome
+{
+    public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Accepted
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-      public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
-
-      @Override
-      public Symbol getSymbol()
-      {
-          return ACCEPTED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return ACCEPTED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Accepted{");
-        final int origLength = builder.length();
 
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
index 43264a9..41d2fa7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
@@ -24,21 +24,18 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
-
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Modified
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
+public class Modified implements Outcome
+{
+    public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
 
-      public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
-
-      @CompositeTypeField
-      private Boolean _deliveryFailed;
+    @CompositeTypeField
+    private Boolean _deliveryFailed;
 
     @CompositeTypeField
     private Boolean _undeliverableHere;
@@ -76,39 +73,39 @@ public class Modified
         _messageAnnotations = messageAnnotations;
     }
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return MODIFIED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return MODIFIED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Modified{");
         final int origLength = builder.length();
 
-        if(_deliveryFailed != null)
+        if (_deliveryFailed != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("deliveryFailed=").append(_deliveryFailed);
         }
 
-        if(_undeliverableHere != null)
+        if (_undeliverableHere != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("undeliverableHere=").append(_undeliverableHere);
         }
 
-        if(_messageAnnotations != null)
+        if (_messageAnnotations != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -118,6 +115,4 @@ public class Modified
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
index 3ad8cb7..86d9554 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
@@ -24,13 +24,13 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Received
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState
-  {
-
+public class Received implements DeliveryState
+{
     @CompositeTypeField(mandatory = true)
     private UnsignedInteger _sectionNumber;
 
@@ -63,18 +63,18 @@ public class Received
         StringBuilder builder = new StringBuilder("Received{");
         final int origLength = builder.length();
 
-        if(_sectionNumber != null)
+        if (_sectionNumber != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("sectionNumber=").append(_sectionNumber);
         }
 
-        if(_sectionOffset != null)
+        if (_sectionOffset != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -84,6 +84,4 @@ public class Received
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
index 5f5ca24..e877a56 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
@@ -24,21 +24,17 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
-
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 
+public class Rejected implements Outcome
+{
+    public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Rejected
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-
-      public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
-
-      @CompositeTypeField
-      private Error _error;
+    @CompositeTypeField
+    private Error _error;
 
     public Error getError()
     {
@@ -50,21 +46,21 @@ public class Rejected
         _error = error;
     }
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return REJECTED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return REJECTED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Rejected{");
         final int origLength = builder.length();
 
-        if(_error != null)
+        if (_error != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -74,6 +70,4 @@ public class Rejected
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
index 4ba74f4..945c7ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
@@ -24,23 +24,20 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
+public class Released implements Outcome
+{
+    public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
 
-public class Released
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-
-      public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
-
-      @Override
-      public Symbol getSymbol()
-      {
-          return RELEASED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return RELEASED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Released{");
@@ -49,6 +46,4 @@ public class Released
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
index 17c4e62..a9b9343 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
@@ -24,22 +24,17 @@
 package org.apache.qpid.server.protocol.v1_0.type.transaction;
 
 
-
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-public class Declared
-  implements DeliveryState, Outcome
-  {
-
-
-      public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
+public class Declared implements Outcome
+{
+    public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
 
-      @CompositeTypeField(mandatory = true)
-      private Binary _txnId;
+    @CompositeTypeField(mandatory = true)
+    private Binary _txnId;
 
     public Binary getTxnId()
     {
@@ -57,9 +52,9 @@ public class Declared
         StringBuilder builder = new StringBuilder("Declared{");
         final int origLength = builder.length();
 
-        if(_txnId != null)
+        if (_txnId != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -71,9 +66,9 @@ public class Declared
     }
 
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return DECLARED_SYMBOL;
-      }
-  }
+    @Override
+    public Symbol getSymbol()
+    {
+        return DECLARED_SYMBOL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
index 1f2d29b..20db2a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
@@ -24,14 +24,13 @@
 package org.apache.qpid.server.protocol.v1_0.type.transaction;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class TransactionalState
-  implements DeliveryState
-  {
-
-
+public class TransactionalState implements DeliveryState
+{
     @CompositeTypeField(mandatory = true)
     private Binary _txnId;
 
@@ -64,18 +63,18 @@ public class TransactionalState
         StringBuilder builder = new StringBuilder("TransactionalState{");
         final int origLength = builder.length();
 
-        if(_txnId != null)
+        if (_txnId != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("txnId=").append(_txnId);
         }
 
-        if(_outcome != null)
+        if (_outcome != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -85,6 +84,4 @@ public class TransactionalState
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 4a22d8e..468fe15 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -79,6 +80,13 @@ import org.apache.qpid.test.utils.QpidTestCase;
 
 public class Session_1_0Test extends QpidTestCase
 {
+    private static final AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                                      .registerTransportLayer()
+                                                                                                      .registerMessagingLayer()
+                                                                                                      .registerTransactionLayer()
+                                                                                                      .registerSecurityLayer()
+                                                                                                      .registerExtensionSoleconnLayer();
+
     private static final String TOPIC_NAME = "testTopic";
     private static final String QUEUE_NAME = "testQueue";
     private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
@@ -721,6 +729,8 @@ public class Session_1_0Test extends QpidTestCase
         when(connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
         when(connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        when(connection.getDescribedTypeRegistry()).thenReturn(DESCRIBED_TYPE_REGISTRY);
+        when(connection.getMaxFrameSize()).thenReturn(512);
         final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
         when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 9aeb4e9..8746941 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -62,7 +62,7 @@ public class FrameTransport implements AutoCloseable
     public static final long RESPONSE_TIMEOUT = 6000;
     private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
 
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100);
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
 
     private final EventLoopGroup _workerGroup;
     private final InetSocketAddress _brokerAddress;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 008b016..3a6b083 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -228,15 +228,15 @@ public class Interaction
         return this;
     }
 
-    public Interaction openChannelMax(UnsignedShort channelMax)
+    public Interaction openMaxFrameSize(final UnsignedInteger maxFrameSize)
     {
-        _open.setChannelMax(channelMax);
+        _open.setMaxFrameSize(maxFrameSize);
         return this;
     }
 
-    public Interaction openMaxFrameSize(UnsignedInteger maxFrameSize)
+    public Interaction openChannelMax(UnsignedShort channelMax)
     {
-        _open.setMaxFrameSize(maxFrameSize);
+        _open.setChannelMax(channelMax);
         return this;
     }
 
@@ -397,6 +397,18 @@ public class Interaction
         return this;
     }
 
+    public Interaction attachUnsettled(final Map<Binary, DeliveryState> unsettled)
+    {
+        _attach.setUnsettled(unsettled);
+        return this;
+    }
+
+    public Interaction attachIncompleteUnsettled(final Boolean incompleteUnsettled)
+    {
+        _attach.setIncompleteUnsettled(incompleteUnsettled);
+        return this;
+    }
+
     public Interaction attach() throws Exception
     {
         sendPerformativeAndChainFuture(_attach, _sessionChannel);
@@ -501,28 +513,33 @@ public class Interaction
         return this;
     }
 
+    public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+    {
+        _transfer.setDeliveryId(deliveryId);
+        return this;
+    }
+
     public Interaction transferDeliveryTag(final Binary deliveryTag)
     {
         _transfer.setDeliveryTag(deliveryTag);
         return this;
     }
 
-    public Interaction transferState(final DeliveryState state)
+    public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
     {
-        _transfer.setState(state);
+        _transfer.setMessageFormat(messageFormat);
         return this;
     }
 
-    public Interaction transferTransactionalState(final Binary transactionalId)
+    public Interaction transferSettled(final Boolean settled)
     {
-        TransactionalState transactionalState = new TransactionalState();
-        transactionalState.setTxnId(transactionalId);
-        return transferState(transactionalState);
+        _transfer.setSettled(settled);
+        return this;
     }
 
-    public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+    public Interaction transferMore(final Boolean more)
     {
-        _transfer.setDeliveryId(deliveryId);
+        _transfer.setMore(more);
         return this;
     }
 
@@ -532,25 +549,32 @@ public class Interaction
         return this;
     }
 
-    public Interaction transferMore(final Boolean more)
+    public Interaction transferState(final DeliveryState state)
     {
-        _transfer.setMore(more);
+        _transfer.setState(state);
         return this;
     }
 
-    public Interaction transferAborted(final Boolean aborted)
+    public Interaction transferTransactionalState(final Binary transactionalId)
     {
-        _transfer.setAborted(aborted);
+        TransactionalState transactionalState = new TransactionalState();
+        transactionalState.setTxnId(transactionalId);
+        return transferState(transactionalState);
+    }
+
+    public Interaction transferResume(final Boolean resume)
+    {
+        _transfer.setResume(resume);
         return this;
     }
 
-    public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
+    public Interaction transferAborted(final Boolean aborted)
     {
-        _transfer.setMessageFormat(messageFormat);
+        _transfer.setAborted(aborted);
         return this;
     }
 
-    public Interaction setPayloadOnTransfer(final List<QpidByteBuffer> payload)
+    public Interaction transferPayload(final List<QpidByteBuffer> payload)
     {
         _transfer.setPayload(payload);
         return this;
@@ -558,11 +582,11 @@ public class Interaction
 
     public Interaction transferPayloadData(final Object payload)
     {
-        setPayloadOnTransfer(_transfer, payload);
+        transferPayload(_transfer, payload);
         return this;
     }
 
-    private void setPayloadOnTransfer(final Transfer transfer, final Object payload)
+    private void transferPayload(final Transfer transfer, final Object payload)
     {
         AmqpValue amqpValue = new AmqpValue(payload);
         final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
@@ -575,12 +599,6 @@ public class Interaction
         }
     }
 
-    public Interaction transferSettled(final Boolean settled)
-    {
-        _transfer.setSettled(settled);
-        return this;
-    }
-
     public Interaction transfer() throws Exception
     {
         sendPerformativeAndChainFuture(_transfer, _sessionChannel);
@@ -648,7 +666,7 @@ public class Interaction
     public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
     {
         Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        setPayloadOnTransfer(transfer, new Declare());
+        transferPayload(transfer, new Declare());
         sendPerformativeAndChainFuture(transfer, _sessionChannel);
         consumeResponse(Disposition.class);
         Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
@@ -668,7 +686,7 @@ public class Interaction
         discharge.setFail(failed);
 
         Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        setPayloadOnTransfer(transfer, discharge);
+        transferPayload(transfer, discharge);
         sendPerformativeAndChainFuture(transfer, _sessionChannel);
 
         Disposition declareTransactionDisposition = null;
@@ -818,6 +836,7 @@ public class Interaction
 
     public Interaction receiveDelivery() throws Exception
     {
+        sync();
         _latestDelivery = receiveAllTransfers();
         return this;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 446c112..f0373f5 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -25,8 +25,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -87,4 +91,48 @@ public class Utils
             return interaction.getDecodedLatestDelivery();
         }
     }
+
+    public static  QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
+    {
+        MessageEncoder messageEncoder = new MessageEncoder();
+        final Header header = new Header();
+        messageEncoder.setHeader(header);
+        messageEncoder.addData(messageContent);
+        List<QpidByteBuffer> payload = messageEncoder.getPayload();
+        long size = QpidByteBufferUtils.remaining(payload);
+
+        QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
+        int chunkSize = (int) size / numberOfParts;
+        int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
+        for (int i = 0; i < numberOfParts; i++)
+        {
+            result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
+        }
+
+        int currentBufferIndex = 0;
+        for (QpidByteBuffer p : payload)
+        {
+            final int limit = p.limit();
+
+            while (p.hasRemaining())
+            {
+                QpidByteBuffer currentBuffer = result[currentBufferIndex];
+                if (currentBuffer.hasRemaining())
+                {
+                    int length = Math.min(p.remaining(), currentBuffer.remaining());
+                    p.limit(p.position() + length);
+                    currentBuffer.put(p.slice());
+                    p.position(p.position() + length);
+                    p.limit(limit);
+                }
+
+                if (!currentBuffer.hasRemaining())
+                {
+                    currentBuffer.flip();
+                    currentBufferIndex++;
+                }
+            }
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 2f83a13..1c09bb6 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.isOneOf;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.After;
@@ -39,11 +38,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
@@ -57,10 +54,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 import org.apache.qpid.tests.protocol.v1_0.Response;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class MultiTransferTest extends ProtocolTestBase
 {
@@ -99,7 +96,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
 
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -114,14 +111,14 @@ public class MultiTransferTest extends ProtocolTestBase
                                                  .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                  .attach().consumeResponse(Attach.class)
                                                  .consumeResponse(Flow.class)
-                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                                                 .transferPayload(Collections.singletonList(payloads[0]))
                                                  .transferDeliveryId(deliveryId)
                                                  .transferDeliveryTag(deliveryTag)
                                                  .transferMore(true)
                                                  .transfer()
                                                  .sync()
                                                  .transferMore(false)
-                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                                                 .transferPayload(Collections.singletonList(payloads[1]))
                                                  .transfer()
                                                  .consumeResponse()
                                                  .getLatestResponse(Disposition.class);
@@ -141,7 +138,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 4);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 4);
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
 
@@ -158,25 +155,25 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transferPayload(Collections.singletonList(payloads[0]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(null)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                       .transferPayload(Collections.singletonList(payloads[1]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(null)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[2]))
+                       .transferPayload(Collections.singletonList(payloads[2]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(null)
                        .transferDeliveryTag(null)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[3]))
+                       .transferPayload(Collections.singletonList(payloads[3]))
                        .transfer()
                        .consumeResponse();
 
@@ -200,7 +197,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
 
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -215,13 +212,13 @@ public class MultiTransferTest extends ProtocolTestBase
                        .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transferPayload(Collections.singletonList(payloads[0]))
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
                        .transfer()
                        .sync()
-                       .setPayloadOnTransfer(null)
+                       .transferPayload(null)
                        .transferMore(null)
                        .transferAborted(true)
                        .transfer();
@@ -237,8 +234,8 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
-            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+            QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
 
             UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
             UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
@@ -275,7 +272,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transferPayload(Collections.singletonList(messagePayload1[0]))
                        .transfer()
                        .sync()
 
@@ -283,7 +280,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transferPayload(Collections.singletonList(messagePayload2[0]))
                        .transfer()
                        .sync()
 
@@ -291,7 +288,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[1]))
+                       .transferPayload(Collections.singletonList(messagePayload1[1]))
                        .transfer()
                        .sync()
 
@@ -299,7 +296,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[1]))
+                       .transferPayload(Collections.singletonList(messagePayload2[1]))
                        .transfer()
                        .sync();
 
@@ -328,8 +325,8 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
-            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+            QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
 
             Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
             Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
@@ -352,62 +349,18 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transferPayload(Collections.singletonList(messagePayload1[0]))
                        .transfer()
                        .sync()
 
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transferPayload(Collections.singletonList(messagePayload2[0]))
                        .transfer()
                        .sync();
 
             interaction.consumeResponse(Detach.class, End.class, Close.class);
         }
     }
-
-    private QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
-    {
-        MessageEncoder messageEncoder = new MessageEncoder();
-        final Header header = new Header();
-        messageEncoder.setHeader(header);
-        messageEncoder.addData(messageContent);
-        List<QpidByteBuffer> payload = messageEncoder.getPayload();
-        long size = QpidByteBufferUtils.remaining(payload);
-
-        QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
-        int chunkSize = (int) size / numberOfParts;
-        int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
-        for (int i = 0; i < numberOfParts; i++)
-        {
-            result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
-        }
-
-        int currentBufferIndex = 0;
-        for (QpidByteBuffer p : payload)
-        {
-            final int limit = p.limit();
-
-            while (p.hasRemaining())
-            {
-                QpidByteBuffer currentBuffer = result[currentBufferIndex];
-                if (currentBuffer.hasRemaining())
-                {
-                    int length = Math.min(p.remaining(), currentBuffer.remaining());
-                    p.limit(p.position() + length);
-                    currentBuffer.put(p.slice());
-                    p.position(p.position() + length);
-                    p.limit(limit);
-                }
-
-                if (!currentBuffer.hasRemaining())
-                {
-                    currentBuffer.flip();
-                    currentBufferIndex++;
-                }
-            }
-        }
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index a5f23e2..85baddf 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,10 +21,8 @@
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -294,7 +292,7 @@ public class TransferTest extends ProtocolTestBase
                                                                                    Rejected.REJECTED_SYMBOL)
                                                              .attach().consumeResponse(Attach.class)
                                                              .consumeResponse(Flow.class)
-                                                             .setPayloadOnTransfer(messageEncoder.getPayload())
+                                                             .transferPayload(messageEncoder.getPayload())
                                                              .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                              .transfer()
                                                              .consumeResponse()
@@ -339,7 +337,7 @@ public class TransferTest extends ProtocolTestBase
                                                   .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                   .attach().consumeResponse(Attach.class)
                                                   .consumeResponse(Flow.class)
-                                                  .setPayloadOnTransfer(messageEncoder.getPayload())
+                                                  .transferPayload(messageEncoder.getPayload())
                                                   .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                   .transfer()
                                                   .consumeResponse()
@@ -780,6 +778,7 @@ public class TransferTest extends ProtocolTestBase
             interaction.transferDeliveryId(UnsignedInteger.ZERO)
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData("test")
+                       .transferSettled(true)
                        .transfer()
                        .sync()
 
@@ -789,41 +788,6 @@ public class TransferTest extends ProtocolTestBase
                        .transfer()
                        .sync();
 
-            boolean firstSettled = false, secondSettled = false;
-            do
-            {
-                interaction.consumeResponse();
-                Response<?> response = interaction.getLatestResponse();
-                assertThat(response, is(notNullValue()));
-
-                Object body = response.getBody();
-
-                if (body instanceof Disposition)
-                {
-                    Disposition disposition = (Disposition) body;
-                    assertThat(disposition.getSettled(), is(equalTo(true)));
-                    assertThat(disposition.getFirst(),
-                               anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE)));
-                    assertThat(disposition.getLast(),
-                               anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE), nullValue()));
-
-                    if (UnsignedInteger.ZERO.equals(disposition.getFirst()))
-                    {
-                        firstSettled = true;
-                    }
-                    if (UnsignedInteger.ONE.equals(disposition.getFirst())
-                        || UnsignedInteger.ONE.equals(disposition.getLast()))
-                    {
-                        secondSettled = true;
-                    }
-                }
-                else if (!(body instanceof Flow))
-                {
-                    fail("Unexpected response " + body);
-                }
-            }
-            while (!firstSettled || !secondSettled);
-
             transport.doCloseConnection();
 
             assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 7168c47..6fe0b55 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message